1
0
Fork 0
mirror of https://github.com/dbarzin/pandora-box.git synced 2025-07-19 05:19:40 +02:00
pandora-box/pandorabox.py

394 lines
14 KiB
Python
Raw Normal View History

2022-06-11 18:25:10 +02:00
#!/usr/bin/python3
2022-06-11 21:50:01 +02:00
"""Pandora-Box is a USB scaning station based on Pandora."""
2022-06-11 18:25:10 +02:00
import curses
2022-06-12 00:26:34 +02:00
from curses import wrapper
2022-06-11 18:25:10 +02:00
import pypandora
2022-06-11 21:06:59 +02:00
import time
2022-06-11 18:25:10 +02:00
import sys
2022-06-11 20:06:30 +02:00
import pyudev
import psutil
2022-06-11 21:06:59 +02:00
import os
2022-06-12 11:14:38 +02:00
import logging
2022-06-12 20:37:49 +02:00
import time
2022-06-11 20:06:30 +02:00
2022-06-11 21:06:59 +02:00
# -----------------------------------------------------------
# Config variables
# -----------------------------------------------------------
2022-06-11 20:06:30 +02:00
2022-06-11 21:06:59 +02:00
NO_SCAN = True
USB_AUTO_MOUNT = True
PANDORA_ROOT_URL = "http://127.0.0.1:6100"
2022-06-12 12:50:04 +02:00
FAKE_SCAN = True
# ----------------------------------------------------------
""" Convert size to human readble string """
def human_readable_size(size, decimal_places=1):
for unit in ['B','KB','MB','GB','TB']:
if size < 1024.0:
break
size /= 1024.0
return f"{size:.{decimal_places}f}{unit}"
2022-06-11 20:06:30 +02:00
2022-06-11 21:06:59 +02:00
# -----------------------------------------------------------
2022-06-11 18:25:10 +02:00
# Screen
2022-06-11 21:06:59 +02:00
# -----------------------------------------------------------
2022-06-11 21:50:01 +02:00
"""Initialise curses"""
def intit_curses():
2022-06-11 21:06:59 +02:00
global screen
screen = curses.initscr()
screen.keypad(1)
curses.curs_set(0)
curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION)
curses.flushinp()
curses.noecho()
2022-06-12 00:26:34 +02:00
# screen.clear()
2022-06-11 21:06:59 +02:00
2022-06-11 18:25:10 +02:00
2022-06-11 21:50:01 +02:00
"""Print status string"""
def print_status(strStatus):
2022-06-12 00:26:34 +02:00
global status_win
#status_win.addstr(1, 1, "Status : %-32s" % strStatus, curses.color_pair(2))
#status_win.refresh()
2022-06-11 21:06:59 +02:00
2022-06-12 00:26:34 +02:00
"""Print current action"""
def print_action(label):
global status_win
#status_win.addstr(3, 1, "Action : %-32s" % label, curses.color_pair(2))
#status_win.refresh()
2022-06-11 20:06:30 +02:00
2022-06-11 21:50:01 +02:00
"""Print FS Label"""
2022-06-12 00:26:34 +02:00
def print_fslabel(label):
global status_win
status_win.addstr(1, 1, "Partition : %-32s" % label, curses.color_pair(2))
status_win.refresh()
"""Print FS Size"""
def print_size(label):
global status_win
2022-06-12 12:50:04 +02:00
if label == None:
2022-06-12 00:26:34 +02:00
status_win.addstr(2, 1, "Size : ",curses.color_pair(2))
else:
2022-06-12 12:50:04 +02:00
status_win.addstr(2, 1, "Size : %s " % label,curses.color_pair(2))
logging.info("Size: %s" % label)
2022-06-12 00:26:34 +02:00
status_win.refresh()
"""Print FS Used Size"""
def print_used(label):
global status_win
2022-06-12 12:50:04 +02:00
if label == None:
2022-06-12 00:26:34 +02:00
status_win.addstr(3, 1, "Used : ",curses.color_pair(2))
else:
2022-06-12 12:50:04 +02:00
status_win.addstr(3, 1, "Used : %s " % label,curses.color_pair(2))
logging.info("Used: %s" % label)
2022-06-12 00:26:34 +02:00
status_win.refresh()
2022-06-11 21:06:59 +02:00
2022-06-12 00:26:34 +02:00
def print_fstype(label):
global status_win
2022-06-12 11:14:38 +02:00
status_win.addstr(1, 50, "Part / Type : %-32s" % label, curses.color_pair(2))
2022-06-12 00:26:34 +02:00
status_win.refresh()
2022-06-11 20:06:30 +02:00
2022-06-12 00:26:34 +02:00
def print_model(label):
global status_win
status_win.addstr(2, 50, "Model : %-32s" % label, curses.color_pair(2))
status_win.refresh()
2022-06-11 21:06:59 +02:00
2022-06-12 00:26:34 +02:00
def print_serial(label):
global status_win
status_win.addstr(3, 50, "Serial : %-32s" % label, curses.color_pair(2))
status_win.refresh()
2022-06-11 20:06:30 +02:00
2022-06-11 21:50:01 +02:00
"""Initialise progress bar"""
def init_bar():
2022-06-11 20:06:30 +02:00
global progress_win
2022-06-12 12:50:04 +02:00
progress_win = curses.newwin(3, 82, 17, 10)
progress_win.border(0)
2022-06-12 00:26:34 +02:00
progress_win.refresh()
2022-06-11 21:06:59 +02:00
2022-06-11 21:50:01 +02:00
"""Update progress bar"""
def update_bar(progress):
2022-06-11 20:06:30 +02:00
global progress_win
2022-06-12 00:26:34 +02:00
if progress == 0:
2022-06-12 11:14:38 +02:00
progress_win.clear()
progress_win.border(0)
time.sleep(0)
2022-06-12 20:37:49 +02:00
progress_win.addstr(0, 1, "Progress:")
2022-06-12 00:26:34 +02:00
else:
pos = (80 * progress) // 100
2022-06-12 12:50:04 +02:00
progress_win.addstr(1, 1, "#"*pos)
2022-06-12 20:37:49 +02:00
progress_win.addstr(0, 1, "Progress: %d%%" % progress)
2022-06-12 00:26:34 +02:00
progress_win.refresh()
def init_log():
global log_win
2022-06-12 11:41:35 +02:00
global logging
2022-06-12 11:14:38 +02:00
log_win = curses.newwin(16, 101, 20, 0)
2022-06-12 00:26:34 +02:00
log_win.border(0)
2022-06-12 11:14:38 +02:00
logging.basicConfig(
filename='pandorabox.log',
level=logging.INFO,
format='%(asctime)s - %(message)s',
2022-06-12 11:41:35 +02:00
datefmt='%m/%d/%y %H:%M'
2022-06-12 11:14:38 +02:00
)
logs = []
2022-06-12 00:26:34 +02:00
def log(str):
2022-06-12 11:41:35 +02:00
global log_win
global logging
2022-06-12 11:14:38 +02:00
logging.info(str)
logs.append(str)
2022-06-12 11:41:35 +02:00
if len(logs)>14:
2022-06-12 11:14:38 +02:00
logs.pop(0)
2022-06-12 12:50:04 +02:00
log_win.clear()
log_win.border(0)
2022-06-12 11:41:35 +02:00
for i in range(min(14,len(logs))):
2022-06-12 11:14:38 +02:00
log_win.addstr(i+1,1,"%-80s"%logs[i],curses.color_pair(3))
log_win.refresh()
"""Splash screen"""
s = [None] * 10;
s[0] = " ██▓███ ▄▄▄ ███▄ █ ▓█████▄ ▒█████ ██▀███ ▄▄▄ ▄▄▄▄ ▒█████ ▒██ ██▒"
s[1] = " ▓██░ ██▒▒████▄ ██ ▀█ █ ▒██▀ ██▌▒██▒ ██▒▓██ ▒ ██▒▒████▄ ▓█████▄ ▒██▒ ██▒▒▒ █ █ ▒░"
s[2] = " ▓██░ ██▓▒▒██ ▀█▄ ▓██ ▀█ ██▒░██ █▌▒██░ ██▒▓██ ░▄█ ▒▒██ ▀█▄ ▒██▒ ▄██▒██░ ██▒░░ █ ░"
s[3] = " ▒██▄█▓▒ ▒░██▄▄▄▄██ ▓██▒ ▐▌██▒░▓█▄ ▌▒██ ██░▒██▀▀█▄ ░██▄▄▄▄██ ▒██░█▀ ▒██ ██░ ░ █ █ ▒ "
s[4] = " ▒██▒ ░ ░ ▓█ ▓██▒▒██░ ▓██░░▒████▓ ░ ████▓▒░░██▓ ▒██▒ ▓█ ▓██▒ ░▓█ ▀█▓░ ████▓▒░▒██▒ ▒██▒"
s[5] = " ▒▓▒░ ░ ░ ▒▒ ▓▒█░░ ▒░ ▒ ▒ ▒▒▓ ▒ ░ ▒░▒░▒░ ░ ▒▓ ░▒▓░ ▒▒ ▓▒█░ ░▒▓███▀▒░ ▒░▒░▒░ ▒▒ ░ ░▓ ░"
s[6] = " ░▒ ░ ▒ ▒▒ ░░ ░░ ░ ▒░ ░ ▒ ▒ ░ ▒ ▒░ ░▒ ░ ▒░ ▒ ▒▒ ░ ▒░▒ ░ ░ ▒ ▒░ ░░ ░▒ ░"
s[7] = " ░░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ░░ ░ ░ ▒ ░ ░ ░ ░ ░ ▒ ░ ░ "
s[8] = " ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ "
s[9] = " ░ ░ "
2022-06-11 21:06:59 +02:00
2022-06-12 00:26:34 +02:00
"""Print main screen"""
2022-06-11 21:50:01 +02:00
def print_screen():
2022-06-12 00:26:34 +02:00
global status_win
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_GREEN, curses.COLOR_BLACK)
title_win = curses.newwin(12, 101, 0, 0)
2022-06-12 11:14:38 +02:00
# title_win.border(0)
title_win.addstr(1, 1, s[0], curses.color_pair(1))
title_win.addstr(2, 1, s[1], curses.color_pair(1))
title_win.addstr(3, 1, s[2], curses.color_pair(1))
title_win.addstr(4, 1, s[3], curses.color_pair(1))
title_win.addstr(5, 1, s[4], curses.color_pair(1))
title_win.addstr(6, 1, s[5], curses.color_pair(1))
title_win.addstr(7, 1, s[6], curses.color_pair(1))
title_win.addstr(8, 1, s[7], curses.color_pair(1))
title_win.addstr(9, 1, s[8], curses.color_pair(1))
title_win.addstr(10, 1, s[9], curses.color_pair(1))
2022-06-12 00:26:34 +02:00
title_win.refresh()
status_win = curses.newwin(5, 101, 12, 0)
status_win.border(0)
2022-06-12 20:37:49 +02:00
status_win.addstr(0, 1, "USB Key Information")
2022-06-12 11:14:38 +02:00
# print_status("WAITING")
2022-06-11 21:50:01 +02:00
print_fslabel("")
2022-06-12 12:50:04 +02:00
print_size(None)
print_used(None)
2022-06-12 00:26:34 +02:00
print_fstype("")
2022-06-11 21:50:01 +02:00
print_action("")
2022-06-12 00:26:34 +02:00
print_model("")
print_serial("")
2022-06-11 21:50:01 +02:00
init_bar()
2022-06-12 00:26:34 +02:00
update_bar(0)
log('Ready.')
2022-06-11 18:25:10 +02:00
2022-06-12 00:26:34 +02:00
"""Closes curses"""
2022-06-11 21:50:01 +02:00
def end_curses():
2022-06-11 21:06:59 +02:00
curses.endwin()
curses.flushinp()
2022-06-11 18:25:10 +02:00
2022-06-11 21:06:59 +02:00
# -----------------------------------------------------------
2022-06-12 00:26:34 +02:00
# Device
2022-06-11 21:06:59 +02:00
# -----------------------------------------------------------
2022-06-12 00:26:34 +02:00
"""Mount USB device"""
2022-06-11 21:50:01 +02:00
def mount_device(device):
2022-06-11 20:06:30 +02:00
if USB_AUTO_MOUNT:
2022-06-11 21:06:59 +02:00
found = False
loop = 0
while (not found) and (loop < 10):
# need to sleep before devide is mounted
2022-06-11 18:25:10 +02:00
time.sleep(1)
for partition in psutil.disk_partitions():
2022-06-11 21:06:59 +02:00
if partition.device == device.device_node:
2022-06-12 11:14:38 +02:00
log("Device mounted at {}".format(partition.mountpoint))
2022-06-11 21:06:59 +02:00
found = True
loop += 1
2022-06-12 00:26:34 +02:00
if loop < 10:
return partition.mountpoint
else:
return ""
2022-06-11 18:25:10 +02:00
else:
2022-06-11 21:06:59 +02:00
res = os.system("pmount " + device.device_node + " box")
2022-06-12 00:26:34 +02:00
if res == 1:
return "/media/box"
else:
return ""
2022-06-12 11:14:38 +02:00
log("Device mounted at /media/box")
2022-06-11 21:06:59 +02:00
2022-06-11 18:25:10 +02:00
2022-06-12 00:26:34 +02:00
"""Unmount USB device"""
2022-06-11 21:50:01 +02:00
def umount_device():
2022-06-11 21:06:59 +02:00
if not USB_AUTO_MOUNT:
2022-06-12 11:14:38 +02:00
log("Unmounting device /media/box")
2022-06-11 21:06:59 +02:00
res = os.system("pumount /media/box")
# print("Return type: ", res)
2022-06-11 18:25:10 +02:00
2022-06-12 00:26:34 +02:00
"""Main device loop"""
2022-06-11 21:50:01 +02:00
def device_loop():
2022-06-11 21:06:59 +02:00
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by("block")
2022-06-12 20:37:49 +02:00
try:
for device in iter(monitor.poll, None):
if device.get("ID_FS_USAGE") == "filesystem" and device.device_node[5:7] == "sd":
if device.action == "add":
log("Device inserted")
log_device_info(device)
# display device type
print_status("KEY INSERTED")
print_fslabel(device.get("ID_FS_LABEL"))
print_fstype(device.get("ID_PART_TABLE_TYPE") + " " + device.get("ID_FS_TYPE"))
print_model(device.get("ID_MODEL"))
print_serial(device.get("ID_SERIAL_SHORT"))
# Mount device
mount_point = mount_device(device)
try:
statvfs=os.statvfs(mount_point)
except Exception as e :
logging.error("Unexpected error: ", e)
continue
print_size(human_readable_size(statvfs.f_frsize * statvfs.f_blocks))
print_used(human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)))
log("Scan started...........")
# fake scan
if False:
loading = 0
while loading < 100:
loading += 1
time.sleep(0.03)
update_bar(loading)
else:
res = scan(mount_point, statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))
if res:
log("Scan done.")
else:
log("Scan failed !")
if device.action == "remove":
log("Device removed")
#print_status("WAITING")
print_action("Device removed")
print_fslabel("")
print_size(None)
print_used(None)
print_fstype("")
print_action("")
print_model("")
print_serial("")
umount_device()
update_bar(0)
except Exception as e:
log("Unexpected error: %s" % e )
finally:
log("Done.")
2022-06-11 21:06:59 +02:00
2022-06-12 11:14:38 +02:00
def log_device_info(dev):
logging.info("Device name: %s" % dev.get("DEVNAME"))
logging.info("Path id: %s" % dev.get("ID_PATH"))
logging.info("Bus system: %s" % dev.get("ID_BUS"))
logging.info("USB driver: %s" % dev.get("ID_USB_DRIVER"))
logging.info("Device type: %s" % dev.get("DEVTYPE"))
logging.info("Device usage: %s" % dev.get("ID_FS_USAGE"))
logging.info("Partition type: %s" % dev.get("ID_PART_TABLE_TYPE"))
logging.info("FS type: %s" % dev.get("ID_FS_TYPE"))
logging.info("Partition label: %s" % dev.get("ID_FS_LABEL"))
# logging.info("FS: %s" % dev.get("ID_FS_SYSTEM_ID"))
logging.info("Device model: %s" % dev.get("ID_MODEL"))
# logging.info('Usage: %s' % dev.get("ID_FS_USAGE"))
logging.info('Model: %s' % dev.get("ID_MODEL_ID"))
logging.info('Serial short: %s' % dev.get("ID_SERIAL_SHORT"))
logging.info('Serial: %s' % dev.get("ID_SERIAL"))
# logging.info(os.stat(dev.get("DEVNAME")))
2022-06-11 21:06:59 +02:00
# -----------------------------------------------------------
2022-06-11 18:25:10 +02:00
# pandora
2022-06-11 21:06:59 +02:00
# -----------------------------------------------------------
2022-06-11 18:25:10 +02:00
2022-06-12 00:26:34 +02:00
"""Scan a mount point with Pandora"""
2022-06-12 12:50:04 +02:00
def scan(mount_point, used):
2022-06-12 20:37:49 +02:00
global infected_filed
infected_files = array()
2022-06-12 12:50:04 +02:00
scanned = 0
2022-06-12 20:37:49 +02:00
file_count = 0
scan_start_time = time.time()
2022-06-12 12:50:04 +02:00
if FAKE_SCAN:
for root, dirs, files in os.walk(mount_point):
for file in files:
2022-06-12 20:37:49 +02:00
try :
full_path = os.path.join(root,file)
file_size = os.path.getsize(full_path)
log("Check %-s [%s]" % (file, human_readable_size(file_size)))
file_scan_start_time = time.time()
time.sleep(0.1)
file_scan_end_time = time.time()
log("Check %s (%ds) -> %-s" % (file,(file_scan_end_time - file_scan_start_time),"SKIPPED"))
scanned += os.path.getsize(full_path)
file_count += 1
update_bar(scanned * 100 // used)
except Exception as e :
log("Unexpected error: %s" % e)
return False
2022-06-12 12:50:04 +02:00
update_bar(100)
2022-06-12 20:37:49 +02:00
log("Scan done in %ds" % (time.time() - scan_start_time))
log("%d files scanned" % file_count)
2022-06-12 12:50:04 +02:00
else:
pp = pypandora.PyPandora(root_url=PANDORA_ROOT_URL)
for arg in sys.argv[1:]:
log("Scan %-80s" % arg)
2022-06-11 21:06:59 +02:00
2022-06-12 12:50:04 +02:00
res = pp.submit_from_disk(arg)
2022-06-11 21:06:59 +02:00
2022-06-12 12:50:04 +02:00
while True:
time.sleep(1)
2022-06-11 21:06:59 +02:00
2022-06-12 12:50:04 +02:00
res = pp.task_status(res["taskId"])
2022-06-11 18:25:10 +02:00
2022-06-12 12:50:04 +02:00
if res["status"] != "WAITING":
break
break;
2022-06-11 18:25:10 +02:00
2022-06-12 12:50:04 +02:00
log("Scan %s -> %s" % (arg,res["status"]))
2022-06-12 20:37:49 +02:00
return True
2022-06-11 18:25:10 +02:00
2022-06-11 21:06:59 +02:00
# --------------------------------------
2022-06-11 18:25:10 +02:00
2022-06-12 00:26:34 +02:00
"""Main entry point"""
def main(stdscr):
2022-06-12 20:37:49 +02:00
try :
2022-06-12 11:41:35 +02:00
init_log()
2022-06-11 21:50:01 +02:00
intit_curses()
print_screen()
2022-06-12 20:37:49 +02:00
while True:
device_loop()
except Exception as e :
logging.error("Unexpected error: ", e)
# logging.error(traceback.format_exc())
2022-06-11 21:06:59 +02:00
finally:
2022-06-11 21:50:01 +02:00
end_curses()
2022-06-11 18:25:10 +02:00
2022-06-11 21:06:59 +02:00
# --------------------------------------
2022-06-11 18:25:10 +02:00
2022-06-11 21:06:59 +02:00
if __name__ == "__main__":
2022-06-12 00:26:34 +02:00
wrapper(main)