4

我的理解是concurrent.futures依靠酸洗参数让它们在不同的进程(或线程)中运行。不应该酸洗创建一个参数的副本?在Linux上它似乎没有这样做,即,我必须明确地传递一份副本。为什么concurrent.futures没有提供参数副本?

我想作以下结果的意义:

<0> rands before submission: [17, 72, 97, 8, 32, 15, 63, 97, 57, 60] 
<1> rands before submission: [97, 15, 97, 32, 60, 17, 57, 72, 8, 63] 
<2> rands before submission: [15, 57, 63, 17, 97, 97, 8, 32, 60, 72] 
<3> rands before submission: [32, 97, 63, 72, 17, 57, 97, 8, 15, 60] 
in function 0 [97, 15, 97, 32, 60, 17, 57, 72, 8, 63] 
in function 1 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8] 
in function 2 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8] 
in function 3 [97, 32, 17, 15, 57, 97, 63, 72, 60, 8] 

下面的代码:

from __future__ import print_function 
import time 
import random 
try: 
    from concurrent import futures 
except ImportError: 
    import futures 


def work_with_rands(i, rands): 
    print('in function', i, rands) 


def main(): 
    random.seed(1) 
    rands = [random.randrange(100) for _ in range(10)] 

    # sequence 1 and sequence 2 should give the same results but they don't 
    # only difference is that one uses a copy of rands (i.e., rands.copy()) 
    # sequence 1 
    with futures.ProcessPoolExecutor() as ex: 
     for i in range(4): 
      print("<{}> rands before submission: {}".format(i, rands)) 
      ex.submit(work_with_rands, i, rands) 
      random.shuffle(rands) 

    print('-' * 30) 
    random.seed(1) 
    rands = [random.randrange(100) for _ in range(10)] 
    # sequence 2 
    print("initial sequence: ", rands) 
    with futures.ProcessPoolExecutor() as ex: 
     for i in range(4): 
      print("<{}> rands before submission: {}".format(i, rands)) 
      ex.submit(work_with_rands, i, rands[:]) 
      random.shuffle(rands) 

if __name__ == "__main__": 
    main() 

在地球上是[97, 32, 17, 15, 57, 97, 63, 72, 60, 8]来自哪里?这甚至不是传递给submit的序列之一。

结果的Python 2.

回答

1

基本上,ProcessPoolExecutor。 submit()方法将函数及其参数放到某个“工作项”字典中(没有任何酸洗), 与另一个线程共享(_queue_management_worker),并且该线程将该词典中的WorkItems传递给实际读取的队列工作进程。

有一个在源代码中的注释,说明并发模块架构: http://hg.python.org/cpython/file/16207b8495bf/Lib/concurrent/futures/process.py#l6

事实证明,有没有足够的时间_queue_management_worker于收到通知提交调用之间新的项目。

