admin管理员组

文章数量:1026989

I am trying to speed up a process I have been doing for a long time. I currently download all the files. Then I convert them all to CSV. Then I use bokeh to create an interactive chart for looking at the data. I would like to as soon as the first file downloads, start converting it to CSV while still downloading files. Then after the CSV is created immediately start creating the interactive chart while still downloading files and creating new CSVs.

Is this possible in python?

The data files range from 100-500mb each and there are generally about 40 to process daily. This process generally takes about 15min to complete and if this new process could cut it by third that would help greatly.

I not sure if multiprocessing/mutli-threading or async/await would help.

I am trying to speed up a process I have been doing for a long time. I currently download all the files. Then I convert them all to CSV. Then I use bokeh to create an interactive chart for looking at the data. I would like to as soon as the first file downloads, start converting it to CSV while still downloading files. Then after the CSV is created immediately start creating the interactive chart while still downloading files and creating new CSVs.

Is this possible in python?

The data files range from 100-500mb each and there are generally about 40 to process daily. This process generally takes about 15min to complete and if this new process could cut it by third that would help greatly.

I not sure if multiprocessing/mutli-threading or async/await would help.

Share Improve this question asked Mar 8 at 16:10 Brent HodgesBrent Hodges 31 bronze badge 1
  • maybe first try to use multiprocessing/mutli-threading or async/await and you will see if it can help in your situation. We have no access to your code so we can't check it. – furas Commented Mar 8 at 17:56
Add a comment  | 

1 Answer 1

Reset to default 0

You need to use Pipelines (this blog helps you to understand whats pipeline).
First of all use some library like queue, where you need to set up separate queues for each processing stage. That allows files to flow through the pipeline as soon as they're ready.

Then you can use from concurrent.futures import ThreadPoolExecutor, this is ideal for I/O tasks cause it allows the program to continue processing while waiting for downloads or what are your program doing...
Also, in each stage you should have some worker threads that pull from input queue and push to the next stage queue.

So if you make everything right, soon as a file is downloaded, it's immediately available for conversion while other downloads continue. Same with converted files, they're immediately sent for chart creation.

It should looks like this:

class ProcessingPipeline:
    def __init__(self, urls, download_dir, csv_dir, chart_dir, max_workers):
        self.urls = urls
        self.download_dir = download_dir
        self.csv_dir = csv_dir
        self.chart_dir = chart_dir
        self.max_workers = max_workers
    
        self.download_queue = queue.Queue()
        self.convert_queue = queue.Queue()
        self.chart_queue = queue.Queue()

     # Below should be your functions that processing the data
     ... ... ...

     def run(self):
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            download_futures = [executor.submit(self.download_worker) 
                              for _ in range(min(self.max_workers, len(self.urls)))]
            convert_futures = [executor.submit(self.convert_worker) 
                             for _ in range(self.max_workers)]
            chart_futures = [executor.submit(self.chart_worker) 
                           for _ in range(self.max_workers)]

# waiting for prociding all queues 
        self.download_queue.join()
        self.convert_queue.join()
        self.chart_queue.join()
    
if __name__ == "__main__":
    res = ProcessingPipeline(
      urls="some_urls",
      download_dir="your_data",
      csv_dir="your_files",
      chart_dir="your_charts",
      max_workers=3 
    )
    res.run()

Hoping, this helps

I am trying to speed up a process I have been doing for a long time. I currently download all the files. Then I convert them all to CSV. Then I use bokeh to create an interactive chart for looking at the data. I would like to as soon as the first file downloads, start converting it to CSV while still downloading files. Then after the CSV is created immediately start creating the interactive chart while still downloading files and creating new CSVs.

Is this possible in python?

The data files range from 100-500mb each and there are generally about 40 to process daily. This process generally takes about 15min to complete and if this new process could cut it by third that would help greatly.

I not sure if multiprocessing/mutli-threading or async/await would help.

I am trying to speed up a process I have been doing for a long time. I currently download all the files. Then I convert them all to CSV. Then I use bokeh to create an interactive chart for looking at the data. I would like to as soon as the first file downloads, start converting it to CSV while still downloading files. Then after the CSV is created immediately start creating the interactive chart while still downloading files and creating new CSVs.

Is this possible in python?

The data files range from 100-500mb each and there are generally about 40 to process daily. This process generally takes about 15min to complete and if this new process could cut it by third that would help greatly.

I not sure if multiprocessing/mutli-threading or async/await would help.

Share Improve this question asked Mar 8 at 16:10 Brent HodgesBrent Hodges 31 bronze badge 1
  • maybe first try to use multiprocessing/mutli-threading or async/await and you will see if it can help in your situation. We have no access to your code so we can't check it. – furas Commented Mar 8 at 17:56
Add a comment  | 

1 Answer 1

Reset to default 0

You need to use Pipelines (this blog helps you to understand whats pipeline).
First of all use some library like queue, where you need to set up separate queues for each processing stage. That allows files to flow through the pipeline as soon as they're ready.

Then you can use from concurrent.futures import ThreadPoolExecutor, this is ideal for I/O tasks cause it allows the program to continue processing while waiting for downloads or what are your program doing...
Also, in each stage you should have some worker threads that pull from input queue and push to the next stage queue.

So if you make everything right, soon as a file is downloaded, it's immediately available for conversion while other downloads continue. Same with converted files, they're immediately sent for chart creation.

It should looks like this:

class ProcessingPipeline:
    def __init__(self, urls, download_dir, csv_dir, chart_dir, max_workers):
        self.urls = urls
        self.download_dir = download_dir
        self.csv_dir = csv_dir
        self.chart_dir = chart_dir
        self.max_workers = max_workers
    
        self.download_queue = queue.Queue()
        self.convert_queue = queue.Queue()
        self.chart_queue = queue.Queue()

     # Below should be your functions that processing the data
     ... ... ...

     def run(self):
        with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            download_futures = [executor.submit(self.download_worker) 
                              for _ in range(min(self.max_workers, len(self.urls)))]
            convert_futures = [executor.submit(self.convert_worker) 
                             for _ in range(self.max_workers)]
            chart_futures = [executor.submit(self.chart_worker) 
                           for _ in range(self.max_workers)]

# waiting for prociding all queues 
        self.download_queue.join()
        self.convert_queue.join()
        self.chart_queue.join()
    
if __name__ == "__main__":
    res = ProcessingPipeline(
      urls="some_urls",
      download_dir="your_data",
      csv_dir="your_files",
      chart_dir="your_charts",
      max_workers=3 
    )
    res.run()

Hoping, this helps

本文标签: