1
前言:这是我在使用这些工具线程队列和多援助
上下文第一次尝试:我有我想处理一个非常大的文件。所以我试图将文件分成更小的块。然后将这些文件加载到队列中进行处理。
目标是加速什么是一个非常缓慢的过程。
代码:
import lifetimes
import os
import pandas
import Queue
import threading
import multiprocessing
import glob
import subprocess
#move master to processing dir
os.system("cp /data/ltv-testing1.csv /data/out")
#break master csv into 1 million row chunks
subprocess.call(['bash', '/home/ddewberry/LTV_CSV_Split.sh'])
#remove master file
os.remove("/data/out/ltv-testing1.csv")
os.chdir("/data/out")
# Create List of Files
worker_data = glob.glob('split_*')
#build queue with file list
q = Queue.Queue(worker_data)
#import tools for data processing
from lifetimes.utils import summary_data_from_transaction_data
#define worker for threads
def worker(outfile = '/data/in/Worker.csv'):
while True:
item = q.get()
data = pandas.read_csv(item)
summary = summary_data_from_transaction_data(data, data[[2]], data[[1]])
summary.to_csv(outfile%s % (item))
q.task_done()
cpus=multiprocessing.cpu_count() #detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
q.join()
#clean up
for row in worker_data:
os.remove(row)
问题:
我没有得到任何错误消息但它并不在所有的工作。 (它基本上什么也没有)
我很困惑我做错了什么或我需要修复什么。
对于初学者来说,'Queue.Queue'有一个参数'maxsize',而不是一个迭代,所以'q.get()'将无限期地阻止,因为没有东西在里面......此外,对于这种线程问题不会给你提速。 – mata
谢谢你的解释。你有没有建议我可以采取什么方法来帮助加快这一进程? – ddewber
所以我应该这样做︰q = Queue.Queue()for worker_data中的文件:q.put(文件) – ddewber