From e3cdee01ecfb6fae9c3b73aa4825f920cabec19b Mon Sep 17 00:00:00 2001 From: dbarzin Date: Mon, 13 Feb 2023 15:45:28 +0100 Subject: [PATCH] code review --- pandora-box.py | 298 +++++++++++++++++++++++++++---------------------- 1 file changed, 167 insertions(+), 131 deletions(-) diff --git a/pandora-box.py b/pandora-box.py index 659f4ee..6f48a63 100755 --- a/pandora-box.py +++ b/pandora-box.py @@ -30,15 +30,6 @@ import configparser import shutil from datetime import datetime -# Abstract Base Class -from abc import ABC, abstractmethod - -# ----------------------------------------------------------- -# States -# ----------------------------------------------------------- - - - # ----------------------------------------------------------- # Config variables # ----------------------------------------------------------- @@ -153,7 +144,7 @@ def print_size(label): status_win.addstr(2, 1, "Size : ",curses.color_pair(2)) else: status_win.addstr(2, 1, "Size : %s " % label,curses.color_pair(2)) - logging.info("Size: %s" % label) + logging.info("size=%s" % label) status_win.refresh() """Print FS Used Size""" @@ -164,7 +155,7 @@ def print_used(label): status_win.addstr(3, 1, "Used : ",curses.color_pair(2)) else: status_win.addstr(3, 1, "Used : %s " % label,curses.color_pair(2)) - logging.info("Used: %s" % label) + logging.info("used=%s" % label) status_win.refresh() def print_fstype(label): @@ -304,7 +295,8 @@ def log(str): # ----------------------------------------------------------- """Mount USB device""" -def mount_device(device): +def mount_device(): + global device log('Try to mount partition') if USB_AUTO_MOUNT: found = False @@ -314,7 +306,6 @@ def mount_device(device): time.sleep(1) for partition in psutil.disk_partitions(): if partition.device == device.device_node: - log("Partition mounted at {}".format(partition.mountpoint)) found = True loop += 1 if loop < 10: @@ -337,7 +328,6 @@ def mount_device(device): loop +=1 continue break; - log("Partition mounted at /media/box") return "/media/box" """Unmount USB device""" @@ -349,109 +339,21 @@ def umount_device(): log("Unmount partitions") res = os.system("pumount /media/box 2>/dev/null >/dev/null") -"""Main device loop""" -def device_loop(): - # First unmount remaining device - umount_device() - # Loop - context = pyudev.Context() - monitor = pyudev.Monitor.from_netlink(context) - monitor.filter_by("block") - try: - for device in iter(monitor.poll, None): - if device.get("ID_FS_USAGE") == "filesystem" and device.device_node[5:7] == "sd": - if device.action == "add": - log("Device inserted") - log_device_info(device) - if not CURSES: - display_image("WORK") - else: - # display device type - print_fslabel(device.get("ID_FS_LABEL")) - print_fstype(device.get("ID_PART_TABLE_TYPE") + " " + device.get("ID_FS_TYPE")) - print_model(device.get("ID_MODEL")) - print_serial(device.get("ID_SERIAL_SHORT")) - # Mount device - mount_point = mount_device(device) - log('Partition mounted at %s' % mount_point) - if mount_point == None: - # no partition - if not CURSES: - display_image("WAIT") - continue - try: - statvfs=os.statvfs(mount_point) - except Exception as e : - log("Unexpected error: %s" % e) - logging.info("An exception was thrown!", exc_info=True) - if not CURSES: - display_image("WAIT") - continue - print_size(human_readable_size(statvfs.f_frsize * statvfs.f_blocks)) - print_used(human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))) - - # Scan files - log("Scan started...........") - infected_files = scan(mount_point, statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)) - - # Clean files - if len(infected_files) > 0: - log('%d infected files found !' % len(infected_files)) - if not CURSES: - display_image("BAD") - waitMouseClick() - else: - log('PRESS KEY TO CLEAN') - screen.getch() - # Remove infected files - for file in infected_files: - try : - os.remove(file) - log('%s removed' % file) - except Exception as e : - log("Unexpected error: %s" % str(e)) - logging.info("An exception was thrown!", exc_info=True) - os.system("sync") - log("Clean done.") - if not CURSES: - display_image("OK") - else: - if not CURSES: - display_image("OK") - umount_device() - - if device.action == "remove": - log("Device removed") - if not CURSES: - display_image("WAIT") - else: - print_fslabel("") - print_size(None) - print_used(None) - print_fstype("") - print_model("") - print_serial("") - update_bar(0) - except Exception as e: - log("Unexpected error: %s" % str(e) ) - logging.info("An exception was thrown!", exc_info=True) - finally: - log("Done.") - def log_device_info(dev): - logging.info("Device name: %s" % dev.get("DEVNAME")) - logging.info("Path id: %s" % dev.get("ID_PATH")) - logging.info("Bus system: %s" % dev.get("ID_BUS")) - logging.info("USB driver: %s" % dev.get("ID_USB_DRIVER")) - logging.info("Device type: %s" % dev.get("DEVTYPE")) - logging.info("Device usage: %s" % dev.get("ID_FS_USAGE")) - logging.info("Partition type: %s" % dev.get("ID_PART_TABLE_TYPE")) - logging.info("FS type: %s" % dev.get("ID_FS_TYPE")) - logging.info("Partition label: %s" % dev.get("ID_FS_LABEL")) - logging.info("Device model: %s" % dev.get("ID_MODEL")) - logging.info('Model: %s' % dev.get("ID_MODEL_ID")) - logging.info('Serial short: %s' % dev.get("ID_SERIAL_SHORT")) - logging.info('Serial: %s' % dev.get("ID_SERIAL")) + logging.info( + "device_name=%s, " % dev.get("DEVNAME") + + "path_id=%s, " % dev.get("ID_PATH") + + "bus system=%s, " % dev.get("ID_BUS") + + "USB_driver=%s, " % dev.get("ID_USB_DRIVER") + + "device_type=%s, " % dev.get("DEVTYPE") + + "device_usage=%s, " % dev.get("ID_FS_USAGE") + + "partition type=%s, " % dev.get("ID_PART_TABLE_TYPE") + + "fs_type=%s, " % dev.get("ID_FS_TYPE") + + "partition_label: %s, " % dev.get("ID_FS_LABEL") + + "device_model=%s, " % dev.get("ID_MODEL") + + 'model_id=%s, ' % dev.get("ID_MODEL_ID") + + 'serial_short=%s, ' % dev.get("ID_SERIAL_SHORT") + + 'serial=%s' % dev.get("ID_SERIAL")) # ----------------------------------------------------------- # pandora @@ -459,7 +361,7 @@ def log_device_info(dev): """Scan a mount point with Pandora""" def scan(mount_point, used): - global infected_filed + global device, infected_filed infected_files = [] scanned = 0 file_count = 0 @@ -495,7 +397,7 @@ def scan(mount_point, used): time.sleep(0.5) loop += 1 file_scan_end_time = time.time() - log("Scan %s [%s] -> %s (%ds)" % ( + log("file=%s, size=%s, status=%s, duration=%ds" % ( file, human_readable_size(file_size), status, @@ -517,12 +419,125 @@ def scan(mount_point, used): display_image("ERROR") raise update_bar(100) - log("Scan done in %ds, %d files scanned, %d files infected" % + log("duration=%ds, files_scanned=%d, files_infected=%d" % ((time.time() - scan_start_time),file_count,len(infected_files))) return infected_files # -------------------------------------- +def wait_device(): + global device + # Loop + context = pyudev.Context() + monitor = pyudev.Monitor.from_netlink(context) + monitor.filter_by("block") + try: + for device in iter(monitor.poll, None): + if device.get("ID_FS_USAGE") == "filesystem" and device.device_node[5:7] == "sd": + if device.action == "add": + log("Device inserted") + log_device_info(device) + if not CURSES: + display_image("WORK") + else: + # display device type + print_fslabel(device.get("ID_FS_LABEL")) + print_fstype(device.get("ID_PART_TABLE_TYPE") + " " + device.get("ID_FS_TYPE")) + print_model(device.get("ID_MODEL")) + print_serial(device.get("ID_SERIAL_SHORT")) + return "INSERTED" + if device.action == "remove": + log("Device removed") + if not CURSES: + display_image("WAIT") + else: + print_fslabel("") + print_size(None) + print_used(None) + print_fstype("") + print_model("") + print_serial("") + update_bar(0) + return "WAIT" + except Exception as e: + log("Unexpected error: %s" % str(e) ) + logging.info("An exception was thrown!", exc_info=True) + finally: + log("Done.") + return "STOP" + +# -------------------------------------- + +def mount(): + global device, mount_point + # Mount device + mount_point = mount_device() + log('Partition mounted at %s' % mount_point) + if mount_point == None: + # no partition + if not CURSES: + display_image("WAIT") + return "WAIT" + try: + statvfs=os.statvfs(mount_point) + except Exception as e : + log("error=%s" % e) + logging.info("An exception was thrown!", exc_info=True) + if not CURSES: + display_image("WAIT") + return "WAIT" + return "SCAN" + +# -------------------------------------- + +def scan_device(): + global mount_point, infected_files + try: + statvfs=os.statvfs(mount_point) + except Exception as e : + log("error=%s" % e) + logging.info("An exception was thrown!", exc_info=True) + if not CURSES: + display_image("WAIT") + return "WAIT" + print_size(human_readable_size(statvfs.f_frsize * statvfs.f_blocks)) + print_used(human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))) + infected_files = scan(mount_point, statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)) + return "CLEAN" + +# -------------------------------------- + +def clean(): + global infected_files + # Clean files + if len(infected_files) > 0: + log('%d infected files found !' % len(infected_files)) + if not CURSES: + display_image("BAD") + waitMouseClick() + else: + log('PRESS KEY TO CLEAN') + screen.getch() + # Remove infected files + for file in infected_files: + try : + os.remove(file) + log('%s removed' % file) + except Exception as e : + log("Unexpected error: %s" % str(e)) + logging.info("An exception was thrown!", exc_info=True) + os.system("sync") + log("Clean done.") + if not CURSES: + display_image("OK") + else: + if not CURSES: + display_image("OK") + umount_device() + return "WAIT" + +# -------------------------------------- + def moveToScriptFolder(): abspath = os.path.abspath(__file__) dname = os.path.dirname(abspath) @@ -530,27 +545,48 @@ def moveToScriptFolder(): # -------------------------------------- +def startup(): + moveToScriptFolder() + init_log() + config() + init_curses() + print_screen() + # First unmount remaining device + umount_device() + return "WAIT" + +# -------------------------------------- + +def loop(state): + match state: + case "START": + return startup() + case "WAIT": + return wait_device() + case "INSERTED": + return mount() + case "SCAN": + return scan_device() + case "CLEAN": + return clean() + case _: + print("Unknwn state "+state) + return "STOP" + +# -------------------------------------- """Main entry point""" def main(stdscr): try : - moveToScriptFolder() - init_log() - config() - init_curses() - print_screen() - while True: - device_loop() + state="START" + while (state!="STOP"): + state = loop(state) except Exception as e : - log("Unexpected error: %s" % e) + log("error=%s" % e) logging.info("An exception was thrown!", exc_info=True) finally: end_curses() -# -------------------------------------- - -# TODO: google wrapper main functionnal programming if __name__ == "__main__": wrapper(main) -