1
0
Fork 0
mirror of https://github.com/dbarzin/pandora-box.git synced 2025-07-23 15:29:43 +02:00

fix flake8

This commit is contained in:
Didier 2024-01-19 07:30:25 +01:00
parent bcedaea36e
commit 4a9a35c3b2
2 changed files with 77 additions and 93 deletions

View file

@ -19,4 +19,3 @@ jobs:
max-line-length: "128" max-line-length: "128"
path: "." path: "."
plugins: "flake8-bugbear" plugins: "flake8-bugbear"

View file

@ -79,6 +79,7 @@ infected_files = None
class scanThread(threading.Thread): class scanThread(threading.Thread):
"""Scanning thread""" """Scanning thread"""
def __init__(self): def __init__(self):
threading.Thread.__init__(self) threading.Thread.__init__(self)
self.pandora = pypandora.PyPandora(root_url=pandora_root_url) self.pandora = pypandora.PyPandora(root_url=pandora_root_url)
@ -107,11 +108,10 @@ class scanThread (threading.Thread):
# f'[{human_readable_size(file_size)}] ' # f'[{human_readable_size(file_size)}] '
# f'Thread-{id} ') # f'Thread-{id} ')
file_scan_start_time = time.time() start_time = time.time()
if is_fake_scan: if is_fake_scan:
status = "SKIPPED" status = "SKIPPED"
else: else:
# do not scan files bigger than 1G # do not scan files bigger than 1G
if file_size > (1024 * 1024 * 1024): if file_size > (1024 * 1024 * 1024):
status = "TOO BIG" status = "TOO BIG"
@ -134,20 +134,17 @@ class scanThread (threading.Thread):
time.sleep(0.1) time.sleep(0.1)
loop += 1 loop += 1
file_scan_end_time = time.time() end_time = time.time()
# log the result # log the result
log( log(f"Scan {file_name} " f"[{human_readable_size(file_size)}] " "-> " f"{status} ({(end_time - start_time):.1f}s)")
f'Scan {file_name} '
f'[{human_readable_size(file_size)}] '
'-> '
f'{status} ({(file_scan_end_time - file_scan_start_time):.1f}s)')
logging.info( logging.info(
f'boxname="{boxname}", ' f'boxname="{boxname}", '
f'file="{file_name}", ' f'file="{file_name}", '
f'size="{file_size}", ' f'size="{file_size}", '
f'status="{status}"", ' f'status="{status}"", '
f'duration="{int(file_scan_end_time - file_scan_start_time)}"') f'duration="{int(end_time - start_time)}"'
)
# Get lock # Get lock
queue_lock.acquire() queue_lock.acquire()
@ -172,13 +169,12 @@ class scanThread (threading.Thread):
except Exception as ex: except Exception as ex:
log(f"Unexpected error: {str(ex)}", flush=True) log(f"Unexpected error: {str(ex)}", flush=True)
logging.info( logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True)
f'boxname="{boxname}", '
f'error="{str(ex)}"', exc_info=True)
# ---------------------------------------------------------- # ----------------------------------------------------------
def config(): def config():
global is_fake_scan, has_usb_auto_mount, pandora_root_url global is_fake_scan, has_usb_auto_mount, pandora_root_url
global has_quarantine, quarantine_folder, has_curses, maxThreads global has_quarantine, quarantine_folder, has_curses, maxThreads
@ -186,25 +182,26 @@ def config():
# intantiate a ConfirParser # intantiate a ConfirParser
config_parser = configparser.ConfigParser() config_parser = configparser.ConfigParser()
# read the config file # read the config file
config_parser.read('pandora-box.ini') config_parser.read("pandora-box.ini")
# set values # set values
is_fake_scan = config_parser['DEFAULT']['FAKE_SCAN'].lower() == "true" is_fake_scan = config_parser["DEFAULT"]["FAKE_SCAN"].lower() == "true"
has_usb_auto_mount = config_parser['DEFAULT']['USB_AUTO_MOUNT'].lower() == "true" has_usb_auto_mount = config_parser["DEFAULT"]["USB_AUTO_MOUNT"].lower() == "true"
pandora_root_url = config_parser['DEFAULT']['PANDORA_ROOT_URL'] pandora_root_url = config_parser["DEFAULT"]["PANDORA_ROOT_URL"]
# Quarantine # Quarantine
has_quarantine = config_parser['DEFAULT']['QUARANTINE'].lower() == "true" has_quarantine = config_parser["DEFAULT"]["QUARANTINE"].lower() == "true"
quarantine_folder = config_parser['DEFAULT']['QUARANTINE_FOLDER'] quarantine_folder = config_parser["DEFAULT"]["QUARANTINE_FOLDER"]
# Curses # Curses
has_curses = config_parser['DEFAULT']['CURSES'].lower() == "true" has_curses = config_parser["DEFAULT"]["CURSES"].lower() == "true"
# MaxThreads # MaxThreads
maxThreads = int(config_parser['DEFAULT']['THREADS']) maxThreads = int(config_parser["DEFAULT"]["THREADS"])
# ---------------------------------------------------------- # ----------------------------------------------------------
def human_readable_size(size, decimal_places=1): def human_readable_size(size, decimal_places=1):
"""Convert size to human readble string""" """Convert size to human readble string"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']: for unit in ["B", "KB", "MB", "GB", "TB"]:
if size < 1024.0: if size < 1024.0:
return f"{size:.{decimal_places}f}{unit}" return f"{size:.{decimal_places}f}{unit}"
size /= 1024.0 size /= 1024.0
@ -215,6 +212,7 @@ def human_readable_size(size, decimal_places=1):
# Image Screen # Image Screen
# ----------------------------------------------------------- # -----------------------------------------------------------
def display_image(status): def display_image(status):
"""Display image on screen""" """Display image on screen"""
if not has_curses: if not has_curses:
@ -235,8 +233,7 @@ def display_image(status):
# display image # display image
if "*" in image: if "*" in image:
# slide show # slide show
os.system(f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image} " os.system(f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image}" "</dev/null 2>/dev/null >/dev/null &")
"</dev/null 2>/dev/null >/dev/null &")
else: else:
# only one image # only one image
os.system(f"fim -qa {image} </dev/null 2>/dev/null >/dev/null &") os.system(f"fim -qa {image} </dev/null 2>/dev/null >/dev/null &")
@ -319,7 +316,7 @@ def init_bar():
def update_bar(progress, flush=False): def update_bar(progress, flush=False):
global last_update_time global last_update_time
"""Update progress bar""" """Update progress bar"""
if (flush or ((time.time() - last_update_time) >= 1)): if flush or ((time.time() - last_update_time) >= 1):
last_update_time = time.time() last_update_time = time.time()
if has_curses: if has_curses:
if progress == 0: if progress == 0:
@ -366,10 +363,8 @@ def print_screen():
print_serial("") print_serial("")
init_bar() init_bar()
update_bar(0, flush=True) update_bar(0, flush=True)
log('Ready.', flush=True) log("Ready.", flush=True)
logging.info( logging.info(f'boxname="{boxname}", ' "pandora-box-start")
f'boxname="{boxname}", '
"pandora-box-start")
def end_curses(): def end_curses():
@ -386,6 +381,7 @@ def end_curses():
# Logging windows # Logging windows
# ----------------------------------------------------------- # -----------------------------------------------------------
def initlog(): def initlog():
"""Inititalize logging function""" """Inititalize logging function"""
global log_win global log_win
@ -393,10 +389,7 @@ def initlog():
log_win = curses.newwin(curses.LINES - 20, curses.COLS, 20, 0) log_win = curses.newwin(curses.LINES - 20, curses.COLS, 20, 0)
log_win.border(0) log_win.border(0)
logging.basicConfig( logging.basicConfig(
filename='/var/log/pandora-box.log', filename="/var/log/pandora-box.log", level=logging.INFO, format="%(asctime)s - %(message)s", datefmt="%m/%d/%y %H:%M"
level=logging.INFO,
format='%(asctime)s - %(message)s',
datefmt='%m/%d/%y %H:%M'
) )
@ -426,7 +419,7 @@ def log_update(flush=False):
"""Update the log screen""" """Update the log screen"""
global last_update_time global last_update_time
# do not refresh the screen too often # do not refresh the screen too often
if (flush or ((time.time() - last_update_time) >= 1)): if flush or ((time.time() - last_update_time) >= 1):
last_update_time = time.time() last_update_time = time.time()
log_win.clear() log_win.clear()
log_win.border(0) log_win.border(0)
@ -439,10 +432,11 @@ def log_update(flush=False):
# Device # Device
# ----------------------------------------------------------- # -----------------------------------------------------------
def mount_device(): def mount_device():
"""Mount USB device""" """Mount USB device"""
global mount_point global mount_point
log('Mount device', flush=True) log("Mount device", flush=True)
if has_usb_auto_mount: if has_usb_auto_mount:
mount_point = None mount_point = None
loop = 0 loop = 0
@ -454,7 +448,7 @@ def mount_device():
mount_point = partition.mountpoint mount_point = partition.mountpoint
loop += 1 loop += 1
if mount_device is None: if mount_device is None:
log('No partition mounted', flush=True) log("No partition mounted", flush=True)
else: else:
mount_point = "/media/box" mount_point = "/media/box"
if not os.path.exists("/media/box"): if not os.path.exists("/media/box"):
@ -506,6 +500,7 @@ def log_device_info(dev):
# pandora # pandora
# ----------------------------------------------------------- # -----------------------------------------------------------
def scan(): def scan():
"""Scan devce with pypandora""" """Scan devce with pypandora"""
global pandora, qfolder global pandora, qfolder
@ -517,10 +512,7 @@ def scan():
statvfs = os.statvfs(mount_point) statvfs = os.statvfs(mount_point)
except Exception as ex: except Exception as ex:
log(f"error={ex}", flush=True) log(f"error={ex}", flush=True)
logging.info( logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True)
f'boxname="{boxname}", '
f'error="{str(ex)}"',
exc_info=True)
if not has_curses: if not has_curses:
display_image("ERROR") display_image("ERROR")
return "ERROR" return "ERROR"
@ -574,19 +566,23 @@ def scan():
t.join() t.join()
update_bar(100, flush=True) update_bar(100, flush=True)
log("Scan done in %.1fs, %d files scanned, %d files infected" % log(
((time.time() - scan_start_time), file_count, len(infected_files)), "Scan done in %.1fs, %d files scanned, %d files infected"
flush=True) % ((time.time() - scan_start_time), file_count, len(infected_files)),
flush=True,
)
logging.info( logging.info(
f'boxname="{boxname}", ' f'boxname="{boxname}", '
f'duration="{int(time.time() - scan_start_time)}", ' f'duration="{int(time.time() - scan_start_time)}", '
f'files_scanned="{file_count}", ' f'files_scanned="{file_count}", '
f'files_infected="{len(infected_files)}"') f'files_infected="{len(infected_files)}"'
)
return "CLEAN" return "CLEAN"
# -------------------------------------- # --------------------------------------
def wait(): def wait():
"""Wait for insert of remove of USB device""" """Wait for insert of remove of USB device"""
# handle error - first unmount the device # handle error - first unmount the device
@ -604,18 +600,14 @@ def wait():
return device_removed() return device_removed()
except Exception as ex: except Exception as ex:
log(f"Unexpected error: {str(ex)}", flush=True) log(f"Unexpected error: {str(ex)}", flush=True)
logging.info( logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True)
f'boxname="{boxname}", '
f'error="{str(ex)}"', exc_info=True)
return "STOP" return "STOP"
def device_inserted(dev): def device_inserted(dev):
global device global device
log("Device inserted", flush=True) log("Device inserted", flush=True)
logging.info( logging.info(f'boxname="{boxname}", ' "device-inserted")
f'boxname="{boxname}", '
"device-inserted")
device = dev device = dev
log_device_info(device) log_device_info(device)
if not has_curses: if not has_curses:
@ -632,9 +624,7 @@ def device_inserted(dev):
def device_removed(): def device_removed():
global device global device
log("Device removed", flush=True) log("Device removed", flush=True)
logging.info( logging.info(f'boxname="{boxname}", ' "device-removed")
f'boxname="{boxname}", '
"device-removed")
device = None device = None
if not has_curses: if not has_curses:
display_image("WAIT") display_image("WAIT")
@ -651,11 +641,12 @@ def device_removed():
# -------------------------------------- # --------------------------------------
def mount(): def mount():
"""Mount device""" """Mount device"""
global mount_point global mount_point
mount_device() mount_device()
log(f'Partition mounted at {mount_point}', flush=True) log(f"Partition mounted at {mount_point}", flush=True)
if mount_point is None: if mount_point is None:
# no partition # no partition
if not has_curses: if not has_curses:
@ -665,9 +656,7 @@ def mount():
os.statvfs(mount_point) os.statvfs(mount_point)
except Exception as ex: except Exception as ex:
log(f"Unexpected error: {str(ex)}", flush=True) log(f"Unexpected error: {str(ex)}", flush=True)
logging.info( logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True)
f'boxname="{boxname}", '
f'error="{str(ex)}"', exc_info=True)
if not has_curses: if not has_curses:
display_image("WAIT") display_image("WAIT")
return "WAIT" return "WAIT"
@ -676,6 +665,7 @@ def mount():
# -------------------------------------- # --------------------------------------
def error(): def error():
"""Display error message""" """Display error message"""
if not has_curses: if not has_curses:
@ -700,9 +690,9 @@ def mouseClickThread():
while not enterEvent.is_set(): while not enterEvent.is_set():
buf = mouse.read(3) buf = mouse.read(3)
if not (buf is None): if not (buf is None):
if ((buf[0] & 0x1) == 1): if (buf[0] & 0x1) == 1:
down = True down = True
if (((buf[0] & 0x1) == 0) and down): if ((buf[0] & 0x1) == 0) and down:
break break
time.sleep(0.1) time.sleep(0.1)
mouse.close() mouse.close()
@ -717,7 +707,7 @@ def enterKeyThread():
while not mouseEvent.is_set(): while not mouseEvent.is_set():
input = sys.stdin.readline() input = sys.stdin.readline()
if (len(input) > 0): if len(input) > 0:
break break
time.sleep(0.1) time.sleep(0.1)
@ -739,14 +729,13 @@ def waitMouseOrEnter():
# -------------------------------------- # --------------------------------------
def clean(): def clean():
"""Remove infected files""" """Remove infected files"""
if len(infected_files) > 0: if len(infected_files) > 0:
# display message # display message
log(f"{len(infected_files)} infected files detecetd:") log(f"{len(infected_files)} infected files detecetd:")
logging.info( logging.info(f'boxname="{boxname}", ' f"infeted_files={len(infected_files)}")
f'boxname="{boxname}", '
f"infeted_files={len(infected_files)}")
if not has_curses: if not has_curses:
display_image("BAD") display_image("BAD")
@ -757,10 +746,10 @@ def clean():
log(file) log(file)
cnt = cnt + 1 cnt = cnt + 1
if cnt >= 10: if cnt >= 10:
log('...') log("...")
break break
# wait for clean # wait for clean
log('PRESS KEY TO CLEAN', flush=True) log("PRESS KEY TO CLEAN", flush=True)
waitMouseOrEnter() waitMouseOrEnter()
@ -773,32 +762,25 @@ def clean():
try: try:
os.remove(file) os.remove(file)
log(f"{file} removed") log(f"{file} removed")
logging.info( logging.info(f'boxname="{boxname}", ' f'removed="{file}"')
f'boxname="{boxname}", '
f'removed="{file}"')
files_removed += 1 files_removed += 1
except Exception as ex: except Exception as ex:
log(f"could not remove: {str(ex)}", flush=True) log(f"could not remove: {str(ex)}", flush=True)
logging.info( logging.info(f'boxname="{boxname}", ' f'not_removed="{file}, ' f'error="{str(ex)}"', exc_info=True)
f'boxname="{boxname}", '
f'not_removed="{file}, '
f'error="{str(ex)}"', exc_info=True)
has_error = True has_error = True
umount_device() umount_device()
logging.info( logging.info(f'boxname="{boxname}", ' f'cleaned="{files_removed}/{len(infected_files)}"')
f'boxname="{boxname}", '
f'cleaned="{files_removed}/{len(infected_files)}"')
if not has_error: if not has_error:
if has_curses: if has_curses:
log('Device cleaned !', flush=True) log("Device cleaned !", flush=True)
else: else:
display_image("OK") display_image("OK")
else: else:
if has_curses: if has_curses:
log('Device not cleaned !', flush=True) log("Device not cleaned !", flush=True)
else: else:
display_image("WAIT") display_image("WAIT")
else: else:
@ -809,6 +791,7 @@ def clean():
# -------------------------------------- # --------------------------------------
def move_to_script_folder(): def move_to_script_folder():
"""Move to pandora-box folder""" """Move to pandora-box folder"""
abspath = os.path.abspath(__file__) abspath = os.path.abspath(__file__)
@ -818,6 +801,7 @@ def move_to_script_folder():
# -------------------------------------- # --------------------------------------
def startup(): def startup():
"""Start Pandora-box""" """Start Pandora-box"""
global logo global logo
@ -830,7 +814,7 @@ def startup():
# Initilize log # Initilize log
initlog() initlog()
# Read logo # Read logo
with open('pandora-box.txt', mode='r', encoding='utf-8') as file1: with open("pandora-box.txt", mode="r", encoding="utf-8") as file1:
logo = file1.readlines() logo = file1.readlines()
# Print logo screen # Print logo screen
print_screen() print_screen()
@ -840,6 +824,7 @@ def startup():
# -------------------------------------- # --------------------------------------
def loop(state): def loop(state):
"""Main event loop""" """Main event loop"""
match state: match state:
@ -861,6 +846,7 @@ def loop(state):
# -------------------------------------- # --------------------------------------
def get_lock(process_name): def get_lock(process_name):
"""Get a lock to check that Pandora-box is not already running""" """Get a lock to check that Pandora-box is not already running"""
@ -873,16 +859,17 @@ def get_lock(process_name):
# in the abstract namespace instead of being created # in the abstract namespace instead of being created
# on the file system itself. # on the file system itself.
# Works only in Linux # Works only in Linux
get_lock._lock_socket.bind('\0' + process_name) get_lock._lock_socket.bind("\0" + process_name)
except socket.error: except socket.error:
print('Pandora-box is already running !', file=sys.stderr) print("Pandora-box is already running !", file=sys.stderr)
os.execvp('/usr/bin/bash', ['/usr/bin/bash', '--norc']) os.execvp("/usr/bin/bash", ["/usr/bin/bash", "--norc"])
sys.exit() sys.exit()
# -------------------------------------- # --------------------------------------
def main(_): def main(_):
"""Main entry point""" """Main entry point"""
try: try:
@ -892,13 +879,11 @@ def main(_):
except Exception as ex: except Exception as ex:
print({str(ex)}) print({str(ex)})
log(f"Unexpected error: {str(ex)}", flush=True) log(f"Unexpected error: {str(ex)}", flush=True)
logging.info( logging.info(f'boxname="{boxname}", ' f'error="{str(ex)}"', exc_info=True)
f'boxname="{boxname}", '
f'error="{str(ex)}"', exc_info=True)
finally: finally:
end_curses() end_curses()
if __name__ == "__main__": if __name__ == "__main__":
get_lock('pandora-box') get_lock("pandora-box")
curses.wrapper(main) curses.wrapper(main)