2013-07-31 59 views
2

我正在构建一个python模块以从大型文本语料库中提取标签,虽然其结果质量高,但执行速度非常缓慢。我试图通过使用多处理来加速进程,并且这也一直在起作用,直到我试图引入一个锁,以便一次只有一个进程连接到我们的数据库。我不知道如何做这项工作的生活 - 尽管进行了大量的搜索和调整,我仍然得到了PicklingError: Can't pickle <type 'thread.lock'>: attribute lookup thread.lock failed。这是有问题的代码 - 它工作正常,直到我试图传递一个锁对象作为参数为f使用多处理锁定时遇到问题:酸洗错误

def make_network(initial_tag, max_tags = 2, max_iter = 3): 
    manager = Manager() 
    lock = manager.Lock() 
    pool = manager.Pool(8) 

    # this is a very expensive function that I would like to parallelize 
    # over a list of tags. It involves a (relatively cheap) call to an external 
    # database, which needs a lock to avoid simultaneous queries. It takes a list 
    # of strings (tags) as its sole argument, and returns a list of sets with entries 
    # corresponding to the input list. 
    f = partial(get_more_tags, max_tags = max_tags, lock = lock) 

    def _recursively_find_more_tags(tags, level): 
     if level >= max_iter: 
      raise StopIteration 
     new_tags = pool.map(f, tags) 
     to_search = [] 
     for i, s in zip(tags, new_tags): 
      for t in s: 
       joined = ' '.join(t) 
       print i + "|" + joined 
       to_search.append(joined) 
     try: 
      return _recursively_find_more_tags(to_search, level+1) 
     except StopIteration: 
      return None 

    _recursively_find_more_tags([initial_tag], 0) 
+0

您是在Windows还是在Linux上运行? – Jonathan

+0

我在Linux上,对不起,我忘了补充一点! – sbrother

回答

4

你的问题是锁对象不可用。在这种情况下,我可以看到两种可能的解决方案。

  • 为了避免这种情况,您可以将您的锁定变量设置为全局变量。然后,您将能够在您的池过程函数中直接将其作为全局变量引用它,并且不必将其作为参数传递给池过程函数。这是有效的,因为Python在创建池进程时使用OS fork机制,因此将创建池进程的整个进程内容复制到它们。这是将锁传递给使用多处理包创建的Python进程的唯一方法。顺便提一下,没有必要只为这个锁使用Manager类。有了这个改变你的代码应该是这样的:

    import multiprocessing 
    from functools import partial 
    
    lock = None # Global definition of lock 
    pool = None # Global definition of pool 
    
    
    def make_network(initial_tag, max_tags=2, max_iter=3): 
        global lock 
        global pool 
        lock = multiprocessing.Lock() 
        pool = multiprocessing.Pool(8) 
    
    
    def get_more_tags(): 
        global lock 
        pass 
    
    
    # this is a very expensive function that I would like to parallelize 
    # over a list of tags. It involves a (relatively cheap) call to an external 
    # database, which needs a lock to avoid simultaneous queries. It takes a 
    # list of strings (tags) as its sole argument, and returns a list of sets 
    # with entries corresponding to the input list. 
    f = partial(get_more_tags, max_tags=max_tags) 
    
    def _recursively_find_more_tags(tags, level): 
        global pool 
        if level >= max_iter: 
         raise StopIteration 
        new_tags = pool.map(f, tags) 
        to_search = [] 
        for i, s in zip(tags, new_tags): 
         for t in s: 
          joined = ' '.join(t) 
          print(i + "|" + joined) 
          to_search.append(joined) 
        try: 
         return _recursively_find_more_tags(to_search, level + 1) 
        except StopIteration: 
         return None 
    
    _recursively_find_more_tags([initial_tag], 0) 
    

在你真正的代码,它是可能的锁和池变量可能是类的实例变量。

  • 这就完全避免了锁的使用,但可能有小幅走高的开销是创建另一个进程使用multiprocessing.Process,它通过一个multiprocessing.Queue连接到每个池过程中的第二方案。这个过程将负责运行你的数据库查询。您可以使用队列来允许池进程将参数发送到管理数据库查询的进程。由于所有池进程都将使用相同的队列,因此对数据库的访问将自动进行序列化。额外的开销将来自酸洗/取消数据库查询参数和查询响应。请注意,您可以将multiprocessing.Queue对象作为参数传递给池进程。还要注意,基于multiprocessing.Lock的解决方案不会在Windows上工作,其中过程不是使用fork语义创建的。