2015-04-22 83 views
1

我想在Python中使用多处理库来同时处理“测试”。我有一个存储在变量test_files中的测试列表。我想要工人从test_files中删除一个测试,并调用它们的process_test函数。但是,当我运行此代码时,两个进程都运行相同的测试。似乎我没有以线程安全的方式访问test_files。我究竟做错了什么?Python锁定关键部分

代码

def process_worker(lock, test_files) 
    # Keep going until we run out of tests 
    while True: 
     test_file = None 
     # Critical section of code 
     lock.acquire() 
     try: 
      if len(test_files) != 0: 
       test_file = test_files.pop() 
     finally: 
      lock.release() 
     # End critical section of code 

     # If there is another test in the queue process it 
     if test_file is not None: 
      print "Running test {0} on worker {1}".format(test_file, multiprocessing.current_process().name) 
      process_test(test_file) 
     else: 
      # No more tests to process 
      return 

# Mutex for workers 
lock = multiprocessing.Lock() 

# Declare our workers 
p1 = multiprocessing.Process(target = process_worker, name = "Process 1", args=(lock, test_files)) 
p2 = multiprocessing.Process(target = process_worker, name = "Process 2", args=(lock, test_files)) 

# Start processing 
p1.start() 
p2.start() 

# Block until both workers finish 
p1.join() 
p2.join() 

输出

Running test "BIT_Test" on worker Process 1 
Running test "BIT_Test" on worker Process 2 
+1

我认为它的,因为你是在复制它传递给新进程的进程列表。它需要某种共享项目,如[Value](https://docs.python.org/2/library/multiprocessing.html#multiprocessing.Value)或[Array](https://docs.python.org/2 /library/multiprocessing.html#multiprocessing.Array)。它们都是'multiprocessing'模块的一部分。另请参阅https://docs.python.org/2/library/multiprocessing.html#sharing-state-between-processes –

回答

4

想要共享这样不正确的方法在这里列出。您应该使用流程安全的数据结构,例如multiprocessing.Queue,或更好的方法是使用multiprocessing.Pool并让它为您处理排队。你正在做的是非常适合Pool.map

import multiprocessing 

def process_worker(test_file): 
    print "Running test {0} on worker {1}".format(test_file, multiprocessing.current_process().name) 
    process_test(test_file) 


p = multiprocessing.Pool(2) # 2 processes in the pool 
# map puts each item from test_files in a Queue, lets the 
# two processes in our pool pull each item from the Queue, 
# and then execute process_worker with that item as an argument. 
p.map(process_worker, test_files) 
p.close() 
p.join() 

更简单!

3

你也可以使用multiprocessing.Manager

import multiprocessing 

def process_worker(lock, test_files): 
    # Keep going until we run out of tests 
    while True: 
     test_file = None 
     # Critical section of code 
     lock.acquire() 
     try: 
      if len(test_files) != 0: 
       test_file = test_files.pop() 
     finally: 
      lock.release() 
     # End critical section of code 

     # If there is another test in the queue process it 
     if test_file is not None: 
      print "Running test %s on worker %s" % (test_file, multiprocessing.current_process().name) 
      #process_test(test_file) 
     else: 
      # No more tests to process 
      return 

# Mutex for workers 
lock = multiprocessing.Lock() 
manager = multiprocessing.Manager() 

test_files = manager.list(['f1', 'f2', 'f3']) 

# Declare our workers 
p1 = multiprocessing.Process(target = process_worker, name = "Process 1", args=(lock, test_files)) 
p2 = multiprocessing.Process(target = process_worker, name = "Process 2", args=(lock, test_files)) 

# Start processing 
p1.start() 
p2.start() 

# Block until both workers finish 
p1.join() 
p2.join()