admin管理员组

文章数量:1026648

I am experimenting with multiprocessing.ThreadPool and Queues, and am getting some odd behaviour that I dont understand. The program runs through once but does not continuously do so as it should.

At first I thought it was a pycharm issue, so I started by updating pycharm and my os that had no luck.Then I changed the run configuration by adding the option "Run with Python Console" but that just runs through once. I also tried "Adding To Process" as per . Then I tried adding a Lock to the code thinking it was a race issue but that also hasnt solved the issue.

The only thing that works is running the program in debug and then stepping through the code, this gets the behaviour I want but is impractical.

worker_core.py

#import common_imports as ci

import logging
from queue import Queue, Empty
from multiprocessing.pool import ThreadPool
from multiprocessing import Event
from threading import Lock
import time

class MultithreadedPoolAndQueue:
    def __init__(self, available_modules):
        self.logger = logging.getLogger("Test_Program.MultithreadedPoolAndQueue")
        self.available_modules = available_modules
        self.logger.info("Multithreaded Pool and Queue Management Initialised")
        self.queue = Queue()
        self.pool = ThreadPool(processes=len(self.available_modules))
        self.all_tasks_completed = Event()
        self.lockymclockface = Lock()

    def unpack_results(self,results):
        filtered_results = []
        for key, val in results.items():
            if isinstance(val, list):
                for item in val:
                    filtered_results.append({key: item})
            else:
                filtered_results.append({key: val})

        return filtered_results

    def fill_queue(self, data):
        for d in data:
            # Each data will be {DATA_TYPE: Data}
            self.queue.put(d)

    def parse_results(self, results):
        """
        This function will need the filtered results.
        It will put the data onto the queue like this: {DATA_TYPE: Data}


        :param results:
        :return:
        """
        self.logger.info(f"Entering parse_results with: {results}")
        if results is None:
            pass

        else:
            unpacked_results = self.unpack_results(results)

            for result in unpacked_results:
                self.logger.info(f"Result added: {result}")
                self.queue.put(result)


    def error_call(self,e):
        self.logger.error(e)
        pass

    def thread_worker(self):
        max_check_count = 10
        check = 0
        loop_around = 0

        while self.queue.empty() is False or loop_around < 2 and check < max_check_count:
            try:

                    item = self.queue.get_nowait()

                    # Assuming item is in the format {data_type: data}
                    data_type = next(iter(item))  # Get the first (and only) key from the dictionary
                    query = item[data_type]

                    # Then for each module in available modules, apply an asynchronous system
                    for module in self.available_modules:
                        for module_name, module_class in module.items():
                            # Check if the module can handle this data type
                            if data_type in module_class.accepted_data:
                                with self.lockymclockface:
                                    module_instance = module_class()
                                    self.pool.apply_async(module_instance.event,args=(query, data_type),callback=self.parse_results,error_callback=self.error_call)

                    loop_around = 0
                    check = 0  # Reset check counter on successful item processing

            except Empty:
                # If the queue is empty, wait for a short time before checking again
                time.sleep(0.1)
                check += 1
                loop_around += 1

            except Exception as e:
                self.logger.exception(e)
                loop_around += 1

        # Wait for all async tasks to complete
        self.pool.close()
        self.pool.join()
        self.all_tasks_completed.set()

    def wait_for_completion(self):
        self.all_tasks_completed.wait()
        self.logger.info(f"Queue item: {self.queue.get()}")
        self.logger.info("All tasks have been completed.")


How I am using it:

threaded_core = worker_core.MultithreadedPoolAndQueue(self.available_modules)
threaded_core.fill_queue(self.targets)
threaded_core.thread_worker()
threaded_core.wait_for_completion()

Complete project files can be found at:

NOTE: The modules folder is meant to be in the consumer_producer folder. When running don't fet to add the path to the config.ini file, and add the config.ini file to the Test_Program.py when running.

