#!/usr/bin/python3 # # This file is part of the Pandora-box distribution. # https://github.com/dbarzin/pandora-box # Copyright (c) 2022 Didier Barzin. # # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3. # # This program is distributed in the hope that it will be useful, but # WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # General Public License for more details. # # You should have received a copy of the GNU General Public License # along with this program. If not, see . # """The Pandora-Box Module.""" import os import time import logging from curses import wrapper from datetime import datetime import configparser import shutil import curses import pyudev import psutil import pypandora class PandoraBox: """The PandoraBox class""" # ----------------------------------------------------------- # Config variables # ----------------------------------------------------------- is_fake_scan = None has_usb_auto_mount = None pandora_root_url = None has_quarantine = None quarantine_folder = None has_curses = None # ----------------------------------------------------------- # Curses # ----------------------------------------------------------- screen = None status_win = None progress_win = None title_win = None log_win = None # Pandora logo logo = None # ----------------------------------------------------------- # Curses # ----------------------------------------------------------- device = None mount_point = None infected_files = None # ---------------------------------------------------------- def config(self): """ read configuration file """ # intantiate a ConfirParser config_parser = configparser.ConfigParser() # read the config file config_parser.read('pandora-box.ini') # set values self.is_fake_scan=config_parser['DEFAULT']['FAKE_SCAN'].lower()=="true" self.has_usb_auto_mount=config_parser['DEFAULT']['USB_AUTO_MOUNT'].lower()=="true" self.pandora_root_url=config_parser['DEFAULT']['PANDORA_ROOT_URL'] # Quarantine self.has_quarantine = config_parser['DEFAULT']['QUARANTINE'].lower()=="true" self.quarantine_folder = config_parser['DEFAULT']['QUARANTINE_FOLDER'] # Curses self.has_curses = config_parser['DEFAULT']['CURSES'].lower()=="true" # ---------------------------------------------------------- def human_readable_size(self,size, decimal_places=1): """ Convert size to human readble string """ for unit in ['B','KB','MB','GB','TB']: if size < 1024.0: break size /= 1024.0 return f"{size:.{decimal_places}f}{unit}" # ----------------------------------------------------------- # Image Screen # ----------------------------------------------------------- def display_image(self, status): """ Display image on screen """ if not self.has_curses: if status=="WAIT": image = "images/key*.png" elif status=="WORK": image = "images/wait*.png" elif status=="OK": image = "images/ok.png" elif status=="BAD": image = "images/bad.png" elif status=="ERROR": image = "images/error.png" else: return # hide old image os.system("killall -s 9 fim 2>/dev/null") # display image if "*" in image: # slide show os.system(f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image} "\ "/dev/null >/dev/null &") else : # only one image os.system(f"fim -qa %s /dev/null >/dev/null {image}") # ----------------------------------------------------------- def wait_mouse_click(self): """ Wait for mouse click event """ with open("/dev/input/mice", "rb" ) as mouse: down = False while True: buf = mouse.read(3) if (buf[0] & 0x1)==1: down = True if ((buf[0] & 0x1)==0) and down: break # ----------------------------------------------------------- # has_curses Screen # ----------------------------------------------------------- def init_curses(self): """Initialise curses""" if self.has_curses: self.screen = curses.initscr() self.screen.keypad(1) curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION) curses.flushinp() curses.noecho() curses.curs_set(0) else: self.display_image("WAIT") def print_fslabel(self, label): """Print FS Label""" if self.has_curses: self.status_win.addstr(1, 1, "Partition : %-32s" % label, curses.color_pair(2)) self.status_win.refresh() def print_size(self, label): """Print FS Size""" if self.has_curses: if label is None: self.status_win.addstr(2, 1, "Size : ",curses.color_pair(2)) else: self.status_win.addstr(2, 1, "Size : %s " % label,curses.color_pair(2)) logging.info("size={label}") self.status_win.refresh() def print_used(self, label): """Print FS Used Size""" if self.has_curses: if label is None: self.status_win.addstr(3, 1, "Used : ",curses.color_pair(2)) else: self.status_win.addstr(3, 1, "Used : %s " % label,curses.color_pair(2)) logging.info(f'used="{label}') self.status_win.refresh() def print_fstype(self, label): """Print device FS type""" if self.has_curses: self.status_win.addstr(1, 50, "Part / Type : %-32s" % label, curses.color_pair(2)) self.status_win.refresh() def print_model(self, label): """Print device model""" if self.has_curses: self.status_win.addstr(2, 50, "Model : %-32s" % label, curses.color_pair(2)) self.status_win.refresh() def print_serial(self, label): """Print device serail number""" if self.has_curses: self.status_win.addstr(3, 50, "Serial : %-32s" % label, curses.color_pair(2)) self.status_win.refresh() def init_bar(self): """Initialise progress bar""" if self.has_curses: self.progress_win = curses.newwin(3, curses.COLS-12, 17, 5) self.progress_win.border(0) self.progress_win.refresh() def update_bar(self, progress): """Update progress bar""" if self.has_curses: if progress == 0: self.progress_win.clear() self.progress_win.border(0) time.sleep(0) self.progress_win.addstr(0, 1, "Progress:") else: pos = ((curses.COLS-14) * progress) // 100 self.progress_win.addstr(1, 1, "#"*pos) self.progress_win.addstr(0, 1, f"Progress: {progress}%") self.progress_win.refresh() def print_screen(self): """Print main screen""" if self.has_curses: curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_BLACK) curses.init_pair(3, curses.COLOR_GREEN, curses.COLOR_BLACK) self.title_win = curses.newwin(12, curses.COLS, 0, 0) # title_win.border(0) title_col = (curses.COLS - len(self.logo[0]))//2 self.title_win.addstr(1, title_col, self.logo[0], curses.color_pair(1)) self.title_win.addstr(2, title_col, self.logo[1], curses.color_pair(1)) self.title_win.addstr(3, title_col, self.logo[2], curses.color_pair(1)) self.title_win.addstr(4, title_col, self.logo[3], curses.color_pair(1)) self.title_win.addstr(5, title_col, self.logo[4], curses.color_pair(1)) self.title_win.addstr(6, title_col, self.logo[5], curses.color_pair(1)) self.title_win.addstr(7, title_col, self.logo[6], curses.color_pair(1)) self.title_win.addstr(8, title_col, self.logo[7], curses.color_pair(1)) self.title_win.addstr(9, title_col, self.logo[8], curses.color_pair(1)) self.title_win.addstr(10, title_col, self.logo[9], curses.color_pair(1)) self.title_win.refresh() self.status_win = curses.newwin(5, curses.COLS, 12, 0) self.status_win.border(0) self.status_win.addstr(0, 1, "USB Key Information") self.print_fslabel("") self.print_size(None) self.print_used(None) self.print_fstype("") self.print_model("") self.print_serial("") self.init_bar() self.update_bar(0) self.log('Ready.') def end_curses(self): """Closes curses""" if self.has_curses: curses.endwin() curses.flushinp() else: # hide old image os.system("killall -s 9 fim 2>/dev/null") # ----------------------------------------------------------- # Logging windows # ----------------------------------------------------------- def init_log(self): """Inititalize logging function""" if self.has_curses: self.log_win = curses.newwin(curses.LINES-20, curses.COLS, 20, 0) self.log_win.border(0) logging.basicConfig( filename='pandora-box.log', level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%m/%d/%y %H:%M' ) logs = [] def log(self, msg): """log something""" logging.info(msg) if self.has_curses: # display log on screen self.logs.append(msg) if len(self.logs)>(curses.LINES-22): self.logs.pop(0) self.log_win.clear() self.log_win.border(0) for i in range(min(curses.LINES-22,len(self.logs))): self.log_win.addstr(i+1,1,self.logs[i][:curses.COLS-2],curses.color_pair(3)) self.log_win.refresh() # ----------------------------------------------------------- # Device # ----------------------------------------------------------- def mount_device(self): """Mount USB device""" self.log('Try to mount partition') if self.has_usb_auto_mount: found = False loop = 0 while (not found) and (loop < 15): # need to sleep before devide is mounted time.sleep(1) for partition in psutil.disk_partitions(): if partition.device == self.device.device_node: found = True loop += 1 if found: return partition.mountpoint self.log('No partition mounted') return None else: if not os.path.exists("/media/box"): self.log("folder /media/box does not exists") return None os.system(f"pmount {self.device.device_node} /media/box >/dev/null 2>/dev/null") loop = 0 while loop < 10: time.sleep(1) try: os.statvfs(self.mount_point) except Exception as ex : loop +=1 continue break return "/media/box" def umount_device(self): """Unmount USB device""" if self.has_usb_auto_mount: self.log("Sync partitions") os.system("sync") else: self.log("Unmount partitions") os.system("pumount /media/box 2>/dev/null >/dev/null") def log_device_info(self, dev): """Log device information""" logging.info( f'device_name={dev.get("DEVNAME")}, ' \ f'path_id={dev.get("ID_PATH")}, ' \ f'bus system={dev.get("ID_BUS")}, ' \ f'USB_driver={dev.get("ID_USB_DRIVER")}, ' \ f'device_type={dev.get("DEVTYPE")}, ' \ f'device_usage={dev.get("ID_FS_USAGE")}, ' \ f'partition type={dev.get("ID_PART_TABLE_TYPE")}, ' \ f'fs_type={dev.get("ID_FS_TYPE")}, ' \ f'partition_label={dev.get("ID_FS_LABEL")}, ' \ f'device_model={dev.get("ID_MODEL")}, ' \ f'model_id={dev.get("ID_MODEL_ID")}, ' \ f'serial_short={dev.get("ID_SERIAL_SHORT")}, '\ f'serial={dev.get("ID_SERIAL")}') # ----------------------------------------------------------- # pandora # ----------------------------------------------------------- def scan(self, used): """Scan a mount point with Pandora""" self.infected_files = [] scanned = 0 file_count = 0 scan_start_time = time.time() if self.has_quarantine: qfolder = os.path.join(self.quarantine_folder,datetime.now().strftime("%y%m%d-%H%M")) if not self.is_fake_scan: pandora = pypandora.PyPandora(root_url=self.pandora_root_url) try: for root, _, files in os.walk(self.mount_point): for file in files: status = None full_path = os.path.join(root,file) file_size = os.path.getsize(full_path) # log("Check %s [%s]" % (file, human_readable_size(file_size))) file_scan_start_time = time.time() if self.is_fake_scan : time.sleep(0.1) status = "SKIPPED" else: if file_size > (1024*1024*1024): status = "TOO BIG" else: self.log("ppypandora : [%s] " % full_path) res = pandora.submit_from_disk(full_path) time.sleep(0.1) loop = 0 while loop < 960: res = pandora.task_status(res["taskId"]) status = res["status"] if status != "WAITING": break time.sleep(0.5) loop += 1 file_scan_end_time = time.time() self.log( f'file="{file}" , '\ f'size="{self.human_readable_size(file_size)}", '\ f'status="{status}"", '\ f'duration="{int(file_scan_end_time - file_scan_start_time)}"') scanned += os.path.getsize(full_path) file_count += 1 self.update_bar(scanned * 100 // used) if status == "ALERT": self.infected_files.append(full_path) if self.has_quarantine: if not os.path.isdir(qfolder) : os.mkdir(qfolder) shutil.copyfile(full_path, os.path.join(qfolder,file)) except Exception as ex : self.log(f"Unexpected error: {ex}") self.log("Scan failed !") if not self.has_curses: self.display_image("ERROR") raise self.update_bar(100) self.log( f'duration="{int(time.time() - scan_start_time)}s", '\ f'files_scanned="{file_count}", '\ f'files_infected="{len(self.infected_files)}"') return self.infected_files # -------------------------------------- def wait_device(self): """Wait for insert of remove of USB device""" # Loop context = pyudev.Context() monitor = pyudev.Monitor.from_netlink(context) monitor.filter_by("block") try: for dev in iter(monitor.poll, None): if dev.get("ID_FS_USAGE") == "filesystem" and dev.device_node[5:7] == "sd": if dev.action == "add": self.device = dev self.log("Device inserted") self.log_device_info(self.device) if not self.has_curses: self.display_image("WORK") else: # display device type self.print_fslabel(self.device.get("ID_FS_LABEL")) self.print_fstype(self.device.get("ID_PART_TABLE_TYPE") + " " + self.device.get("ID_FS_TYPE")) self.print_model(self.device.get("ID_MODEL")) self.print_serial(self.device.get("ID_SERIAL_SHORT")) return "INSERTED" if dev.action == "remove": self.device = None self.log("Device removed") if not self.has_curses: self.display_image("WAIT") else: self.print_fslabel("") self.print_size(None) self.print_used(None) self.print_fstype("") self.print_model("") self.print_serial("") self.update_bar(0) return "WAIT" except Exception as ex: self.log(f"Unexpected error: {str(ex)}") logging.info("An exception was thrown!", exc_info=True) finally: self.log("Done.") return "STOP" # -------------------------------------- def mount(self): # Mount device self.mount_point = self.mount_device() self.log(f'Partition mounted at {self.mount_point}') if self.mount_point is None: # no partition if not self.has_curses: self.display_image("WAIT") return "WAIT" try: os.statvfs(self.mount_point) except Exception as ex : self.log(f"error={ex}") logging.info("An exception was thrown!", exc_info=True) if not self.has_curses: self.display_image("WAIT") return "WAIT" return "SCAN" # -------------------------------------- def scan_device(self): """Scan devce with pypandora""" try: statvfs=os.statvfs(self.mount_point) except Exception as ex : self.log(f"error={ex}") logging.info("An exception was thrown!", exc_info=True) if not self.has_curses: self.display_image("WAIT") return "WAIT" self.print_size(self.human_readable_size(statvfs.f_frsize * statvfs.f_blocks)) self.print_used( self.human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))) self.scan(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)) return "CLEAN" # -------------------------------------- def clean(self): """Remove infected files""" # Clean files if len(self.infected_files) > 0: self.log(f"infeted_files={len(self.infected_files)}") if not self.has_curses: self.display_image("BAD") self.wait_mouse_click() else: self.log('PRESS KEY TO CLEAN') self.screen.getch() # Remove infected files for file in self.infected_files: try : os.remove(file) self.log(f"{file} removed") except Exception as ex : self.log(f"Unexpected error: {ex}") logging.info("An exception was thrown!", exc_info=True) os.system("sync") self.log("Clean done.") if not self.has_curses: self.display_image("OK") else: if not self.has_curses: self.display_image("OK") self.umount_device() return "WAIT" # -------------------------------------- def move_to_script_folder(self): """Move to pandora-box folder""" abspath = os.path.abspath(__file__) dname = os.path.dirname(abspath) os.chdir(dname) # -------------------------------------- def startup(self): """Start Pandora-box""" self.config() self.init_curses() self.init_log() self.move_to_script_folder() # Read logo with open('pandora-box.txt', 'r') as file1: self.logo = file1.readlines() # Print logo screen self.print_screen() # First unmount remaining device self.umount_device() return "WAIT" # -------------------------------------- def loop(self, state): """Main event loop""" match state: case "START": return self.startup() case "WAIT": return self.wait_device() case "INSERTED": return self.mount() case "SCAN": return self.scan_device() case "CLEAN": return self.clean() case _: self.log(f"Unknwn state: {state}") return "STOP" # -------------------------------------- def main(self): """Main entry point""" try : state="START" while state!="STOP": state = self.loop(state) except Exception as ex : self.log(f"error={ex}") logging.info("An exception was thrown!", exc_info=True) finally: self.end_curses() def main(_): """Main entry point""" pandora_box = PandoraBox() pandora_box.main() if __name__ == "__main__": wrapper(main)