From 6c831b1e01b5d6fc44d66be34d8e30322f5a8075 Mon Sep 17 00:00:00 2001 From: dbarzin Date: Sun, 12 Jun 2022 00:26:34 +0200 Subject: [PATCH] work in progress --- pandorabox.py | 168 +++++++++++++++++++++++++++++++----------- tests/detect-usb.py | 8 ++ tests/progress-bar.py | 10 +-- 3 files changed, 138 insertions(+), 48 deletions(-) diff --git a/pandorabox.py b/pandorabox.py index 48be1d5..e7f5012 100755 --- a/pandorabox.py +++ b/pandorabox.py @@ -3,6 +3,7 @@ """Pandora-Box is a USB scaning station based on Pandora.""" import curses +from curses import wrapper import pypandora import time import sys @@ -18,7 +19,6 @@ NO_SCAN = True USB_AUTO_MOUNT = True PANDORA_ROOT_URL = "http://127.0.0.1:6100" - # ----------------------------------------------------------- # Screen # ----------------------------------------------------------- @@ -32,73 +32,132 @@ def intit_curses(): curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION) curses.flushinp() curses.noecho() - screen.clear() +# screen.clear() """Print status string""" def print_status(strStatus): - screen.addstr(12, 0, "Status : %-32s" % strStatus) - screen.refresh() - - -"""Print FS Label""" -def print_fslabel(strLabel): - screen.addstr(13, 0, "Device : %-32s" % strLabel) - screen.refresh() - + global status_win + #status_win.addstr(1, 1, "Status : %-32s" % strStatus, curses.color_pair(2)) + #status_win.refresh() """Print current action""" -def print_action(strAction): - screen.addstr(14, 0, "Action : %-64s" % strAction) - screen.refresh() +def print_action(label): + global status_win + #status_win.addstr(3, 1, "Action : %-32s" % label, curses.color_pair(2)) + #status_win.refresh() +"""Print FS Label""" +def print_fslabel(label): + global status_win + status_win.addstr(1, 1, "Partition : %-32s" % label, curses.color_pair(2)) + status_win.refresh() + +"""Print FS Size""" +def print_size(label): + global status_win + if label == 0.0: + status_win.addstr(2, 1, "Size : ",curses.color_pair(2)) + else: + status_win.addstr(2, 1, "Size : %4.1fGB " % label,curses.color_pair(2)) + status_win.refresh() + +"""Print FS Used Size""" +def print_used(label): + global status_win + if label == 0.0: + status_win.addstr(3, 1, "Used : ",curses.color_pair(2)) + else: + status_win.addstr(3, 1, "Used : %4.1fGB " % label,curses.color_pair(2)) + status_win.refresh() + +def print_fstype(label): + global status_win + status_win.addstr(1, 50, "FS Type : %-32s" % label, curses.color_pair(2)) + status_win.refresh() + +def print_model(label): + global status_win + status_win.addstr(2, 50, "Model : %-32s" % label, curses.color_pair(2)) + status_win.refresh() + +def print_serial(label): + global status_win + status_win.addstr(3, 50, "Serial : %-32s" % label, curses.color_pair(2)) + status_win.refresh() """Initialise progress bar""" def init_bar(): global progress_win - progress_win = curses.newwin(3, 62, 3, 16) - progress_win.border(0) - + progress_win = curses.newwin(3, 83, 18, 10) + # progress_win.border(1) + progress_win.refresh() """Update progress bar""" def update_bar(progress): global progress_win - rangex = (60 / float(100)) * progress - pos = int(rangex) - display = "#" - if pos != 0: - progress_win.addstr(1, pos, "{}".format(display)) - progress_win.refresh() + if progress == 0: + #progress_win.clear() + progress_win.border(1) + else: + pos = (80 * progress) // 100 + progress_win.addstr(1, pos+1, "#") + progress_win.refresh() +def init_log(): + global log_win + log_win = curses.newwin(10, 101, 22, 0) + log_win.border(0) +def log(str): + log_win.addstr(1,1,str,curses.color_pair(3)) + log_win.refresh() + +"""Print main screen""" def print_screen(): - screen.addstr(1, 0, " ██▓███ ▄▄▄ ███▄ █ ▓█████▄ ▒█████ ██▀███ ▄▄▄ ▄▄▄▄ ▒█████ ▒██ ██▒") - screen.addstr(2, 0, " ▓██░ ██▒▒████▄ ██ ▀█ █ ▒██▀ ██▌▒██▒ ██▒▓██ ▒ ██▒▒████▄ ▓█████▄ ▒██▒ ██▒▒▒ █ █ ▒░") - screen.addstr(3, 0, " ▓██░ ██▓▒▒██ ▀█▄ ▓██ ▀█ ██▒░██ █▌▒██░ ██▒▓██ ░▄█ ▒▒██ ▀█▄ ▒██▒ ▄██▒██░ ██▒░░ █ ░") - screen.addstr(4, 0, " ▒██▄█▓▒ ▒░██▄▄▄▄██ ▓██▒ ▐▌██▒░▓█▄ ▌▒██ ██░▒██▀▀█▄ ░██▄▄▄▄██ ▒██░█▀ ▒██ ██░ ░ █ █ ▒ ") - screen.addstr(5, 0, " ▒██▒ ░ ░ ▓█ ▓██▒▒██░ ▓██░░▒████▓ ░ ████▓▒░░██▓ ▒██▒ ▓█ ▓██▒ ░▓█ ▀█▓░ ████▓▒░▒██▒ ▒██▒") - screen.addstr(6, 0, " ▒▓▒░ ░ ░ ▒▒ ▓▒█░░ ▒░ ▒ ▒ ▒▒▓ ▒ ░ ▒░▒░▒░ ░ ▒▓ ░▒▓░ ▒▒ ▓▒█░ ░▒▓███▀▒░ ▒░▒░▒░ ▒▒ ░ ░▓ ░") - screen.addstr(7, 0, " ░▒ ░ ▒ ▒▒ ░░ ░░ ░ ▒░ ░ ▒ ▒ ░ ▒ ▒░ ░▒ ░ ▒░ ▒ ▒▒ ░ ▒░▒ ░ ░ ▒ ▒░ ░░ ░▒ ░") - screen.addstr(8, 0, " ░░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ░░ ░ ░ ▒ ░ ░ ░ ░ ░ ▒ ░ ░ ") - screen.addstr(9, 0, " ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ") - screen.addstr(10, 0, " ░ ░ ") + global status_win + curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK) + curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_BLACK) + curses.init_pair(3, curses.COLOR_GREEN, curses.COLOR_BLACK) + title_win = curses.newwin(12, 101, 0, 0) + title_win.border(0) + title_win.addstr(1, 1, " ██▓███ ▄▄▄ ███▄ █ ▓█████▄ ▒█████ ██▀███ ▄▄▄ ▄▄▄▄ ▒█████ ▒██ ██▒",curses.color_pair(1)) + title_win.addstr(2, 1, " ▓██░ ██▒▒████▄ ██ ▀█ █ ▒██▀ ██▌▒██▒ ██▒▓██ ▒ ██▒▒████▄ ▓█████▄ ▒██▒ ██▒▒▒ █ █ ▒░",curses.color_pair(1)) + title_win.addstr(3, 1, " ▓██░ ██▓▒▒██ ▀█▄ ▓██ ▀█ ██▒░██ █▌▒██░ ██▒▓██ ░▄█ ▒▒██ ▀█▄ ▒██▒ ▄██▒██░ ██▒░░ █ ░",curses.color_pair(1)) + title_win.addstr(4, 1, " ▒██▄█▓▒ ▒░██▄▄▄▄██ ▓██▒ ▐▌██▒░▓█▄ ▌▒██ ██░▒██▀▀█▄ ░██▄▄▄▄██ ▒██░█▀ ▒██ ██░ ░ █ █ ▒ ",curses.color_pair(1)) + title_win.addstr(5, 1, " ▒██▒ ░ ░ ▓█ ▓██▒▒██░ ▓██░░▒████▓ ░ ████▓▒░░██▓ ▒██▒ ▓█ ▓██▒ ░▓█ ▀█▓░ ████▓▒░▒██▒ ▒██▒",curses.color_pair(1)) + title_win.addstr(6, 1, " ▒▓▒░ ░ ░ ▒▒ ▓▒█░░ ▒░ ▒ ▒ ▒▒▓ ▒ ░ ▒░▒░▒░ ░ ▒▓ ░▒▓░ ▒▒ ▓▒█░ ░▒▓███▀▒░ ▒░▒░▒░ ▒▒ ░ ░▓ ░",curses.color_pair(1)) + title_win.addstr(7, 1, " ░▒ ░ ▒ ▒▒ ░░ ░░ ░ ▒░ ░ ▒ ▒ ░ ▒ ▒░ ░▒ ░ ▒░ ▒ ▒▒ ░ ▒░▒ ░ ░ ▒ ▒░ ░░ ░▒ ░",curses.color_pair(1)) + title_win.addstr(8, 1, " ░░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ░░ ░ ░ ▒ ░ ░ ░ ░ ░ ▒ ░ ░ ",curses.color_pair(1)) + title_win.addstr(9, 1, " ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ",curses.color_pair(1)) + title_win.addstr(10, 1, " ░ ░ ",curses.color_pair(1)) + title_win.refresh() + status_win = curses.newwin(5, 101, 12, 0) + status_win.border(0) print_status("WAITING") print_fslabel("") + print_size(0.0) + print_used(0.0) + print_fstype("") print_action("") + print_model("") + print_serial("") init_bar() - update_bar(1) - + update_bar(0) + init_log() + log('Ready.') +"""Closes curses""" def end_curses(): curses.endwin() curses.flushinp() # ----------------------------------------------------------- -# device +# Device # ----------------------------------------------------------- - +"""Mount USB device""" def mount_device(device): if USB_AUTO_MOUNT: found = False @@ -111,12 +170,20 @@ def mount_device(device): print_action("Mounted at {}".format(partition.mountpoint)) found = True loop += 1 + if loop < 10: + return partition.mountpoint + else: + return "" else: print_action("mount device to /media/box") res = os.system("pmount " + device.device_node + " box") - # print("Return type: ", res) + if res == 1: + return "/media/box" + else: + return "" +"""Unmount USB device""" def umount_device(): if not USB_AUTO_MOUNT: print_action("unmount device /media/box") @@ -124,6 +191,7 @@ def umount_device(): # print("Return type: ", res) +"""Main device loop""" def device_loop(): context = pyudev.Context() monitor = pyudev.Monitor.from_netlink(context) @@ -131,17 +199,31 @@ def device_loop(): for device in iter(monitor.poll, None): if device.get("ID_FS_USAGE") == "filesystem" and device.device_node[5:7] == "sd": if device.action == "add": - # print("New device {}".format(device.device_node)) - mount_device(device) # display device type print_status("KEY INSERTED") print_fslabel(device.get("ID_FS_LABEL")) + print_fstype(device.get("ID_PART_TABLE_TYPE")) + print_model(device.get("ID_MODEL")) + print_serial(device.get("ID_SERIAL_SHORT")) + # Mount device + mount_point = mount_device(device) + statvfs=os.statvfs(mount_point) + print_size(statvfs.f_frsize * statvfs.f_blocks // 1024 // 1024 / 1024) + print_used(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree) // 1024 // 1024 / 1024) + # fake scan + loading = 0 + while loading < 100: + loading += 1 + time.sleep(0.03) + update_bar(loading) + if device.action == "remove": print_status("WAITING") print_action("Device removed") print_fslabel("") umount_device() + update_bar(0) # ----------------------------------------------------------- @@ -149,6 +231,7 @@ def device_loop(): # ----------------------------------------------------------- +"""Scan a mount point with Pandora""" def scan(mountPoint): pp = pypandora.PyPandora(root_url=PANDORA_ROOT_URL) @@ -173,7 +256,8 @@ def scan(mountPoint): # -------------------------------------- -def pandorabox(): +"""Main entry point""" +def main(stdscr): try: intit_curses() print_screen() @@ -187,4 +271,4 @@ def pandorabox(): if __name__ == "__main__": - pandorabox() + wrapper(main) diff --git a/tests/detect-usb.py b/tests/detect-usb.py index cfca7cd..e583bb4 100755 --- a/tests/detect-usb.py +++ b/tests/detect-usb.py @@ -27,6 +27,11 @@ def printDeviceInfo(dev): print("USB driver: %s" % dev.get("ID_USB_DRIVER")) print("Path id: %s" % dev.get("ID_PATH")) print('Usage: %s' % dev.get("ID_FS_USAGE")) + print('Serial short: %s' % dev.get("ID_SERIAL_SHORT")) + print('Serial: %s' % dev.get("ID_SERIAL")) + print('Model: %s' % dev.get("ID_MODEL_ID")) + print(os.stat(dev.get("DEVNAME"))) + print("") print("") @@ -47,6 +52,9 @@ for device in iter(monitor.poll, None): for partition in psutil.disk_partitions(): if partition.device == device.device_node: print("Mounted at {}".format(partition.mountpoint)) + statvfs=os.statvfs(partition.mountpoint) + print("size %4.1fGB"% (statvfs.f_frsize * statvfs.f_blocks // 1024 // 1024 / 1024)) + print("used %4.1fGB"% (statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree) // 1024 // 1024 / 1024)) found = True loop += 1 else: diff --git a/tests/progress-bar.py b/tests/progress-bar.py index 5bc272e..2dc1dfc 100755 --- a/tests/progress-bar.py +++ b/tests/progress-bar.py @@ -14,11 +14,9 @@ def initBar(): def updateBar(progress): global progress_win - rangex = (60 / float(100)) * progress - pos = int(rangex) - display = "#" - if pos != 0: - progress_win.addstr(1, pos, "{}".format(display)) + pos = (60 * progress) // 100 + if pos != 0 : + progress_win.addstr(1, pos, "{}".format("#")) progress_win.refresh() @@ -31,5 +29,5 @@ while loading < 100: time.sleep(1) -curses.endwin() +#curses.endwin() curses.flushinp()