1

我试图实现可以访问共享数据资源的多处理应用程序。我正在使用锁定机制来确保共享资源安全访问。但是我打错了。令人惊讶的是,如果进程1首先获取锁,它正在为请求提供服务,并且它正在尝试获取锁的下一个进程失败。但是,如果除1以外的其他进程试图首先获取锁,则它将在第一次运行时失败。我是新来的Python和使用文档,以实现这个所以我不知道如果我错过任何基本的安全机制here.Any数据点作为为什么我目睹这将是很大的帮助获取锁时Python多处理锁机制失败

方案:

#!/usr/bin/python 
from multiprocessing import Process, Manager, Lock 
import os 
import Queue 
import time 
lock = Lock() 
def launch_worker(d,l,index): 
    global lock 
    lock.acquire() 
    d[index] = "new" 
    print "in process"+str(index) 
    print d 
    lock.release() 
    return None 

def dispatcher(): 
    i=1 
    d={} 
    mp = Manager() 
    d = mp.dict() 
    d[1] = "a" 
    d[2] = "b" 
    d[3] = "c" 
    d[4] = "d" 
    d[5] = "e" 
    l = mp.list(range(10)) 
    for i in range(4): 
     p = Process(target=launch_worker, args=(d,l,i)) 
     i = i+1 
     p.start() 
    return None 

if __name__ == '__main__': 
    dispatcher() 

ERROR当处理1被服务第一

in process0 
{0: 'new', 1: 'a', 2: 'b', 3: 'c', 4: 'd', 5: 'e'} 
Process Process-3: 
Traceback (most recent call last): 
    File "/usr/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap 
    self.run() 
    File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run 
    self._target(*self._args, **self._kwargs) 
    File "dispatcher.py", line 10, in launch_worker 
    d[index] = "new" 
    File "<string>", line 2, in __setitem__ 
    File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod 
    self._connect() 
    File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect 
    conn = self._Client(self._token.address, authkey=self._authkey) 
    File "/usr/lib/python2.6/multiprocessing/connection.py", line 143, in Client 
    c = SocketClient(address) 
    File "/usr/lib/python2.6/multiprocessing/connection.py", line 263, in SocketClient 
    s.connect(address) 
    File "<string>", line 1, in connect 
error: [Errno 2] No such file or directory 

ERROR当处理2首先服务

Process Process-2: 
Traceback (most recent call last): 
    File "/usr/lib/python2.6/multiprocessing/process.py", line 232, in _bootstrap 
    self.run() 
    File "/usr/lib/python2.6/multiprocessing/process.py", line 88, in run 
    self._target(*self._args, **self._kwargs) 
    File "dispatcher.py", line 10, in launch_worker 
    d[index] = "new" 
    File "<string>", line 2, in __setitem__ 
    File "/usr/lib/python2.6/multiprocessing/managers.py", line 722, in _callmethod 
    self._connect() 
    File "/usr/lib/python2.6/multiprocessing/managers.py", line 709, in _connect 
    conn = self._Client(self._token.address, authkey=self._authkey) 
    File "/usr/lib/python2.6/multiprocessing/connection.py", line 150, in Client 
    deliver_challenge(c, authkey) 
    File "/usr/lib/python2.6/multiprocessing/connection.py", line 373, in deliver_challenge 
    response = connection.recv_bytes(256)  # reject large message 
IOError: [Errno 104] Connection reset by peer 

回答

1

您的工作人员修改的字典是由调度过程管理的共享对象;工人对该对象的修改要求他们与调度过程进行通信。您看到的错误来自您的调度员在启动它们之后并未等待工作进程的事实;它过早退出,所以它们可能不存在让他们在需要时进行沟通。

尝试更新共享字典的第一个或两个工作人员可能会成功,因为当他们修改共享字典时,包含Manager实例的进程可能仍然存在(例如,它可能仍在创建更多工作人员的过程中) 。因此在你的例子中你看到一些成功输出。但管理过程很快就会退出,而下一个尝试进行修改的员工将会失败。 (您看到的错误消息通常是进程间通信失败的尝试;如果您再次运行程序,您可能会看到EOF错误。)

您需要做的是调用join方法在Process对象作为一种等待他们每个人退出的方式。您的dispatcher的以下修改显示了基本思路:

def dispatcher(): 
    mp = Manager() 
    d = mp.dict() 
    d[1] = "a" 
    d[2] = "b" 
    d[3] = "c" 
    d[4] = "d" 
    d[5] = "e" 
    procs = [] 
    for i in range(4): 
     p = Process(target=launch_worker, args=(d,i)) 
     procs.append(p) 
     p.start() 
    for p in procs: 
     p.join() 
+0

对我来说是非常有意义的。父进程正在退出。但我可能不想使用连接,因为它可能会使我的程序序列化,因为我希望独立处理每个进程。我可能想睡在父功能中,除非有任何子进程通知。 – pavan