mirror of
https://github.com/dbarzin/pandora-box.git
synced 2025-07-19 05:19:40 +02:00
code quality
This commit is contained in:
parent
ceb0bdb64e
commit
f92ce645dc
3 changed files with 138 additions and 125 deletions
2
.github/workflows/ci.yml
vendored
2
.github/workflows/ci.yml
vendored
|
@ -21,7 +21,7 @@ jobs:
|
|||
|
||||
- name: code quality checkout
|
||||
run: |
|
||||
pip install pyudev psutil pypandora
|
||||
pip install pylint
|
||||
pylint ./pandora-box.py
|
||||
|
||||
|
||||
|
|
|
@ -182,3 +182,4 @@ cp pandora-box.ini.curses pandora-dox.ini
|
|||
|
||||
# Reboot
|
||||
echo "You may reboot the server."
|
||||
|
||||
|
|
246
pandora-box.py
246
pandora-box.py
|
@ -67,7 +67,7 @@ class PandoraBox:
|
|||
|
||||
# ----------------------------------------------------------
|
||||
|
||||
def config(self):
|
||||
def _config(self):
|
||||
""" read configuration file """
|
||||
# intantiate a ConfirParser
|
||||
config_parser = configparser.ConfigParser()
|
||||
|
@ -86,7 +86,7 @@ class PandoraBox:
|
|||
|
||||
# ----------------------------------------------------------
|
||||
|
||||
def human_readable_size(self,size, decimal_places=1):
|
||||
def _human_readable_size(self,size, decimal_places=1):
|
||||
""" Convert size to human readble string """
|
||||
for unit in ['B','KB','MB','GB','TB']:
|
||||
if size < 1024.0:
|
||||
|
@ -123,7 +123,7 @@ class PandoraBox:
|
|||
"</dev/null 2>/dev/null >/dev/null &")
|
||||
else :
|
||||
# only one image
|
||||
os.system(f"fim -qa %s </dev/null 2>/dev/null >/dev/null {image}")
|
||||
os.system(f"fim -qa {image} </dev/null 2>/dev/null >/dev/null &")
|
||||
|
||||
|
||||
# -----------------------------------------------------------
|
||||
|
@ -143,7 +143,7 @@ class PandoraBox:
|
|||
# has_curses Screen
|
||||
# -----------------------------------------------------------
|
||||
|
||||
def init_curses(self):
|
||||
def _init_curses(self):
|
||||
"""Initialise curses"""
|
||||
if self.has_curses:
|
||||
self.screen = curses.initscr()
|
||||
|
@ -155,58 +155,55 @@ class PandoraBox:
|
|||
else:
|
||||
self.display_image("WAIT")
|
||||
|
||||
def print_fslabel(self, label):
|
||||
def _print_fslabel(self, label):
|
||||
"""Print FS Label"""
|
||||
if label is None:
|
||||
label = ""
|
||||
if self.has_curses:
|
||||
self.status_win.addstr(1, 1, "Partition : %-32s" % label, curses.color_pair(2))
|
||||
self.status_win.addstr(1, 1, f"Partition : {label:32}", curses.color_pair(2))
|
||||
self.status_win.refresh()
|
||||
|
||||
def print_size(self, label):
|
||||
def _print_size(self, label):
|
||||
"""Print FS Size"""
|
||||
if self.has_curses:
|
||||
if label is None:
|
||||
self.status_win.addstr(2, 1, "Size : ",curses.color_pair(2))
|
||||
else:
|
||||
self.status_win.addstr(2, 1, "Size : %s " % label,curses.color_pair(2))
|
||||
logging.info("size={label}")
|
||||
self.status_win.addstr(2, 1, f"Size : {label:32} ", curses.color_pair(2))
|
||||
self.status_win.refresh()
|
||||
logging.info('fs_size="%s"', label)
|
||||
|
||||
def print_used(self, label):
|
||||
def _print_used(self, label):
|
||||
"""Print FS Used Size"""
|
||||
if self.has_curses:
|
||||
if label is None:
|
||||
self.status_win.addstr(3, 1, "Used : ",curses.color_pair(2))
|
||||
else:
|
||||
self.status_win.addstr(3, 1, "Used : %s " % label,curses.color_pair(2))
|
||||
logging.info(f'used="{label}')
|
||||
self.status_win.addstr(3, 1, f"Used : {label:32} ",curses.color_pair(2))
|
||||
self.status_win.refresh()
|
||||
logging.info('fs_usage="%s"',label)
|
||||
|
||||
def print_fstype(self, label):
|
||||
def _print_fstype(self, label):
|
||||
"""Print device FS type"""
|
||||
if self.has_curses:
|
||||
self.status_win.addstr(1, 50, "Part / Type : %-32s" % label, curses.color_pair(2))
|
||||
self.status_win.addstr(1, 50, f"Part / Type : {label:32}", curses.color_pair(2))
|
||||
self.status_win.refresh()
|
||||
logging.info('fs_type="%s"',label)
|
||||
|
||||
def print_model(self, label):
|
||||
def _print_model(self, label):
|
||||
"""Print device model"""
|
||||
if self.has_curses:
|
||||
self.status_win.addstr(2, 50, "Model : %-32s" % label, curses.color_pair(2))
|
||||
self.status_win.addstr(2, 50, f"Model : {label:32}", curses.color_pair(2))
|
||||
self.status_win.refresh()
|
||||
|
||||
def print_serial(self, label):
|
||||
def _print_serial(self, label):
|
||||
"""Print device serail number"""
|
||||
if self.has_curses:
|
||||
self.status_win.addstr(3, 50, "Serial : %-32s" % label, curses.color_pair(2))
|
||||
self.status_win.addstr(3, 50, f"Serial : {label:32}", curses.color_pair(2))
|
||||
self.status_win.refresh()
|
||||
|
||||
def init_bar(self):
|
||||
def _init_bar(self):
|
||||
"""Initialise progress bar"""
|
||||
if self.has_curses:
|
||||
self.progress_win = curses.newwin(3, curses.COLS-12, 17, 5)
|
||||
self.progress_win.border(0)
|
||||
self.progress_win.refresh()
|
||||
|
||||
def update_bar(self, progress):
|
||||
def _update_bar(self, progress):
|
||||
"""Update progress bar"""
|
||||
if self.has_curses:
|
||||
if progress == 0:
|
||||
|
@ -220,7 +217,7 @@ class PandoraBox:
|
|||
self.progress_win.addstr(0, 1, f"Progress: {progress}%")
|
||||
self.progress_win.refresh()
|
||||
|
||||
def print_screen(self):
|
||||
def _print_screen(self):
|
||||
"""Print main screen"""
|
||||
if self.has_curses:
|
||||
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
|
||||
|
@ -243,17 +240,17 @@ class PandoraBox:
|
|||
self.status_win = curses.newwin(5, curses.COLS, 12, 0)
|
||||
self.status_win.border(0)
|
||||
self.status_win.addstr(0, 1, "USB Key Information")
|
||||
self.print_fslabel("")
|
||||
self.print_size(None)
|
||||
self.print_used(None)
|
||||
self.print_fstype("")
|
||||
self.print_model("")
|
||||
self.print_serial("")
|
||||
self.init_bar()
|
||||
self.update_bar(0)
|
||||
self.log('Ready.')
|
||||
self._print_fslabel("")
|
||||
self._print_size("")
|
||||
self._print_used("")
|
||||
self._print_fstype("")
|
||||
self._print_model("")
|
||||
self._print_serial("")
|
||||
self._init_bar()
|
||||
self._update_bar(0)
|
||||
self._log('Ready.')
|
||||
|
||||
def end_curses(self):
|
||||
def _end_curses(self):
|
||||
"""Closes curses"""
|
||||
if self.has_curses:
|
||||
curses.endwin()
|
||||
|
@ -266,7 +263,7 @@ class PandoraBox:
|
|||
# Logging windows
|
||||
# -----------------------------------------------------------
|
||||
|
||||
def init_log(self):
|
||||
def _init_log(self):
|
||||
"""Inititalize logging function"""
|
||||
if self.has_curses:
|
||||
self.log_win = curses.newwin(curses.LINES-20, curses.COLS, 20, 0)
|
||||
|
@ -279,7 +276,7 @@ class PandoraBox:
|
|||
)
|
||||
|
||||
logs = []
|
||||
def log(self, msg):
|
||||
def _log(self, msg):
|
||||
"""log something"""
|
||||
logging.info(msg)
|
||||
if self.has_curses:
|
||||
|
@ -299,24 +296,22 @@ class PandoraBox:
|
|||
|
||||
def mount_device(self):
|
||||
"""Mount USB device"""
|
||||
self.log('Try to mount partition')
|
||||
self._log('Try to mount partition')
|
||||
self.mount_point = None
|
||||
if self.has_usb_auto_mount:
|
||||
found = False
|
||||
loop = 0
|
||||
while (not found) and (loop < 15):
|
||||
while (self.mount_point is None) and (loop < 15):
|
||||
# need to sleep before devide is mounted
|
||||
time.sleep(1)
|
||||
for partition in psutil.disk_partitions():
|
||||
if partition.device == self.device.device_node:
|
||||
found = True
|
||||
self.mount_point = partition.mountpoint
|
||||
loop += 1
|
||||
if found:
|
||||
return partition.mountpoint
|
||||
self.log('No partition mounted')
|
||||
return None
|
||||
if self.mount_device is None:
|
||||
self._log('No partition mounted')
|
||||
else:
|
||||
if not os.path.exists("/media/box"):
|
||||
self.log("folder /media/box does not exists")
|
||||
self._log("folder /media/box does not exists")
|
||||
return None
|
||||
os.system(f"pmount {self.device.device_node} /media/box >/dev/null 2>/dev/null")
|
||||
loop = 0
|
||||
|
@ -325,36 +320,50 @@ class PandoraBox:
|
|||
try:
|
||||
os.statvfs(self.mount_point)
|
||||
except Exception as ex:
|
||||
self._log(f"Unexpected error: {ex}")
|
||||
loop +=1
|
||||
continue
|
||||
break
|
||||
return "/media/box"
|
||||
self.mount_point = "/media/box"
|
||||
|
||||
def umount_device(self):
|
||||
"""Unmount USB device"""
|
||||
if self.has_usb_auto_mount:
|
||||
self.log("Sync partitions")
|
||||
self._log("Sync partitions")
|
||||
os.system("sync")
|
||||
else:
|
||||
self.log("Unmount partitions")
|
||||
os.system("pumount /media/box 2>/dev/null >/dev/null")
|
||||
|
||||
def log_device_info(self, dev):
|
||||
def _log_device_info(self, dev):
|
||||
"""Log device information"""
|
||||
logging.info(
|
||||
f'device_name={dev.get("DEVNAME")}, ' \
|
||||
f'path_id={dev.get("ID_PATH")}, ' \
|
||||
f'bus system={dev.get("ID_BUS")}, ' \
|
||||
f'USB_driver={dev.get("ID_USB_DRIVER")}, ' \
|
||||
f'device_type={dev.get("DEVTYPE")}, ' \
|
||||
f'device_usage={dev.get("ID_FS_USAGE")}, ' \
|
||||
f'partition type={dev.get("ID_PART_TABLE_TYPE")}, ' \
|
||||
f'fs_type={dev.get("ID_FS_TYPE")}, ' \
|
||||
f'partition_label={dev.get("ID_FS_LABEL")}, ' \
|
||||
f'device_model={dev.get("ID_MODEL")}, ' \
|
||||
f'model_id={dev.get("ID_MODEL_ID")}, ' \
|
||||
f'serial_short={dev.get("ID_SERIAL_SHORT")}, '\
|
||||
f'serial={dev.get("ID_SERIAL")}')
|
||||
'device_name="%s", ' \
|
||||
'path_id="%s", ' \
|
||||
'bus system="%s", ' \
|
||||
'USB_driver="%s", ' \
|
||||
'device_type="%s", ' \
|
||||
'device_usage="%s", ' \
|
||||
'partition type="%s", ' \
|
||||
'fs_type="%s", ' \
|
||||
'partition_label="%s", ' \
|
||||
'device_model="%s", ' \
|
||||
'model_id="%s", ' \
|
||||
'serial_short="%s", '\
|
||||
'serial="%s"',
|
||||
dev.get("DEVNAME"),
|
||||
dev.get("ID_PATH"),
|
||||
dev.get("ID_BUS"),
|
||||
dev.get("ID_USB_DRIVER"),
|
||||
dev.get("DEVTYPE"),
|
||||
dev.get("ID_FS_USAGE"),
|
||||
dev.get("ID_PART_TABLE_TYPE"),
|
||||
dev.get("ID_FS_TYPE"),
|
||||
dev.get("ID_FS_LABEL"),
|
||||
dev.get("ID_MODEL"),
|
||||
dev.get("ID_MODEL_ID"),
|
||||
dev.get("ID_SERIAL_SHORT"),
|
||||
dev.get("ID_SERIAL")
|
||||
)
|
||||
|
||||
# -----------------------------------------------------------
|
||||
# pandora
|
||||
|
@ -385,7 +394,7 @@ class PandoraBox:
|
|||
if file_size > (1024*1024*1024):
|
||||
status = "TOO BIG"
|
||||
else:
|
||||
self.log("ppypandora : [%s] " % full_path)
|
||||
self._log(f'scan="{full_path}]"')
|
||||
res = pandora.submit_from_disk(full_path)
|
||||
time.sleep(0.1)
|
||||
loop = 0
|
||||
|
@ -397,14 +406,14 @@ class PandoraBox:
|
|||
time.sleep(0.5)
|
||||
loop += 1
|
||||
file_scan_end_time = time.time()
|
||||
self.log(
|
||||
self._log(
|
||||
f'file="{file}" , '\
|
||||
f'size="{self.human_readable_size(file_size)}", '\
|
||||
f'size="{self._human_readable_size(file_size)}", '\
|
||||
f'status="{status}"", '\
|
||||
f'duration="{int(file_scan_end_time - file_scan_start_time)}"')
|
||||
scanned += os.path.getsize(full_path)
|
||||
file_count += 1
|
||||
self.update_bar(scanned * 100 // used)
|
||||
self._update_bar(scanned * 100 // used)
|
||||
|
||||
if status == "ALERT":
|
||||
self.infected_files.append(full_path)
|
||||
|
@ -413,13 +422,13 @@ class PandoraBox:
|
|||
os.mkdir(qfolder)
|
||||
shutil.copyfile(full_path, os.path.join(qfolder,file))
|
||||
except Exception as ex :
|
||||
self.log(f"Unexpected error: {ex}")
|
||||
self.log("Scan failed !")
|
||||
self._log(f"Unexpected error: {ex}")
|
||||
self._log("Scan failed !")
|
||||
if not self.has_curses:
|
||||
self.display_image("ERROR")
|
||||
raise
|
||||
self.update_bar(100)
|
||||
self.log(
|
||||
self._update_bar(100)
|
||||
self._log(
|
||||
f'duration="{int(time.time() - scan_start_time)}s", '\
|
||||
f'files_scanned="{file_count}", '\
|
||||
f'files_infected="{len(self.infected_files)}"')
|
||||
|
@ -437,46 +446,49 @@ class PandoraBox:
|
|||
for dev in iter(monitor.poll, None):
|
||||
if dev.get("ID_FS_USAGE") == "filesystem" and dev.device_node[5:7] == "sd":
|
||||
if dev.action == "add":
|
||||
return self._device_inserted(dev)
|
||||
if dev.action == "remove":
|
||||
return self._device_removed()
|
||||
except Exception as ex:
|
||||
self._log(f"Unexpected error: {str(ex)}")
|
||||
logging.info("An exception was thrown!", exc_info=True)
|
||||
finally:
|
||||
self._log("Done.")
|
||||
return "STOP"
|
||||
|
||||
def _device_inserted(self, dev):
|
||||
self.device = dev
|
||||
self.log("Device inserted")
|
||||
self.log_device_info(self.device)
|
||||
self._log_device_info(self.device)
|
||||
if not self.has_curses:
|
||||
self.display_image("WORK")
|
||||
else:
|
||||
# display device type
|
||||
self.print_fslabel(self.device.get("ID_FS_LABEL"))
|
||||
self.print_fstype(self.device.get("ID_PART_TABLE_TYPE")
|
||||
self._print_fslabel(self.device.get("ID_FS_LABEL"))
|
||||
self._print_fstype(self.device.get("ID_PART_TABLE_TYPE")
|
||||
+ " " + self.device.get("ID_FS_TYPE"))
|
||||
self.print_model(self.device.get("ID_MODEL"))
|
||||
self.print_serial(self.device.get("ID_SERIAL_SHORT"))
|
||||
self._print_model(self.device.get("ID_MODEL"))
|
||||
self._print_serial(self.device.get("ID_SERIAL_SHORT"))
|
||||
return "INSERTED"
|
||||
if dev.action == "remove":
|
||||
|
||||
def _device_removed(self):
|
||||
self.device = None
|
||||
self.log("Device removed")
|
||||
if not self.has_curses:
|
||||
self.display_image("WAIT")
|
||||
else:
|
||||
self.print_fslabel("")
|
||||
self.print_size(None)
|
||||
self.print_used(None)
|
||||
self.print_fstype("")
|
||||
self.print_model("")
|
||||
self.print_serial("")
|
||||
self.update_bar(0)
|
||||
self._print_fslabel("")
|
||||
self._print_size("")
|
||||
self._print_used("")
|
||||
self._print_fstype("")
|
||||
self._print_model("")
|
||||
self._print_serial("")
|
||||
self._update_bar(0)
|
||||
return "WAIT"
|
||||
except Exception as ex:
|
||||
self.log(f"Unexpected error: {str(ex)}")
|
||||
logging.info("An exception was thrown!", exc_info=True)
|
||||
finally:
|
||||
self.log("Done.")
|
||||
return "STOP"
|
||||
|
||||
# --------------------------------------
|
||||
|
||||
def mount(self):
|
||||
# Mount device
|
||||
self.mount_point = self.mount_device()
|
||||
self.log(f'Partition mounted at {self.mount_point}')
|
||||
""" Mount device """
|
||||
self.mount_device()
|
||||
self._log(f'Partition mounted at {self.mount_point}')
|
||||
if self.mount_point is None:
|
||||
# no partition
|
||||
if not self.has_curses:
|
||||
|
@ -485,7 +497,7 @@ class PandoraBox:
|
|||
try:
|
||||
os.statvfs(self.mount_point)
|
||||
except Exception as ex :
|
||||
self.log(f"error={ex}")
|
||||
self._log(f"error={ex}")
|
||||
logging.info("An exception was thrown!", exc_info=True)
|
||||
if not self.has_curses:
|
||||
self.display_image("WAIT")
|
||||
|
@ -499,14 +511,14 @@ class PandoraBox:
|
|||
try:
|
||||
statvfs=os.statvfs(self.mount_point)
|
||||
except Exception as ex :
|
||||
self.log(f"error={ex}")
|
||||
self._log(f"error={ex}")
|
||||
logging.info("An exception was thrown!", exc_info=True)
|
||||
if not self.has_curses:
|
||||
self.display_image("WAIT")
|
||||
return "WAIT"
|
||||
self.print_size(self.human_readable_size(statvfs.f_frsize * statvfs.f_blocks))
|
||||
self.print_used(
|
||||
self.human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)))
|
||||
self._print_size(self._human_readable_size(statvfs.f_frsize * statvfs.f_blocks))
|
||||
self._print_used(
|
||||
self._human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)))
|
||||
self.scan(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))
|
||||
return "CLEAN"
|
||||
|
||||
|
@ -514,25 +526,23 @@ class PandoraBox:
|
|||
|
||||
def clean(self):
|
||||
"""Remove infected files"""
|
||||
# Clean files
|
||||
if len(self.infected_files) > 0:
|
||||
self.log(f"infeted_files={len(self.infected_files)}")
|
||||
self._log(f"infeted_files={len(self.infected_files)}")
|
||||
if not self.has_curses:
|
||||
self.display_image("BAD")
|
||||
self.wait_mouse_click()
|
||||
else:
|
||||
self.log('PRESS KEY TO CLEAN')
|
||||
self._log('PRESS KEY TO CLEAN')
|
||||
self.screen.getch()
|
||||
# Remove infected files
|
||||
for file in self.infected_files:
|
||||
try :
|
||||
os.remove(file)
|
||||
self.log(f"{file} removed")
|
||||
self._log(f"{file} removed")
|
||||
except Exception as ex :
|
||||
self.log(f"Unexpected error: {ex}")
|
||||
self._log(f"Unexpected error: {ex}")
|
||||
logging.info("An exception was thrown!", exc_info=True)
|
||||
os.system("sync")
|
||||
self.log("Clean done.")
|
||||
if not self.has_curses:
|
||||
self.display_image("OK")
|
||||
else:
|
||||
|
@ -553,15 +563,18 @@ class PandoraBox:
|
|||
|
||||
def startup(self):
|
||||
"""Start Pandora-box"""
|
||||
self.config()
|
||||
self.init_curses()
|
||||
self.init_log()
|
||||
self.move_to_script_folder()
|
||||
# read config
|
||||
self._config()
|
||||
# Initialize curesrs
|
||||
self._init_curses()
|
||||
# Initilize log
|
||||
self._init_log()
|
||||
# Read logo
|
||||
with open('pandora-box.txt', 'r') as file1:
|
||||
with open('pandora-box.txt', mode='r', encoding='utf-8') as file1:
|
||||
self.logo = file1.readlines()
|
||||
# Print logo screen
|
||||
self.print_screen()
|
||||
self._print_screen()
|
||||
# First unmount remaining device
|
||||
self.umount_device()
|
||||
return "WAIT"
|
||||
|
@ -582,7 +595,6 @@ class PandoraBox:
|
|||
case "CLEAN":
|
||||
return self.clean()
|
||||
case _:
|
||||
self.log(f"Unknwn state: {state}")
|
||||
return "STOP"
|
||||
|
||||
# --------------------------------------
|
||||
|
@ -594,10 +606,10 @@ class PandoraBox:
|
|||
while state!="STOP":
|
||||
state = self.loop(state)
|
||||
except Exception as ex :
|
||||
self.log(f"error={ex}")
|
||||
self._log(f"error={ex}")
|
||||
logging.info("An exception was thrown!", exc_info=True)
|
||||
finally:
|
||||
self.end_curses()
|
||||
self._end_curses()
|
||||
|
||||
|
||||
def main(_):
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue