From 49a1f2e3af2387d8ec2e31455d52a3e340fce54c Mon Sep 17 00:00:00 2001 From: dbarzin Date: Sat, 25 Feb 2023 11:19:37 +0100 Subject: [PATCH] code quality --- pandora-box.py | 169 ++++++++++++++++++++++++------------------------- 1 file changed, 84 insertions(+), 85 deletions(-) diff --git a/pandora-box.py b/pandora-box.py index 0768b1a..8e19f5a 100755 --- a/pandora-box.py +++ b/pandora-box.py @@ -33,6 +33,7 @@ import psutil import pypandora + class PandoraBox: """The PandoraBox class""" @@ -74,27 +75,25 @@ class PandoraBox: # read the config file config_parser.read('pandora-box.ini') # set values - self.is_fake_scan=config_parser['DEFAULT']['FAKE_SCAN'].lower()=="true" - self.has_usb_auto_mount=config_parser['DEFAULT']['USB_AUTO_MOUNT'].lower()=="true" - self.pandora_root_url=config_parser['DEFAULT']['PANDORA_ROOT_URL'] + self.is_fake_scan = config_parser['DEFAULT']['FAKE_SCAN'].lower() == "true" + self.has_usb_auto_mount = config_parser['DEFAULT']['USB_AUTO_MOUNT'].lower() == "true" + self.pandora_root_url = config_parser['DEFAULT']['PANDORA_ROOT_URL'] # Quarantine - self.has_quarantine = config_parser['DEFAULT']['QUARANTINE'].lower()=="true" + self.has_quarantine = config_parser['DEFAULT']['QUARANTINE'].lower() == "true" self.quarantine_folder = config_parser['DEFAULT']['QUARANTINE_FOLDER'] # Curses - self.has_curses = config_parser['DEFAULT']['CURSES'].lower()=="true" - + self.has_curses = config_parser['DEFAULT']['CURSES'].lower() == "true" # ---------------------------------------------------------- - def _human_readable_size(self,size, decimal_places=1): + def _human_readable_size(self, size, decimal_places=1): """ Convert size to human readble string """ - for unit in ['B','KB','MB','GB','TB']: + for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size < 1024.0: break size /= 1024.0 return f"{size:.{decimal_places}f}{unit}" - # ----------------------------------------------------------- # Image Screen # ----------------------------------------------------------- @@ -102,15 +101,15 @@ class PandoraBox: def display_image(self, status): """ Display image on screen """ if not self.has_curses: - if status=="WAIT": + if status == "WAIT": image = "images/key*.png" - elif status=="WORK": + elif status == "WORK": image = "images/wait*.png" - elif status=="OK": + elif status == "OK": image = "images/ok.png" - elif status=="BAD": + elif status == "BAD": image = "images/bad.png" - elif status=="ERROR": + elif status == "ERROR": image = "images/error.png" else: return @@ -119,24 +118,23 @@ class PandoraBox: # display image if "*" in image: # slide show - os.system(f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image} "\ - "/dev/null >/dev/null &") - else : + os.system(f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image} " + "/dev/null >/dev/null &") + else: # only one image os.system(f"fim -qa {image} /dev/null >/dev/null &") - # ----------------------------------------------------------- def wait_mouse_click(self): """ Wait for mouse click event """ - with open("/dev/input/mice", "rb" ) as mouse: + with open("/dev/input/mice", "rb") as mouse: down = False while True: buf = mouse.read(3) - if (buf[0] & 0x1)==1: + if (buf[0] & 0x1) == 1: down = True - if ((buf[0] & 0x1)==0) and down: + if ((buf[0] & 0x1) == 0) and down: break # ----------------------------------------------------------- @@ -172,7 +170,7 @@ class PandoraBox: def _print_used(self, label): """Print FS Used Size""" if self.has_curses: - self.status_win.addstr(3, 1, f"Used : {label:32} ",curses.color_pair(2)) + self.status_win.addstr(3, 1, f"Used : {label:32} ", curses.color_pair(2)) self.status_win.refresh() def _print_fstype(self, label): @@ -274,17 +272,18 @@ class PandoraBox: ) logs = [] + def _log(self, msg): """log something""" if self.has_curses: # display log on screen self.logs.append(msg) - if len(self.logs)>(curses.LINES-22): + if len(self.logs) > (curses.LINES - 22): self.logs.pop(0) self.log_win.clear() self.log_win.border(0) - for i in range(min(curses.LINES-22,len(self.logs))): - self.log_win.addstr(i+1,1,self.logs[i][:curses.COLS-2],curses.color_pair(3)) + for i in range(min(curses.LINES - 22, len(self.logs))): + self.log_win.addstr(i + 1, 1, self.logs[i][:curses.COLS - 2], curses.color_pair(3)) self.log_win.refresh() # ----------------------------------------------------------- @@ -319,7 +318,7 @@ class PandoraBox: os.statvfs(self.mount_point) except Exception as ex: self._log(f"Unexpected error: {ex}") - loop +=1 + loop += 1 continue break @@ -334,18 +333,18 @@ class PandoraBox: def _log_device_info(self, dev): """Log device information""" logging.info( - 'device_name="%s", ' \ - 'path_id="%s", ' \ - 'bus system="%s", ' \ - 'USB_driver="%s", ' \ - 'device_type="%s", ' \ - 'device_usage="%s", ' \ - 'partition type="%s", ' \ - 'fs_type="%s", ' \ - 'partition_label="%s", ' \ - 'device_model="%s", ' \ - 'model_id="%s", ' \ - 'serial_short="%s", '\ + 'device_name="%s", ' + 'path_id="%s", ' + 'bus system="%s", ' + 'USB_driver="%s", ' + 'device_type="%s", ' + 'device_usage="%s", ' + 'partition type="%s", ' + 'fs_type="%s", ' + 'partition_label="%s", ' + 'device_model="%s", ' + 'model_id="%s", ' + 'serial_short="%s", ' 'serial="%s"', dev.get("DEVNAME"), dev.get("ID_PATH"), @@ -366,25 +365,41 @@ class PandoraBox: # pandora # ----------------------------------------------------------- - def scan(self, used): - """Scan a mount point with Pandora""" + def scan(self): + """Scan devce with pypandora""" + + # get device size + try: + statvfs = os.statvfs(self.mount_point) + except Exception as ex: + self._log(f"error={ex}") + logging.info("An exception was thrown!", exc_info=True) + if not self.has_curses: + self.display_image("ERROR") + return "ERROR" + self._print_size(self._human_readable_size(statvfs.f_frsize * statvfs.f_blocks)) + self._print_used(self._human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))) + self._human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)) + used = statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree) + + # scan device self.infected_files = [] scanned = 0 file_count = 0 scan_start_time = time.time() if self.has_quarantine: - qfolder = os.path.join(self.quarantine_folder,datetime.now().strftime("%y%m%d-%H%M")) + qfolder = os.path.join(self.quarantine_folder, datetime.now().strftime("%y%m%d-%H%M")) if not self.is_fake_scan: pandora = pypandora.PyPandora(root_url=self.pandora_root_url) try: for root, _, files in os.walk(self.mount_point): for file in files: status = None - full_path = os.path.join(root,file) + full_path = os.path.join(root, file) file_size = os.path.getsize(full_path) # log("Check %s [%s]" % (file, human_readable_size(file_size))) file_scan_start_time = time.time() - if self.is_fake_scan : + if self.is_fake_scan: time.sleep(0.1) status = "SKIPPED" else: @@ -405,14 +420,14 @@ class PandoraBox: file_scan_end_time = time.time() self._log( - f'Scan {file} '\ - f'[{self._human_readable_size(file_size)}] '\ - '-> '\ + f'Scan {file} ' + f'[{self._human_readable_size(file_size)}] ' + '-> ' f'{status} ({int(file_scan_end_time - file_scan_start_time)}s)') logging.info( - f'file="{file}", '\ - f'size="{file_size}", '\ - f'status="{status}"", '\ + f'file="{file}", ' + f'size="{file_size}", ' + f'status="{status}"", ' f'duration="{int(file_scan_end_time - file_scan_start_time)}"') scanned += os.path.getsize(full_path) file_count += 1 @@ -421,25 +436,25 @@ class PandoraBox: if status == "ALERT": self.infected_files.append(full_path) if self.has_quarantine: - if not os.path.isdir(qfolder) : + if not os.path.isdir(qfolder): os.mkdir(qfolder) - shutil.copyfile(full_path, os.path.join(qfolder,file)) - except Exception as ex : + shutil.copyfile(full_path, os.path.join(qfolder, file)) + except Exception as ex: self._log(f"Unexpected error: {str(ex)}") logging.info(f'error="{str(ex)}"', exc_info=True) return "ERROR" self._update_bar(100) - self._log("Scan done in %ds, %d files scanned, %d files infected" % - ((time.time() - scan_start_time),file_count,len(self.infected_files))) + self._log("Scan done in %ds, %d files scanned, %d files infected" % + ((time.time() - scan_start_time), file_count, len(self.infected_files))) logging.info( - f'duration="{int(time.time() - scan_start_time)}", '\ - f'files_scanned="{file_count}", '\ + f'duration="{int(time.time() - scan_start_time)}", ' + f'files_scanned="{file_count}", ' f'files_infected="{len(self.infected_files)}"') return "CLEAN" # -------------------------------------- - def wait_device(self): + def wait(self): """Wait for insert of remove of USB device""" # Loop context = pyudev.Context() @@ -467,8 +482,7 @@ class PandoraBox: else: # display device type self._print_fslabel(self.device.get("ID_FS_LABEL")) - self._print_fstype(self.device.get("ID_PART_TABLE_TYPE") - + " " + self.device.get("ID_FS_TYPE")) + self._print_fstype(self.device.get("ID_PART_TABLE_TYPE") + " " + self.device.get("ID_FS_TYPE")) self._print_model(self.device.get("ID_MODEL")) self._print_serial(self.device.get("ID_SERIAL_SHORT")) return "INSERTED" @@ -501,7 +515,7 @@ class PandoraBox: return "WAIT" try: os.statvfs(self.mount_point) - except Exception as ex : + except Exception as ex: self._log(f"Unexpected error: {str(ex)}") logging.info(f'error="{str(ex)}"', exc_info=True) if not self.has_curses: @@ -511,23 +525,6 @@ class PandoraBox: # -------------------------------------- - def scan_device(self): - """Scan devce with pypandora""" - try: - statvfs=os.statvfs(self.mount_point) - except Exception as ex : - self._log(f"error={ex}") - logging.info("An exception was thrown!", exc_info=True) - if not self.has_curses: - self.display_image("WAIT") - return "WAIT" - self._print_size(self._human_readable_size(statvfs.f_frsize * statvfs.f_blocks)) - self._print_used( - self._human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))) - return self.scan(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)) - - # -------------------------------------- - def error(self): """ Display error message """ if not self.has_curses: @@ -550,12 +547,12 @@ class PandoraBox: # Remove infected files files_removed = 0 for file in self.infected_files: - try : + try: os.remove(file) self._log(f"{file} removed") logging.info(f'removed="{file}"') files_removed += 1 - except Exception as ex : + except Exception as ex: self._log(f"Unexpected error: {str(ex)}") logging.info(f'error="{str(ex)}"', exc_info=True) os.system("sync") @@ -607,11 +604,11 @@ class PandoraBox: case "START": return self.startup() case "WAIT": - return self.wait_device() + return self.wait() case "INSERTED": return self.mount() case "SCAN": - return self.scan_device() + return self.scan() case "CLEAN": return self.clean() case "ERROR": @@ -623,20 +620,22 @@ class PandoraBox: def main(self): """Main entry point""" - try : - state="START" - while state!="STOP": + try: + state = "START" + while state != "STOP": state = self.loop(state) - except Exception as ex : + except Exception as ex: self._log(f"Unexpected error: {str(ex)}") logging.info(f'error="{str(ex)}"', exc_info=True) finally: self._end_curses() + def main(_): """Main entry point""" pandora_box = PandoraBox() pandora_box.main() + if __name__ == "__main__": wrapper(main)