From 35ff1922b2f4a742613763df99ca62c5d0501b97 Mon Sep 17 00:00:00 2001 From: Didier Barzin Date: Tue, 28 Jun 2022 19:05:44 +0200 Subject: [PATCH] Delete pandorabox.py --- pandorabox.py | 422 -------------------------------------------------- 1 file changed, 422 deletions(-) delete mode 100755 pandorabox.py diff --git a/pandorabox.py b/pandorabox.py deleted file mode 100755 index 99c4586..0000000 --- a/pandorabox.py +++ /dev/null @@ -1,422 +0,0 @@ -#!/usr/bin/python3 - -"""Pandora-box is a USB scaning station based on Pandora.""" - -import curses -from curses import wrapper -import pypandora -import time -import sys -import pyudev -import psutil -import os -import logging -import time -import configparser -import shutil -from datetime import datetime - -# ----------------------------------------------------------- -# Config variables -# ----------------------------------------------------------- - -USB_AUTO_MOUNT = False -PANDORA_ROOT_URL = "http://127.0.0.1:6100" -FAKE_SCAN = False -QUARANTINE = False - -""" read configuration file """ -def config(): - global USB_AUTO_MOUNT, PANDORA_ROOT_URL - global FAKE_SCAN, QUARANTINE, QUARANTINE_FOLDER - # intantiate a ConfirParser - config = configparser.ConfigParser() - # read the config file - config.read('pandorabox.ini') - # set values - FAKE_SCAN=config['DEFAULT']['FAKE_SCAN'].lower()=="true" - USB_AUTO_MOUNT=config['DEFAULT']['USB_AUTO_MOUNT'].lower()=="true" - PANDORA_ROOT_URL=config['DEFAULT']['PANDORA_ROOT_URL'] - # Quarantine - QUARANTINE = config['DEFAULT']['QUARANTINE'].lower()=="true" - QUARANTINE_FOLDER = config['DEFAULT']['QUARANTINE_FOLDER'] - -# ---------------------------------------------------------- - -""" Convert size to human readble string """ -def human_readable_size(size, decimal_places=1): - for unit in ['B','KB','MB','GB','TB']: - if size < 1024.0: - break - size /= 1024.0 - return f"{size:.{decimal_places}f}{unit}" - -# ----------------------------------------------------------- -# Screen -# ----------------------------------------------------------- - -"""Initialise curses""" -def intit_curses(): - global screen - screen = curses.initscr() - screen.keypad(1) - curses.curs_set(0) - curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION) - curses.flushinp() - curses.noecho() -# screen.clear() - -"""Print FS Label""" -def print_fslabel(label): - global status_win - status_win.addstr(1, 1, "Partition : %-32s" % label, curses.color_pair(2)) - status_win.refresh() - -"""Print FS Size""" -def print_size(label): - global status_win - if label == None: - status_win.addstr(2, 1, "Size : ",curses.color_pair(2)) - else: - status_win.addstr(2, 1, "Size : %s " % label,curses.color_pair(2)) - logging.info("Size: %s" % label) - status_win.refresh() - -"""Print FS Used Size""" -def print_used(label): - global status_win - if label == None: - status_win.addstr(3, 1, "Used : ",curses.color_pair(2)) - else: - status_win.addstr(3, 1, "Used : %s " % label,curses.color_pair(2)) - logging.info("Used: %s" % label) - status_win.refresh() - -def print_fstype(label): - global status_win - status_win.addstr(1, 50, "Part / Type : %-32s" % label, curses.color_pair(2)) - status_win.refresh() - -def print_model(label): - global status_win - status_win.addstr(2, 50, "Model : %-32s" % label, curses.color_pair(2)) - status_win.refresh() - -def print_serial(label): - global status_win - status_win.addstr(3, 50, "Serial : %-32s" % label, curses.color_pair(2)) - status_win.refresh() - -"""Initialise progress bar""" -def init_bar(): - global progress_win - progress_win = curses.newwin(3, curses.COLS-12, 17, 5) - progress_win.border(0) - progress_win.refresh() - -"""Update progress bar""" -def update_bar(progress): - global progress_win - if progress == 0: - progress_win.clear() - progress_win.border(0) - time.sleep(0) - progress_win.addstr(0, 1, "Progress:") - else: - pos = ((curses.COLS-14) * progress) // 100 - progress_win.addstr(1, 1, "#"*pos) - progress_win.addstr(0, 1, "Progress: %d%%" % progress) - progress_win.refresh() - -"""Splash screen""" -s = [None] * 10; -s[0] = " ██▓███ ▄▄▄ ███▄ █ ▓█████▄ ▒█████ ██▀███ ▄▄▄ ▄▄▄▄ ▒█████ ▒██ ██▒" -s[1] = " ▓██░ ██▒▒████▄ ██ ▀█ █ ▒██▀ ██▌▒██▒ ██▒▓██ ▒ ██▒▒████▄ ▓█████▄ ▒██▒ ██▒▒▒ █ █ ▒░" -s[2] = " ▓██░ ██▓▒▒██ ▀█▄ ▓██ ▀█ ██▒░██ █▌▒██░ ██▒▓██ ░▄█ ▒▒██ ▀█▄ ▒██▒ ▄██▒██░ ██▒░░ █ ░" -s[3] = " ▒██▄█▓▒ ▒░██▄▄▄▄██ ▓██▒ ▐▌██▒░▓█▄ ▌▒██ ██░▒██▀▀█▄ ░██▄▄▄▄██ ▒██░█▀ ▒██ ██░ ░ █ █ ▒ " -s[4] = " ▒██▒ ░ ░ ▓█ ▓██▒▒██░ ▓██░░▒████▓ ░ ████▓▒░░██▓ ▒██▒ ▓█ ▓██▒ ░▓█ ▀█▓░ ████▓▒░▒██▒ ▒██▒" -s[5] = " ▒▓▒░ ░ ░ ▒▒ ▓▒█░░ ▒░ ▒ ▒ ▒▒▓ ▒ ░ ▒░▒░▒░ ░ ▒▓ ░▒▓░ ▒▒ ▓▒█░ ░▒▓███▀▒░ ▒░▒░▒░ ▒▒ ░ ░▓ ░" -s[6] = " ░▒ ░ ▒ ▒▒ ░░ ░░ ░ ▒░ ░ ▒ ▒ ░ ▒ ▒░ ░▒ ░ ▒░ ▒ ▒▒ ░ ▒░▒ ░ ░ ▒ ▒░ ░░ ░▒ ░" -s[7] = " ░░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ░░ ░ ░ ▒ ░ ░ ░ ░ ░ ▒ ░ ░ " -s[8] = " ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ " -s[9] = " ░ ░ " - -#curses.LINES, curses.COLS - -"""Print main screen""" -def print_screen(): - global status_win - curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) - curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_BLACK) - curses.init_pair(3, curses.COLOR_GREEN, curses.COLOR_BLACK) - title_win = curses.newwin(12, curses.COLS, 0, 0) - # title_win.border(0) - title_col = (curses.COLS - len(s[0]))//2 - title_win.addstr(1, title_col, s[0], curses.color_pair(1)) - title_win.addstr(2, title_col, s[1], curses.color_pair(1)) - title_win.addstr(3, title_col, s[2], curses.color_pair(1)) - title_win.addstr(4, title_col, s[3], curses.color_pair(1)) - title_win.addstr(5, title_col, s[4], curses.color_pair(1)) - title_win.addstr(6, title_col, s[5], curses.color_pair(1)) - title_win.addstr(7, title_col, s[6], curses.color_pair(1)) - title_win.addstr(8, title_col, s[7], curses.color_pair(1)) - title_win.addstr(9, title_col, s[8], curses.color_pair(1)) - title_win.addstr(10, title_col, s[9], curses.color_pair(1)) - title_win.refresh() - status_win = curses.newwin(5, curses.COLS, 12, 0) - status_win.border(0) - status_win.addstr(0, 1, "USB Key Information") - print_fslabel("") - print_size(None) - print_used(None) - print_fstype("") - print_model("") - print_serial("") - init_bar() - update_bar(0) - log('Ready.') - -"""Closes curses""" -def end_curses(): - curses.endwin() - curses.flushinp() - -# ----------------------------------------------------------- -# Logging windows -# ----------------------------------------------------------- - -def init_log(): - global log_win, logging - log_win = curses.newwin(curses.LINES-20, curses.COLS, 20, 0) - log_win.border(0) - logging.basicConfig( - filename='pandorabox.log', - level=logging.INFO, - format='%(asctime)s - %(message)s', - datefmt='%m/%d/%y %H:%M' - ) - -logs = [] -def log(str): - global log_win, logging - logging.info(str) - logs.append(str) - if len(logs)>(curses.LINES-22): - logs.pop(0) - log_win.clear() - log_win.border(0) - for i in range(min(curses.LINES-22,len(logs))): - log_win.addstr(i+1,1,logs[i][:curses.COLS-2],curses.color_pair(3)) - log_win.refresh() - -# ----------------------------------------------------------- -# Device -# ----------------------------------------------------------- - -"""Mount USB device""" -def mount_device(device): - if USB_AUTO_MOUNT: - found = False - loop = 0 - while (not found) and (loop < 10): - # need to sleep before devide is mounted - time.sleep(1) - for partition in psutil.disk_partitions(): - if partition.device == device.device_node: - log("Device mounted at {}".format(partition.mountpoint)) - found = True - loop += 1 - if loop < 10: - return partition.mountpoint - else: - return None - else: - res = os.system("pmount " + device.device_node + " /media/box") - found = False - loop = 0 - while (not found) and (loop < 10): - time.sleep(1) - try: - statvfs=os.statvfs(mount_point) - except Exception as e : - loop +=1 - continue - break; - log("Device mounted at /media/box") - return "/media/box" - -"""Unmount USB device""" -def umount_device(): - if not USB_AUTO_MOUNT: - log("Unmounting device /media/box") - res = os.system("pumount /media/box") - # print("Return type: ", res) - - -"""Main device loop""" -def device_loop(): - context = pyudev.Context() - monitor = pyudev.Monitor.from_netlink(context) - monitor.filter_by("block") - try: - for device in iter(monitor.poll, None): - if device.get("ID_FS_USAGE") == "filesystem" and device.device_node[5:7] == "sd": - if device.action == "add": - log("Device inserted") - log_device_info(device) - # display device type - print_fslabel(device.get("ID_FS_LABEL")) - print_fstype(device.get("ID_PART_TABLE_TYPE") + " " + device.get("ID_FS_TYPE")) - print_model(device.get("ID_MODEL")) - print_serial(device.get("ID_SERIAL_SHORT")) - # Mount device - mount_point = mount_device(device) - if mount_point == None: - # no partition - continue - try: - statvfs=os.statvfs(mount_point) - except Exception as e : - log("Unexpected error: %s" % e) - continue - print_size(human_readable_size(statvfs.f_frsize * statvfs.f_blocks)) - print_used(human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))) - - # Scan files - log("Scan started...........") - infected_files = scan(mount_point, statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)) - - # Clean files - if len(infected_files) > 0: - log('%d infected files found !' % len(infected_files)) - log('PRESS KEY TO CLEAN') - screen.getch() - # Remove infected files - for file in infected_files: - try : - os.remove(file) - log('%s removed' % file) - except Exception as e : - log("Unexpected error: %s" % str(e)) - log("Clean done.") - - if device.action == "remove": - log("Device removed") - print_fslabel("") - print_size(None) - print_used(None) - print_fstype("") - print_model("") - print_serial("") - umount_device() - update_bar(0) - except Exception as e: - log("Unexpected error: %s" % str(e) ) - finally: - log("Done.") - - -def log_device_info(dev): - logging.info("Device name: %s" % dev.get("DEVNAME")) - logging.info("Path id: %s" % dev.get("ID_PATH")) - logging.info("Bus system: %s" % dev.get("ID_BUS")) - logging.info("USB driver: %s" % dev.get("ID_USB_DRIVER")) - logging.info("Device type: %s" % dev.get("DEVTYPE")) - logging.info("Device usage: %s" % dev.get("ID_FS_USAGE")) - logging.info("Partition type: %s" % dev.get("ID_PART_TABLE_TYPE")) - logging.info("FS type: %s" % dev.get("ID_FS_TYPE")) - logging.info("Partition label: %s" % dev.get("ID_FS_LABEL")) - # logging.info("FS: %s" % dev.get("ID_FS_SYSTEM_ID")) - logging.info("Device model: %s" % dev.get("ID_MODEL")) - # logging.info('Usage: %s' % dev.get("ID_FS_USAGE")) - logging.info('Model: %s' % dev.get("ID_MODEL_ID")) - logging.info('Serial short: %s' % dev.get("ID_SERIAL_SHORT")) - logging.info('Serial: %s' % dev.get("ID_SERIAL")) - # logging.info(os.stat(dev.get("DEVNAME"))) - -# ----------------------------------------------------------- -# pandora -# ----------------------------------------------------------- - -"""Scan a mount point with Pandora""" -def scan(mount_point, used): - global infected_filed - infected_files = [] - scanned = 0 - file_count = 0 - scan_start_time = time.time() - if QUARANTINE: - quanrantine_folder = os.path.join(QUARANTINE_FOLDER,datetime.now().strftime("%y%m%d-%H%M")) - if not FAKE_SCAN: - pandora = pypandora.PyPandora(root_url=PANDORA_ROOT_URL) - for root, dirs, files in os.walk(mount_point): - for file in files: - try : - status = None - full_path = os.path.join(root,file) - file_size = os.path.getsize(full_path) - # log("Check %s [%s]" % (file, human_readable_size(file_size))) - file_scan_start_time = time.time() - if FAKE_SCAN : - time.sleep(0.1) - status = "SKIPPED" - else: - if file_size > (1024*1024*1024): - status = "TOO BIG" - else: - res = pandora.submit_from_disk(full_path) - time.sleep(0.1) - loop = 0 - while True and (loop < 60): - res = pandora.task_status(res["taskId"]) - status = res["status"] - if status != "WAITING": - break - time.sleep(0.5) - loop += 1 - file_scan_end_time = time.time() - log("Scan %s [%s] -> %s (%ds)" % ( - file, - human_readable_size(file_size), - status, - (file_scan_end_time - file_scan_start_time))) - scanned += os.path.getsize(full_path) - file_count += 1 - update_bar(scanned * 100 // used) - - if status == "ALERT": - infected_files.append(full_path) - if QUARANTINE: - if not os.path.isdir(quanrantine_folder) : - os.mkdir(quanrantine_folder) - shutil.copyfile(full_path, os.path.join(quanrantine_folder,file)) - - except Exception as e : - log("Unexpected error: %s" % e) - update_bar(100) - log("Scan done in %ds, %d files scanned, %d files infected" % - ((time.time() - scan_start_time),file_count,len(infected_files))) - return infected_files - -# -------------------------------------- - -"""Main entry point""" -def main(stdscr): - try : - init_log() - config() - intit_curses() - print_screen() - while True: - device_loop() - except Exception as e : - end_curses() - print("Unexpected error: ", e) - finally: - end_curses() - -# -------------------------------------- - - -if __name__ == "__main__": - wrapper(main)