因此,该线程始终在此处等待:(http://hg.python.org/cpython/file/16207b8495bf/Lib/concurrent/futures/process.py#l226) 并且只在ProcessPoolExecutor.shutdown(从ProcessPoolExecutor上下文退出时)上醒来。

如果你把一些延迟在你的第一个序列,这样的:

with futures.ProcessPoolExecutor() as ex: 
    for i in range(4): 
     print("<{}> rands before submission: {}".format(i, rands)) 
     ex.submit(work_with_rands, i, rands) 
     random.shuffle(rands) 
     time.sleep(0.01) 

,你会看到,_queue_management_worker将被唤醒,并传递给工作进程的调用,并work_with_rands将打印不同的值。

2

下略有不同,你在所有线程及其突变共享相同的列表。它很难调试,因为当你添加一个打印它会有不同的表现。但[97, 32, 17, 15, 57, 97, 63, 72, 60, 8]必须是shuffle以内的状态。 shuffle保存列表(与所有线程中存在的列表相同)并且不止一次更改它。当线程被调用时,状态是[97, 32, 17, 15, 57, 97, 63, 72, 60, 8]。这些值不会被内部复制,它们会被复制到另一个线程中,因此您无法保证它们何时会被复制。

什么洗牌产生洗牌完成之前的一个例子:

[31, 64, 88, 7, 68, 85, 69, 3, 15, 47] # initial value (rands) 
# ex.submit() is called here 
# shuffle() is called here 
# shuffle starts changing rand to: 
[31, 64, 88, 47, 68, 85, 69, 3, 15, 7] 
[31, 64, 15, 47, 68, 85, 69, 3, 88, 7] 
[31, 64, 15, 47, 68, 85, 69, 3, 88, 7] 
[31, 64, 69, 47, 68, 85, 15, 3, 88, 7] 
[31, 64, 85, 47, 68, 69, 15, 3, 88, 7] # threads may be called here 
[31, 64, 85, 47, 68, 69, 15, 3, 88, 7] # or here 
[31, 64, 85, 47, 68, 69, 15, 3, 88, 7] # or here 
[31, 85, 64, 47, 68, 69, 15, 3, 88, 7] 
[85, 31, 64, 47, 68, 69, 15, 3, 88, 7] # value when the shuffle has finished 

洗牌的源代码:

def shuffle(self, x, random=None): 
    if random is None: 
     randbelow = self._randbelow 
     for i in reversed(range(1, len(x))): 
      # pick an element in x[:i+1] with which to exchange x[i] 
      j = randbelow(i+1) 
      x[i], x[j] = x[j], x[i] 
      # added this print here. that's what prints the output above 
      # your threads are probably being called when this is still pending 
      print(x) 
    ... other staff here 

所以如果你输入的是[17, 72, 97, 8, 32, 15, 63, 97, 57, 60]和你的输出为[97, 15, 97, 32, 60, 17, 57, 72, 8, 63]洗牌有“步骤在那之间的中间“。你的线程中调用了

发明的“中间步骤”无突变,一般尽量避免线程之间共享数据,因为它真的很难得到正确:

def work_with_rands(i, rands): 
    print('in function', i, rands) 


def foo(a): 
    random.seed(random.randrange(999912)/9) 
    x = [None]*len(a) 
    for i in a: 
     _rand = random.randrange(len(a)) 

     while x[_rand] is not None: 
      _rand = random.randrange(len(a)) 

     x[_rand] = i 
    return x 

def main(): 
    rands = [random.randrange(100) for _ in range(10)] 
    with futures.ProcessPoolExecutor() as ex: 
     for i in range(4): 
      new_rands = foo(rands) 
      print("<{}> rands before submission: {}".format(i, new_rands)) 
      ex.submit(work_with_rands, i, new_rands) 


<0> rands before submission: [84, 12, 93, 47, 40, 53, 74, 38, 52, 62] 
<1> rands before submission: [74, 53, 93, 12, 38, 47, 52, 40, 84, 62] 
<2> rands before submission: [84, 12, 93, 38, 62, 52, 53, 74, 47, 40] 
<3> rands before submission: [53, 62, 52, 12, 84, 47, 93, 40, 74, 38] 
in function 0 [84, 12, 93, 47, 40, 53, 74, 38, 52, 62] 
in function 1 [74, 53, 93, 12, 38, 47, 52, 40, 84, 62] 
in function 2 [84, 12, 93, 38, 62, 52, 53, 74, 47, 40] 
in function 3 [53, 62, 52, 12, 84, 47, 93, 40, 74, 38] 
+0

我有一个解决方案(明确的副本)。我只是困惑于我得到的结果(1)为什么它在泡菜时不制作副本,如果是酸洗和(2)“[97,32,17,15,57, 97,63,72,60,8]''来自。 – ariddell

+0

它来自洗牌,洗牌多次改变列表,直到洗牌实际完成,我更新了答案,以解释它可能已经混乱 –

+0

我对文档的阅读是没有共享数据,即每个参数是腌制和分开发送。这是我混淆的根源,是什么促成了这个问题。 – ariddell

相关问题