1
0
Fork 0
mirror of https://github.com/dbarzin/pandora-box.git synced 2025-07-19 05:19:40 +02:00
pandora-box/tests/thread.py
2023-03-04 17:51:50 +01:00

62 lines
1.3 KiB
Python
Executable file

#!/usr/bin/python3
import queue
import threading
import time
exitFlag = False
class myThread (threading.Thread):
def __init__(self, id, q):
threading.Thread.__init__(self)
self.id = id
self.q = q
def run(self):
print(f"Thread-{self.id} Starting ")
self.process_data()
print(f"Thread-{self.id} Done.")
def process_data(self):
while not exitFlag:
queueLock.acquire()
if not workQueue.empty():
data = self.q.get()
queueLock.release()
print(f"Thread-{self.id} processing {data}")
else:
queueLock.release()
time.sleep(1)
maxThread = 2
workList = ["One", "Two", "Three", "Four", "Five", "Six", 'Seven']
queueLock = threading.Lock()
workQueue = queue.Queue()
threads = []
# Create new threads
for i in range(maxThread):
thread = myThread(i, workQueue)
thread.start()
threads.append(thread)
# Fill the queue
queueLock.acquire()
for word in workList:
workQueue.put(word)
queueLock.release()
# Wait for queue to empty
while not workQueue.empty():
pass
# Notify threads it's time to exit
exitFlag = True
# Wait for all threads to complete
for t in threads:
t.join()
print("Exiting Main Thread")