From 5e23e53c842c2223f92874cfe879fcee848933a9 Mon Sep 17 00:00:00 2001 From: didier Date: Thu, 17 Apr 2025 15:20:06 +0200 Subject: [PATCH] work on code quality --- .github/workflows/ci.yml | 20 ++-- pandora-box.py | 204 ++++++++++++++++++++++++++------------- quality.sh | 8 ++ 3 files changed, 156 insertions(+), 76 deletions(-) create mode 100755 quality.sh diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index fc2eae5..0059f4c 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -1,21 +1,25 @@ -name: flake8 Lint +name: pylint Lint on: [push, pull_request] jobs: - flake8-lint: + pylint-lint: runs-on: ubuntu-latest name: Lint steps: - name: Check out source repository uses: actions/checkout@v3 + - name: Set up Python environment uses: actions/setup-python@v4 with: python-version: "3.11" - - name: flake8 Lint - uses: py-actions/flake8@v2 - with: - max-line-length: "128" - path: "." - plugins: "flake8-bugbear" + + - name: Install dependencies + run: | + python -m pip install --upgrade pip + pip install pylint + + - name: Run pylint + run: | + pylint . --exit-zero diff --git a/pandora-box.py b/pandora-box.py index f633afb..783b53f 100755 --- a/pandora-box.py +++ b/pandora-box.py @@ -24,8 +24,6 @@ import curses import datetime import logging import os -import pyudev -import psutil import queue import shutil import socket @@ -33,25 +31,28 @@ import sys import time import threading import subprocess +from pathlib import Path import pypandora -from pathlib import Path + +import psutil +import pyudev # ----------------------------------------------------------- # Threading variables # ----------------------------------------------------------- threads = [] -exit_flag = False -queue_lock = threading.Lock() -work_queue = None +EXIT_FLAG = False +queueLock = threading.Lock() +workQueue = None # ----------------------------------------------------------- # Config variables # ----------------------------------------------------------- -is_fake_scan = None -has_usb_auto_mount = None +isFakeScan = None +hasUSBAutoMount = None pandora_root_url = None -has_quarantine = None +hasQuarantine = None quarantine_folder = None has_curses = None maxThreads = None @@ -78,6 +79,7 @@ infected_files = None # ----------------------------------------------------------- + class scanThread(threading.Thread): """Scanning thread""" @@ -86,14 +88,14 @@ class scanThread(threading.Thread): self.pandora = pypandora.PyPandora(root_url=pandora_root_url) def run(self): - while not exit_flag: - queue_lock.acquire() - if not work_queue.empty(): - file = work_queue.get() - queue_lock.release() + while not EXIT_FLAG: + queueLock.acquire() + if not workQueue.empty(): + file = workQueue.get() + queueLock.release() self.scan(file) else: - queue_lock.release() + queueLock.release() time.sleep(1) def scan(self, file): @@ -106,12 +108,13 @@ class scanThread(threading.Thread): # log the scan has started logging.info( - f'Scan {file_name} ' - f'[{human_readable_size(file_size)}] ' - f'Thread-{id} ') + f"Scan {file_name} " + f"[{human_readable_size(file_size)}] " + f"Thread-{id} " + ) start_time = time.time() - if is_fake_scan: + if isFakeScan: status = "SKIPPED" logging.info(f"Fake scan - skipped.") else: @@ -120,9 +123,10 @@ class scanThread(threading.Thread): status = "TOO BIG" logging.info(f"File too big.") else: - worker = self.pandora.submit_from_disk(file, seed_expire=6000) - if (not 'taskId' in worker) or (not 'seed' in worker) : - logging.error(f'task_status={worker}') + worker = self.pandora.submit_from_disk( + file, seed_expire=6000) + if ("taskId" not in worker) or ("seed" not in worker): + logging.error(f"task_status={worker}") status = "ERROR" return @@ -130,15 +134,16 @@ class scanThread(threading.Thread): loop = 0 while loop < (1024 * 256): - res = self.pandora.task_status(worker['taskId'], worker['seed']) - logging.info(f'task_status={res}') + res = self.pandora.task_status( + worker["taskId"], worker["seed"]) + logging.info(f"task_status={res}") # Handle response from Pandora - if 'status' in res : - status = res['status'] + if "status" in res: + status = res["status"] if status != "WAITING": break - else : + else: status = "ERROR" return @@ -150,7 +155,13 @@ class scanThread(threading.Thread): end_time = time.time() # log the result - log(f"Scan {file_name} " f"[{human_readable_size(file_size)}] " "-> " f"{status} ({(end_time - start_time):.1f}s)") + log( + f"Scan {file_name} " + f"[{human_readable_size(file_size)}] " + "-> " + f"{status} ({(end_time - start_time):.1f}s)" + ) + logging.info( f'boxname="{boxname}", ' f'file="{file_name}", ' @@ -160,7 +171,7 @@ class scanThread(threading.Thread): ) # Get lock - queue_lock.acquire() + queueLock.acquire() scanned += file_size file_count += 1 @@ -170,39 +181,44 @@ class scanThread(threading.Thread): infected_files.append(file) # Release lock - queue_lock.release() + queueLock.release() # update status bar update_bar(scanned * 100 // f_used) - if has_quarantine and status == "ALERT": + if hasQuarantine and status == "ALERT": if not os.path.isdir(qfolder): os.mkdir(qfolder) shutil.copyfile(file, os.path.join(qfolder, file_name)) except Exception as ex: log(f"Unexpected error: {str(ex)}", flush=True) - logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True) + logging.info( + f'boxname="{boxname}", ' + f'error="{str(ex)}"', + exc_info=True) logging.info(f"Start done.") + # ---------------------------------------------------------- def config(): - global is_fake_scan, has_usb_auto_mount, pandora_root_url - global has_quarantine, quarantine_folder, has_curses, maxThreads + global isFakeScan, hasUSBAutoMount, pandora_root_url + global hasQuarantine, quarantine_folder, has_curses, maxThreads """ read configuration file """ # intantiate a ConfirParser config_parser = configparser.ConfigParser() # read the config file config_parser.read("pandora-box.ini") # set values - is_fake_scan = config_parser["DEFAULT"]["FAKE_SCAN"].lower() == "true" - has_usb_auto_mount = config_parser["DEFAULT"]["USB_AUTO_MOUNT"].lower() == "true" + isFakeScan = config_parser["DEFAULT"]["FAKE_SCAN"].lower() == "true" + hasUSBAutoMount = config_parser["DEFAULT"]["USB_AUTO_MOUNT"].lower( + ) == "true" pandora_root_url = config_parser["DEFAULT"]["PANDORA_ROOT_URL"] # Quarantine - has_quarantine = config_parser["DEFAULT"]["QUARANTINE"].lower() == "true" + hasQuarantine = config_parser["DEFAULT"]["QUARANTINE"].lower() == "true" quarantine_folder = config_parser["DEFAULT"]["QUARANTINE_FOLDER"] # Curses has_curses = config_parser["DEFAULT"]["CURSES"].lower() == "true" @@ -247,7 +263,10 @@ def display_image(status): # display image if "*" in image: # slide show - os.system(f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image}" "/dev/null >/dev/null &") + os.system( + f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image}" + "/dev/null >/dev/null &" + ) else: # only one image os.system(f"fim -qa {image} /dev/null >/dev/null &") @@ -264,7 +283,8 @@ def init_curses(): if has_curses: screen = curses.initscr() screen.keypad(1) - curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION) + curses.mousemask(curses.ALL_MOUSE_EVENTS | + curses.REPORT_MOUSE_POSITION) curses.flushinp() curses.noecho() curses.curs_set(0) @@ -279,7 +299,11 @@ def init_curses(): def print_fslabel(label): """Print FS Label""" if has_curses: - status_win.addstr(1, 1, f"Partition : {label!s:32}", curses.color_pair(2)) + status_win.addstr( + 1, + 1, + f"Partition : {label!s:32}", + curses.color_pair(2)) status_win.refresh() @@ -300,7 +324,11 @@ def print_used(label): def print_fstype(label): """Print device FS type""" if has_curses: - status_win.addstr(1, 50, f"Part / Type : {label!s:32}", curses.color_pair(2)) + status_win.addstr( + 1, + 50, + f"Part / Type : {label!s:32}", + curses.color_pair(2)) status_win.refresh() @@ -314,7 +342,11 @@ def print_model(label): def print_serial(label): """Print device serail number""" if has_curses: - status_win.addstr(3, 50, f"Serial : {label!s:32}", curses.color_pair(2)) + status_win.addstr( + 3, + 50, + f"Serial : {label!s:32}", + curses.color_pair(2)) status_win.refresh() @@ -403,7 +435,10 @@ def initlog(): log_win = curses.newwin(curses.LINES - 20, curses.COLS, 20, 0) log_win.border(0) logging.basicConfig( - filename="/var/log/pandora-box.log", level=logging.INFO, format="%(asctime)s - %(message)s", datefmt="%m/%d/%y %H:%M" + filename="/var/log/pandora-box.log", + level=logging.INFO, + format="%(asctime)s - %(message)s", + datefmt="%m/%d/%y %H:%M", ) @@ -438,7 +473,10 @@ def log_update(flush=False): log_win.clear() log_win.border(0) for i in range(min(curses.LINES - 22, len(logs))): - log_win.addstr(i + 1, 1, logs[i][: curses.COLS - 2], curses.color_pair(3)) + log_win.addstr(i + 1, + 1, + logs[i][: curses.COLS - 2], + curses.color_pair(3)) log_win.refresh() @@ -451,7 +489,7 @@ def mount_device(): """Mount USB device""" global mount_point log("Mount device", flush=True) - if has_usb_auto_mount: + if hasUSBAutoMount: mount_point = None loop = 0 while (mount_point is None) and (loop < 15): @@ -476,7 +514,7 @@ def mount_device(): ["pmount", device.device_node, "/media/box"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, - check=True + check=True, ) logging.info("Mount successful") except subprocess.CalledProcessError as e: @@ -503,7 +541,7 @@ def mount_device(): def umount_device(): """Unmount USB device""" - if has_usb_auto_mount: + if hasUSBAutoMount: log("Sync partitions", flush=True) os.system("sync") else: @@ -513,7 +551,7 @@ def umount_device(): ["pumount", "/media/box"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, - check=True + check=True, ) logging.info("Umount successful") except subprocess.CalledProcessError as e: @@ -555,7 +593,7 @@ def log_device_info(dev): def scan(): """Scan devce with pypandora""" global pandora, qfolder - global work_queue, exit_flag, threads, scanned + global workQueue, EXIT_FLAG, threads, scanned global mount_point, infected_files, file_count, f_used # get device size @@ -563,7 +601,10 @@ def scan(): statvfs = os.statvfs(mount_point) except Exception as ex: log(f"error={ex}", flush=True) - logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True) + logging.info( + f'boxname="{boxname}", ' + f'error="{str(ex)}"', + exc_info=True) if not has_curses: display_image("ERROR") return "ERROR" @@ -583,14 +624,16 @@ def scan(): file_count = 0 scan_start_time = time.time() - if has_quarantine: - qfolder = os.path.join(quarantine_folder, datetime.datetime.now().strftime("%y%m%d-%H%M")) + if hasQuarantine: + qfolder = os.path.join( + quarantine_folder, datetime.datetime.now().strftime("%y%m%d-%H%M") + ) # Instantice work quere - work_queue = queue.Queue(512) + workQueue = queue.Queue(512) # set exit condition to false - exit_flag = False + EXIT_FLAG = False # Instanciate threads for _ in range(maxThreads): @@ -601,20 +644,20 @@ def scan(): # Fill the work queue for root, _, files in os.walk(mount_point): for file in files: - while work_queue.full(): + while workQueue.full(): time.sleep(1) pass - queue_lock.acquire() - work_queue.put(os.path.join(root, file)) - queue_lock.release() + queueLock.acquire() + workQueue.put(os.path.join(root, file)) + queueLock.release() # Wait for queue to empty - while not work_queue.empty(): + while not workQueue.empty(): time.sleep(1) pass # Notify threads it's time to exit - exit_flag = True + EXIT_FLAG = True # Wait for all threads to complete for t in threads: @@ -637,6 +680,7 @@ def scan(): # -------------------------------------- + def wait(): """Wait for insert of remove of USB device""" # handle error - first unmount the device @@ -647,14 +691,18 @@ def wait(): monitor.filter_by("block") try: for dev in iter(monitor.poll, None): - if dev.get("ID_FS_USAGE") == "filesystem" and dev.device_node[5:7] == "sd": + if dev.get( + "ID_FS_USAGE") == "filesystem" and dev.device_node[5:7] == "sd": if dev.action == "add": return device_inserted(dev) if dev.action == "remove": return device_removed() except Exception as ex: log(f"Unexpected error: {str(ex)}", flush=True) - logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True) + logging.info( + f'boxname="{boxname}", ' + f'error="{str(ex)}"', + exc_info=True) return "STOP" @@ -712,7 +760,10 @@ def mount(): os.statvfs(mount_point) except Exception as ex: log(f"Unexpected error: {str(ex)}", flush=True) - logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True) + logging.info( + f'boxname="{boxname}", ' + f'error="{str(ex)}"', + exc_info=True) if not has_curses: display_image("WAIT") return "WAIT" @@ -745,7 +796,7 @@ def mouseClickThread(): down = False while not enterEvent.is_set(): buf = mouse.read(3) - if not (buf is None): + if buf is not None: if (buf[0] & 0x1) == 1: down = True if ((buf[0] & 0x1) == 0) and down: @@ -791,7 +842,9 @@ def clean(): if len(infected_files) > 0: # display message log(f"{len(infected_files)} infected files detecetd:") - logging.info(f'boxname="{boxname}", ' f"infeted_files={len(infected_files)}") + logging.info( + f'boxname="{boxname}", ' + f"infeted_files={len(infected_files)}") if not has_curses: display_image("BAD") @@ -822,12 +875,19 @@ def clean(): files_removed += 1 except Exception as ex: log(f"could not remove: {str(ex)}", flush=True) - logging.info(f'boxname="{boxname}", ' f'not_removed="{file}, ' f'error="{str(ex)}"', exc_info=True) + logging.info( + f'boxname="{boxname}", ' + f'not_removed="{file}, ' + f'error="{str(ex)}"', + exc_info=True, + ) has_error = True umount_device() - logging.info(f'boxname="{boxname}", ' f'cleaned="{files_removed}/{len(infected_files)}"') + logging.info( + f'boxname="{boxname}", ' + f'cleaned="{files_removed}/{len(infected_files)}"') if not has_error: if has_curses: @@ -857,11 +917,13 @@ def move_to_script_folder(): # -------------------------------------- + def get_enabled_workers(): config_dir = Path("~/pandora/pandora/workers") yml_files = list(config_dir.glob("*.yml")) return [str(file.stem) for file in yml_files if file.stem is not None] + def wait_for_workers(): pandora = pypandora.PyPandora(root_url=pandora_root_url) target_count = len(get_enabled_workers()) - 1 @@ -872,6 +934,7 @@ def wait_for_workers(): break time.sleep(2) + # -------------------------------------- @@ -939,8 +1002,10 @@ def get_lock(process_name): os.execvp("/usr/bin/bash", ["/usr/bin/bash", "--norc"]) sys.exit() + # -------------------------------------- + def main(_): """Main entry point""" print("main") @@ -954,7 +1019,10 @@ def main(_): except Exception as ex: print({str(ex)}) log(f"Unexpected error: {str(ex)}", flush=True) - logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True) + logging.info( + f'boxname="{boxname}", ' + f'error="{str(ex)}"', + exc_info=True) finally: end_curses() diff --git a/quality.sh b/quality.sh new file mode 100755 index 0000000..95e98ff --- /dev/null +++ b/quality.sh @@ -0,0 +1,8 @@ +#!/usr/bin/bash +# sudo apt install pylint black python3-autopep8 + +black -l 90 ./pandora-box.py + +autopep8 ./pandora-box.py --in-place --aggressive --aggressive + +pylint ./pandora-box.py