2012-10-22 55 views
1

可能重复:
Python multiprocessing pool.map for multiple arguments是否可以将多个参数传递给multiprocessing.pool?

我想两个参数输送给在multiprocessing.Pool子进程?我感觉我正在向上推进。它有可能是2个参数,还是一个元组,或者... ...?它似乎对一些工作正常,正确地传出两个文件名(进出),但然后barfs意外地在一个可变的点。可悲的是,它并没有真正的工作,因为输出文件都是空的 - 如果我直接调用它,或者单个处理,这不会发生什么。还有一个复杂的因素,被调用的例程在另一个导入的模块中。将本地化为'foo'存根模块确实解决了一些问题,但它只能打印参数,而不是尝试做任何实际的工作。

这可能是一个固执地抵制学习如何使用队列的很长的路,但我只想确认我不会在任何地方推进我所处的道路。

fixtures/txt_data/AAD.txt obj/txt_data/AAD.txt 
fixtures/txt_data/ANZSMW.txt obj/txt_data/ANZSMW.txt 
fixtures/txt_data/BENPA.txt obj/txt_data/BENPA.txt 
fixtures/txt_data/CBAIZQ.txt obj/txt_data/CBAIZQ.txt 
Traceback (most recent call last): 
    File "./jobflow.py", line 60, in <module> 
    main() 
    File "./jobflow.py", line 57, in main 
    args.func(args) 
    File "./jobflow.py", line 40, in market 
    pool.map(foo, market_files()) 
    File "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 225, in map 
    return self.map_async(func, iterable, chunksize).get() 
File  "/System/Library/Frameworks/Python.framework/Versions/2.7/lib/python2.7/multiprocessing/pool.py", line 522, in get 
raise self._value 
TypeError: function takes exactly 1 argument (2 given) 
fixtures/txt_data/CSDO.txt obj/txt_data/CSDO.txt 
fixtures/txt_data/EMB.txt obj/txt_data/EMB.txt 
fixtures/txt_data/GGG.txt obj/txt_data/GGG.txt 
fixtures/txt_data/IDL.txt obj/txt_data/IDL.txt 

这是一个错误示例。它在文件耗尽之前暂停。它或者呻吟声,它要2个ARGS,但只得到了1当我改变它,试图通过有两个参数:

def foo(c): 
    a, b, = c 
    print a, b 
    market2.file_main((a, b)) # does comment/uncommenting this break it only because it's in another python file? 

def market(args): 
    """ 
    read raw ticker data files and output nice, clean, more valid ticker data files 
    """ 
    pool = multiprocessing.Pool() 

    class market_files(object): 
     for infile in args.infiles: 
      outfile = os.path.join(args.outdir, os.path.basename(infile)) 
      yield (infile, outfile) 

    pool.map(foo, market_files()) 
+0

哦。很好的发现。谢谢。 –

回答

2

哦,等等,它的工作,但不是通过直接传递多个ARGS,但将它们放入一个元组。

我通过在每次迭代时产生一个新的Processp.start()来实现它,这个迭代产生了一个荒谬的进程数;但是确实吞下了多个参数。

从那里开始工作我把迭代器简化成了一个列表(现在我已经确定了一个迭代器可能没问题),但我认为最主要的是将args作为元组传递。一定是在切割房间地板上看到太多混乱的情况之一,以查看已经工作的解决方案。

所以在控制器我有:

arglist = [] 
    for infile in args.infiles: 
     outfile = os.path.join(args.outdir, os.path.basename(infile)) 
     arglist.append((infile, outfile)) 

    pool = multiprocessing.Pool() 
    p = pool.map(func=market2.process, iterable=arglist) 

和模块中:

def process(x): 
     infile, outfile = x 
     instream = open(infile, 'rB') 
     outstream = open(outfile, 'wB') 
     main(instream, outstream) 
     instream.close() 
     outstream.close() 

4芯性能(分):

  • 单线程= 3:54
  • 使用子进程= 4:52(我认为它默认情况下会阻止这个数字)
  • 使用Process的的squillions同时= 2:41(饱和所有内核与每个进程1-4%CPU)
  • 使用普尔= 2:13
+0

编辑:lols。我认为它正在循环推动一切 - 所以运行真正想要的笛卡尔产品。让我解决所有的问题! –

相关问题