1
0
Fork 0
mirror of https://github.com/dbarzin/pandora-box.git synced 2025-07-23 07:19:42 +02:00

code quality

This commit is contained in:
dbarzin 2023-02-15 13:22:59 +01:00
parent 6df98a8910
commit c9f692f3c9

View file

@ -17,6 +17,8 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>. # along with this program. If not, see <http://www.gnu.org/licenses/>.
# #
"""The Pandora-Box Module."""
import os import os
import time import time
import logging import logging
@ -31,39 +33,62 @@ import psutil
import pypandora import pypandora
class PandoraBox:
"""The PandoraBox class"""
# ----------------------------------------------------------- # -----------------------------------------------------------
# Config variables # Config variables
# ----------------------------------------------------------- # -----------------------------------------------------------
is_fake_scan = None
has_usb_auto_mount = None
pandora_root_url = None
has_quarantine = None
quarantine_folder = None
has_curses = None
USB_AUTO_MOUNT = False # -----------------------------------------------------------
PANDORA_ROOT_URL = "http://127.0.0.1:6100"
FAKE_SCAN = False
QUARANTINE = False
CURSES = True
""" read configuration file """
def config():
global USB_AUTO_MOUNT, PANDORA_ROOT_URL
global FAKE_SCAN, QUARANTINE, QUARANTINE_FOLDER
global CURSES
# intantiate a ConfirParser
config = configparser.ConfigParser()
# read the config file
config.read('pandora-box.ini')
# set values
FAKE_SCAN=config['DEFAULT']['FAKE_SCAN'].lower()=="true"
USB_AUTO_MOUNT=config['DEFAULT']['USB_AUTO_MOUNT'].lower()=="true"
PANDORA_ROOT_URL=config['DEFAULT']['PANDORA_ROOT_URL']
# Quarantine
QUARANTINE = config['DEFAULT']['QUARANTINE'].lower()=="true"
QUARANTINE_FOLDER = config['DEFAULT']['QUARANTINE_FOLDER']
# Curses # Curses
CURSES = config['DEFAULT']['CURSES'].lower()=="true" # -----------------------------------------------------------
screen = None
status_win = None
progress_win = None
title_win = None
log_win = None
# Pandora logo
logo = None
# -----------------------------------------------------------
# Curses
# -----------------------------------------------------------
device = None
mount_point = None
infected_files = None
# ---------------------------------------------------------- # ----------------------------------------------------------
def config(self):
""" read configuration file """
# intantiate a ConfirParser
config_parser = configparser.ConfigParser()
# read the config file
config_parser.read('pandora-box.ini')
# set values
self.is_fake_scan=config_parser['DEFAULT']['FAKE_SCAN'].lower()=="true"
self.has_usb_auto_mount=config_parser['DEFAULT']['USB_AUTO_MOUNT'].lower()=="true"
self.pandora_root_url=config_parser['DEFAULT']['PANDORA_ROOT_URL']
# Quarantine
self.has_quarantine = config_parser['DEFAULT']['QUARANTINE'].lower()=="true"
self.quarantine_folder = config_parser['DEFAULT']['QUARANTINE_FOLDER']
# Curses
self.has_curses = config_parser['DEFAULT']['CURSES'].lower()=="true"
# ----------------------------------------------------------
def human_readable_size(self,size, decimal_places=1):
""" Convert size to human readble string """ """ Convert size to human readble string """
def human_readable_size(size, decimal_places=1):
for unit in ['B','KB','MB','GB','TB']: for unit in ['B','KB','MB','GB','TB']:
if size < 1024.0: if size < 1024.0:
break break
@ -75,8 +100,9 @@ def human_readable_size(size, decimal_places=1):
# Image Screen # Image Screen
# ----------------------------------------------------------- # -----------------------------------------------------------
def display_image(status): def display_image(self, status):
if not CURSES: """ Display image on screen """
if not self.has_curses:
if status=="WAIT": if status=="WAIT":
image = "images/key*.png" image = "images/key*.png"
elif status=="WORK": elif status=="WORK":
@ -94,18 +120,18 @@ def display_image(status):
# display image # display image
if "*" in image: if "*" in image:
# slide show # slide show
os.system("fim -qa -c 'while(1){display;sleep 1;next;}' %s "\ os.system(f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image} "\
"</dev/null 2>/dev/null >/dev/null &" "</dev/null 2>/dev/null >/dev/null &")
% image)
else : else :
# only one image # only one image
os.system("fim -qa %s </dev/null 2>/dev/null >/dev/null &" % image) os.system(f"fim -qa %s </dev/null 2>/dev/null >/dev/null {image}")
# ----------------------------------------------------------- # -----------------------------------------------------------
def waitMouseClick(): def wait_mouse_click(self):
mouse = open( "/dev/input/mice", "rb" ) """ Wait for mouse click event """
with open("/dev/input/mice", "rb" ) as mouse:
down = False down = False
while True: while True:
buf = mouse.read(3) buf = mouse.read(3)
@ -113,147 +139,124 @@ def waitMouseClick():
down = True down = True
if ((buf[0] & 0x1)==0) and down: if ((buf[0] & 0x1)==0) and down:
break break
mouse.close()
# ----------------------------------------------------------- # -----------------------------------------------------------
# CURSES Screen # has_curses Screen
# ----------------------------------------------------------- # -----------------------------------------------------------
def init_curses(self):
"""Initialise curses""" """Initialise curses"""
def init_curses(): if self.has_curses:
global screen self.screen = curses.initscr()
if CURSES: self.screen.keypad(1)
screen = curses.initscr()
screen.keypad(1)
curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION) curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION)
curses.flushinp() curses.flushinp()
curses.noecho() curses.noecho()
curses.curs_set(0) curses.curs_set(0)
else: else:
display_image("WAIT") self.display_image("WAIT")
def print_fslabel(self, label):
"""Print FS Label""" """Print FS Label"""
def print_fslabel(label): if self.has_curses:
global status_win self.status_win.addstr(1, 1, "Partition : %-32s" % label, curses.color_pair(2))
if CURSES: self.status_win.refresh()
status_win.addstr(1, 1, "Partition : %-32s" % label, curses.color_pair(2))
status_win.refresh()
def print_size(self, label):
"""Print FS Size""" """Print FS Size"""
def print_size(label): if self.has_curses:
global status_win
if CURSES:
if label == None: if label == None:
status_win.addstr(2, 1, "Size : ",curses.color_pair(2)) self.status_win.addstr(2, 1, "Size : ",curses.color_pair(2))
else: else:
status_win.addstr(2, 1, "Size : %s " % label,curses.color_pair(2)) self.status_win.addstr(2, 1, "Size : %s " % label,curses.color_pair(2))
logging.info("size=%s" % label) logging.info("size={label}")
status_win.refresh() self.status_win.refresh()
def print_used(self, label):
"""Print FS Used Size""" """Print FS Used Size"""
def print_used(label): if self.has_curses:
global status_win
if CURSES:
if label == None: if label == None:
status_win.addstr(3, 1, "Used : ",curses.color_pair(2)) self.status_win.addstr(3, 1, "Used : ",curses.color_pair(2))
else: else:
status_win.addstr(3, 1, "Used : %s " % label,curses.color_pair(2)) self.status_win.addstr(3, 1, "Used : %s " % label,curses.color_pair(2))
logging.info("used=%s" % label) logging.info("used=%s" % label)
status_win.refresh() self.status_win.refresh()
def print_fstype(label): def print_fstype(self, label):
global status_win """Print device FS type"""
if CURSES: if self.has_curses:
status_win.addstr(1, 50, "Part / Type : %-32s" % label, curses.color_pair(2)) self.status_win.addstr(1, 50, "Part / Type : %-32s" % label, curses.color_pair(2))
status_win.refresh() self.status_win.refresh()
def print_model(label): def print_model(self, label):
global status_win """Print device model"""
if CURSES: if self.has_curses:
status_win.addstr(2, 50, "Model : %-32s" % label, curses.color_pair(2)) self.status_win.addstr(2, 50, "Model : %-32s" % label, curses.color_pair(2))
status_win.refresh() self.status_win.refresh()
def print_serial(label): def print_serial(self, label):
global status_win """Print device serail number"""
if CURSES: if self.has_curses:
status_win.addstr(3, 50, "Serial : %-32s" % label, curses.color_pair(2)) self.status_win.addstr(3, 50, "Serial : %-32s" % label, curses.color_pair(2))
status_win.refresh() self.status_win.refresh()
def init_bar(self):
"""Initialise progress bar""" """Initialise progress bar"""
def init_bar(): if self.has_curses:
global progress_win self.progress_win = curses.newwin(3, curses.COLS-12, 17, 5)
if CURSES: self.progress_win.border(0)
progress_win = curses.newwin(3, curses.COLS-12, 17, 5) self.progress_win.refresh()
progress_win.border(0)
progress_win.refresh()
def update_bar(self, progress):
"""Update progress bar""" """Update progress bar"""
def update_bar(progress): if self.has_curses:
global progress_win
if CURSES:
if progress == 0: if progress == 0:
progress_win.clear() self.progress_win.clear()
progress_win.border(0) self.progress_win.border(0)
time.sleep(0) time.sleep(0)
progress_win.addstr(0, 1, "Progress:") self.progress_win.addstr(0, 1, "Progress:")
else: else:
pos = ((curses.COLS-14) * progress) // 100 pos = ((curses.COLS-14) * progress) // 100
progress_win.addstr(1, 1, "#"*pos) self.progress_win.addstr(1, 1, "#"*pos)
progress_win.addstr(0, 1, "Progress: %d%%" % progress) self.progress_win.addstr(0, 1, "Progress: %d%%" % progress)
progress_win.refresh() self.progress_win.refresh()
"""Splash screen"""
s = [None] * 10
s[0] = " ██▓███ ▄▄▄ ███▄ █ ▓█████▄ ▒█████ ██▀███ ▄▄▄ ▄▄▄▄ ▒█████ ▒██ ██▒"
s[1] = " ▓██░ ██▒▒████▄ ██ ▀█ █ ▒██▀ ██▌▒██▒ ██▒▓██ ▒ ██▒▒████▄ ▓█████▄ ▒██▒ ██▒▒▒ █ █ ▒░"
s[2] = " ▓██░ ██▓▒▒██ ▀█▄ ▓██ ▀█ ██▒░██ █▌▒██░ ██▒▓██ ░▄█ ▒▒██ ▀█▄ ▒██▒ ▄██▒██░ ██▒░░ █ ░"
s[3] = " ▒██▄█▓▒ ▒░██▄▄▄▄██ ▓██▒ ▐▌██▒░▓█▄ ▌▒██ ██░▒██▀▀█▄ ░██▄▄▄▄██ ▒██░█▀ ▒██ ██░ ░ █ █ ▒ "
s[4] = " ▒██▒ ░ ░ ▓█ ▓██▒▒██░ ▓██░░▒████▓ ░ ████▓▒░░██▓ ▒██▒ ▓█ ▓██▒ ░▓█ ▀█▓░ ████▓▒░▒██▒ ▒██▒"
s[5] = " ▒▓▒░ ░ ░ ▒▒ ▓▒█░░ ▒░ ▒ ▒ ▒▒▓ ▒ ░ ▒░▒░▒░ ░ ▒▓ ░▒▓░ ▒▒ ▓▒█░ ░▒▓███▀▒░ ▒░▒░▒░ ▒▒ ░ ░▓ ░"
s[6] = " ░▒ ░ ▒ ▒▒ ░░ ░░ ░ ▒░ ░ ▒ ▒ ░ ▒ ▒░ ░▒ ░ ▒░ ▒ ▒▒ ░ ▒░▒ ░ ░ ▒ ▒░ ░░ ░▒ ░"
s[7] = " ░░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ░░ ░ ░ ▒ ░ ░ ░ ░ ░ ▒ ░ ░ "
s[8] = " ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ "
s[9] = " ░ ░ "
#curses.LINES, curses.COLS
def print_screen(self):
"""Print main screen""" """Print main screen"""
def print_screen(): if self.has_curses:
global status_win
if CURSES:
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_BLACK) curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_GREEN, curses.COLOR_BLACK) curses.init_pair(3, curses.COLOR_GREEN, curses.COLOR_BLACK)
title_win = curses.newwin(12, curses.COLS, 0, 0) self.title_win = curses.newwin(12, curses.COLS, 0, 0)
# title_win.border(0) # title_win.border(0)
title_col = (curses.COLS - len(s[0]))//2 title_col = (curses.COLS - len(self.logo[0]))//2
title_win.addstr(1, title_col, s[0], curses.color_pair(1)) self.title_win.addstr(1, title_col, self.logo[0], curses.color_pair(1))
title_win.addstr(2, title_col, s[1], curses.color_pair(1)) self.title_win.addstr(2, title_col, self.logo[1], curses.color_pair(1))
title_win.addstr(3, title_col, s[2], curses.color_pair(1)) self.title_win.addstr(3, title_col, self.logo[2], curses.color_pair(1))
title_win.addstr(4, title_col, s[3], curses.color_pair(1)) self.title_win.addstr(4, title_col, self.logo[3], curses.color_pair(1))
title_win.addstr(5, title_col, s[4], curses.color_pair(1)) self.title_win.addstr(5, title_col, self.logo[4], curses.color_pair(1))
title_win.addstr(6, title_col, s[5], curses.color_pair(1)) self.title_win.addstr(6, title_col, self.logo[5], curses.color_pair(1))
title_win.addstr(7, title_col, s[6], curses.color_pair(1)) self.title_win.addstr(7, title_col, self.logo[6], curses.color_pair(1))
title_win.addstr(8, title_col, s[7], curses.color_pair(1)) self.title_win.addstr(8, title_col, self.logo[7], curses.color_pair(1))
title_win.addstr(9, title_col, s[8], curses.color_pair(1)) self.title_win.addstr(9, title_col, self.logo[8], curses.color_pair(1))
title_win.addstr(10, title_col, s[9], curses.color_pair(1)) self.title_win.addstr(10, title_col, self.logo[9], curses.color_pair(1))
title_win.refresh() self.title_win.refresh()
status_win = curses.newwin(5, curses.COLS, 12, 0) self.status_win = curses.newwin(5, curses.COLS, 12, 0)
status_win.border(0) self.status_win.border(0)
status_win.addstr(0, 1, "USB Key Information") self.status_win.addstr(0, 1, "USB Key Information")
print_fslabel("") self.print_fslabel("")
print_size(None) self.print_size(None)
print_used(None) self.print_used(None)
print_fstype("") self.print_fstype("")
print_model("") self.print_model("")
print_serial("") self.print_serial("")
init_bar() self.init_bar()
update_bar(0) self.update_bar(0)
log('Ready.') self.log('Ready.')
def end_curses(self):
"""Closes curses""" """Closes curses"""
def end_curses(): if self.has_curses:
if CURSES:
curses.endwin() curses.endwin()
curses.flushinp() curses.flushinp()
else: else:
@ -264,11 +267,11 @@ def end_curses():
# Logging windows # Logging windows
# ----------------------------------------------------------- # -----------------------------------------------------------
def init_log(): def init_log(self):
global log_win, logging """Inititalize logging function"""
if CURSES: if self.has_curses:
log_win = curses.newwin(curses.LINES-20, curses.COLS, 20, 0) self.log_win = curses.newwin(curses.LINES-20, curses.COLS, 20, 0)
log_win.border(0) self.log_win.border(0)
logging.basicConfig( logging.basicConfig(
filename='pandora-box.log', filename='pandora-box.log',
level=logging.INFO, level=logging.INFO,
@ -277,118 +280,113 @@ def init_log():
) )
logs = [] logs = []
def log(str): def log(self, str):
global log_win, logging """log something"""
logging.info(str) logging.info(str)
if CURSES: if self.has_curses:
# display log on screen # display log on screen
logs.append(str) self.logs.append(str)
if len(logs)>(curses.LINES-22): if len(self.logs)>(curses.LINES-22):
logs.pop(0) self.logs.pop(0)
log_win.clear() self.log_win.clear()
log_win.border(0) self.log_win.border(0)
for i in range(min(curses.LINES-22,len(logs))): for i in range(min(curses.LINES-22,len(self.logs))):
log_win.addstr(i+1,1,logs[i][:curses.COLS-2],curses.color_pair(3)) self.log_win.addstr(i+1,1,self.logs[i][:curses.COLS-2],curses.color_pair(3))
log_win.refresh() self.log_win.refresh()
# else:
# print(str,end="\n\r")
# ----------------------------------------------------------- # -----------------------------------------------------------
# Device # Device
# ----------------------------------------------------------- # -----------------------------------------------------------
def mount_device(self):
"""Mount USB device""" """Mount USB device"""
def mount_device(): self.log('Try to mount partition')
global device if self.has_usb_auto_mount:
log('Try to mount partition')
if USB_AUTO_MOUNT:
found = False found = False
loop = 0 loop = 0
while (not found) and (loop < 15): while (not found) and (loop < 15):
# need to sleep before devide is mounted # need to sleep before devide is mounted
time.sleep(1) time.sleep(1)
for partition in psutil.disk_partitions(): for partition in psutil.disk_partitions():
if partition.device == device.device_node: if partition.device == self.device.device_node:
found = True found = True
loop += 1 loop += 1
if found: if found:
return partition.mountpoint return partition.mountpoint
else: self.log('No partition mounted')
log('No partition mounted')
return None return None
else: else:
if not os.path.exists("/media/box"): if not os.path.exists("/media/box"):
log("folder /media/box does not exists") self.log("folder /media/box does not exists")
return None return None
res = os.system("pmount " + device.device_node + " /media/box >/dev/null 2>/dev/null") os.system(f"pmount {self.device.device_node} /media/box >/dev/null 2>/dev/null")
found = False
loop = 0 loop = 0
while (not found) and (loop < 10): while loop < 10:
time.sleep(1) time.sleep(1)
try: try:
statvfs=os.statvfs(mount_point) os.statvfs(self.mount_point)
except Exception as e : except Exception as e :
loop +=1 loop +=1
continue continue
break break
return "/media/box" return "/media/box"
def umount_device(self):
"""Unmount USB device""" """Unmount USB device"""
def umount_device(): if self.has_usb_auto_mount:
if USB_AUTO_MOUNT: self.log("Sync partitions")
log("Sync partitions") os.system("sync")
res = os.system("sync")
else: else:
log("Unmount partitions") self.log("Unmount partitions")
res = os.system("pumount /media/box 2>/dev/null >/dev/null") os.system("pumount /media/box 2>/dev/null >/dev/null")
def log_device_info(dev): def log_device_info(self, dev):
"""Log device information"""
logging.info( logging.info(
"device_name=%s, " % dev.get("DEVNAME") + f'device_name={dev.get("DEVNAME")}, ' \
"path_id=%s, " % dev.get("ID_PATH") + f'path_id={dev.get("ID_PATH")}, ' \
"bus system=%s, " % dev.get("ID_BUS") + f'bus system={dev.get("ID_BUS")}, ' \
"USB_driver=%s, " % dev.get("ID_USB_DRIVER") + f'USB_driver={dev.get("ID_USB_DRIVER")}, ' \
"device_type=%s, " % dev.get("DEVTYPE") + f'device_type={dev.get("DEVTYPE")}, ' \
"device_usage=%s, " % dev.get("ID_FS_USAGE") + f'device_usage={dev.get("ID_FS_USAGE")}, ' \
"partition type=%s, " % dev.get("ID_PART_TABLE_TYPE") + f'partition type={dev.get("ID_PART_TABLE_TYPE")}, ' \
"fs_type=%s, " % dev.get("ID_FS_TYPE") + f'fs_type={dev.get("ID_FS_TYPE")}, ' \
"partition_label: %s, " % dev.get("ID_FS_LABEL") + f'partition_label={dev.get("ID_FS_LABEL")}, ' \
"device_model=%s, " % dev.get("ID_MODEL") + f'device_model={dev.get("ID_MODEL")}, ' \
'model_id=%s, ' % dev.get("ID_MODEL_ID") + f'model_id={dev.get("ID_MODEL_ID")}, ' \
'serial_short=%s, ' % dev.get("ID_SERIAL_SHORT") + f'serial_short={dev.get("ID_SERIAL_SHORT")}, '\
'serial=%s' % dev.get("ID_SERIAL")) f'serial={dev.get("ID_SERIAL")}')
# ----------------------------------------------------------- # -----------------------------------------------------------
# pandora # pandora
# ----------------------------------------------------------- # -----------------------------------------------------------
def scan(self, used):
"""Scan a mount point with Pandora""" """Scan a mount point with Pandora"""
def scan(mount_point, used): self.infected_files = []
global device, infected_filed
infected_files = []
scanned = 0 scanned = 0
file_count = 0 file_count = 0
scan_start_time = time.time() scan_start_time = time.time()
if QUARANTINE: if self.has_quarantine:
quanrantine_folder = os.path.join(QUARANTINE_FOLDER,datetime.now().strftime("%y%m%d-%H%M")) qfolder = os.path.join(self.quarantine_folder,datetime.now().strftime("%y%m%d-%H%M"))
if not FAKE_SCAN: if not self.is_fake_scan:
pandora = pypandora.PyPandora(root_url=PANDORA_ROOT_URL) pandora = pypandora.PyPandora(root_url=pandora_root_url)
try: try:
for root, dirs, files in os.walk(mount_point): for root, dirs, files in os.walk(self.mount_point):
for file in files: for file in files:
status = None status = None
full_path = os.path.join(root,file) full_path = os.path.join(root,file)
file_size = os.path.getsize(full_path) file_size = os.path.getsize(full_path)
# log("Check %s [%s]" % (file, human_readable_size(file_size))) # log("Check %s [%s]" % (file, human_readable_size(file_size)))
file_scan_start_time = time.time() file_scan_start_time = time.time()
if FAKE_SCAN : if self.is_fake_scan :
time.sleep(0.1) time.sleep(0.1)
status = "SKIPPED" status = "SKIPPED"
else: else:
if file_size > (1024*1024*1024): if file_size > (1024*1024*1024):
status = "TOO BIG" status = "TOO BIG"
else: else:
log("ppypandora : [%s] " % full_path) self.log("ppypandora : [%s] " % full_path)
res = pandora.submit_from_disk(full_path) res = pandora.submit_from_disk(full_path)
time.sleep(0.1) time.sleep(0.1)
loop = 0 loop = 0
@ -400,196 +398,211 @@ def scan(mount_point, used):
time.sleep(0.5) time.sleep(0.5)
loop += 1 loop += 1
file_scan_end_time = time.time() file_scan_end_time = time.time()
log( self.log(
f"file = { file } , "\ f'file="{file}" , '\
f"size={human_readable_size(file_size)}, "\ f'size="{self.human_readable_size(file_size)}", '\
f"status={status}, "\ f'status="{status}"", '\
f"duration={int(file_scan_end_time - file_scan_start_time)}") f'duration="{int(file_scan_end_time - file_scan_start_time)}"')
scanned += os.path.getsize(full_path) scanned += os.path.getsize(full_path)
file_count += 1 file_count += 1
update_bar(scanned * 100 // used) self.update_bar(scanned * 100 // used)
if status == "ALERT": if status == "ALERT":
infected_files.append(full_path) self.infected_files.append(full_path)
if QUARANTINE: if self.has_quarantine:
if not os.path.isdir(quanrantine_folder) : if not os.path.isdir(qfolder) :
os.mkdir(quanrantine_folder) os.mkdir(qfolder)
shutil.copyfile(full_path, os.path.join(quanrantine_folder,file)) shutil.copyfile(full_path, os.path.join(qfolder,file))
except Exception as e : except Exception as e :
log(f"Unexpected error: {e}") self.log(f"Unexpected error: {e}")
log("Scan failed !") self.log("Scan failed !")
if not CURSES: if not self.has_curses:
display_image("ERROR") self.display_image("ERROR")
raise raise
update_bar(100) self.update_bar(100)
log("duration=%ds, files_scanned=%d, files_infected=%d" % self.log(
((time.time() - scan_start_time),file_count,len(infected_files))) f'duration="{int(time.time() - scan_start_time)}s", '\
return infected_files f'files_scanned="{file_count}", '\
f'files_infected="{len(self.infected_files)}"')
return self.infected_files
# -------------------------------------- # --------------------------------------
def wait_device(): def wait_device(self):
global device """Wait for insert of remove of USB device"""
# Loop # Loop
context = pyudev.Context() context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context) monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by("block") monitor.filter_by("block")
try: try:
for device in iter(monitor.poll, None): for dev in iter(monitor.poll, None):
if device.get("ID_FS_USAGE") == "filesystem" and device.device_node[5:7] == "sd": if dev.get("ID_FS_USAGE") == "filesystem" and dev.device_node[5:7] == "sd":
if device.action == "add": if dev.action == "add":
log("Device inserted") self.device = dev
log_device_info(device) self.log("Device inserted")
if not CURSES: self.log_device_info(self.device)
display_image("WORK") if not self.has_curses:
self.display_image("WORK")
else: else:
# display device type # display device type
print_fslabel(device.get("ID_FS_LABEL")) self.print_fslabel(self.device.get("ID_FS_LABEL"))
print_fstype(device.get("ID_PART_TABLE_TYPE") + " " + device.get("ID_FS_TYPE")) self.print_fstype(self.device.get("ID_PART_TABLE_TYPE")
print_model(device.get("ID_MODEL")) + " " + self.device.get("ID_FS_TYPE"))
print_serial(device.get("ID_SERIAL_SHORT")) self.print_model(self.device.get("ID_MODEL"))
self.print_serial(self.device.get("ID_SERIAL_SHORT"))
return "INSERTED" return "INSERTED"
if device.action == "remove": if dev.action == "remove":
log("Device removed") self.device = None
if not CURSES: self.log("Device removed")
display_image("WAIT") if not self.has_curses:
self.display_image("WAIT")
else: else:
print_fslabel("") self.print_fslabel("")
print_size(None) self.print_size(None)
print_used(None) self.print_used(None)
print_fstype("") self.print_fstype("")
print_model("") self.print_model("")
print_serial("") self.print_serial("")
update_bar(0) self.update_bar(0)
return "WAIT" return "WAIT"
except Exception as e: except Exception as e:
log("Unexpected error: %s" % str(e) ) self.log(f"Unexpected error: {str(e)}")
logging.info("An exception was thrown!", exc_info=True) logging.info("An exception was thrown!", exc_info=True)
finally: finally:
log("Done.") self.log("Done.")
return "STOP" return "STOP"
# -------------------------------------- # --------------------------------------
def mount(): def mount(self):
global mount_point
# Mount device # Mount device
mount_point = mount_device() self.mount_point = self.mount_device()
log('Partition mounted at %s' % mount_point) self.log(f'Partition mounted at {self.mount_point}')
if mount_point is None: if self.mount_point is None:
# no partition # no partition
if not CURSES: if not self.has_curses:
display_image("WAIT") self.display_image("WAIT")
return "WAIT" return "WAIT"
try: try:
os.statvfs(mount_point) os.statvfs(self.mount_point)
except Exception as e : except Exception as e :
log(f"error={e}") self.log(f"error={e}")
logging.info("An exception was thrown!", exc_info=True) logging.info("An exception was thrown!", exc_info=True)
if not CURSES: if not self.has_curses:
display_image("WAIT") self.display_image("WAIT")
return "WAIT" return "WAIT"
return "SCAN" return "SCAN"
# -------------------------------------- # --------------------------------------
def scan_device(): def scan_device(self):
global infected_files """Scan devce with pypandora"""
try: try:
statvfs=os.statvfs(mount_point) statvfs=os.statvfs(self.mount_point)
except Exception as e : except Exception as e :
log(f"error={e}") self.log(f"error={e}")
logging.info("An exception was thrown!", exc_info=True) logging.info("An exception was thrown!", exc_info=True)
if not CURSES: if not self.has_curses:
display_image("WAIT") self.display_image("WAIT")
return "WAIT" return "WAIT"
print_size(human_readable_size(statvfs.f_frsize * statvfs.f_blocks)) self.print_size(self.human_readable_size(statvfs.f_frsize * statvfs.f_blocks))
print_used(human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))) self.print_used(
infected_files = scan(mount_point, statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)) self.human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)))
self.scan(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))
return "CLEAN" return "CLEAN"
# -------------------------------------- # --------------------------------------
def clean(): def clean(self):
global infected_files """Remove infected files"""
# Clean files # Clean files
if len(infected_files) > 0: if len(self.infected_files) > 0:
log(f"infeted_files={len(infected_files)}") self.log(f"infeted_files={len(infected_files)}")
if not CURSES: if not self.has_curses:
display_image("BAD") self.display_image("BAD")
waitMouseClick() self.wait_mouse_click()
else: else:
log('PRESS KEY TO CLEAN') self.log('PRESS KEY TO CLEAN')
screen.getch() self.screen.getch()
# Remove infected files # Remove infected files
for file in infected_files: for file in self.infected_files:
try : try :
os.remove(file) os.remove(file)
log(f"{file} removed") self.log(f"{file} removed")
except Exception as e : except Exception as e :
log(f"Unexpected error: {e}") self.log(f"Unexpected error: {e}")
logging.info("An exception was thrown!", exc_info=True) logging.info("An exception was thrown!", exc_info=True)
os.system("sync") os.system("sync")
log("Clean done.") log("Clean done.")
if not CURSES: if not self.has_curses:
display_image("OK") self.display_image("OK")
else: else:
if not CURSES: if not self.has_curses:
display_image("OK") self.display_image("OK")
umount_device() self.umount_device()
return "WAIT" return "WAIT"
# -------------------------------------- # --------------------------------------
def moveToScriptFolder(): def moveToScriptFolder(self):
abspath = os.path.abspath(__file__) abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath) dname = os.path.dirname(abspath)
os.chdir(dname) os.chdir(dname)
# -------------------------------------- # --------------------------------------
def startup(): def startup(self):
moveToScriptFolder() """Start Pandora-box"""
init_log() self.config()
config() self.init_curses()
init_curses() self.init_log()
print_screen() self.moveToScriptFolder()
# Read logo
with open('pandora-box.txt', 'r') as file1:
self.logo = file1.readlines()
# Print logo screen
self.print_screen()
# First unmount remaining device # First unmount remaining device
umount_device() self.umount_device()
return "WAIT" return "WAIT"
# -------------------------------------- # --------------------------------------
def loop(state): def loop(self, state):
"""Main event loop"""
match state: match state:
case "START": case "START":
return startup() return self.startup()
case "WAIT": case "WAIT":
return wait_device() return self.wait_device()
case "INSERTED": case "INSERTED":
return mount() return self.mount()
case "SCAN": case "SCAN":
return scan_device() return self.scan_device()
case "CLEAN": case "CLEAN":
return clean() return self.clean()
case _: case _:
print("Unknwn state "+state) self.log(f"Unknwn state: {state}")
return "STOP" return "STOP"
# -------------------------------------- # --------------------------------------
def main(self):
"""Main entry point""" """Main entry point"""
def main(unused):
try : try :
state="START" state="START"
while state!="STOP": while state!="STOP":
state = loop(state) state = self.loop(state)
except Exception as e : except Exception as ex :
log("error=%s" % e) self.log(f"error={ex}")
logging.info("An exception was thrown!", exc_info=True) logging.info("An exception was thrown!", exc_info=True)
finally: finally:
end_curses() self.end_curses()
def main(unused):
pandora_box = PandoraBox()
pandora_box.main()
if __name__ == "__main__": if __name__ == "__main__":
wrapper(main) wrapper(main)