2009-11-13 66 views
0

我虽然会很有趣,看看线程和队列,所以我已经写了2个脚本,一个将打破文件并加密在一个线程每个块,其他的会做连续。我对python还很陌生,并且不知道为什么treading脚本需要这么长时间。线程和队列VS系列性能

螺纹脚本:

#!/usr/bin/env python 

from Crypto.Cipher import AES 
from optparse import OptionParser 
import os, base64, time, sys, hashlib, pickle, threading, timeit, Queue 


BLOCK_SIZE = 32 #32 = 256-bit | 16 = 128-bit 
TFILE = 'mytestfile.bin' 
CHUNK_SIZE = 2048 * 2048 
KEY = os.urandom(32) 

class DataSplit(): 
    def __init__(self,fileObj, chunkSize): 

     self.fileObj = fileObj 
     self.chunkSize = chunkSize 

    def split(self): 
     while True: 
      data = self.fileObj.read(self.chunkSize) 
      if not data: 
       break 
      yield data 

class encThread(threading.Thread): 
    def __init__(self, seg_queue,result_queue, cipher): 
     threading.Thread.__init__(self) 
     self.seg_queue = seg_queue 
     self.result_queue = result_queue 
     self.cipher = cipher 

    def run(self): 
     while True: 
      #Grab a data segment from the queue 
      data = self.seg_queue.get() 
      encSegment = []   
      for lines in data: 
      encSegment.append(self.cipher.encrypt(lines)) 
      self.result_queue.put(encSegment) 
      print "Segment Encrypted" 
      self.seg_queue.task_done() 

start = time.time() 
def main(): 
    seg_queue = Queue.Queue() 
    result_queue = Queue.Queue() 
    estSegCount = (os.path.getsize(TFILE)/CHUNK_SIZE)+1 
    cipher = AES.new(KEY, AES.MODE_CFB) 
    #Spawn threads (one for each segment at the moment) 
    for i in range(estSegCount): 
     eT = encThread(seg_queue, result_queue, cipher) 
     eT.setDaemon(True) 
     eT.start() 
     print ("thread spawned") 

    fileObj = open(TFILE, "rb") 
    splitter = DataSplit(fileObj, CHUNK_SIZE) 
    for data in splitter.split(): 
     seg_queue.put(data) 
     print ("Data sent to thread") 

    seg_queue.join() 
    #result_queue.join() 
    print ("Seg Q: {0}".format(seg_queue.qsize())) 
    print ("Res Q: {0}".format(result_queue.qsize())) 



main() 
print ("Elapsed Time: {0}".format(time.time()-start)) 

串行脚本:使用readlines方法(),而不是读

#!/usr/bin/env python 

from Crypto.Cipher import AES 
from optparse import OptionParser 
import os, base64, time, sys, hashlib, pickle, threading, timeit, Queue 

TFILE = 'mytestfile.bin' 
CHUNK_SIZE = 2048 * 2048 

class EncSeries(): 
    def __init(self): 
     pass 

    def loadFile(self,path): 
     openFile = open(path, "rb") 
     #fileData = openFile.readlines() 
     fileData = openFile.read(CHUNK_SIZE) 
     openFile.close() 
     return fileData 

    def encryptData(self,key, data): 
     cipher = AES.new(key, AES.MODE_CFB) 
     newData = [] 
     for lines in data: 
      newData.append(cipher.encrypt(lines)) 
     return newData 


start = time.time() 
def main(): 
    print ("Start") 
    key = os.urandom(32) 
    run = EncSeries() 
    fileData = run.loadFile(TFILE) 

    encFileData=run.encryptData(key, fileData) 
    print("Finish") 

main() 
print ("Elapsed Time: {0}".format(time.time()-start)) 

似乎对串行版本大大加快东西太多,但它已经是多快比线程版本。

回答

