admin管理员组文章数量:1026648
I am experimenting with multiprocessing.ThreadPool and Queues, and am getting some odd behaviour that I dont understand. The program runs through once but does not continuously do so as it should.
At first I thought it was a pycharm issue, so I started by updating pycharm and my os that had no luck.Then I changed the run configuration by adding the option "Run with Python Console" but that just runs through once. I also tried "Adding To Process" as per . Then I tried adding a Lock to the code thinking it was a race issue but that also hasnt solved the issue.
The only thing that works is running the program in debug and then stepping through the code, this gets the behaviour I want but is impractical.
worker_core.py
#import common_imports as ci
import logging
from queue import Queue, Empty
from multiprocessing.pool import ThreadPool
from multiprocessing import Event
from threading import Lock
import time
class MultithreadedPoolAndQueue:
def __init__(self, available_modules):
self.logger = logging.getLogger("Test_Program.MultithreadedPoolAndQueue")
self.available_modules = available_modules
self.logger.info("Multithreaded Pool and Queue Management Initialised")
self.queue = Queue()
self.pool = ThreadPool(processes=len(self.available_modules))
self.all_tasks_completed = Event()
self.lockymclockface = Lock()
def unpack_results(self,results):
filtered_results = []
for key, val in results.items():
if isinstance(val, list):
for item in val:
filtered_results.append({key: item})
else:
filtered_results.append({key: val})
return filtered_results
def fill_queue(self, data):
for d in data:
# Each data will be {DATA_TYPE: Data}
self.queue.put(d)
def parse_results(self, results):
"""
This function will need the filtered results.
It will put the data onto the queue like this: {DATA_TYPE: Data}
:param results:
:return:
"""
self.logger.info(f"Entering parse_results with: {results}")
if results is None:
pass
else:
unpacked_results = self.unpack_results(results)
for result in unpacked_results:
self.logger.info(f"Result added: {result}")
self.queue.put(result)
def error_call(self,e):
self.logger.error(e)
pass
def thread_worker(self):
max_check_count = 10
check = 0
loop_around = 0
while self.queue.empty() is False or loop_around < 2 and check < max_check_count:
try:
item = self.queue.get_nowait()
# Assuming item is in the format {data_type: data}
data_type = next(iter(item)) # Get the first (and only) key from the dictionary
query = item[data_type]
# Then for each module in available modules, apply an asynchronous system
for module in self.available_modules:
for module_name, module_class in module.items():
# Check if the module can handle this data type
if data_type in module_class.accepted_data:
with self.lockymclockface:
module_instance = module_class()
self.pool.apply_async(module_instance.event,args=(query, data_type),callback=self.parse_results,error_callback=self.error_call)
loop_around = 0
check = 0 # Reset check counter on successful item processing
except Empty:
# If the queue is empty, wait for a short time before checking again
time.sleep(0.1)
check += 1
loop_around += 1
except Exception as e:
self.logger.exception(e)
loop_around += 1
# Wait for all async tasks to complete
self.pool.close()
self.pool.join()
self.all_tasks_completed.set()
def wait_for_completion(self):
self.all_tasks_completed.wait()
self.logger.info(f"Queue item: {self.queue.get()}")
self.logger.info("All tasks have been completed.")
How I am using it:
threaded_core = worker_core.MultithreadedPoolAndQueue(self.available_modules)
threaded_core.fill_queue(self.targets)
threaded_core.thread_worker()
threaded_core.wait_for_completion()
Complete project files can be found at:
NOTE: The modules folder is meant to be in the consumer_producer folder. When running don't fet to add the path to the config.ini file, and add the config.ini file to the Test_Program.py when running.
I am experimenting with multiprocessing.ThreadPool and Queues, and am getting some odd behaviour that I dont understand. The program runs through once but does not continuously do so as it should.
At first I thought it was a pycharm issue, so I started by updating pycharm and my os that had no luck.Then I changed the run configuration by adding the option "Run with Python Console" but that just runs through once. I also tried "Adding To Process" as per . Then I tried adding a Lock to the code thinking it was a race issue but that also hasnt solved the issue.
The only thing that works is running the program in debug and then stepping through the code, this gets the behaviour I want but is impractical.
worker_core.py
#import common_imports as ci
import logging
from queue import Queue, Empty
from multiprocessing.pool import ThreadPool
from multiprocessing import Event
from threading import Lock
import time
class MultithreadedPoolAndQueue:
def __init__(self, available_modules):
self.logger = logging.getLogger("Test_Program.MultithreadedPoolAndQueue")
self.available_modules = available_modules
self.logger.info("Multithreaded Pool and Queue Management Initialised")
self.queue = Queue()
self.pool = ThreadPool(processes=len(self.available_modules))
self.all_tasks_completed = Event()
self.lockymclockface = Lock()
def unpack_results(self,results):
filtered_results = []
for key, val in results.items():
if isinstance(val, list):
for item in val:
filtered_results.append({key: item})
else:
filtered_results.append({key: val})
return filtered_results
def fill_queue(self, data):
for d in data:
# Each data will be {DATA_TYPE: Data}
self.queue.put(d)
def parse_results(self, results):
"""
This function will need the filtered results.
It will put the data onto the queue like this: {DATA_TYPE: Data}
:param results:
:return:
"""
self.logger.info(f"Entering parse_results with: {results}")
if results is None:
pass
else:
unpacked_results = self.unpack_results(results)
for result in unpacked_results:
self.logger.info(f"Result added: {result}")
self.queue.put(result)
def error_call(self,e):
self.logger.error(e)
pass
def thread_worker(self):
max_check_count = 10
check = 0
loop_around = 0
while self.queue.empty() is False or loop_around < 2 and check < max_check_count:
try:
item = self.queue.get_nowait()
# Assuming item is in the format {data_type: data}
data_type = next(iter(item)) # Get the first (and only) key from the dictionary
query = item[data_type]
# Then for each module in available modules, apply an asynchronous system
for module in self.available_modules:
for module_name, module_class in module.items():
# Check if the module can handle this data type
if data_type in module_class.accepted_data:
with self.lockymclockface:
module_instance = module_class()
self.pool.apply_async(module_instance.event,args=(query, data_type),callback=self.parse_results,error_callback=self.error_call)
loop_around = 0
check = 0 # Reset check counter on successful item processing
except Empty:
# If the queue is empty, wait for a short time before checking again
time.sleep(0.1)
check += 1
loop_around += 1
except Exception as e:
self.logger.exception(e)
loop_around += 1
# Wait for all async tasks to complete
self.pool.close()
self.pool.join()
self.all_tasks_completed.set()
def wait_for_completion(self):
self.all_tasks_completed.wait()
self.logger.info(f"Queue item: {self.queue.get()}")
self.logger.info("All tasks have been completed.")
How I am using it:
threaded_core = worker_core.MultithreadedPoolAndQueue(self.available_modules)
threaded_core.fill_queue(self.targets)
threaded_core.thread_worker()
threaded_core.wait_for_completion()
Complete project files can be found at:
NOTE: The modules folder is meant to be in the consumer_producer folder. When running don't fet to add the path to the config.ini file, and add the config.ini file to the Test_Program.py when running.
本文标签: pythonWhat is causing the program to run once but not continuouslyStack Overflow
版权声明:本文标题:python - What is causing the program to run once but not continuously? - Stack Overflow 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1745649896a2161252.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
发表评论