1
0
Fork 0
mirror of https://github.com/dbarzin/pandora-box.git synced 2025-08-04 13:25:27 +02:00
This commit is contained in:
dbarzin 2023-03-03 21:15:32 +01:00
parent 80f40aa9c8
commit 9b46054494

View file

@ -208,19 +208,21 @@ class PandoraBox:
self.progress_win.border(0) self.progress_win.border(0)
self.progress_win.refresh() self.progress_win.refresh()
def _update_bar(self, progress): def _update_bar(self, progress, flush=False):
"""Update progress bar""" """Update progress bar"""
if self.has_curses: if (flush or ((time.time() - self.last_update_time) >= 1)):
if progress == 0: self.last_update_time = time.time()
self.progress_win.clear() if self.has_curses:
self.progress_win.border(0) if progress == 0:
time.sleep(0) self.progress_win.clear()
self.progress_win.addstr(0, 1, "Progress:") self.progress_win.border(0)
else: time.sleep(0)
pos = ((curses.COLS - 14) * progress) // 100 self.progress_win.addstr(0, 1, "Progress:")
self.progress_win.addstr(1, 1, "#" * pos) else:
self.progress_win.addstr(0, 1, f"Progress: {progress}%") pos = ((curses.COLS - 14) * progress) // 100
self.progress_win.refresh() self.progress_win.addstr(1, 1, "#" * pos)
self.progress_win.addstr(0, 1, f"Progress: {progress}%")
self.progress_win.refresh()
def _print_screen(self): def _print_screen(self):
"""Print main screen""" """Print main screen"""
@ -252,8 +254,8 @@ class PandoraBox:
self._print_model("") self._print_model("")
self._print_serial("") self._print_serial("")
self._init_bar() self._init_bar()
self._update_bar(0) self._update_bar(0, flush=True)
self._log('Ready.') self._log('Ready.', flush=True)
logging.info("pandora-box-start") logging.info("pandora-box-start")
def _end_curses(self): def _end_curses(self):
@ -282,15 +284,16 @@ class PandoraBox:
) )
logs = [] logs = []
last_update_time = 0
def _log(self, msg): def _log(self, msg, flush=False):
"""log a message with a new line""" """log a message with a new line"""
if self.has_curses: if self.has_curses:
# display log on screen # display log on screen
self.logs.append(msg) self.logs.append(msg)
if len(self.logs) > (curses.LINES - 22): if len(self.logs) > (curses.LINES - 22):
self.logs.pop(0) self.logs.pop(0)
self._log_update() self._log_update(flush)
def _log_msg(self, msg): def _log_msg(self, msg):
"""update last message -> no new line""" """update last message -> no new line"""
@ -299,13 +302,16 @@ class PandoraBox:
self.logs[-1] = msg self.logs[-1] = msg
self._log_update() self._log_update()
def _log_update(self): def _log_update(self, flush=False):
"""Update the log screen""" """Update the log screen"""
self.log_win.clear() # do not refresh the screen too often
self.log_win.border(0) if (flush or ((time.time() - self.last_update_time) >= 1)):
for i in range(min(curses.LINES - 22, len(self.logs))): self.last_update_time = time.time()
self.log_win.addstr(i + 1, 1, self.logs[i][:curses.COLS - 2], curses.color_pair(3)) self.log_win.clear()
self.log_win.refresh() self.log_win.border(0)
for i in range(min(curses.LINES - 22, len(self.logs))):
self.log_win.addstr(i + 1, 1, self.logs[i][:curses.COLS - 2], curses.color_pair(3))
self.log_win.refresh()
# ----------------------------------------------------------- # -----------------------------------------------------------
# Device # Device
@ -313,7 +319,7 @@ class PandoraBox:
def mount_device(self): def mount_device(self):
"""Mount USB device""" """Mount USB device"""
self._log('Mount device') self._log('Mount device', flush=True)
if self.has_usb_auto_mount: if self.has_usb_auto_mount:
self.mount_point = None self.mount_point = None
loop = 0 loop = 0
@ -325,11 +331,11 @@ class PandoraBox:
self.mount_point = partition.mountpoint self.mount_point = partition.mountpoint
loop += 1 loop += 1
if self.mount_device is None: if self.mount_device is None:
self._log('No partition mounted') self._log('No partition mounted', flush=True)
else: else:
self.mount_point = "/media/box" self.mount_point = "/media/box"
if not os.path.exists("/media/box"): if not os.path.exists("/media/box"):
self._log("folder /media/box does not exists") self._log("folder /media/box does not exists", flush=True)
return None return None
os.system(f"pmount {self.device.device_node} /media/box >/dev/null 2>/dev/null") os.system(f"pmount {self.device.device_node} /media/box >/dev/null 2>/dev/null")
loop = 0 loop = 0
@ -338,7 +344,7 @@ class PandoraBox:
try: try:
os.statvfs(self.mount_point) os.statvfs(self.mount_point)
except Exception as ex: except Exception as ex:
self._log(f"Unexpected error: {ex}") self._log(f"Unexpected error: {ex}", flush=True)
loop += 1 loop += 1
continue continue
break break
@ -346,7 +352,7 @@ class PandoraBox:
def umount_device(self): def umount_device(self):
"""Unmount USB device""" """Unmount USB device"""
if self.has_usb_auto_mount: if self.has_usb_auto_mount:
self._log("Sync partitions") self._log("Sync partitions", flush=True)
os.system("sync") os.system("sync")
else: else:
os.system("pumount /media/box 2>/dev/null >/dev/null") os.system("pumount /media/box 2>/dev/null >/dev/null")
@ -393,7 +399,7 @@ class PandoraBox:
try: try:
statvfs = os.statvfs(self.mount_point) statvfs = os.statvfs(self.mount_point)
except Exception as ex: except Exception as ex:
self._log(f"error={ex}") self._log(f"error={ex}", flush=True)
logging.info("An exception was thrown!", exc_info=True) logging.info("An exception was thrown!", exc_info=True)
if not self.has_curses: if not self.has_curses:
self.display_image("ERROR") self.display_image("ERROR")
@ -480,12 +486,13 @@ class PandoraBox:
os.mkdir(qfolder) os.mkdir(qfolder)
shutil.copyfile(full_path, os.path.join(qfolder, file)) shutil.copyfile(full_path, os.path.join(qfolder, file))
except Exception as ex: except Exception as ex:
self._log(f"Unexpected error: {str(ex)}") self._log(f"Unexpected error: {str(ex)}", flush=True)
logging.info(f'error="{str(ex)}"', exc_info=True) logging.info(f'error="{str(ex)}"', exc_info=True)
return "ERROR" return "ERROR"
self._update_bar(100) self._update_bar(100, flush=True)
self._log("Scan done in %ds, %d files scanned, %d files infected" % self._log("Scan done in %ds, %d files scanned, %d files infected" %
((time.time() - scan_start_time), file_count, len(self.infected_files))) ((time.time() - scan_start_time), file_count, len(self.infected_files)),
flush=True)
logging.info( logging.info(
f'duration="{int(time.time() - scan_start_time)}", ' f'duration="{int(time.time() - scan_start_time)}", '
f'files_scanned="{file_count}", ' f'files_scanned="{file_count}", '
@ -508,12 +515,12 @@ class PandoraBox:
if dev.action == "remove": if dev.action == "remove":
return self._device_removed() return self._device_removed()
except Exception as ex: except Exception as ex:
self._log(f"Unexpected error: {str(ex)}") self._log(f"Unexpected error: {str(ex)}", flush=True)
logging.info(f'error="{str(ex)}"', exc_info=True) logging.info(f'error="{str(ex)}"')
return "STOP" return "STOP"
def _device_inserted(self, dev): def _device_inserted(self, dev):
self._log("Device inserted") self._log("Device inserted", flush=True)
logging.info("device-inserted") logging.info("device-inserted")
self.device = dev self.device = dev
self._log_device_info(self.device) self._log_device_info(self.device)
@ -528,7 +535,7 @@ class PandoraBox:
return "INSERTED" return "INSERTED"
def _device_removed(self): def _device_removed(self):
self._log("Device removed") self._log("Device removed", flush=True)
logging.info("device-removed") logging.info("device-removed")
self.device = None self.device = None
if not self.has_curses: if not self.has_curses:
@ -540,14 +547,14 @@ class PandoraBox:
self._print_fstype("") self._print_fstype("")
self._print_model("") self._print_model("")
self._print_serial("") self._print_serial("")
self._update_bar(0) self._update_bar(0, flush=True)
return "WAIT" return "WAIT"
# -------------------------------------- # --------------------------------------
def mount(self): def mount(self):
""" Mount device """ """ Mount device """
self.mount_device() self.mount_device()
self._log(f'Partition mounted at {self.mount_point}') self._log(f'Partition mounted at {self.mount_point}', flush=True)
if self.mount_point is None: if self.mount_point is None:
# no partition # no partition
if not self.has_curses: if not self.has_curses:
@ -577,7 +584,7 @@ class PandoraBox:
"""Remove infected files""" """Remove infected files"""
if len(self.infected_files) > 0: if len(self.infected_files) > 0:
# display message # display message
self._log(f"{len(self.infected_files)} infected files detecetd:") self._log(f"{len(self.infected_files)} infected files detecetd:", flush=True)
logging.info(f"infeted_files={len(self.infected_files)}") logging.info(f"infeted_files={len(self.infected_files)}")
if not self.has_curses: if not self.has_curses:
@ -600,7 +607,7 @@ class PandoraBox:
try: try:
os.statvfs(self.mount_point) os.statvfs(self.mount_point)
except Exception: except Exception:
self._log("Device not cleaned !") self._log("Device not cleaned !", flush=True)
logging.info('device_not_cleaned') logging.info('device_not_cleaned')
return "WAIT" return "WAIT"
@ -619,7 +626,7 @@ class PandoraBox:
if not self.has_curses: if not self.has_curses:
self.display_image("OK") self.display_image("OK")
else: else:
self._log('Device cleaned !') self._log('Device cleaned !', flush=True)
logging.info(f'cleaned="{files_removed}/{len(self.infected_files)}"') logging.info(f'cleaned="{files_removed}/{len(self.infected_files)}"')
else: else:
if not self.has_curses: if not self.has_curses:
@ -685,7 +692,7 @@ class PandoraBox:
while state != "STOP": while state != "STOP":
state = self.loop(state) state = self.loop(state)
except Exception as ex: except Exception as ex:
self._log(f"Unexpected error: {str(ex)}") self._log(f"Unexpected error: {str(ex)}", flush=True)
logging.info(f'error="{str(ex)}"', exc_info=True) logging.info(f'error="{str(ex)}"', exc_info=True)
finally: finally:
self._end_curses() self._end_curses()