1
0
Fork 0
mirror of https://github.com/dbarzin/pandora-box.git synced 2025-07-25 00:09:40 +02:00

work in progress

This commit is contained in:
dbarzin 2022-06-12 20:37:49 +02:00
parent 414beccf04
commit 9f68da0094
2 changed files with 84 additions and 58 deletions

View file

@ -6,6 +6,7 @@ Pandorabox is a USB scaning station base on Pandora
Mouse terminal Mouse terminal
--------------- ---------------
sudo apt install gpm sudo apt install gpm
User mount device User mount device

View file

@ -11,6 +11,7 @@ import pyudev
import psutil import psutil
import os import os
import logging import logging
import time
# ----------------------------------------------------------- # -----------------------------------------------------------
# Config variables # Config variables
@ -114,9 +115,11 @@ def update_bar(progress):
progress_win.clear() progress_win.clear()
progress_win.border(0) progress_win.border(0)
time.sleep(0) time.sleep(0)
progress_win.addstr(0, 1, "Progress:")
else: else:
pos = (80 * progress) // 100 pos = (80 * progress) // 100
progress_win.addstr(1, 1, "#"*pos) progress_win.addstr(1, 1, "#"*pos)
progress_win.addstr(0, 1, "Progress: %d%%" % progress)
progress_win.refresh() progress_win.refresh()
def init_log(): def init_log():
@ -179,6 +182,7 @@ def print_screen():
title_win.refresh() title_win.refresh()
status_win = curses.newwin(5, 101, 12, 0) status_win = curses.newwin(5, 101, 12, 0)
status_win.border(0) status_win.border(0)
status_win.addstr(0, 1, "USB Key Information")
# print_status("WAITING") # print_status("WAITING")
print_fslabel("") print_fslabel("")
print_size(None) print_size(None)
@ -240,51 +244,59 @@ def device_loop():
context = pyudev.Context() context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context) monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by("block") monitor.filter_by("block")
for device in iter(monitor.poll, None): try:
if device.get("ID_FS_USAGE") == "filesystem" and device.device_node[5:7] == "sd": for device in iter(monitor.poll, None):
if device.action == "add": if device.get("ID_FS_USAGE") == "filesystem" and device.device_node[5:7] == "sd":
log("Device inserted") if device.action == "add":
log_device_info(device) log("Device inserted")
# display device type log_device_info(device)
print_status("KEY INSERTED") # display device type
print_fslabel(device.get("ID_FS_LABEL")) print_status("KEY INSERTED")
print_fstype(device.get("ID_PART_TABLE_TYPE") + " " + device.get("ID_FS_TYPE")) print_fslabel(device.get("ID_FS_LABEL"))
print_model(device.get("ID_MODEL")) print_fstype(device.get("ID_PART_TABLE_TYPE") + " " + device.get("ID_FS_TYPE"))
print_serial(device.get("ID_SERIAL_SHORT")) print_model(device.get("ID_MODEL"))
# Mount device print_serial(device.get("ID_SERIAL_SHORT"))
mount_point = mount_device(device) # Mount device
try: mount_point = mount_device(device)
statvfs=os.statvfs(mount_point) try:
except: statvfs=os.statvfs(mount_point)
log("Unexpected error: %-80s" % sys.exc_info()[0]) except Exception as e :
continue logging.error("Unexpected error: ", e)
print_size(human_readable_size(statvfs.f_frsize * statvfs.f_blocks)) continue
print_used(human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))) print_size(human_readable_size(statvfs.f_frsize * statvfs.f_blocks))
log("Scan started...........") print_used(human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)))
# fake scan log("Scan started...........")
if False: # fake scan
loading = 0 if False:
while loading < 100: loading = 0
loading += 1 while loading < 100:
time.sleep(0.03) loading += 1
update_bar(loading) time.sleep(0.03)
else: update_bar(loading)
scan(mount_point, statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)) else:
log("Scan done.") res = scan(mount_point, statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))
if res:
log("Scan done.")
else:
log("Scan failed !")
if device.action == "remove": if device.action == "remove":
log("Device removed") log("Device removed")
#print_status("WAITING") #print_status("WAITING")
print_action("Device removed") print_action("Device removed")
print_fslabel("") print_fslabel("")
print_size(None) print_size(None)
print_used(None) print_used(None)
print_fstype("") print_fstype("")
print_action("") print_action("")
print_model("") print_model("")
print_serial("") print_serial("")
umount_device() umount_device()
update_bar(0) update_bar(0)
except Exception as e:
log("Unexpected error: %s" % e )
finally:
log("Done.")
def log_device_info(dev): def log_device_info(dev):
@ -312,20 +324,31 @@ def log_device_info(dev):
"""Scan a mount point with Pandora""" """Scan a mount point with Pandora"""
def scan(mount_point, used): def scan(mount_point, used):
global infected_filed
infected_files = array()
scanned = 0 scanned = 0
file_count = 0
scan_start_time = time.time()
if FAKE_SCAN: if FAKE_SCAN:
for root, dirs, files in os.walk(mount_point): for root, dirs, files in os.walk(mount_point):
for file in files: for file in files:
full_path = os.path.join(root,file) try :
file_size = os.path.getsize(full_path) full_path = os.path.join(root,file)
log("Check %-s [%s]" % (file, human_readable_size(file_size))) file_size = os.path.getsize(full_path)
time.sleep(0.05) log("Check %-s [%s]" % (file, human_readable_size(file_size)))
log("Check %s -> %-s" % (file,"SKIPPED")) file_scan_start_time = time.time()
time.sleep(0.1)
scanned += os.path.getsize(full_path) file_scan_end_time = time.time()
update_bar(scanned * 100 // used) log("Check %s (%ds) -> %-s" % (file,(file_scan_end_time - file_scan_start_time),"SKIPPED"))
scanned += os.path.getsize(full_path)
file_count += 1
update_bar(scanned * 100 // used)
except Exception as e :
log("Unexpected error: %s" % e)
return False
update_bar(100) update_bar(100)
log("Scan done in %ds" % (time.time() - scan_start_time))
log("%d files scanned" % file_count)
else: else:
pp = pypandora.PyPandora(root_url=PANDORA_ROOT_URL) pp = pypandora.PyPandora(root_url=PANDORA_ROOT_URL)
for arg in sys.argv[1:]: for arg in sys.argv[1:]:
@ -343,6 +366,7 @@ def scan(mount_point, used):
break; break;
log("Scan %s -> %s" % (arg,res["status"])) log("Scan %s -> %s" % (arg,res["status"]))
return True
# -------------------------------------- # --------------------------------------
@ -350,14 +374,15 @@ def scan(mount_point, used):
"""Main entry point""" """Main entry point"""
def main(stdscr): def main(stdscr):
try: try :
init_log() init_log()
intit_curses() intit_curses()
print_screen() print_screen()
device_loop() while True:
except: device_loop()
logging.error("Unexpected error:", sys.exc_info()[0]) except Exception as e :
logging.error(traceback.format_exc()) logging.error("Unexpected error: ", e)
# logging.error(traceback.format_exc())
finally: finally:
end_curses() end_curses()