From ee03e9320267e82a9142dfed8ad397bef3734357 Mon Sep 17 00:00:00 2001 From: dbarzin Date: Sun, 12 Feb 2023 18:10:04 +0100 Subject: [PATCH] add states tests --- pandora-box.py | 225 ++++++++++++++++++++++++++++++++++++++---------- tests/states.py | 41 +++++++++ 2 files changed, 222 insertions(+), 44 deletions(-) mode change 100644 => 100755 pandora-box.py create mode 100755 tests/states.py diff --git a/pandora-box.py b/pandora-box.py old mode 100644 new mode 100755 index 2693c30..94518eb --- a/pandora-box.py +++ b/pandora-box.py @@ -1,8 +1,9 @@ #!/usr/bin/python3 # -# This file is part of the Pandora-box distribution (https://github.com/dbarzin/pandora-box). +# This file is part of the Pandora-box distribution. +# https://github.com/dbarzin/pandora-box # Copyright (c) 2022 Didier Barzin. -# +# # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, version 3. @@ -29,6 +30,173 @@ import configparser import shutil from datetime import datetime +# Abstract Base Class +from abc import ABC, abstractmethod + +# ----------------------------------------------------------- +# States +# ----------------------------------------------------------- + +class Context: + """ + The Context defines the interface of interest to clients. It also maintains + a reference to an instance of a State subclass, which represents the current + state of the Context. + """ + + _state = None + """ + A reference to the current state of the Context. + """ + + def __init__(self, state: State) -> None: + self.transition_to(state) + + def transition_to(self, state: State): + """ + The Context allows changing the State object at runtime. + """ + + print(f"Context: Transition to {type(state).__name__}") + self._state = state + self._state.context = self + + """ + The Context delegates part of its behavior to the current State object. + """ + + def initalize(self): + self._state.handleInitialize() + + def wait(self): + self._state.handleWait() + + def scan(self): + self._state.handleScan() + + def askClean(self): + self._state.() + + def clean(self): + self._state.handleClean() + + +class State(ABC): + """ + The base State class declares methods that all Concrete State should + implement and also provides a backreference to the Context object, + associated with the State. This backreference can be used by States to + transition the Context to another State. + """ + + @property + def context(self) -> Context: + return self._context + + @context.setter + def context(self, context: Context) -> None: + self._context = context + + @abstractmethod + def handleInitialize(self) -> None: + pass + + @abstractmethod + def handleWait(self) -> None: + pass + + @abstractmethod + def handleDeviceInserted(self) -> None: + pass + + @abstractmethod + def handleInsertKey(self) -> None: + pass + + @abstractmethod + def handleScan(self) -> None: + pass + + @abstractmethod + def handleClean(self) -> None: + pass + + +# Initilize +class ConcreteStateInitialize(State): + def handleInitialize(self) -> None: + + moveToScriptFolder() + init_log() + config() + init_curses() + print_screen() + + self.context.transition_to(ConcreteStateWait()) + + +# Wait loop +class ConcreteStateWait(State): + def handleWait(self) -> None: + + # First unmount remaining device + umount_device() + # Loop + context = pyudev.Context() + monitor = pyudev.Monitor.from_netlink(context) + monitor.filter_by("block") + + for device in iter(monitor.poll, None): + if device.get("ID_FS_USAGE") == "filesystem" and device.device_node[5:7] == "sd": + if device.action == "add": + + + + ...... + + self.context.transition_to(ConcreteStatDeviceInserted()) + +class ConcreteStateDeviceInserted(State): + def handleDeviceInserted(self) -> None: + + log("Device inserted") + + log_device_info(device) + if not CURSES: + display_image("WORK") + else: + # display device type + print_fslabel(device.get("ID_FS_LABEL")) + print_fstype(device.get("ID_PART_TABLE_TYPE") + " " + device.get("ID_FS_TYPE")) + print_model(device.get("ID_MODEL")) + print_serial(device.get("ID_SERIAL_SHORT")) + # Mount device + mount_point = mount_device(device) + log('Partition mounted at %s' % mount_point) + if mount_point == None: + # no partition + if not CURSES: + display_image("WAIT") + continue + try: + statvfs=os.statvfs(mount_point) + except Exception as e : + log("Unexpected error: %s" % e) + logging.info("An exception was thrown!", exc_info=True) + if not CURSES: + display_image("WAIT") + continue + print_size(human_readable_size(statvfs.f_frsize * statvfs.f_blocks)) + print_used(human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))) + + self.context.transition_to(ConcreteStateScan()) + + +class ConcreteStateScan(State): + + + + # ----------------------------------------------------------- # Config variables # ----------------------------------------------------------- @@ -341,44 +509,7 @@ def umount_device(): """Main device loop""" def device_loop(): - # First unmount remaining device - umount_device() - # Loop - context = pyudev.Context() - monitor = pyudev.Monitor.from_netlink(context) - monitor.filter_by("block") try: - for device in iter(monitor.poll, None): - if device.get("ID_FS_USAGE") == "filesystem" and device.device_node[5:7] == "sd": - if device.action == "add": - log("Device inserted") - log_device_info(device) - if not CURSES: - display_image("WORK") - else: - # display device type - print_fslabel(device.get("ID_FS_LABEL")) - print_fstype(device.get("ID_PART_TABLE_TYPE") + " " + device.get("ID_FS_TYPE")) - print_model(device.get("ID_MODEL")) - print_serial(device.get("ID_SERIAL_SHORT")) - # Mount device - mount_point = mount_device(device) - log('Partition mounted at %s' % mount_point) - if mount_point == None: - # no partition - if not CURSES: - display_image("WAIT") - continue - try: - statvfs=os.statvfs(mount_point) - except Exception as e : - log("Unexpected error: %s" % e) - logging.info("An exception was thrown!", exc_info=True) - if not CURSES: - display_image("WAIT") - continue - print_size(human_readable_size(statvfs.f_frsize * statvfs.f_blocks)) - print_used(human_readable_size(statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree))) # Scan files log("Scan started...........") @@ -518,14 +649,18 @@ def moveToScriptFolder(): dname = os.path.dirname(abspath) os.chdir(dname) +# -------------------------------------- + + """Main entry point""" def main(stdscr): try : - moveToScriptFolder() - init_log() - config() - init_curses() - print_screen() + + context = Context(ConcreteStateA()); + + context.init() + + xxxxxxxx while True: device_loop() except Exception as e : @@ -536,6 +671,8 @@ def main(stdscr): # -------------------------------------- +# TODO: google wrapper main functionnal programming + if __name__ == "__main__": wrapper(main) diff --git a/tests/states.py b/tests/states.py new file mode 100755 index 0000000..bee200c --- /dev/null +++ b/tests/states.py @@ -0,0 +1,41 @@ +#!/usr/bin/python3 +# + +def waitMouseClick(): + print("wait mouse click") + mouse = open( "/dev/input/mice", "rb" ) + down = False; + while True: + buf = mouse.read(3) + if ((buf[0] & 0x1)==1): + down = True + if (((buf[0] & 0x1)==0) and down): + break; + mouse.close() + +def loop(state): + print('loop ' +state) + match state: + case "START": + waitMouseClick() + return "STEP1" + case "STEP1": + waitMouseClick() + return "STEP2" + case "STEP2": + waitMouseClick() + return "STOP" + case _: + print("Unknwn state "+state) + return "STOP" + print("end loop") + + +if __name__ == "__main__": + # The client code. + + state="START" + while (state!="STOP"): + state = loop(state) + +