1
0
Fork 0
mirror of https://github.com/dbarzin/pandora-box.git synced 2025-07-19 13:29:42 +02:00

code review

This commit is contained in:
dbarzin 2023-02-13 15:45:28 +01:00
parent 4492d6be8a
commit e3cdee01ec

View file

@ -30,15 +30,6 @@ import configparser
import shutil import shutil
from datetime import datetime from datetime import datetime
# Abstract Base Class
from abc import ABC, abstractmethod
# -----------------------------------------------------------
# States
# -----------------------------------------------------------
# ----------------------------------------------------------- # -----------------------------------------------------------
# Config variables # Config variables
# ----------------------------------------------------------- # -----------------------------------------------------------
@ -153,7 +144,7 @@ def print_size(label):
status_win.addstr(2, 1, "Size : ",curses.color_pair(2)) status_win.addstr(2, 1, "Size : ",curses.color_pair(2))
else: else:
status_win.addstr(2, 1, "Size : %s " % label,curses.color_pair(2)) status_win.addstr(2, 1, "Size : %s " % label,curses.color_pair(2))
logging.info("Size: %s" % label) logging.info("size=%s" % label)
status_win.refresh() status_win.refresh()
"""Print FS Used Size""" """Print FS Used Size"""
@ -164,7 +155,7 @@ def print_used(label):
status_win.addstr(3, 1, "Used : ",curses.color_pair(2)) status_win.addstr(3, 1, "Used : ",curses.color_pair(2))
else: else:
status_win.addstr(3, 1, "Used : %s " % label,curses.color_pair(2)) status_win.addstr(3, 1, "Used : %s " % label,curses.color_pair(2))
logging.info("Used: %s" % label) logging.info("used=%s" % label)
status_win.refresh() status_win.refresh()
def print_fstype(label): def print_fstype(label):
@ -304,7 +295,8 @@ def log(str):
# ----------------------------------------------------------- # -----------------------------------------------------------
"""Mount USB device""" """Mount USB device"""
def mount_device(device): def mount_device():
global device
log('Try to mount partition') log('Try to mount partition')
if USB_AUTO_MOUNT: if USB_AUTO_MOUNT:
found = False found = False
@ -314,7 +306,6 @@ def mount_device(device):
time.sleep(1) time.sleep(1)
for partition in psutil.disk_partitions(): for partition in psutil.disk_partitions():
if partition.device == device.device_node: if partition.device == device.device_node:
log("Partition mounted at {}".format(partition.mountpoint))
found = True found = True
loop += 1 loop += 1
if loop < 10: if loop < 10:
@ -337,7 +328,6 @@ def mount_device(device):
loop +=1 loop +=1
continue continue
break; break;
log("Partition mounted at /media/box")
return "/media/box" return "/media/box"
"""Unmount USB device""" """Unmount USB device"""
@ -349,109 +339,21 @@ def umount_device():
log("Unmount partitions") log("Unmount partitions")
res = os.system("pumount /media/box 2>/dev/null >/dev/null") res = os.system("pumount /media/box 2>/dev/null >/dev/null")
"""Main device loop"""
def device_loop():
# First unmount remaining device
umount_device()
# Loop
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by("block")
try:
for device in iter(monitor.poll, None):
if device.get("ID_FS_USAGE") == "filesystem" and device.device_node[5:7] == "sd":
if device.action == "add":
log("Device inserted")
log_device_info(device)
if not CURSES:
display_image("WORK")
else:
# display device type
print_fslabel(device.get("ID_FS_LABEL"))
print_fstype(device.get("ID_PART_TABLE_TYPE") + " " + device.get("ID_FS_TYPE"))
print_model(device.get("ID_MODEL"))
print_serial(device.get("ID_SERIAL_SHORT"))
# Mount device
mount_point = mount_device(device)
log('Partition mounted at %s' % mount_point)
if mount_point == None:
# no partition
if not CURSES:
display_image("WAIT")
continue
try:
statvfs=os.statvfs(mount_point)
except Exception as e :
log("Unexpected error: %s" % e)
logging.info("An exception was thrown!", exc_info=True)
if not CURSES:
display_image("WAIT")
continue
print_size(human_readable_size(statvfs.f_frsize * statvfs.f_blocks))
print_used(human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)))
# Scan files
log("Scan started...........")
infected_files = scan(mount_point, statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))
# Clean files
if len(infected_files) > 0:
log('%d infected files found !' % len(infected_files))
if not CURSES:
display_image("BAD")
waitMouseClick()
else:
log('PRESS KEY TO CLEAN')
screen.getch()
# Remove infected files
for file in infected_files:
try :
os.remove(file)
log('%s removed' % file)
except Exception as e :
log("Unexpected error: %s" % str(e))
logging.info("An exception was thrown!", exc_info=True)
os.system("sync")
log("Clean done.")
if not CURSES:
display_image("OK")
else:
if not CURSES:
display_image("OK")
umount_device()
if device.action == "remove":
log("Device removed")
if not CURSES:
display_image("WAIT")
else:
print_fslabel("")
print_size(None)
print_used(None)
print_fstype("")
print_model("")
print_serial("")
update_bar(0)
except Exception as e:
log("Unexpected error: %s" % str(e) )
logging.info("An exception was thrown!", exc_info=True)
finally:
log("Done.")
def log_device_info(dev): def log_device_info(dev):
logging.info("Device name: %s" % dev.get("DEVNAME")) logging.info(
logging.info("Path id: %s" % dev.get("ID_PATH")) "device_name=%s, " % dev.get("DEVNAME") +
logging.info("Bus system: %s" % dev.get("ID_BUS")) "path_id=%s, " % dev.get("ID_PATH") +
logging.info("USB driver: %s" % dev.get("ID_USB_DRIVER")) "bus system=%s, " % dev.get("ID_BUS") +
logging.info("Device type: %s" % dev.get("DEVTYPE")) "USB_driver=%s, " % dev.get("ID_USB_DRIVER") +
logging.info("Device usage: %s" % dev.get("ID_FS_USAGE")) "device_type=%s, " % dev.get("DEVTYPE") +
logging.info("Partition type: %s" % dev.get("ID_PART_TABLE_TYPE")) "device_usage=%s, " % dev.get("ID_FS_USAGE") +
logging.info("FS type: %s" % dev.get("ID_FS_TYPE")) "partition type=%s, " % dev.get("ID_PART_TABLE_TYPE") +
logging.info("Partition label: %s" % dev.get("ID_FS_LABEL")) "fs_type=%s, " % dev.get("ID_FS_TYPE") +
logging.info("Device model: %s" % dev.get("ID_MODEL")) "partition_label: %s, " % dev.get("ID_FS_LABEL") +
logging.info('Model: %s' % dev.get("ID_MODEL_ID")) "device_model=%s, " % dev.get("ID_MODEL") +
logging.info('Serial short: %s' % dev.get("ID_SERIAL_SHORT")) 'model_id=%s, ' % dev.get("ID_MODEL_ID") +
logging.info('Serial: %s' % dev.get("ID_SERIAL")) 'serial_short=%s, ' % dev.get("ID_SERIAL_SHORT") +
'serial=%s' % dev.get("ID_SERIAL"))
# ----------------------------------------------------------- # -----------------------------------------------------------
# pandora # pandora
@ -459,7 +361,7 @@ def log_device_info(dev):
"""Scan a mount point with Pandora""" """Scan a mount point with Pandora"""
def scan(mount_point, used): def scan(mount_point, used):
global infected_filed global device, infected_filed
infected_files = [] infected_files = []
scanned = 0 scanned = 0
file_count = 0 file_count = 0
@ -495,7 +397,7 @@ def scan(mount_point, used):
time.sleep(0.5) time.sleep(0.5)
loop += 1 loop += 1
file_scan_end_time = time.time() file_scan_end_time = time.time()
log("Scan %s [%s] -> %s (%ds)" % ( log("file=%s, size=%s, status=%s, duration=%ds" % (
file, file,
human_readable_size(file_size), human_readable_size(file_size),
status, status,
@ -517,12 +419,125 @@ def scan(mount_point, used):
display_image("ERROR") display_image("ERROR")
raise raise
update_bar(100) update_bar(100)
log("Scan done in %ds, %d files scanned, %d files infected" % log("duration=%ds, files_scanned=%d, files_infected=%d" %
((time.time() - scan_start_time),file_count,len(infected_files))) ((time.time() - scan_start_time),file_count,len(infected_files)))
return infected_files return infected_files
# -------------------------------------- # --------------------------------------
def wait_device():
global device
# Loop
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by("block")
try:
for device in iter(monitor.poll, None):
if device.get("ID_FS_USAGE") == "filesystem" and device.device_node[5:7] == "sd":
if device.action == "add":
log("Device inserted")
log_device_info(device)
if not CURSES:
display_image("WORK")
else:
# display device type
print_fslabel(device.get("ID_FS_LABEL"))
print_fstype(device.get("ID_PART_TABLE_TYPE") + " " + device.get("ID_FS_TYPE"))
print_model(device.get("ID_MODEL"))
print_serial(device.get("ID_SERIAL_SHORT"))
return "INSERTED"
if device.action == "remove":
log("Device removed")
if not CURSES:
display_image("WAIT")
else:
print_fslabel("")
print_size(None)
print_used(None)
print_fstype("")
print_model("")
print_serial("")
update_bar(0)
return "WAIT"
except Exception as e:
log("Unexpected error: %s" % str(e) )
logging.info("An exception was thrown!", exc_info=True)
finally:
log("Done.")
return "STOP"
# --------------------------------------
def mount():
global device, mount_point
# Mount device
mount_point = mount_device()
log('Partition mounted at %s' % mount_point)
if mount_point == None:
# no partition
if not CURSES:
display_image("WAIT")
return "WAIT"
try:
statvfs=os.statvfs(mount_point)
except Exception as e :
log("error=%s" % e)
logging.info("An exception was thrown!", exc_info=True)
if not CURSES:
display_image("WAIT")
return "WAIT"
return "SCAN"
# --------------------------------------
def scan_device():
global mount_point, infected_files
try:
statvfs=os.statvfs(mount_point)
except Exception as e :
log("error=%s" % e)
logging.info("An exception was thrown!", exc_info=True)
if not CURSES:
display_image("WAIT")
return "WAIT"
print_size(human_readable_size(statvfs.f_frsize * statvfs.f_blocks))
print_used(human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)))
infected_files = scan(mount_point, statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))
return "CLEAN"
# --------------------------------------
def clean():
global infected_files
# Clean files
if len(infected_files) > 0:
log('%d infected files found !' % len(infected_files))
if not CURSES:
display_image("BAD")
waitMouseClick()
else:
log('PRESS KEY TO CLEAN')
screen.getch()
# Remove infected files
for file in infected_files:
try :
os.remove(file)
log('%s removed' % file)
except Exception as e :
log("Unexpected error: %s" % str(e))
logging.info("An exception was thrown!", exc_info=True)
os.system("sync")
log("Clean done.")
if not CURSES:
display_image("OK")
else:
if not CURSES:
display_image("OK")
umount_device()
return "WAIT"
# --------------------------------------
def moveToScriptFolder(): def moveToScriptFolder():
abspath = os.path.abspath(__file__) abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath) dname = os.path.dirname(abspath)
@ -530,27 +545,48 @@ def moveToScriptFolder():
# -------------------------------------- # --------------------------------------
def startup():
moveToScriptFolder()
init_log()
config()
init_curses()
print_screen()
# First unmount remaining device
umount_device()
return "WAIT"
# --------------------------------------
def loop(state):
match state:
case "START":
return startup()
case "WAIT":
return wait_device()
case "INSERTED":
return mount()
case "SCAN":
return scan_device()
case "CLEAN":
return clean()
case _:
print("Unknwn state "+state)
return "STOP"
# --------------------------------------
"""Main entry point""" """Main entry point"""
def main(stdscr): def main(stdscr):
try : try :
moveToScriptFolder() state="START"
init_log() while (state!="STOP"):
config() state = loop(state)
init_curses()
print_screen()
while True:
device_loop()
except Exception as e : except Exception as e :
log("Unexpected error: %s" % e) log("error=%s" % e)
logging.info("An exception was thrown!", exc_info=True) logging.info("An exception was thrown!", exc_info=True)
finally: finally:
end_curses() end_curses()
# --------------------------------------
# TODO: google wrapper main functionnal programming
if __name__ == "__main__": if __name__ == "__main__":
wrapper(main) wrapper(main)