I am experimenting with multiprocessing.ThreadPool and Queues, and am getting some odd behaviour that I dont understand. The program runs through once but does not continuously do so as it should.

At first I thought it was a pycharm issue, so I started by updating pycharm and my os that had no luck.Then I changed the run configuration by adding the option "Run with Python Console" but that just runs through once. I also tried "Adding To Process" as per . Then I tried adding a Lock to the code thinking it was a race issue but that also hasnt solved the issue.

The only thing that works is running the program in debug and then stepping through the code, this gets the behaviour I want but is impractical.

worker_core.py

#import common_imports as ci

import logging
from queue import Queue, Empty
from multiprocessing.pool import ThreadPool
from multiprocessing import Event
from threading import Lock
import time

class MultithreadedPoolAndQueue:
    def __init__(self, available_modules):
        self.logger = logging.getLogger("Test_Program.MultithreadedPoolAndQueue")
        self.available_modules = available_modules
        self.logger.info("Multithreaded Pool and Queue Management Initialised")
        self.queue = Queue()
        self.pool = ThreadPool(processes=len(self.available_modules))
        self.all_tasks_completed = Event()
        self.lockymclockface = Lock()

    def unpack_results(self,results):
        filtered_results = []
        for key, val in results.items():
            if isinstance(val, list):
                for item in val:
                    filtered_results.append({key: item})
            else:
                filtered_results.append({key: val})

        return filtered_results

    def fill_queue(self, data):
        for d in data:
            # Each data will be {DATA_TYPE: Data}
            self.queue.put(d)

    def parse_results(self, results):
        """
        This function will need the filtered results.
        It will put the data onto the queue like this: {DATA_TYPE: Data}


        :param results:
        :return:
        """
        self.logger.info(f"Entering parse_results with: {results}")
        if results is None:
            pass

        else:
            unpacked_results = self.unpack_results(results)

            for result in unpacked_results:
                self.logger.info(f"Result added: {result}")
                self.queue.put(result)


    def error_call(self,e):
        self.logger.error(e)
        pass

    def thread_worker(self):
        max_check_count = 10
        check = 0
        loop_around = 0

        while self.queue.empty() is False or loop_around < 2 and check < max_check_count:
            try:

                    item = self.queue.get_nowait()

                    # Assuming item is in the format {data_type: data}
                    data_type = next(iter(item))  # Get the first (and only) key from the dictionary
                    query = item[data_type]

                    # Then for each module in available modules, apply an asynchronous system
                    for module in self.available_modules:
                        for module_name, module_class in module.items():
                            # Check if the module can handle this data type
                            if data_type in module_class.accepted_data:
                                with self.lockymclockface:
                                    module_instance = module_class()
                                    self.pool.apply_async(module_instance.event,args=(query, data_type),callback=self.parse_results,error_callback=self.error_call)

                    loop_around = 0
                    check = 0  # Reset check counter on successful item processing

            except Empty:
                # If the queue is empty, wait for a short time before checking again
                time.sleep(0.1)
                check += 1
                loop_around += 1

            except Exception as e:
                self.logger.exception(e)
                loop_around += 1

        # Wait for all async tasks to complete
        self.pool.close()
        self.pool.join()
        self.all_tasks_completed.set()

    def wait_for_completion(self):
        self.all_tasks_completed.wait()
        self.logger.info(f"Queue item: {self.queue.get()}")
        self.logger.info("All tasks have been completed.")


How I am using it:

threaded_core = worker_core.MultithreadedPoolAndQueue(self.available_modules)
threaded_core.fill_queue(self.targets)
threaded_core.thread_worker()
threaded_core.wait_for_completion()

Complete project files can be found at:

NOTE: The modules folder is meant to be in the consumer_producer folder. When running don't fet to add the path to the config.ini file, and add the config.ini file to the Test_Program.py when running.

本文标签: pythonWhat is causing the program to run once but not continuouslyStack Overflow