2014-10-16 22 views
0

我已经实现了一个Python套接字服务器。它将来自多台摄像机的图像数据发送到客户端。我请求处理程序类的样子:在Python中多次并行运行类方法

class RequestHandler(SocketServer.BaseRequestHandler): 
    def handle(self): 
     while True: 
      data = self.request.recv(1024) 
      if data.endswith('0000000050'): # client requests data 

       for camera_id, camera_path in _video_devices.iteritems(): 
        message = self.create_image_transfer_message(camera_id, camera_path) 
        self.request.sendto(message, self.client_address) 

    def create_image_transfer_message(self, camera_id, camera_path): 
     # somecode ... 

我不得不坚持,因为客户端的服务器socket。它可以工作,但问题在于它可以顺序工作,所以上传的照相机图像之间存在很大的延迟。我想创建传输消息并行通话之间的一个小的延迟。

我试图用pool classmultiprocessing

import multiprocessing 

class RequestHandler(SocketServer.BaseRequestHandler): 
    def handle(self): 

    ... 

    pool = multiprocessing.Pool(processes=4) 
    messages = [pool.apply(self.create_image_transfer_message, args=(camera_id, camera_path)) for camId, camPath in _video_devices.iteritems()] 

但这抛出:

PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

我想知道如果有另一种方式来创建一个并行的传输消息定义的通话间延迟?

编辑:

我使用来自多个相机的数据创建响应消息。问题是,如果我运行图像抓取程序太靠近彼此,我会得到图像伪像,因为USB总线过载。我发现,以0.2秒的时间顺序调用图像抓取将解决问题。摄像头在图像抓取功能运行的整个过程中都不会发送数据,因此延迟的并行校准会生成良好的图像,并且两者之间只有很小的延迟。

+0

你已经遇到了Python的multiproces唱衰弱。 Python进程通过酸洗对象并通过管道来回发送(幕后)。 'pickle'对它可以处理的内容有限制(https://docs.python.org/2/library/pickle.html#what-c​​an-be-pickled-and-unpickled)。在你的情况下,你有一个函数或类在模块级别没有定义的地方,它正在扼杀pickler。 – 2014-10-17 13:52:09

回答

1

我认为你已经走上了正确的道路,没有必要丢掉你的工作。

这里有一个答案how to use a class method with multiprocessing我通过谷歌找到搜索“multiprocessing class method

from multiprocessing import Pool 
import time 

pool = Pool(processes=2) 

def unwrap_self_f(arg, **kwarg): 
    return C.create_image_transfer_message(*arg, **kwarg) 

class RequestHandler(SocketServer.BaseRequestHandler): 

    @classmethod 
    def create_image_transfer_message(cls, camera_id, camera_path): 
     # your logic goes here 

    def handle(self): 
     while True: 
      data = self.request.recv(1024) 
      if not data.endswith('0000000050'): # client requests data 
       continue 

      pool.map(unwrap_self_f, 
       (
        (camera_id, camera_path) 
        for camera_id, camera_path in _video_devices.iteritems() 
       ) 
      ) 

注后,如果您想从工人返回值,那么你就需要探索using a shared resource在这里看到这样的回答 - How can I recover the return value of a function passed to multiprocessing.Process?

+0

非常感谢您的提示,我会很乐意检查出来。我唯一担心的是,如果这个“解决方法”不是针对“好人”的话。我想知道是否有另一种方法,比将计时器放入方法中。 – karlitos 2014-10-16 16:58:46

+0

如果您不使用该类,只需将该方法移出课程。有一个快速的解决方法。如果你正在使用该类,那么看看这个现有的答案 - http://stackoverflow.com/a/1816969/6084 – mjallday 2014-10-16 17:16:15

+0

我仍然不确定的时间。我想要实现的是在指定的时间间隔内运行带有不同参数的类方法。我尝试了你的代码,并使用不同的设置(特别是process =参数)进行播放,但我无法获得令人满意的结果。该方法立即被称为处理时间,然后代码等待,直到处理完成。我想有这样的事情:调用一个方法,等待xx时间,调用一个方法而不必等待前一个结果... – karlitos 2014-10-16 20:41:15

0

此代码为我做的伎俩:

class RequestHandler(SocketServer.BaseRequestHandler): 
    def handle(self): 
     while True: 
      data = self.request.recv(1024) 
      if data.endswith('0000000050'): # client requests data 

       process_manager = multiprocessing.Manager() 
       messaging_queue = process_manager.Queue() 
       jobs = [] 

       for camId, camPath in _video_devices.iteritems(): 
        p = multiprocessing.Process(target=self.create_image_transfer_message, 
               args=(camera_id, camera_path, messaging_queue)) 
        jobs.append(p) 
        p.start() 
        time.sleep(0.3) 

       # wait for all processes to finish 
       for p in jobs: 
        p.join() 

       while not messaging_queue.empty(): 
        self.request.sendto(messaging_queue.get(), self.client_address)