1
  1. 看起来你的第二个版本只能读取一个块,而第一个版本读取整个文件 - 这将解释大加速。 编辑:另一个问题:我只是注意到你无缘无故地运行了for lines in data--这实际上是对单个字符进行加密,这会慢得多。相反,只需将数据直接传递给encrypt即可。

  2. 有正在启动更多的CPU重线程比你的处理器内核是没有意义的。

  3. 的线程可以并行,如果它们调用的解锁GIL同时运行的扩展模块才能正常工作。我不认为PyCrypto这样做,所以你不会在这里完成任何并行的工作。

  4. 如果瓶颈是磁盘性能,那么无论如何你都不会看到很大的改进 - 在这种情况下,最好有一个线程做磁盘I/O,另一个线程做加密。 GIL不会成为问题,因为它在做磁盘I/O时被释放。

+0

很多thx的信息家伙。我将修改线程版本为一个I/O线程和一个加密线程。对于小文件,我是否可以通过创建两个具有唯一密码对象的进程并行加密来获得性能提升? – zyrus001 2009-11-13 12:06:04

+0

如果您看到100%的CPU核心,那么它可以帮助添加另一个进程。如果没有,那么磁盘I/O可能是瓶颈,而另一个进程无济于事。 – interjay 2009-11-13 12:36:36

1

线程不加快程序的神奇的方式 - 拆分工作到线程通常会慢下来,除非该程序会耗费其时间等待I/O一显著部分。每个新线程都会在代码分割工作中增加更多的开销,并且在线程间切换时操作系统开销更大。

从理论上讲,如果你是一个多处理器CPU则线程可能在不同的处理器上运行上运行,以便工作并行进行,但即使再有就是具有比处理器的多个线程是没有意义的。

实际上它是完全不同的,至少对于C版本的Python。 GIL在多处理器上运行得并不好。 David Beazley看到这个presentation的原因是为什么。 IronPython和Jython没有这个问题。

如果你真的想要并行化工作,那么最好是产生多个进程并把它们分配给它们,但是有可能传输大块数据的进程间通信开销会否定并行性的好处。

0

主题有几个不同的用途:

  1. 他们只提供加速,如果他们让你得到多个硬件在同一时间对你的问题的工作,即硬件是否是CPU内核或磁盘头。

  2. 它们允许你跟踪I/O事件的多个序列,这将是复杂得多,没有他们,比如与多个用户同时通话的。

后者不是为了性能,而是为了清晰的代码。

1

我看了戴夫柯比链接到演示文稿,并试图例子计数器,它需要两个多两倍的时间在两个线程运行:

import time 
from threading import Thread 

countmax=100000000 

def count(n): 
    while n>0: 
     n-=1 

def main1(): 
    count(countmax) 
    count(countmax) 

def main2(): 
    t1=Thread(target=count,args=(countmax,)) 
    t2=Thread(target=count,args=(countmax,)) 
    t1.start() 
    t2.start() 
    t1.join() 
    t2.join() 

def timeit(func): 
    start = time.time() 
    func() 
    end=time.time()-start 
    print ("Elapsed Time: {0}".format(end)) 

if __name__ == '__main__': 
    timeit(main1) 
    timeit(main2) 

输出:

Elapsed Time: 21.5470001698 
Elapsed Time: 55.3279998302 

但是,如果我换了Thread for Process:

from multiprocessing import Process 

and

t1=Process(target .... 

等我得到这样的输出:

Elapsed Time: 20.5 
Elapsed Time: 10.4059998989 

现在它好像我的奔腾CPU有两个内核,我敢打赌,它的超线程。任何人都可以在他们的两个或四个核心机器上尝试这种方式并运行2或4个线程?

multiprocessing

0

只是一个快速注蟒蛇2.6.4文档更新这个帖子:蟒蛇3.2有一个新的实现这减轻了不少与多线程相关的开销的GIL的,但并没有消除锁定。 (即它不允许你使用多个核心,但它允许你有效地在该核心上使用多个线程)。