1
0
Fork 0
mirror of https://github.com/dbarzin/pandora-box.git synced 2025-07-19 21:39:40 +02:00

code quality

This commit is contained in:
dbarzin 2023-02-25 11:19:37 +01:00
parent 73f5d0c5d3
commit 49a1f2e3af

View file

@ -33,6 +33,7 @@ import psutil
import pypandora import pypandora
class PandoraBox: class PandoraBox:
"""The PandoraBox class""" """The PandoraBox class"""
@ -74,27 +75,25 @@ class PandoraBox:
# read the config file # read the config file
config_parser.read('pandora-box.ini') config_parser.read('pandora-box.ini')
# set values # set values
self.is_fake_scan=config_parser['DEFAULT']['FAKE_SCAN'].lower()=="true" self.is_fake_scan = config_parser['DEFAULT']['FAKE_SCAN'].lower() == "true"
self.has_usb_auto_mount=config_parser['DEFAULT']['USB_AUTO_MOUNT'].lower()=="true" self.has_usb_auto_mount = config_parser['DEFAULT']['USB_AUTO_MOUNT'].lower() == "true"
self.pandora_root_url=config_parser['DEFAULT']['PANDORA_ROOT_URL'] self.pandora_root_url = config_parser['DEFAULT']['PANDORA_ROOT_URL']
# Quarantine # Quarantine
self.has_quarantine = config_parser['DEFAULT']['QUARANTINE'].lower()=="true" self.has_quarantine = config_parser['DEFAULT']['QUARANTINE'].lower() == "true"
self.quarantine_folder = config_parser['DEFAULT']['QUARANTINE_FOLDER'] self.quarantine_folder = config_parser['DEFAULT']['QUARANTINE_FOLDER']
# Curses # Curses
self.has_curses = config_parser['DEFAULT']['CURSES'].lower()=="true" self.has_curses = config_parser['DEFAULT']['CURSES'].lower() == "true"
# ---------------------------------------------------------- # ----------------------------------------------------------
def _human_readable_size(self,size, decimal_places=1): def _human_readable_size(self, size, decimal_places=1):
""" Convert size to human readble string """ """ Convert size to human readble string """
for unit in ['B','KB','MB','GB','TB']: for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size < 1024.0: if size < 1024.0:
break break
size /= 1024.0 size /= 1024.0
return f"{size:.{decimal_places}f}{unit}" return f"{size:.{decimal_places}f}{unit}"
# ----------------------------------------------------------- # -----------------------------------------------------------
# Image Screen # Image Screen
# ----------------------------------------------------------- # -----------------------------------------------------------
@ -102,15 +101,15 @@ class PandoraBox:
def display_image(self, status): def display_image(self, status):
""" Display image on screen """ """ Display image on screen """
if not self.has_curses: if not self.has_curses:
if status=="WAIT": if status == "WAIT":
image = "images/key*.png" image = "images/key*.png"
elif status=="WORK": elif status == "WORK":
image = "images/wait*.png" image = "images/wait*.png"
elif status=="OK": elif status == "OK":
image = "images/ok.png" image = "images/ok.png"
elif status=="BAD": elif status == "BAD":
image = "images/bad.png" image = "images/bad.png"
elif status=="ERROR": elif status == "ERROR":
image = "images/error.png" image = "images/error.png"
else: else:
return return
@ -119,24 +118,23 @@ class PandoraBox:
# display image # display image
if "*" in image: if "*" in image:
# slide show # slide show
os.system(f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image} "\ os.system(f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image} "
"</dev/null 2>/dev/null >/dev/null &") "</dev/null 2>/dev/null >/dev/null &")
else : else:
# only one image # only one image
os.system(f"fim -qa {image} </dev/null 2>/dev/null >/dev/null &") os.system(f"fim -qa {image} </dev/null 2>/dev/null >/dev/null &")
# ----------------------------------------------------------- # -----------------------------------------------------------
def wait_mouse_click(self): def wait_mouse_click(self):
""" Wait for mouse click event """ """ Wait for mouse click event """
with open("/dev/input/mice", "rb" ) as mouse: with open("/dev/input/mice", "rb") as mouse:
down = False down = False
while True: while True:
buf = mouse.read(3) buf = mouse.read(3)
if (buf[0] & 0x1)==1: if (buf[0] & 0x1) == 1:
down = True down = True
if ((buf[0] & 0x1)==0) and down: if ((buf[0] & 0x1) == 0) and down:
break break
# ----------------------------------------------------------- # -----------------------------------------------------------
@ -172,7 +170,7 @@ class PandoraBox:
def _print_used(self, label): def _print_used(self, label):
"""Print FS Used Size""" """Print FS Used Size"""
if self.has_curses: if self.has_curses:
self.status_win.addstr(3, 1, f"Used : {label:32} ",curses.color_pair(2)) self.status_win.addstr(3, 1, f"Used : {label:32} ", curses.color_pair(2))
self.status_win.refresh() self.status_win.refresh()
def _print_fstype(self, label): def _print_fstype(self, label):
@ -274,17 +272,18 @@ class PandoraBox:
) )
logs = [] logs = []
def _log(self, msg): def _log(self, msg):
"""log something""" """log something"""
if self.has_curses: if self.has_curses:
# display log on screen # display log on screen
self.logs.append(msg) self.logs.append(msg)
if len(self.logs)>(curses.LINES-22): if len(self.logs) > (curses.LINES - 22):
self.logs.pop(0) self.logs.pop(0)
self.log_win.clear() self.log_win.clear()
self.log_win.border(0) self.log_win.border(0)
for i in range(min(curses.LINES-22,len(self.logs))): for i in range(min(curses.LINES - 22, len(self.logs))):
self.log_win.addstr(i+1,1,self.logs[i][:curses.COLS-2],curses.color_pair(3)) self.log_win.addstr(i + 1, 1, self.logs[i][:curses.COLS - 2], curses.color_pair(3))
self.log_win.refresh() self.log_win.refresh()
# ----------------------------------------------------------- # -----------------------------------------------------------
@ -319,7 +318,7 @@ class PandoraBox:
os.statvfs(self.mount_point) os.statvfs(self.mount_point)
except Exception as ex: except Exception as ex:
self._log(f"Unexpected error: {ex}") self._log(f"Unexpected error: {ex}")
loop +=1 loop += 1
continue continue
break break
@ -334,18 +333,18 @@ class PandoraBox:
def _log_device_info(self, dev): def _log_device_info(self, dev):
"""Log device information""" """Log device information"""
logging.info( logging.info(
'device_name="%s", ' \ 'device_name="%s", '
'path_id="%s", ' \ 'path_id="%s", '
'bus system="%s", ' \ 'bus system="%s", '
'USB_driver="%s", ' \ 'USB_driver="%s", '
'device_type="%s", ' \ 'device_type="%s", '
'device_usage="%s", ' \ 'device_usage="%s", '
'partition type="%s", ' \ 'partition type="%s", '
'fs_type="%s", ' \ 'fs_type="%s", '
'partition_label="%s", ' \ 'partition_label="%s", '
'device_model="%s", ' \ 'device_model="%s", '
'model_id="%s", ' \ 'model_id="%s", '
'serial_short="%s", '\ 'serial_short="%s", '
'serial="%s"', 'serial="%s"',
dev.get("DEVNAME"), dev.get("DEVNAME"),
dev.get("ID_PATH"), dev.get("ID_PATH"),
@ -366,25 +365,41 @@ class PandoraBox:
# pandora # pandora
# ----------------------------------------------------------- # -----------------------------------------------------------
def scan(self, used): def scan(self):
"""Scan a mount point with Pandora""" """Scan devce with pypandora"""
# get device size
try:
statvfs = os.statvfs(self.mount_point)
except Exception as ex:
self._log(f"error={ex}")
logging.info("An exception was thrown!", exc_info=True)
if not self.has_curses:
self.display_image("ERROR")
return "ERROR"
self._print_size(self._human_readable_size(statvfs.f_frsize * statvfs.f_blocks))
self._print_used(self._human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)))
self._human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))
used = statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)
# scan device
self.infected_files = [] self.infected_files = []
scanned = 0 scanned = 0
file_count = 0 file_count = 0
scan_start_time = time.time() scan_start_time = time.time()
if self.has_quarantine: if self.has_quarantine:
qfolder = os.path.join(self.quarantine_folder,datetime.now().strftime("%y%m%d-%H%M")) qfolder = os.path.join(self.quarantine_folder, datetime.now().strftime("%y%m%d-%H%M"))
if not self.is_fake_scan: if not self.is_fake_scan:
pandora = pypandora.PyPandora(root_url=self.pandora_root_url) pandora = pypandora.PyPandora(root_url=self.pandora_root_url)
try: try:
for root, _, files in os.walk(self.mount_point): for root, _, files in os.walk(self.mount_point):
for file in files: for file in files:
status = None status = None
full_path = os.path.join(root,file) full_path = os.path.join(root, file)
file_size = os.path.getsize(full_path) file_size = os.path.getsize(full_path)
# log("Check %s [%s]" % (file, human_readable_size(file_size))) # log("Check %s [%s]" % (file, human_readable_size(file_size)))
file_scan_start_time = time.time() file_scan_start_time = time.time()
if self.is_fake_scan : if self.is_fake_scan:
time.sleep(0.1) time.sleep(0.1)
status = "SKIPPED" status = "SKIPPED"
else: else:
@ -405,14 +420,14 @@ class PandoraBox:
file_scan_end_time = time.time() file_scan_end_time = time.time()
self._log( self._log(
f'Scan {file} '\ f'Scan {file} '
f'[{self._human_readable_size(file_size)}] '\ f'[{self._human_readable_size(file_size)}] '
'-> '\ '-> '
f'{status} ({int(file_scan_end_time - file_scan_start_time)}s)') f'{status} ({int(file_scan_end_time - file_scan_start_time)}s)')
logging.info( logging.info(
f'file="{file}", '\ f'file="{file}", '
f'size="{file_size}", '\ f'size="{file_size}", '
f'status="{status}"", '\ f'status="{status}"", '
f'duration="{int(file_scan_end_time - file_scan_start_time)}"') f'duration="{int(file_scan_end_time - file_scan_start_time)}"')
scanned += os.path.getsize(full_path) scanned += os.path.getsize(full_path)
file_count += 1 file_count += 1
@ -421,25 +436,25 @@ class PandoraBox:
if status == "ALERT": if status == "ALERT":
self.infected_files.append(full_path) self.infected_files.append(full_path)
if self.has_quarantine: if self.has_quarantine:
if not os.path.isdir(qfolder) : if not os.path.isdir(qfolder):
os.mkdir(qfolder) os.mkdir(qfolder)
shutil.copyfile(full_path, os.path.join(qfolder,file)) shutil.copyfile(full_path, os.path.join(qfolder, file))
except Exception as ex : except Exception as ex:
self._log(f"Unexpected error: {str(ex)}") self._log(f"Unexpected error: {str(ex)}")
logging.info(f'error="{str(ex)}"', exc_info=True) logging.info(f'error="{str(ex)}"', exc_info=True)
return "ERROR" return "ERROR"
self._update_bar(100) self._update_bar(100)
self._log("Scan done in %ds, %d files scanned, %d files infected" % self._log("Scan done in %ds, %d files scanned, %d files infected" %
((time.time() - scan_start_time),file_count,len(self.infected_files))) ((time.time() - scan_start_time), file_count, len(self.infected_files)))
logging.info( logging.info(
f'duration="{int(time.time() - scan_start_time)}", '\ f'duration="{int(time.time() - scan_start_time)}", '
f'files_scanned="{file_count}", '\ f'files_scanned="{file_count}", '
f'files_infected="{len(self.infected_files)}"') f'files_infected="{len(self.infected_files)}"')
return "CLEAN" return "CLEAN"
# -------------------------------------- # --------------------------------------
def wait_device(self): def wait(self):
"""Wait for insert of remove of USB device""" """Wait for insert of remove of USB device"""
# Loop # Loop
context = pyudev.Context() context = pyudev.Context()
@ -467,8 +482,7 @@ class PandoraBox:
else: else:
# display device type # display device type
self._print_fslabel(self.device.get("ID_FS_LABEL")) self._print_fslabel(self.device.get("ID_FS_LABEL"))
self._print_fstype(self.device.get("ID_PART_TABLE_TYPE") self._print_fstype(self.device.get("ID_PART_TABLE_TYPE") + " " + self.device.get("ID_FS_TYPE"))
+ " " + self.device.get("ID_FS_TYPE"))
self._print_model(self.device.get("ID_MODEL")) self._print_model(self.device.get("ID_MODEL"))
self._print_serial(self.device.get("ID_SERIAL_SHORT")) self._print_serial(self.device.get("ID_SERIAL_SHORT"))
return "INSERTED" return "INSERTED"
@ -501,7 +515,7 @@ class PandoraBox:
return "WAIT" return "WAIT"
try: try:
os.statvfs(self.mount_point) os.statvfs(self.mount_point)
except Exception as ex : except Exception as ex:
self._log(f"Unexpected error: {str(ex)}") self._log(f"Unexpected error: {str(ex)}")
logging.info(f'error="{str(ex)}"', exc_info=True) logging.info(f'error="{str(ex)}"', exc_info=True)
if not self.has_curses: if not self.has_curses:
@ -511,23 +525,6 @@ class PandoraBox:
# -------------------------------------- # --------------------------------------
def scan_device(self):
"""Scan devce with pypandora"""
try:
statvfs=os.statvfs(self.mount_point)
except Exception as ex :
self._log(f"error={ex}")
logging.info("An exception was thrown!", exc_info=True)
if not self.has_curses:
self.display_image("WAIT")
return "WAIT"
self._print_size(self._human_readable_size(statvfs.f_frsize * statvfs.f_blocks))
self._print_used(
self._human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)))
return self.scan(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))
# --------------------------------------
def error(self): def error(self):
""" Display error message """ """ Display error message """
if not self.has_curses: if not self.has_curses:
@ -550,12 +547,12 @@ class PandoraBox:
# Remove infected files # Remove infected files
files_removed = 0 files_removed = 0
for file in self.infected_files: for file in self.infected_files:
try : try:
os.remove(file) os.remove(file)
self._log(f"{file} removed") self._log(f"{file} removed")
logging.info(f'removed="{file}"') logging.info(f'removed="{file}"')
files_removed += 1 files_removed += 1
except Exception as ex : except Exception as ex:
self._log(f"Unexpected error: {str(ex)}") self._log(f"Unexpected error: {str(ex)}")
logging.info(f'error="{str(ex)}"', exc_info=True) logging.info(f'error="{str(ex)}"', exc_info=True)
os.system("sync") os.system("sync")
@ -607,11 +604,11 @@ class PandoraBox:
case "START": case "START":
return self.startup() return self.startup()
case "WAIT": case "WAIT":
return self.wait_device() return self.wait()
case "INSERTED": case "INSERTED":
return self.mount() return self.mount()
case "SCAN": case "SCAN":
return self.scan_device() return self.scan()
case "CLEAN": case "CLEAN":
return self.clean() return self.clean()
case "ERROR": case "ERROR":
@ -623,20 +620,22 @@ class PandoraBox:
def main(self): def main(self):
"""Main entry point""" """Main entry point"""
try : try:
state="START" state = "START"
while state!="STOP": while state != "STOP":
state = self.loop(state) state = self.loop(state)
except Exception as ex : except Exception as ex:
self._log(f"Unexpected error: {str(ex)}") self._log(f"Unexpected error: {str(ex)}")
logging.info(f'error="{str(ex)}"', exc_info=True) logging.info(f'error="{str(ex)}"', exc_info=True)
finally: finally:
self._end_curses() self._end_curses()
def main(_): def main(_):
"""Main entry point""" """Main entry point"""
pandora_box = PandoraBox() pandora_box = PandoraBox()
pandora_box.main() pandora_box.main()
if __name__ == "__main__": if __name__ == "__main__":
wrapper(main) wrapper(main)