From 1a29d021cf5a1bd75534973e98beb042dc1ecafd Mon Sep 17 00:00:00 2001 From: dbarzin Date: Sat, 11 Jun 2022 21:06:59 +0200 Subject: [PATCH] reformating --- pandora-box.py | 224 ++++++++++++++++++++++-------------------- tests/detect-usb.py | 96 +++++++++--------- tests/mouse-click.py | 7 +- tests/progress-bar.py | 6 +- tests/scan.py | 18 ++-- tests/screen.py | 22 ++--- 6 files changed, 192 insertions(+), 181 deletions(-) diff --git a/pandora-box.py b/pandora-box.py index 5ce2e59..01220fa 100755 --- a/pandora-box.py +++ b/pandora-box.py @@ -2,168 +2,184 @@ import curses import pypandora -import time +import time import sys import pyudev import psutil +import os -#----------------------------------------------------------- -# Variables -#----------------------------------------------------------- +# ----------------------------------------------------------- +# Config variables +# ----------------------------------------------------------- -NO_SCAN=True -USB_AUTO_MOUNT=True -PANDORA_ROOT_URL="http://127.0.0.1:6100" +NO_SCAN = True +USB_AUTO_MOUNT = True +PANDORA_ROOT_URL = "http://127.0.0.1:6100" -#----------------------------------------------------------- +# ----------------------------------------------------------- # Screen -#----------------------------------------------------------- +# ----------------------------------------------------------- + def intitCurses(): - global screen - screen = curses.initscr() - screen.keypad(1) - curses.curs_set(0) - curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION) - curses.flushinp() - curses.noecho() - screen.clear() + global screen + screen = curses.initscr() + screen.keypad(1) + curses.curs_set(0) + curses.mousemask(curses.ALL_MOUSE_EVENTS | curses.REPORT_MOUSE_POSITION) + curses.flushinp() + curses.noecho() + screen.clear() + def printStatus(strStatus): - screen.addstr(12,0,'Status : %-32s' % strStatus) - screen.refresh() + screen.addstr(12, 0, "Status : %-32s" % strStatus) + screen.refresh() + def printFSLabel(strLabel): - screen.addstr(13,0,'Device : %-32s' % strLabel) - screen.refresh() + screen.addstr(13, 0, "Device : %-32s" % strLabel) + screen.refresh() + def printAction(strAction): - screen.addstr(14,0,'Action : %-64s' % strAction) - screen.refresh() + screen.addstr(14, 0, "Action : %-64s" % strAction) + screen.refresh() + def initBar(): global progress_win progress_win = curses.newwin(3, 62, 3, 16) progress_win.border(0) + def updateBar(progress): global progress_win rangex = (60 / float(100)) * progress pos = int(rangex) - display = '#' + display = "#" if pos != 0: progress_win.addstr(1, pos, "{}".format(display)) progress_win.refresh() + def printScreen(): - screen.addstr(1,0," ██▓███ ▄▄▄ ███▄ █ ▓█████▄ ▒█████ ██▀███ ▄▄▄ ▄▄▄▄ ▒█████ ▒██ ██▒") - screen.addstr(2,0," ▓██░ ██▒▒████▄ ██ ▀█ █ ▒██▀ ██▌▒██▒ ██▒▓██ ▒ ██▒▒████▄ ▓█████▄ ▒██▒ ██▒▒▒ █ █ ▒░") - screen.addstr(3,0," ▓██░ ██▓▒▒██ ▀█▄ ▓██ ▀█ ██▒░██ █▌▒██░ ██▒▓██ ░▄█ ▒▒██ ▀█▄ ▒██▒ ▄██▒██░ ██▒░░ █ ░") - screen.addstr(4,0," ▒██▄█▓▒ ▒░██▄▄▄▄██ ▓██▒ ▐▌██▒░▓█▄ ▌▒██ ██░▒██▀▀█▄ ░██▄▄▄▄██ ▒██░█▀ ▒██ ██░ ░ █ █ ▒ ") - screen.addstr(5,0," ▒██▒ ░ ░ ▓█ ▓██▒▒██░ ▓██░░▒████▓ ░ ████▓▒░░██▓ ▒██▒ ▓█ ▓██▒ ░▓█ ▀█▓░ ████▓▒░▒██▒ ▒██▒") - screen.addstr(6,0," ▒▓▒░ ░ ░ ▒▒ ▓▒█░░ ▒░ ▒ ▒ ▒▒▓ ▒ ░ ▒░▒░▒░ ░ ▒▓ ░▒▓░ ▒▒ ▓▒█░ ░▒▓███▀▒░ ▒░▒░▒░ ▒▒ ░ ░▓ ░") - screen.addstr(7,0," ░▒ ░ ▒ ▒▒ ░░ ░░ ░ ▒░ ░ ▒ ▒ ░ ▒ ▒░ ░▒ ░ ▒░ ▒ ▒▒ ░ ▒░▒ ░ ░ ▒ ▒░ ░░ ░▒ ░") - screen.addstr(8,0," ░░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ░░ ░ ░ ▒ ░ ░ ░ ░ ░ ▒ ░ ░ ") - screen.addstr(9,0," ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ") - screen.addstr(10,0," ░ ░ ") - printStatus("WAITING") - printFSLabel('') - printAction('') - initBar() - updateBar(1) + screen.addstr(1, 0, " ██▓███ ▄▄▄ ███▄ █ ▓█████▄ ▒█████ ██▀███ ▄▄▄ ▄▄▄▄ ▒█████ ▒██ ██▒") + screen.addstr(2, 0, " ▓██░ ██▒▒████▄ ██ ▀█ █ ▒██▀ ██▌▒██▒ ██▒▓██ ▒ ██▒▒████▄ ▓█████▄ ▒██▒ ██▒▒▒ █ █ ▒░") + screen.addstr(3, 0, " ▓██░ ██▓▒▒██ ▀█▄ ▓██ ▀█ ██▒░██ █▌▒██░ ██▒▓██ ░▄█ ▒▒██ ▀█▄ ▒██▒ ▄██▒██░ ██▒░░ █ ░") + screen.addstr(4, 0, " ▒██▄█▓▒ ▒░██▄▄▄▄██ ▓██▒ ▐▌██▒░▓█▄ ▌▒██ ██░▒██▀▀█▄ ░██▄▄▄▄██ ▒██░█▀ ▒██ ██░ ░ █ █ ▒ ") + screen.addstr(5, 0, " ▒██▒ ░ ░ ▓█ ▓██▒▒██░ ▓██░░▒████▓ ░ ████▓▒░░██▓ ▒██▒ ▓█ ▓██▒ ░▓█ ▀█▓░ ████▓▒░▒██▒ ▒██▒") + screen.addstr(6, 0, " ▒▓▒░ ░ ░ ▒▒ ▓▒█░░ ▒░ ▒ ▒ ▒▒▓ ▒ ░ ▒░▒░▒░ ░ ▒▓ ░▒▓░ ▒▒ ▓▒█░ ░▒▓███▀▒░ ▒░▒░▒░ ▒▒ ░ ░▓ ░") + screen.addstr(7, 0, " ░▒ ░ ▒ ▒▒ ░░ ░░ ░ ▒░ ░ ▒ ▒ ░ ▒ ▒░ ░▒ ░ ▒░ ▒ ▒▒ ░ ▒░▒ ░ ░ ▒ ▒░ ░░ ░▒ ░") + screen.addstr(8, 0, " ░░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ░░ ░ ░ ▒ ░ ░ ░ ░ ░ ▒ ░ ░ ") + screen.addstr(9, 0, " ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ") + screen.addstr(10, 0, " ░ ░ ") + printStatus("WAITING") + printFSLabel("") + printAction("") + initBar() + updateBar(1) + def endCurses(): - curses.endwin() - curses.flushinp() + curses.endwin() + curses.flushinp() -#----------------------------------------------------------- + +# ----------------------------------------------------------- # device -#----------------------------------------------------------- +# ----------------------------------------------------------- + -# mount device def mountDevice(device): if USB_AUTO_MOUNT: - found=False - loop=0 - while (not found) and (loop<10): - # need to sleep before devide is mounted + found = False + loop = 0 + while (not found) and (loop < 10): + # need to sleep before devide is mounted time.sleep(1) for partition in psutil.disk_partitions(): - if partition.device == device.device_node: - printAction("Mounted at {}".format(partition.mountpoint)) - found=True - loop+=1 + if partition.device == device.device_node: + printAction("Mounted at {}".format(partition.mountpoint)) + found = True + loop += 1 else: - printAction("mount device to /media/box") - res=os.system("pmount "+device.device_node+" box") - #print("Return type: ", res) + printAction("mount device to /media/box") + res = os.system("pmount " + device.device_node + " box") + # print("Return type: ", res) + -# unmount device def umountDevice(): - if not USB_AUTO_MOUNT: - printAction("unmount device /media/box") - res = os.system("pumount /media/box") - # print("Return type: ", res) + if not USB_AUTO_MOUNT: + printAction("unmount device /media/box") + res = os.system("pumount /media/box") + # print("Return type: ", res) + def deviceLoop(): - context = pyudev.Context() - monitor = pyudev.Monitor.from_netlink(context) - monitor.filter_by('block') - for device in iter(monitor.poll, None): - if 'ID_FS_TYPE' in device: - if device.action == 'add': - if device.device_node[5:7] == 'sd' and device.get('DEVTYPE')=='partition': - #print("New device {}".format(device.device_node)) - mountDevice(device) - # display device type - printStatus("KEY INSERTED") - printFSLabel(device.get('ID_FS_LABEL')) + context = pyudev.Context() + monitor = pyudev.Monitor.from_netlink(context) + monitor.filter_by("block") + for device in iter(monitor.poll, None): + if "ID_FS_TYPE" in device: + if device.action == "add": + if device.device_node[5:7] == "sd" and device.get("DEVTYPE") == "partition": + # print("New device {}".format(device.device_node)) + mountDevice(device) + # display device type + printStatus("KEY INSERTED") + printFSLabel(device.get("ID_FS_LABEL")) - if device.action == 'remove': - if device.device_node[5:7] == 'sd' and device.get('DEVTYPE')=='partition': - printStatus("WAITING") - printAction('Device removed') - printFSLabel('') - umountDevice() + if device.action == "remove": + if device.device_node[5:7] == "sd" and device.get("DEVTYPE") == "partition": + printStatus("WAITING") + printAction("Device removed") + printFSLabel("") + umountDevice() -#----------------------------------------------------------- + +# ----------------------------------------------------------- # pandora -#----------------------------------------------------------- +# ----------------------------------------------------------- + -# scan device at mountPoint def scan(mountPoint): - pp = pypandora.PyPandora(root_url= PANDORA_ROOT_URL) + pp = pypandora.PyPandora(root_url=PANDORA_ROOT_URL) - for arg in sys.argv[1:]: - print(arg,end='',flush=True) - print(":",end='',flush=True) + for arg in sys.argv[1:]: + print(arg, end="", flush=True) + print(":", end="", flush=True) - res = pp.submit_from_disk(arg) + res = pp.submit_from_disk(arg) - while True: - print('.',end='',flush=True) - time.sleep(1) + while True: + print(".", end="", flush=True) + time.sleep(1) - res = pp.task_status(res['taskId']) + res = pp.task_status(res["taskId"]) - if res['status']!='WAITING': - break + if res["status"] != "WAITING": + break - print(res['status']) + print(res["status"]) -#-------------------------------------- -intitCurses() -printScreen() -deviceLoop() +# -------------------------------------- -while True: - key = screen.getch() - if key == curses.KEY_MOUSE: - break - if key == 27: - break -endCurses() +def pandoraBox(): + try: + intitCurses() + printScreen() + deviceLoop() + finally: + endCurses() + + +# -------------------------------------- + + +if __name__ == "__main__": + pandoraBox() diff --git a/tests/detect-usb.py b/tests/detect-usb.py index 7e99cd8..616c97a 100755 --- a/tests/detect-usb.py +++ b/tests/detect-usb.py @@ -7,59 +7,57 @@ import os context = pyudev.Context() monitor = pyudev.Monitor.from_netlink(context) -monitor.filter_by('block') +monitor.filter_by("block") + +autoMount = False -autoMount=False def printDeviceInfo(dev): - print('') - print('') - print('Device name: %s' % dev.get('DEVNAME')) - print('Device type: %s' % dev.get('DEVTYPE')) - print('Bus system: %s' % dev.get('ID_BUS')) - print('Partition label: %s' % dev.get('ID_FS_LABEL')) - print('FS: %s' % dev.get('ID_FS_SYSTEM_ID')) - print('FS type: %s' % dev.get('ID_FS_TYPE')) - print('Device usage: %s' % dev.get('ID_FS_USAGE')) - print('Device model: %s' % dev.get('ID_MODEL')) - print('Partition type: %s' % dev.get('ID_PART_TABLE_TYPE')) - print('USB driver: %s' % dev.get('ID_USB_DRIVER')) - print('Path id: %s' % dev.get('ID_PATH')) - #print('Capacity: %s' % stdOut[0].strip()) - print('') + print("") + print("") + print("Device name: %s" % dev.get("DEVNAME")) + print("Device type: %s" % dev.get("DEVTYPE")) + print("Bus system: %s" % dev.get("ID_BUS")) + print("Partition label: %s" % dev.get("ID_FS_LABEL")) + print("FS: %s" % dev.get("ID_FS_SYSTEM_ID")) + print("FS type: %s" % dev.get("ID_FS_TYPE")) + print("Device usage: %s" % dev.get("ID_FS_USAGE")) + print("Device model: %s" % dev.get("ID_MODEL")) + print("Partition type: %s" % dev.get("ID_PART_TABLE_TYPE")) + print("USB driver: %s" % dev.get("ID_USB_DRIVER")) + print("Path id: %s" % dev.get("ID_PATH")) + # print('Capacity: %s' % stdOut[0].strip()) + print("") + # enumerate at device connection for device in iter(monitor.poll, None): - if 'ID_FS_TYPE' in device: - if device.action == 'add': - if device.device_node[5:7] == 'sd' and device.get('DEVTYPE')=='partition': - printDeviceInfo(device) - print("New device {}".format(device.device_node)) - # loop until device is mounted - if autoMount: - found=False - loop=0 - while (not found) and (loop<10): - # need to sleep before devide is mounted - time.sleep(1) - for partition in psutil.disk_partitions(): - if partition.device == device.device_node: - print("Mounted at {}".format(partition.mountpoint)) - found=True - loop+=1 - else: - print("mount device to /media/box") - res=os.system("pmount "+device.device_node+" box") - print("Return type: ", res) - - if device.action == 'remove': - if device.device_node[5:7] == 'sd' and device.get('DEVTYPE')=='partition': - print('Device removed') - if not autoMount: - print("unmount device /media/box") - res = os.system("pumount /media/box") - print("Return type: ", res) - - - + if "ID_FS_TYPE" in device: + if device.action == "add": + if device.device_node[5:7] == "sd" and device.get("DEVTYPE") == "partition": + printDeviceInfo(device) + print("New device {}".format(device.device_node)) + # loop until device is mounted + if autoMount: + found = False + loop = 0 + while (not found) and (loop < 10): + # need to sleep before devide is mounted + time.sleep(1) + for partition in psutil.disk_partitions(): + if partition.device == device.device_node: + print("Mounted at {}".format(partition.mountpoint)) + found = True + loop += 1 + else: + print("mount device to /media/box") + res = os.system("pmount " + device.device_node + " box") + print("Return type: ", res) + if device.action == "remove": + if device.device_node[5:7] == "sd" and device.get("DEVTYPE") == "partition": + print("Device removed") + if not autoMount: + print("unmount device /media/box") + res = os.system("pumount /media/box") + print("Return type: ", res) diff --git a/tests/mouse-click.py b/tests/mouse-click.py index 22dcc4e..49f2273 100755 --- a/tests/mouse-click.py +++ b/tests/mouse-click.py @@ -13,15 +13,12 @@ screen.clear() while True: key = screen.getch() screen.clear() - screen.addstr(0, 0, 'key: {}'.format(key)) + screen.addstr(0, 0, "key: {}".format(key)) if key == curses.KEY_MOUSE: _, x, y, _, button = curses.getmouse() - screen.addstr(1, 0, 'x, y, button = {}, {}, {}'.format(x, y, button)) + screen.addstr(1, 0, "x, y, button = {}, {}, {}".format(x, y, button)) elif key == 27: break curses.endwin() curses.flushinp() - - - diff --git a/tests/progress-bar.py b/tests/progress-bar.py index b0da933..5bc272e 100755 --- a/tests/progress-bar.py +++ b/tests/progress-bar.py @@ -5,20 +5,23 @@ import time curses.initscr() + def initBar(): global progress_win progress_win = curses.newwin(3, 62, 3, 10) progress_win.border(0) + def updateBar(progress): global progress_win rangex = (60 / float(100)) * progress pos = int(rangex) - display = '#' + display = "#" if pos != 0: progress_win.addstr(1, pos, "{}".format(display)) progress_win.refresh() + initBar() loading = 0 while loading < 100: @@ -30,4 +33,3 @@ time.sleep(1) curses.endwin() curses.flushinp() - diff --git a/tests/scan.py b/tests/scan.py index e5d585b..3ce7a90 100755 --- a/tests/scan.py +++ b/tests/scan.py @@ -1,26 +1,24 @@ #!/usr/bin/python3 import pypandora -import time +import time import sys -pp = pypandora.PyPandora(root_url= 'http://127.0.0.1:6100') +pp = pypandora.PyPandora(root_url="http://127.0.0.1:6100") for arg in sys.argv[1:]: - print(arg,end='',flush=True) - print(":",end='',flush=True) + print(arg, end="", flush=True) + print(":", end="", flush=True) res = pp.submit_from_disk(arg) while True: - print('.',end='',flush=True) + print(".", end="", flush=True) time.sleep(1) - res = pp.task_status(res['taskId']) + res = pp.task_status(res["taskId"]) - if res['status']!='WAITING': + if res["status"] != "WAITING": break - print(res['status']) - - + print(res["status"]) diff --git a/tests/screen.py b/tests/screen.py index 92b1a5c..c3553a8 100755 --- a/tests/screen.py +++ b/tests/screen.py @@ -10,17 +10,17 @@ curses.flushinp() curses.noecho() screen.clear() -screen.addstr(1,0," ██▓███ ▄▄▄ ███▄ █ ▓█████▄ ▒█████ ██▀███ ▄▄▄ ▄▄▄▄ ▒█████ ▒██ ██▒") -screen.addstr(2,0," ▓██░ ██▒▒████▄ ██ ▀█ █ ▒██▀ ██▌▒██▒ ██▒▓██ ▒ ██▒▒████▄ ▓█████▄ ▒██▒ ██▒▒▒ █ █ ▒░") -screen.addstr(3,0," ▓██░ ██▓▒▒██ ▀█▄ ▓██ ▀█ ██▒░██ █▌▒██░ ██▒▓██ ░▄█ ▒▒██ ▀█▄ ▒██▒ ▄██▒██░ ██▒░░ █ ░") -screen.addstr(4,0," ▒██▄█▓▒ ▒░██▄▄▄▄██ ▓██▒ ▐▌██▒░▓█▄ ▌▒██ ██░▒██▀▀█▄ ░██▄▄▄▄██ ▒██░█▀ ▒██ ██░ ░ █ █ ▒ ") -screen.addstr(5,0," ▒██▒ ░ ░ ▓█ ▓██▒▒██░ ▓██░░▒████▓ ░ ████▓▒░░██▓ ▒██▒ ▓█ ▓██▒ ░▓█ ▀█▓░ ████▓▒░▒██▒ ▒██▒") -screen.addstr(6,0," ▒▓▒░ ░ ░ ▒▒ ▓▒█░░ ▒░ ▒ ▒ ▒▒▓ ▒ ░ ▒░▒░▒░ ░ ▒▓ ░▒▓░ ▒▒ ▓▒█░ ░▒▓███▀▒░ ▒░▒░▒░ ▒▒ ░ ░▓ ░") -screen.addstr(7,0," ░▒ ░ ▒ ▒▒ ░░ ░░ ░ ▒░ ░ ▒ ▒ ░ ▒ ▒░ ░▒ ░ ▒░ ▒ ▒▒ ░ ▒░▒ ░ ░ ▒ ▒░ ░░ ░▒ ░") -screen.addstr(8,0," ░░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ░░ ░ ░ ▒ ░ ░ ░ ░ ░ ▒ ░ ░ ") -screen.addstr(9,0," ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ") -screen.addstr(10,0," ░ ░ ") -screen.addstr(11,0,"READY."); +screen.addstr(1, 0, " ██▓███ ▄▄▄ ███▄ █ ▓█████▄ ▒█████ ██▀███ ▄▄▄ ▄▄▄▄ ▒█████ ▒██ ██▒") +screen.addstr(2, 0, " ▓██░ ██▒▒████▄ ██ ▀█ █ ▒██▀ ██▌▒██▒ ██▒▓██ ▒ ██▒▒████▄ ▓█████▄ ▒██▒ ██▒▒▒ █ █ ▒░") +screen.addstr(3, 0, " ▓██░ ██▓▒▒██ ▀█▄ ▓██ ▀█ ██▒░██ █▌▒██░ ██▒▓██ ░▄█ ▒▒██ ▀█▄ ▒██▒ ▄██▒██░ ██▒░░ █ ░") +screen.addstr(4, 0, " ▒██▄█▓▒ ▒░██▄▄▄▄██ ▓██▒ ▐▌██▒░▓█▄ ▌▒██ ██░▒██▀▀█▄ ░██▄▄▄▄██ ▒██░█▀ ▒██ ██░ ░ █ █ ▒ ") +screen.addstr(5, 0, " ▒██▒ ░ ░ ▓█ ▓██▒▒██░ ▓██░░▒████▓ ░ ████▓▒░░██▓ ▒██▒ ▓█ ▓██▒ ░▓█ ▀█▓░ ████▓▒░▒██▒ ▒██▒") +screen.addstr(6, 0, " ▒▓▒░ ░ ░ ▒▒ ▓▒█░░ ▒░ ▒ ▒ ▒▒▓ ▒ ░ ▒░▒░▒░ ░ ▒▓ ░▒▓░ ▒▒ ▓▒█░ ░▒▓███▀▒░ ▒░▒░▒░ ▒▒ ░ ░▓ ░") +screen.addstr(7, 0, " ░▒ ░ ▒ ▒▒ ░░ ░░ ░ ▒░ ░ ▒ ▒ ░ ▒ ▒░ ░▒ ░ ▒░ ▒ ▒▒ ░ ▒░▒ ░ ░ ▒ ▒░ ░░ ░▒ ░") +screen.addstr(8, 0, " ░░ ░ ▒ ░ ░ ░ ░ ░ ░ ░ ░ ░ ▒ ░░ ░ ░ ▒ ░ ░ ░ ░ ░ ▒ ░ ░ ") +screen.addstr(9, 0, " ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ░ ") +screen.addstr(10, 0, " ░ ░ ") +screen.addstr(11, 0, "READY.") while True: key = screen.getch()