2016-01-25 49 views
1

前言:这是我在使用这些工具线程队列和多援助

上下文第一次尝试:我有我想处理一个非常大的文件。所以我试图将文件分成更小的块。然后将这些文件加载​​到队列中进行处理。

目标是加速什么是一个非常缓慢的过程。

代码:

import lifetimes 
import os 
import pandas 
import Queue 
import threading 
import multiprocessing 
import glob 
import subprocess 


#move master to processing dir 
os.system("cp /data/ltv-testing1.csv /data/out") 

#break master csv into 1 million row chunks 
subprocess.call(['bash', '/home/ddewberry/LTV_CSV_Split.sh']) 

#remove master file 
os.remove("/data/out/ltv-testing1.csv") 

os.chdir("/data/out") 


# Create List of Files 
worker_data = glob.glob('split_*') 

#build queue with file list 
q = Queue.Queue(worker_data) 

#import tools for data processing 
from lifetimes.utils import summary_data_from_transaction_data 

#define worker for threads 

def worker(outfile = '/data/in/Worker.csv'): 
    while True: 
     item = q.get() 
     data = pandas.read_csv(item) 
     summary = summary_data_from_transaction_data(data, data[[2]], data[[1]]) 
     summary.to_csv(outfile%s % (item)) 
     q.task_done() 

cpus=multiprocessing.cpu_count() #detect number of cores 
print("Creating %d threads" % cpus) 
for i in range(cpus): 
    t = threading.Thread(target=worker) 
    t.daemon = True 
    t.start() 

q.join() 

#clean up 
for row in worker_data: 
    os.remove(row) 

问题:

我没有得到任何错误消息但它并不在所有的工作。 (它基本上什么也没有)

我很困惑我做错了什么或我需要修复什么。

+0

对于初学者来说,'Queue.Queue'有一个参数'maxsize',而不是一个迭代,所以'q.get()'将无限期地阻止,因为没有东西在里面......此外,对于这种线程问题不会给你提速。 – mata

+0

谢谢你的解释。你有没有建议我可以采取什么方法来帮助加快这一进程? – ddewber

+0

所以我应该这样做︰q = Queue.Queue()for worker_data中的文件:q.put(文件) – ddewber

回答

0
import lifetimes 
import os 
import pandas 
import Queue 
import threading 
import multiprocessing 
import glob 
import subprocess 


#move master to processing dir 
os.system("cp /data/ltv-testing1.csv /data/out") 

#break master csv into 1 million row chunks 
subprocess.call(['bash', '/home/ddewberry/LTV_CSV_Split.sh']) 

#remove master file 
os.remove("/data/out/ltv-testing1.csv") 

os.chdir("/data/out") 


# Create List of Files 
worker_data = glob.glob('split_*') 

# rename to csv 
for row in worker_data: 
    os.rename(row, row+'.csv') 

worker_data1 = glob.glob('split_*') 

#build queue with file list 
q = Queue.Queue() 
for files in worker_data1: 
    q.put(files)