From f92ce645dca5eec8559f34ce3b26510059d49738 Mon Sep 17 00:00:00 2001 From: dbarzin Date: Wed, 15 Feb 2023 18:03:18 +0100 Subject: [PATCH] code quality --- .github/workflows/ci.yml | 2 +- install.sh | 1 + pandora-box.py | 260 ++++++++++++++++++++------------------- 3 files changed, 138 insertions(+), 125 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 5136623..14829d8 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -21,7 +21,7 @@ jobs: - name: code quality checkout run: | + pip install pyudev psutil pypandora pip install pylint pylint ./pandora-box.py - diff --git a/install.sh b/install.sh index 0806fb6..69a3729 100755 --- a/install.sh +++ b/install.sh @@ -182,3 +182,4 @@ cp pandora-box.ini.curses pandora-dox.ini # Reboot echo "You may reboot the server." + diff --git a/pandora-box.py b/pandora-box.py index f4c73d6..c6decf4 100755 --- a/pandora-box.py +++ b/pandora-box.py @@ -67,7 +67,7 @@ class PandoraBox: # ---------------------------------------------------------- - def config(self): + def _config(self): """ read configuration file """ # intantiate a ConfirParser config_parser = configparser.ConfigParser() @@ -86,7 +86,7 @@ class PandoraBox: # ---------------------------------------------------------- - def human_readable_size(self,size, decimal_places=1): + def _human_readable_size(self,size, decimal_places=1): """ Convert size to human readble string """ for unit in ['B','KB','MB','GB','TB']: if size < 1024.0: @@ -123,7 +123,7 @@ class PandoraBox: "/dev/null >/dev/null &") else : # only one image - os.system(f"fim -qa %s /dev/null >/dev/null {image}") + os.system(f"fim -qa {image} /dev/null >/dev/null &") # ----------------------------------------------------------- @@ -143,7 +143,7 @@ class PandoraBox: # has_curses Screen # ----------------------------------------------------------- - def init_curses(self): + def _init_curses(self): """Initialise curses""" if self.has_curses: self.screen = curses.initscr() @@ -155,58 +155,55 @@ class PandoraBox: else: self.display_image("WAIT") - def print_fslabel(self, label): + def _print_fslabel(self, label): """Print FS Label""" + if label is None: + label = "" if self.has_curses: - self.status_win.addstr(1, 1, "Partition : %-32s" % label, curses.color_pair(2)) + self.status_win.addstr(1, 1, f"Partition : {label:32}", curses.color_pair(2)) self.status_win.refresh() - def print_size(self, label): + def _print_size(self, label): """Print FS Size""" if self.has_curses: - if label is None: - self.status_win.addstr(2, 1, "Size : ",curses.color_pair(2)) - else: - self.status_win.addstr(2, 1, "Size : %s " % label,curses.color_pair(2)) - logging.info("size={label}") + self.status_win.addstr(2, 1, f"Size : {label:32} ", curses.color_pair(2)) self.status_win.refresh() + logging.info('fs_size="%s"', label) - def print_used(self, label): + def _print_used(self, label): """Print FS Used Size""" if self.has_curses: - if label is None: - self.status_win.addstr(3, 1, "Used : ",curses.color_pair(2)) - else: - self.status_win.addstr(3, 1, "Used : %s " % label,curses.color_pair(2)) - logging.info(f'used="{label}') + self.status_win.addstr(3, 1, f"Used : {label:32} ",curses.color_pair(2)) self.status_win.refresh() + logging.info('fs_usage="%s"',label) - def print_fstype(self, label): + def _print_fstype(self, label): """Print device FS type""" if self.has_curses: - self.status_win.addstr(1, 50, "Part / Type : %-32s" % label, curses.color_pair(2)) + self.status_win.addstr(1, 50, f"Part / Type : {label:32}", curses.color_pair(2)) self.status_win.refresh() + logging.info('fs_type="%s"',label) - def print_model(self, label): + def _print_model(self, label): """Print device model""" if self.has_curses: - self.status_win.addstr(2, 50, "Model : %-32s" % label, curses.color_pair(2)) + self.status_win.addstr(2, 50, f"Model : {label:32}", curses.color_pair(2)) self.status_win.refresh() - def print_serial(self, label): + def _print_serial(self, label): """Print device serail number""" if self.has_curses: - self.status_win.addstr(3, 50, "Serial : %-32s" % label, curses.color_pair(2)) + self.status_win.addstr(3, 50, f"Serial : {label:32}", curses.color_pair(2)) self.status_win.refresh() - def init_bar(self): + def _init_bar(self): """Initialise progress bar""" if self.has_curses: self.progress_win = curses.newwin(3, curses.COLS-12, 17, 5) self.progress_win.border(0) self.progress_win.refresh() - def update_bar(self, progress): + def _update_bar(self, progress): """Update progress bar""" if self.has_curses: if progress == 0: @@ -220,7 +217,7 @@ class PandoraBox: self.progress_win.addstr(0, 1, f"Progress: {progress}%") self.progress_win.refresh() - def print_screen(self): + def _print_screen(self): """Print main screen""" if self.has_curses: curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) @@ -243,17 +240,17 @@ class PandoraBox: self.status_win = curses.newwin(5, curses.COLS, 12, 0) self.status_win.border(0) self.status_win.addstr(0, 1, "USB Key Information") - self.print_fslabel("") - self.print_size(None) - self.print_used(None) - self.print_fstype("") - self.print_model("") - self.print_serial("") - self.init_bar() - self.update_bar(0) - self.log('Ready.') + self._print_fslabel("") + self._print_size("") + self._print_used("") + self._print_fstype("") + self._print_model("") + self._print_serial("") + self._init_bar() + self._update_bar(0) + self._log('Ready.') - def end_curses(self): + def _end_curses(self): """Closes curses""" if self.has_curses: curses.endwin() @@ -266,7 +263,7 @@ class PandoraBox: # Logging windows # ----------------------------------------------------------- - def init_log(self): + def _init_log(self): """Inititalize logging function""" if self.has_curses: self.log_win = curses.newwin(curses.LINES-20, curses.COLS, 20, 0) @@ -279,7 +276,7 @@ class PandoraBox: ) logs = [] - def log(self, msg): + def _log(self, msg): """log something""" logging.info(msg) if self.has_curses: @@ -299,24 +296,22 @@ class PandoraBox: def mount_device(self): """Mount USB device""" - self.log('Try to mount partition') + self._log('Try to mount partition') + self.mount_point = None if self.has_usb_auto_mount: - found = False loop = 0 - while (not found) and (loop < 15): + while (self.mount_point is None) and (loop < 15): # need to sleep before devide is mounted time.sleep(1) for partition in psutil.disk_partitions(): if partition.device == self.device.device_node: - found = True + self.mount_point = partition.mountpoint loop += 1 - if found: - return partition.mountpoint - self.log('No partition mounted') - return None + if self.mount_device is None: + self._log('No partition mounted') else: if not os.path.exists("/media/box"): - self.log("folder /media/box does not exists") + self._log("folder /media/box does not exists") return None os.system(f"pmount {self.device.device_node} /media/box >/dev/null 2>/dev/null") loop = 0 @@ -324,37 +319,51 @@ class PandoraBox: time.sleep(1) try: os.statvfs(self.mount_point) - except Exception as ex : + except Exception as ex: + self._log(f"Unexpected error: {ex}") loop +=1 continue break - return "/media/box" + self.mount_point = "/media/box" def umount_device(self): """Unmount USB device""" if self.has_usb_auto_mount: - self.log("Sync partitions") + self._log("Sync partitions") os.system("sync") else: - self.log("Unmount partitions") os.system("pumount /media/box 2>/dev/null >/dev/null") - def log_device_info(self, dev): + def _log_device_info(self, dev): """Log device information""" logging.info( - f'device_name={dev.get("DEVNAME")}, ' \ - f'path_id={dev.get("ID_PATH")}, ' \ - f'bus system={dev.get("ID_BUS")}, ' \ - f'USB_driver={dev.get("ID_USB_DRIVER")}, ' \ - f'device_type={dev.get("DEVTYPE")}, ' \ - f'device_usage={dev.get("ID_FS_USAGE")}, ' \ - f'partition type={dev.get("ID_PART_TABLE_TYPE")}, ' \ - f'fs_type={dev.get("ID_FS_TYPE")}, ' \ - f'partition_label={dev.get("ID_FS_LABEL")}, ' \ - f'device_model={dev.get("ID_MODEL")}, ' \ - f'model_id={dev.get("ID_MODEL_ID")}, ' \ - f'serial_short={dev.get("ID_SERIAL_SHORT")}, '\ - f'serial={dev.get("ID_SERIAL")}') + 'device_name="%s", ' \ + 'path_id="%s", ' \ + 'bus system="%s", ' \ + 'USB_driver="%s", ' \ + 'device_type="%s", ' \ + 'device_usage="%s", ' \ + 'partition type="%s", ' \ + 'fs_type="%s", ' \ + 'partition_label="%s", ' \ + 'device_model="%s", ' \ + 'model_id="%s", ' \ + 'serial_short="%s", '\ + 'serial="%s"', + dev.get("DEVNAME"), + dev.get("ID_PATH"), + dev.get("ID_BUS"), + dev.get("ID_USB_DRIVER"), + dev.get("DEVTYPE"), + dev.get("ID_FS_USAGE"), + dev.get("ID_PART_TABLE_TYPE"), + dev.get("ID_FS_TYPE"), + dev.get("ID_FS_LABEL"), + dev.get("ID_MODEL"), + dev.get("ID_MODEL_ID"), + dev.get("ID_SERIAL_SHORT"), + dev.get("ID_SERIAL") + ) # ----------------------------------------------------------- # pandora @@ -385,7 +394,7 @@ class PandoraBox: if file_size > (1024*1024*1024): status = "TOO BIG" else: - self.log("ppypandora : [%s] " % full_path) + self._log(f'scan="{full_path}]"') res = pandora.submit_from_disk(full_path) time.sleep(0.1) loop = 0 @@ -397,14 +406,14 @@ class PandoraBox: time.sleep(0.5) loop += 1 file_scan_end_time = time.time() - self.log( + self._log( f'file="{file}" , '\ - f'size="{self.human_readable_size(file_size)}", '\ + f'size="{self._human_readable_size(file_size)}", '\ f'status="{status}"", '\ f'duration="{int(file_scan_end_time - file_scan_start_time)}"') scanned += os.path.getsize(full_path) file_count += 1 - self.update_bar(scanned * 100 // used) + self._update_bar(scanned * 100 // used) if status == "ALERT": self.infected_files.append(full_path) @@ -413,13 +422,13 @@ class PandoraBox: os.mkdir(qfolder) shutil.copyfile(full_path, os.path.join(qfolder,file)) except Exception as ex : - self.log(f"Unexpected error: {ex}") - self.log("Scan failed !") + self._log(f"Unexpected error: {ex}") + self._log("Scan failed !") if not self.has_curses: self.display_image("ERROR") raise - self.update_bar(100) - self.log( + self._update_bar(100) + self._log( f'duration="{int(time.time() - scan_start_time)}s", '\ f'files_scanned="{file_count}", '\ f'files_infected="{len(self.infected_files)}"') @@ -437,46 +446,49 @@ class PandoraBox: for dev in iter(monitor.poll, None): if dev.get("ID_FS_USAGE") == "filesystem" and dev.device_node[5:7] == "sd": if dev.action == "add": - self.device = dev - self.log("Device inserted") - self.log_device_info(self.device) - if not self.has_curses: - self.display_image("WORK") - else: - # display device type - self.print_fslabel(self.device.get("ID_FS_LABEL")) - self.print_fstype(self.device.get("ID_PART_TABLE_TYPE") - + " " + self.device.get("ID_FS_TYPE")) - self.print_model(self.device.get("ID_MODEL")) - self.print_serial(self.device.get("ID_SERIAL_SHORT")) - return "INSERTED" + return self._device_inserted(dev) if dev.action == "remove": - self.device = None - self.log("Device removed") - if not self.has_curses: - self.display_image("WAIT") - else: - self.print_fslabel("") - self.print_size(None) - self.print_used(None) - self.print_fstype("") - self.print_model("") - self.print_serial("") - self.update_bar(0) - return "WAIT" + return self._device_removed() except Exception as ex: - self.log(f"Unexpected error: {str(ex)}") + self._log(f"Unexpected error: {str(ex)}") logging.info("An exception was thrown!", exc_info=True) finally: - self.log("Done.") + self._log("Done.") return "STOP" + def _device_inserted(self, dev): + self.device = dev + self._log_device_info(self.device) + if not self.has_curses: + self.display_image("WORK") + else: + # display device type + self._print_fslabel(self.device.get("ID_FS_LABEL")) + self._print_fstype(self.device.get("ID_PART_TABLE_TYPE") + + " " + self.device.get("ID_FS_TYPE")) + self._print_model(self.device.get("ID_MODEL")) + self._print_serial(self.device.get("ID_SERIAL_SHORT")) + return "INSERTED" + + def _device_removed(self): + self.device = None + if not self.has_curses: + self.display_image("WAIT") + else: + self._print_fslabel("") + self._print_size("") + self._print_used("") + self._print_fstype("") + self._print_model("") + self._print_serial("") + self._update_bar(0) + return "WAIT" # -------------------------------------- def mount(self): - # Mount device - self.mount_point = self.mount_device() - self.log(f'Partition mounted at {self.mount_point}') + """ Mount device """ + self.mount_device() + self._log(f'Partition mounted at {self.mount_point}') if self.mount_point is None: # no partition if not self.has_curses: @@ -485,7 +497,7 @@ class PandoraBox: try: os.statvfs(self.mount_point) except Exception as ex : - self.log(f"error={ex}") + self._log(f"error={ex}") logging.info("An exception was thrown!", exc_info=True) if not self.has_curses: self.display_image("WAIT") @@ -499,14 +511,14 @@ class PandoraBox: try: statvfs=os.statvfs(self.mount_point) except Exception as ex : - self.log(f"error={ex}") + self._log(f"error={ex}") logging.info("An exception was thrown!", exc_info=True) if not self.has_curses: self.display_image("WAIT") return "WAIT" - self.print_size(self.human_readable_size(statvfs.f_frsize * statvfs.f_blocks)) - self.print_used( - self.human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))) + self._print_size(self._human_readable_size(statvfs.f_frsize * statvfs.f_blocks)) + self._print_used( + self._human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))) self.scan(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)) return "CLEAN" @@ -514,25 +526,23 @@ class PandoraBox: def clean(self): """Remove infected files""" - # Clean files if len(self.infected_files) > 0: - self.log(f"infeted_files={len(self.infected_files)}") + self._log(f"infeted_files={len(self.infected_files)}") if not self.has_curses: self.display_image("BAD") self.wait_mouse_click() else: - self.log('PRESS KEY TO CLEAN') + self._log('PRESS KEY TO CLEAN') self.screen.getch() # Remove infected files for file in self.infected_files: try : os.remove(file) - self.log(f"{file} removed") + self._log(f"{file} removed") except Exception as ex : - self.log(f"Unexpected error: {ex}") + self._log(f"Unexpected error: {ex}") logging.info("An exception was thrown!", exc_info=True) os.system("sync") - self.log("Clean done.") if not self.has_curses: self.display_image("OK") else: @@ -553,15 +563,18 @@ class PandoraBox: def startup(self): """Start Pandora-box""" - self.config() - self.init_curses() - self.init_log() self.move_to_script_folder() + # read config + self._config() + # Initialize curesrs + self._init_curses() + # Initilize log + self._init_log() # Read logo - with open('pandora-box.txt', 'r') as file1: + with open('pandora-box.txt', mode='r', encoding='utf-8') as file1: self.logo = file1.readlines() # Print logo screen - self.print_screen() + self._print_screen() # First unmount remaining device self.umount_device() return "WAIT" @@ -582,7 +595,6 @@ class PandoraBox: case "CLEAN": return self.clean() case _: - self.log(f"Unknwn state: {state}") return "STOP" # -------------------------------------- @@ -594,10 +606,10 @@ class PandoraBox: while state!="STOP": state = self.loop(state) except Exception as ex : - self.log(f"error={ex}") + self._log(f"error={ex}") logging.info("An exception was thrown!", exc_info=True) finally: - self.end_curses() + self._end_curses() def main(_):