1
0
Fork 0
mirror of https://github.com/dbarzin/pandora-box.git synced 2025-07-18 12:59:40 +02:00
pandora-box/pandora-box.py
2025-04-22 12:50:37 +02:00

1033 lines
29 KiB
Python
Executable file

#!/usr/bin/python3
#
# This file is part of the Pandora-box distribution.
# https://github.com/dbarzin/pandora-box
# Copyright (c) 2022 Didier Barzin.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, version 3.
#
# This program is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
"""The Pandora-Box Module."""
import configparser
import curses
import datetime
import glob
import logging
import os
import queue
import shutil
import socket
import sys
import time
import threading
import subprocess
from pathlib import Path
import pypandora
import psutil
import pyudev
# -----------------------------------------------------------
# Threading variables
# -----------------------------------------------------------
threads = []
EXIT_FLAG = False
queueLock = threading.Lock()
workQueue = None
# -----------------------------------------------------------
# Config variables
# -----------------------------------------------------------
isFakeScan = None
hasUSBAutoMount = None
pandora_root_url = None
hasQuarantine = None
quarantine_folder = None
has_curses = None
maxThreads = None
boxname = socket.gethostname()
maxFileSize = None
# -----------------------------------------------------------
# Curses
# -----------------------------------------------------------
screen = None
status_win = None
progress_win = None
title_win = None
log_win = None
# Pandora logo
logo = None
# -----------------------------------------------------------
# Interval box variables
# -----------------------------------------------------------
device = None
mount_point = None
infected_files = None
# -----------------------------------------------------------
class scanThread(threading.Thread):
"""Scanning thread"""
def __init__(self):
threading.Thread.__init__(self)
self.pandora = pypandora.PyPandora(root_url=pandora_root_url)
def run(self):
while not EXIT_FLAG:
queueLock.acquire()
if not workQueue.empty():
file = workQueue.get()
queueLock.release()
self.scan(file)
else:
queueLock.release()
time.sleep(1)
def scan(self, file):
global infected_files, scanned, file_count, f_used, maxFileSize
logging.info(f'{"Start scan."}')
try:
# get file information
file_name = os.path.basename(file)
file_size = os.path.getsize(file)
# log the scan has started
logging.info(
f"Scan {file_name} "
f"[{human_readable_size(file_size)}] "
f"Thread-{id} "
)
start_time = time.time()
if isFakeScan:
status = "SKIPPED"
logging.info(f'{"Fake scan - skipped."}')
else:
# do not scan files bigger than 1G
if file_size > maxFileSize :
status = "TOO BIG"
logging.info(f'{"File too big."}')
else:
worker = self.pandora.submit_from_disk(
file, seed_expire=6000)
if ("taskId" not in worker) or ("seed" not in worker):
logging.error(f"task_status={worker}")
status = "ERROR"
return
time.sleep(1)
loop = 0
while loop < (1024 * 256):
res = self.pandora.task_status(
worker["taskId"], worker["seed"])
logging.info(f"task_status={res}")
# Handle response from Pandora
if "status" in res:
status = res["status"]
if status != "WAITING":
break
else:
status = "ERROR"
return
# wait a little
pass
time.sleep(1)
loop += 1
end_time = time.time()
# log the result
log(
f"Scan {file_name} "
f"[{human_readable_size(file_size)}] "
"-> "
f"{status} ({(end_time - start_time):.1f}s)"
)
logging.info(
f'boxname="{boxname}", '
f'file="{file_name}", '
f'size="{file_size}", '
f'status="{status}"", '
f'duration="{int(end_time - start_time)}"'
)
# Get lock
queueLock.acquire()
scanned += file_size
file_count += 1
if status == "ALERT":
# add file to list
infected_files.append(file)
# Release lock
queueLock.release()
# update status bar
update_bar(scanned * 100 // f_used)
if hasQuarantine and status == "ALERT":
if not os.path.isdir(qfolder):
os.mkdir(qfolder)
shutil.copyfile(file, os.path.join(qfolder, file_name))
except Exception as ex:
log(f"Unexpected error: {str(ex)}", flush=True)
logging.info(
f'boxname="{boxname}", '
f'error="{str(ex)}"',
exc_info=True)
logging.info(f'{"Start done."}')
# ----------------------------------------------------------
def config():
global isFakeScan, hasUSBAutoMount, pandora_root_url
global hasQuarantine, quarantine_folder, has_curses, maxThreads, maxFileSize
""" read configuration file """
# intantiate a ConfirParser
config_parser = configparser.ConfigParser()
# read the config file
config_parser.read("pandora-box.ini")
# set values
isFakeScan = config_parser["DEFAULT"]["FAKE_SCAN"].lower() == "true"
hasUSBAutoMount = config_parser["DEFAULT"]["USB_AUTO_MOUNT"].lower(
) == "true"
pandora_root_url = config_parser["DEFAULT"]["PANDORA_ROOT_URL"]
# Quarantine
hasQuarantine = config_parser["DEFAULT"]["QUARANTINE"].lower() == "true"
quarantine_folder = config_parser["DEFAULT"]["QUARANTINE_FOLDER"]
# Curses
has_curses = config_parser["DEFAULT"]["CURSES"].lower() == "true"
# MaxThreads
maxThreads = int(config_parser["DEFAULT"]["THREADS"])
# MaxFileSize
maxFileSize = int(config_parser["DEFAULT"]["MAX_FILE_SIZE"])
# ----------------------------------------------------------
def human_readable_size(size, decimal_places=1):
"""Convert size to human readble string"""
for unit in ["B", "KB", "MB", "GB", "TB"]:
if size < 1024.0:
return f"{size:.{decimal_places}f}{unit}"
size /= 1024.0
return None
# -----------------------------------------------------------
# Image Screen
# -----------------------------------------------------------
def display_image(status):
"""Display image on screen"""
if not has_curses:
if status == "WAIT":
image = "images/key*.png"
elif status == "WORK":
image = "images/wait*.png"
elif status == "OK":
image = "images/ok.png"
elif status == "BAD":
image = "images/bad.png"
elif status == "ERROR":
image = "images/error.png"
else:
return
# hide old image
os.system("killall -s 9 fim 2>/dev/null")
# display image
if "*" in image:
# slide show
os.system(
f"fim -qa -c 'while(1){{display;sleep 1;next;}}' {image}"
"</dev/null 2>/dev/null >/dev/null &"
)
else:
# only one image
os.system(f"fim -qa {image} </dev/null 2>/dev/null >/dev/null &")
# -----------------------------------------------------------
# has_curses Screen
# -----------------------------------------------------------
def init_curses():
global screen
"""Initialise curses"""
if has_curses:
screen = curses.initscr()
screen.keypad(1)
curses.mousemask(curses.ALL_MOUSE_EVENTS |
curses.REPORT_MOUSE_POSITION)
curses.flushinp()
curses.noecho()
curses.curs_set(0)
else:
# hide blinking cursor
sys.stdout.write("\033[?1;0;0c")
sys.stdout.flush()
# display wait
display_image("WAIT")
def print_fslabel(label):
"""Print FS Label"""
if has_curses:
status_win.addstr(
1,
1,
f"Partition : {label!s:32}",
curses.color_pair(2))
status_win.refresh()
def print_size(label):
"""Print FS Size"""
if has_curses:
status_win.addstr(2, 1, f"Size : {label!s:32} ", curses.color_pair(2))
status_win.refresh()
def print_used(label):
"""Print FS Used Size"""
if has_curses:
status_win.addstr(3, 1, f"Used : {label!s:32} ", curses.color_pair(2))
status_win.refresh()
def print_fstype(label):
"""Print device FS type"""
if has_curses:
status_win.addstr(
1,
50,
f"Part / Type : {label!s:32}",
curses.color_pair(2))
status_win.refresh()
def print_model(label):
"""Print device model"""
if has_curses:
status_win.addstr(2, 50, f"Model : {label!s:32}", curses.color_pair(2))
status_win.refresh()
def print_serial(label):
"""Print device serail number"""
if has_curses:
status_win.addstr(
3,
50,
f"Serial : {label!s:32}",
curses.color_pair(2))
status_win.refresh()
def init_bar():
"""Initialise progress bar"""
global progress_win
if has_curses:
progress_win = curses.newwin(3, curses.COLS - 12, 17, 5)
progress_win.border(0)
progress_win.refresh()
def update_bar(progress, flush=False):
global last_update_time
"""Update progress bar"""
if flush or ((time.time() - last_update_time) >= 1):
last_update_time = time.time()
if has_curses:
if progress == 0:
progress_win.clear()
progress_win.border(0)
time.sleep(1)
progress_win.addstr(0, 1, "Progress:")
else:
pos = ((curses.COLS - 14) * progress) // 100
progress_win.addstr(1, 1, "#" * pos)
progress_win.addstr(0, 1, f"Progress: {progress}%")
progress_win.refresh()
def print_screen():
global status_win
"""Print main screen"""
if has_curses:
curses.init_pair(1, curses.COLOR_RED, curses.COLOR_BLACK)
curses.init_pair(2, curses.COLOR_BLUE, curses.COLOR_BLACK)
curses.init_pair(3, curses.COLOR_GREEN, curses.COLOR_BLACK)
title_win = curses.newwin(12, curses.COLS, 0, 0)
# title_win.border(0)
title_col = (curses.COLS - len(logo[0])) // 2
title_win.addstr(1, title_col, logo[0], curses.color_pair(1))
title_win.addstr(2, title_col, logo[1], curses.color_pair(1))
title_win.addstr(3, title_col, logo[2], curses.color_pair(1))
title_win.addstr(4, title_col, logo[3], curses.color_pair(1))
title_win.addstr(5, title_col, logo[4], curses.color_pair(1))
title_win.addstr(6, title_col, logo[5], curses.color_pair(1))
title_win.addstr(7, title_col, logo[6], curses.color_pair(1))
title_win.addstr(8, title_col, logo[7], curses.color_pair(1))
title_win.addstr(9, title_col, logo[8], curses.color_pair(1))
title_win.addstr(10, title_col, logo[9], curses.color_pair(1))
title_win.refresh()
status_win = curses.newwin(5, curses.COLS, 12, 0)
status_win.border(0)
status_win.addstr(0, 1, "USB Key Information")
print_fslabel("")
print_size("")
print_used("")
print_fstype("")
print_model("")
print_serial("")
init_bar()
update_bar(0, flush=True)
logging.info(f'boxname="{boxname}", ' "pandora-box-start")
def end_curses():
"""Closes curses"""
if has_curses:
curses.endwin()
curses.flushinp()
else:
# hide old image
os.system("killall -s 9 fim 2>/dev/null")
# -----------------------------------------------------------
# Logging windows
# -----------------------------------------------------------
def initlog():
"""Inititalize logging function"""
global log_win
if has_curses:
log_win = curses.newwin(curses.LINES - 20, curses.COLS, 20, 0)
log_win.border(0)
logging.basicConfig(
filename="/var/log/pandora-box.log",
level=logging.INFO,
format="%(asctime)s - %(message)s",
datefmt="%m/%d/%y %H:%M",
)
logs = []
last_update_time = 0
def log(msg, flush=False):
"""log a message with a new line"""
if has_curses:
# display log on screen
logs.append(msg)
if len(logs) > (curses.LINES - 22):
logs.pop(0)
log_update(flush)
def log_msg(msg):
"""update last message -> no new line"""
if has_curses:
# display log on screen
logs[-1] = msg
log_update()
def log_update(flush=False):
"""Update the log screen"""
global last_update_time
# do not refresh the screen too often
if flush or ((time.time() - last_update_time) >= 1):
last_update_time = time.time()
log_win.clear()
log_win.border(0)
for i in range(min(curses.LINES - 22, len(logs))):
log_win.addstr(i + 1,
1,
logs[i][: curses.COLS - 2],
curses.color_pair(3))
log_win.refresh()
# -----------------------------------------------------------
# Device
# -----------------------------------------------------------
def mount_device():
"""Mount USB device"""
global mount_point
log("Mount device", flush=True)
if hasUSBAutoMount:
mount_point = None
loop = 0
while (mount_point is None) and (loop < 15):
# need to sleep before devide is mounted
time.sleep(1)
for partition in psutil.disk_partitions():
if partition.device == device.device_node:
mount_point = partition.mountpoint
loop += 1
if mount_device is None:
log("No partition mounted", flush=True)
else:
mount_point = "/media/box"
if not os.path.exists("/media/box"):
log("folder /media/box does not exists", flush=True)
logging.error("folder /media/box does not exists")
return None
# Mount device
try:
subprocess.run(
["sudo",
"mount", "-o", "uid=1000,gid=1000,dmask=0000,fmask=0000",
device.device_node, "/media/box"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True,
)
logging.info("Mount successful")
except subprocess.CalledProcessError as e:
logging.error(f"Mount failed: return code {e.returncode}")
return None
except FileNotFoundError:
logging.error("Command 'mount' not found")
return None
except Exception as e:
logging.error(f"Unexpected error: {e}")
return None
loop = 0
while loop < 10:
time.sleep(1)
try:
os.statvfs(mount_point)
except Exception as ex:
log(f"Mount - Unexpected error: {ex}", flush=True)
loop += 1
continue
break
def umount_device():
"""Unmount USB device"""
if hasUSBAutoMount:
log("Sync partitions", flush=True)
os.system("sync")
else:
try:
subprocess.run(
["sudo", "umount", "/media/box"],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
check=True,
)
logging.info("Umount successful")
except subprocess.CalledProcessError as e:
logging.error(f"umount failed: return code {e.returncode}")
return None
except FileNotFoundError:
logging.error("Command 'umount' not found")
return None
except Exception as e:
logging.error(f"Umount - Unexpected error: {e}")
return None
def log_device_info(dev):
"""Log device information"""
logging.info(
f'boxname="{boxname}", '
f'device_name="{dev.get("DEVNAME")}, '
f'path_id="{dev.get("ID_PATH")}", '
f'bus system="{dev.get("ID_BUS")}", '
f'USB_driver="{dev.get("ID_USB_DRIVER")}", '
f'device_type="{dev.get("DEVTYPE")}", '
f'device_usage="{dev.get("ID_FS_USAGE")}", '
f'partition type="{dev.get("ID_PART_TABLE_TYPE")}", '
f'fs_type="{dev.get("ID_FS_TYPE")}", '
f'partition_label="{dev.get("ID_FS_LABEL")}", '
f'device_model="{dev.get("ID_MODEL")}", '
f'model_id="{dev.get("ID_MODEL_ID")}", '
f'serial_short="{dev.get("ID_SERIAL_SHORT")}", '
f'serial="{dev.get("ID_SERIAL")}"'
)
# -----------------------------------------------------------
# pandora
# -----------------------------------------------------------
def scan():
"""Scan devce with pypandora"""
global pandora, qfolder
global workQueue, EXIT_FLAG, threads, scanned
global mount_point, infected_files, file_count, f_used
# get device size
logging.info(f'start scan')
try:
statvfs = os.statvfs(mount_point)
except Exception as ex:
log(f"error={ex}", flush=True)
logging.info(
f'boxname="{boxname}", '
f'error="{str(ex)}"',
exc_info=True)
if not has_curses:
display_image("ERROR")
return "ERROR"
# Print device information
f_size = human_readable_size(statvfs.f_frsize * statvfs.f_blocks)
print_size(f_size)
logging.info(f'size="{f_size}"')
f_used = statvfs.f_frsize * (statvfs.f_blocks - statvfs.f_bfree)
print_used(human_readable_size(f_used))
logging.info(f'used="{human_readable_size(f_used)}"')
# scan device
infected_files = []
scanned = 0
file_count = 0
scan_start_time = time.time()
if hasQuarantine:
qfolder = os.path.join(
quarantine_folder, datetime.datetime.now().strftime("%y%m%d-%H%M")
)
# Instantice work quere
workQueue = queue.Queue(512)
# set exit condition to false
EXIT_FLAG = False
# Instanciate threads
for _ in range(maxThreads):
thread = scanThread()
thread.start()
threads.append(thread)
# Fill the work queue
for root, _, files in os.walk(mount_point):
for file in files:
while workQueue.full():
time.sleep(1)
pass
queueLock.acquire()
workQueue.put(os.path.join(root, file))
queueLock.release()
# Wait for queue to empty
while not workQueue.empty():
time.sleep(1)
pass
# Notify threads it's time to exit
EXIT_FLAG = True
# Wait for all threads to complete
for t in threads:
t.join()
update_bar(100, flush=True)
log(
"Scan done in %.1fs, %d files scanned, %d files infected"
% ((time.time() - scan_start_time), file_count, len(infected_files)),
flush=True,
)
logging.info(
f'boxname="{boxname}", '
f'duration="{int(time.time() - scan_start_time)}", '
f'files_scanned="{file_count}", '
f'files_infected="{len(infected_files)}"'
)
return "CLEAN"
# --------------------------------------
def wait():
"""Wait for insert of remove of USB device"""
# handle error - first unmount the device
umount_device()
# Loop
context = pyudev.Context()
monitor = pyudev.Monitor.from_netlink(context)
monitor.filter_by("block")
try:
for dev in iter(monitor.poll, None):
if dev.get(
"ID_FS_USAGE") == "filesystem" and dev.device_node[5:7] == "sd":
if dev.action == "add":
return device_inserted(dev)
if dev.action == "remove":
return device_removed()
except Exception as ex:
log(f"Unexpected error: {str(ex)}", flush=True)
logging.info(
f'boxname="{boxname}", '
f'error="{str(ex)}"',
exc_info=True)
return "STOP"
def device_inserted(dev):
global device
log("Device inserted", flush=True)
logging.info(f'boxname="{boxname}", ' "device-inserted")
device = dev
log_device_info(device)
if not has_curses:
display_image("WORK")
else:
# display device type
print_fslabel(device.get("ID_FS_LABEL"))
print_fstype(device.get("ID_FS_TYPE"))
print_model(device.get("ID_MODEL"))
print_serial(device.get("ID_SERIAL_SHORT"))
return "INSERTED"
def device_removed():
global device
log("Device removed", flush=True)
logging.info(f'boxname="{boxname}", ' "device-removed")
device = None
if not has_curses:
display_image("WAIT")
else:
print_fslabel("")
print_size("")
print_used("")
print_fstype("")
print_model("")
print_serial("")
update_bar(0, flush=True)
return "WAIT"
# --------------------------------------
def mount():
"""Mount device"""
global mount_point
mount_device()
log(f"Partition mounted at {mount_point}", flush=True)
logging.info(f"Partition mounted at {mount_point}")
if mount_point is None:
# no partition
if not has_curses:
display_image("WAIT")
return "WAIT"
try:
os.statvfs(mount_point)
except Exception as ex:
log(f"Unexpected error: {str(ex)}", flush=True)
logging.info(
f'boxname="{boxname}", '
f'error="{str(ex)}"',
exc_info=True)
if not has_curses:
display_image("WAIT")
return "WAIT"
return "SCAN"
# --------------------------------------
def error():
"""Display error message"""
if not has_curses:
display_image("ERROR")
return "WAIT"
# -----------------------------------------------------------
# Wait for mouse click or enter
# -----------------------------------------------------------
mouseEvent = threading.Event()
enterEvent = threading.Event()
mouseOrEnterCondition = threading.Condition()
def mouseClickThread():
mouse = open("/dev/input/mice", "rb")
os.set_blocking(mouse.fileno(), False)
down = False
while not enterEvent.is_set():
buf = mouse.read(3)
if buf is not None:
if (buf[0] & 0x1) == 1:
down = True
if ((buf[0] & 0x1) == 0) and down:
break
time.sleep(1)
mouse.close()
mouseEvent.set()
with mouseOrEnterCondition:
mouseOrEnterCondition.notify()
def enterKeyThread():
os.set_blocking(sys.stdin.fileno(), False)
while not mouseEvent.is_set():
input = sys.stdin.readline()
if len(input) > 0:
break
time.sleep(0.1)
enterEvent.set()
with mouseOrEnterCondition:
mouseOrEnterCondition.notify()
def waitMouseOrEnter():
with mouseOrEnterCondition:
threading.Thread(target=mouseClickThread, args=()).start()
threading.Thread(target=enterKeyThread, args=()).start()
mouseEvent.clear()
enterEvent.clear()
while not (mouseEvent.is_set() or enterEvent.is_set()):
mouseOrEnterCondition.wait()
# --------------------------------------
def clean():
"""Remove infected files"""
if len(infected_files) > 0:
# display message
log(f"{len(infected_files)} infected files detecetd:")
logging.info(
f'boxname="{boxname}", '
f"infeted_files={len(infected_files)}")
if not has_curses:
display_image("BAD")
else:
# print list of files
cnt = 0
for file in infected_files:
log(file)
cnt = cnt + 1
if cnt >= 10:
log("...")
break
# wait for clean
log("PRESS KEY TO CLEAN", flush=True)
waitMouseOrEnter()
# TODO: check device is still present
# Remove infected files
files_removed = 0
has_error = False
for file in infected_files:
try:
os.remove(file)
log(f"{file} removed")
logging.info(f'boxname="{boxname}", ' f'removed="{file}"')
files_removed += 1
except Exception as ex:
log(f"could not remove: {str(ex)}", flush=True)
logging.info(
f'boxname="{boxname}", '
f'not_removed="{file}, '
f'error="{str(ex)}"',
exc_info=True,
)
has_error = True
umount_device()
logging.info(
f'boxname="{boxname}", '
f'cleaned="{files_removed}/{len(infected_files)}"')
if not has_error:
if has_curses:
log("Device cleaned !", flush=True)
else:
display_image("OK")
else:
if has_curses:
log("Device not cleaned !", flush=True)
else:
display_image("WAIT")
else:
if not has_curses:
display_image("OK")
return "WAIT"
# --------------------------------------
def move_to_script_folder():
"""Move to pandora-box folder"""
abspath = os.path.abspath(__file__)
dname = os.path.dirname(abspath)
os.chdir(dname)
# --------------------------------------
def wait_for_workers():
log(f"Starting..............", flush=True)
time.sleep(10)
pandora = pypandora.PyPandora(root_url=pandora_root_url)
workers = pandora.get_enabled_workers()
while not pandora.is_up:
log(f"Waiting for Pandora", flush=True)
time.sleep(1)
for worker in workers:
log(f"Worker: {worker}", flush=True)
# --------------------------------------
def startup():
"""Start Pandora-box"""
global logo
# Move to script folder
move_to_script_folder()
# read config
config()
# Initialize curses
init_curses()
# Initilize log
initlog()
# Read logo
with open("pandora-box.txt", mode="r", encoding="utf-8") as file1:
logo = file1.readlines()
# Print logo screen
print_screen()
# Wait for workers to start
wait_for_workers()
# Now Ready
log("Ready.", flush=True)
return "WAIT"
# --------------------------------------
def loop(state):
"""Main event loop"""
match state:
case "START":
return startup()
case "WAIT":
return wait()
case "INSERTED":
return mount()
case "SCAN":
return scan()
case "CLEAN":
return clean()
case "ERROR":
return error()
case _:
return "STOP"
# --------------------------------------
def get_lock(process_name):
"""Get a lock to check that Pandora-box is not already running"""
# Without holding a reference to our socket somewhere it gets garbage
# collected when the function exits
get_lock._lock_socket = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM)
try:
# The null byte (\0) means the socket is created
# in the abstract namespace instead of being created
# on the file system itself.
# Works only in Linux
get_lock._lock_socket.bind("\0" + process_name)
except socket.error:
print("Pandora-box is already running !", file=sys.stderr)
os.execvp("/usr/bin/bash", ["/usr/bin/bash", "--norc"])
sys.exit()
# --------------------------------------
def main(_):
"""Main entry point"""
print("main")
try:
# Enter the mail loop
state = "START"
while state != "STOP":
state = loop(state)
#except Exception as ex:
# print({str(ex)})
# log(f"Unexpected error: {str(ex)}", flush=True)
# logging.info(
# f'boxname="{boxname}", '
# f'error="{str(ex)}"',
# exc_info=True)
finally:
end_curses()
if __name__ == "__main__":
print("Start")
get_lock("pandora-box")
curses.wrapper(main)
print("Done.")