admin管理员组文章数量:1026989
I am trying to speed up a process I have been doing for a long time. I currently download all the files. Then I convert them all to CSV. Then I use bokeh to create an interactive chart for looking at the data. I would like to as soon as the first file downloads, start converting it to CSV while still downloading files. Then after the CSV is created immediately start creating the interactive chart while still downloading files and creating new CSVs.
Is this possible in python?
The data files range from 100-500mb each and there are generally about 40 to process daily. This process generally takes about 15min to complete and if this new process could cut it by third that would help greatly.
I not sure if multiprocessing/mutli-threading or async/await would help.
I am trying to speed up a process I have been doing for a long time. I currently download all the files. Then I convert them all to CSV. Then I use bokeh to create an interactive chart for looking at the data. I would like to as soon as the first file downloads, start converting it to CSV while still downloading files. Then after the CSV is created immediately start creating the interactive chart while still downloading files and creating new CSVs.
Is this possible in python?
The data files range from 100-500mb each and there are generally about 40 to process daily. This process generally takes about 15min to complete and if this new process could cut it by third that would help greatly.
I not sure if multiprocessing/mutli-threading or async/await would help.
Share Improve this question asked Mar 8 at 16:10 Brent HodgesBrent Hodges 31 bronze badge 1 |1 Answer
Reset to default 0You need to use Pipelines (this blog helps you to understand whats pipeline).
First of all use some library like queue
, where you need to set up separate queues for each processing stage. That allows files to flow through the pipeline as soon as they're ready.
Then you can use from concurrent.futures import ThreadPoolExecutor
, this is ideal for I/O tasks cause it allows the program to continue processing while waiting for downloads or what are your program doing...
Also, in each stage you should have some worker threads that pull from input queue and push to the next stage queue.
So if you make everything right, soon as a file is downloaded, it's immediately available for conversion while other downloads continue. Same with converted files, they're immediately sent for chart creation.
It should looks like this:
class ProcessingPipeline:
def __init__(self, urls, download_dir, csv_dir, chart_dir, max_workers):
self.urls = urls
self.download_dir = download_dir
self.csv_dir = csv_dir
self.chart_dir = chart_dir
self.max_workers = max_workers
self.download_queue = queue.Queue()
self.convert_queue = queue.Queue()
self.chart_queue = queue.Queue()
# Below should be your functions that processing the data
... ... ...
def run(self):
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
download_futures = [executor.submit(self.download_worker)
for _ in range(min(self.max_workers, len(self.urls)))]
convert_futures = [executor.submit(self.convert_worker)
for _ in range(self.max_workers)]
chart_futures = [executor.submit(self.chart_worker)
for _ in range(self.max_workers)]
# waiting for prociding all queues
self.download_queue.join()
self.convert_queue.join()
self.chart_queue.join()
if __name__ == "__main__":
res = ProcessingPipeline(
urls="some_urls",
download_dir="your_data",
csv_dir="your_files",
chart_dir="your_charts",
max_workers=3
)
res.run()
Hoping, this helps
I am trying to speed up a process I have been doing for a long time. I currently download all the files. Then I convert them all to CSV. Then I use bokeh to create an interactive chart for looking at the data. I would like to as soon as the first file downloads, start converting it to CSV while still downloading files. Then after the CSV is created immediately start creating the interactive chart while still downloading files and creating new CSVs.
Is this possible in python?
The data files range from 100-500mb each and there are generally about 40 to process daily. This process generally takes about 15min to complete and if this new process could cut it by third that would help greatly.
I not sure if multiprocessing/mutli-threading or async/await would help.
I am trying to speed up a process I have been doing for a long time. I currently download all the files. Then I convert them all to CSV. Then I use bokeh to create an interactive chart for looking at the data. I would like to as soon as the first file downloads, start converting it to CSV while still downloading files. Then after the CSV is created immediately start creating the interactive chart while still downloading files and creating new CSVs.
Is this possible in python?
The data files range from 100-500mb each and there are generally about 40 to process daily. This process generally takes about 15min to complete and if this new process could cut it by third that would help greatly.
I not sure if multiprocessing/mutli-threading or async/await would help.
Share Improve this question asked Mar 8 at 16:10 Brent HodgesBrent Hodges 31 bronze badge 1-
maybe first try to use
multiprocessing/mutli-threading or async/await
and you will see if it can help in your situation. We have no access to your code so we can't check it. – furas Commented Mar 8 at 17:56
1 Answer
Reset to default 0You need to use Pipelines (this blog helps you to understand whats pipeline).
First of all use some library like queue
, where you need to set up separate queues for each processing stage. That allows files to flow through the pipeline as soon as they're ready.
Then you can use from concurrent.futures import ThreadPoolExecutor
, this is ideal for I/O tasks cause it allows the program to continue processing while waiting for downloads or what are your program doing...
Also, in each stage you should have some worker threads that pull from input queue and push to the next stage queue.
So if you make everything right, soon as a file is downloaded, it's immediately available for conversion while other downloads continue. Same with converted files, they're immediately sent for chart creation.
It should looks like this:
class ProcessingPipeline:
def __init__(self, urls, download_dir, csv_dir, chart_dir, max_workers):
self.urls = urls
self.download_dir = download_dir
self.csv_dir = csv_dir
self.chart_dir = chart_dir
self.max_workers = max_workers
self.download_queue = queue.Queue()
self.convert_queue = queue.Queue()
self.chart_queue = queue.Queue()
# Below should be your functions that processing the data
... ... ...
def run(self):
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
download_futures = [executor.submit(self.download_worker)
for _ in range(min(self.max_workers, len(self.urls)))]
convert_futures = [executor.submit(self.convert_worker)
for _ in range(self.max_workers)]
chart_futures = [executor.submit(self.chart_worker)
for _ in range(self.max_workers)]
# waiting for prociding all queues
self.download_queue.join()
self.convert_queue.join()
self.chart_queue.join()
if __name__ == "__main__":
res = ProcessingPipeline(
urls="some_urls",
download_dir="your_data",
csv_dir="your_files",
chart_dir="your_charts",
max_workers=3
)
res.run()
Hoping, this helps
本文标签:
版权声明:本文标题:multithreading - How to download, convert, and process files in three queues at the same time using Python - Stack Overflow 内容由热心网友自发贡献,该文观点仅代表作者本人, 转载请联系作者并注明出处:http://it.en369.cn/questions/1744892002a2122642.html, 本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如发现本站有涉嫌抄袭侵权/违法违规的内容,一经查实,本站将立刻删除。
multiprocessing/mutli-threading or async/await
and you will see if it can help in your situation. We have no access to your code so we can't check it. – furas Commented Mar 8 at 17:56