2013-08-21 108 views
3

我有一个包含大量数据的文件。每一行都是一条记录。我正在尝试对整个文件进行一些ETL工作。现在我正在使用标准输入逐行读取数据。这很酷的事情是你的脚本可以非常灵活地与其他脚本和shell命令集成。我将结果写入标准输出。例如。Python Threading stdin/stdout

$ cat input_file 
line1 
line2 
line3 
line4 
... 

我当前的Python代码看起来是这样的 - parse.py

import sys 
for line in sys.stdin: 
    result = ETL(line) # ETL is some self defined function which takes a while to execute. 
    print result 

下面的代码是它是如何工作现在:

cat input_file | python parse.py > output_file 

我已经看过了线程模块的Python,我想知道如果我使用该模块,性能会大大提高。

问题1:我该如何规划每个线程的配额,为什么?

... 
counter = 0 
buffer = [] 
for line in sys.stdin: 
    buffer.append(line) 
    if counter % 5 == 0: # maybe assign 5 rows to each thread? if not, is there a rule of thumb to determine 
     counter = 0 
     thread = parser(buffer) 
     buffer = [] 
     thread.start() 

问题2:多线程可能在同一时间打印出结果返回到标准输出,如何组织他们,避免下面的情况呢?

import threading 
import time 

class parser(threading.Thread): 
    def __init__ (self, data_input): 
     threading.Thread.__init__(self) 
     self.data_input = data_input 

    def run(self): 
     for elem in self.data_input: 
      time.sleep(3) 
      print elem + 'Finished' 

work = ['a', 'b', 'c', 'd', 'e', 'f'] 

thread1 = parser(['a', 'b']) 
thread2 = parser(['c', 'd']) 
thread3 = parser(['e', 'f']) 

thread1.start() 
thread2.start() 
thread3.start() 

的输出是真难看,其中一个行都包含来自两个线程的输出。

aFinished 
cFinishedeFinished 

bFinished 
fFinished 
dFinished 
+0

你能链接“Python的线程模块”吗? 无论如何,线程访问一个文件,恕我直言,是不是一件好事。您需要定义什么内核可以通过锁和信号量以及作品访问什么以及何时访问。 由于大部分工作是I/O工作,而不是CPU工作,可能你不会看到性能提升。 –

回答

4

先问你第二个问题,这是mutexes的用途。您可以通过使用一个锁的解析器之间进行协调得到你想要的干净的输出,并确保只有一个线程在给定的时间段进入到输出流:

class parser(threading.Thread): 
    output_lock = threading.Lock() 

    def __init__ (self, data_input): 
     threading.Thread.__init__(self) 
     self.data_input = data_input 

    def run(self): 
     for elem in self.data_input: 
      time.sleep(3) 
      with self.output_lock: 
       print elem + 'Finished' 

至于你的第一个问题,请注意,多线程可能不会为您的特定工作负载提供好处。这在很大程度上取决于您对每个输入行(您的ETL函数)所做的工作主要是CPU限制还是IO限制。如果前者(我怀疑可能),线程将无济于事,因为global interpreter lock。在这种情况下,您可能希望使用multiprocessing模块在多个进程而不是多个线程之间分配工作。

但您可以通过更容易实现的工作流程获得相同的结果:将输入文件拆分为n个部分(使用例如split命令);分别在每个子文件上调用extract-and-transform脚本;然后连接结果输出文件。

一个挑剔:“使用标准输入逐行读取数据,因为它不会将整个文件加载到内存中”涉及到一个误解。您可以从内部的Python,例如,通过在结构像一个文件对象替换sys.stdin逐行读取一个文件行:

for line in sys.stdin: 

也见文件对象的readline()方法,并注意read()可以作为参数要读取的最大字节数。

+0

很多伟大的东西在你的文章,阿尔卑斯山。我对你的评论CPU限制/ IO限制非常感兴趣。我想知道您是否有办法确定CPU/IO占用了多少时间和资源?顺便说一句,他们的理由是我使用stdIO是因为你可以将你的脚本与Shell命令集成在一起,这使得它非常灵活和方便。感谢关于'记忆中间理解'的更正。 –

0

线程是否会有帮助,您高度依赖于您的情况。特别是,如果您的功能涉及大量磁盘访问,那么线程可能会使您的速度得到显着改善。

回应你的第一个问题,我总是发现它只是依赖。确定理想的线程数时,有很多因素在起作用,其中许多因素都依赖于程序。例如,如果您正在进行大量磁盘访问(这非常缓慢),那么您将需要更多线程利用停机时间来等待磁盘访问。但是,如果程序是CPU绑定的,那么大量的线程可能不是非常有用。因此,尽管可能分析所有因素以获得理想数量的线程,但通常要做出初步猜测并且从那里进行调整要快得多。

更具体地说,虽然为每个线程分配一定数量的线可能并不是分散工作的最佳途径。例如,考虑一条线需要特别长的时间来处理。如果一条线程可以在这一条线上工作,而其他线程可以在此期间再多做几条线,那将是最好的。处理这个问题的最好方法是使用Queue。如果将每行都插入到队列中,则每个线程都可以从队列中拉出一条线,处理它并重复,直到队列为空。通过这种方式,工作得到分配,从而不会有任何线程无需工作(当然,直到最后)。

现在,第二个问题。你绝对正确的做法是一次写入多个线程的stdout并不是一个理想的解决方案。理想情况下,你会安排一些事情,以便写入标准输出只发生在一个地方。一个很好的方法是使用队列。如果每个线程都将其输出写入共享队列,则可以产生一个额外线程,其唯一任务是将项目从该队列中取出并将其输出到stdout。通过将打印限制为只有一个线程,可以避免多个线程尝试一次打印时固有的问题。