1

长时间潜伏者,第一次海报。请让我知道如何改善我的帖子!在pygmo中使用队列进行功能评估

我试图通过使用安装Anaconda3优化pygmo 2.5图书馆现有的一些代码,它们通过做弹道优化(这是POST2如果有人有兴趣)可执行包装异步和参数向量的分布评价。为了促进这一点,我使用多处理.SyncManager和multiprocessing.Queues通过网络传递输入并接收输出和日志消息。因此,在这种情况下,pygmo将选择向量来尝试,支持代码会将其传递到一个分布式工作人员将抓取的输入队列中,通过可执行文件评估并将结果传递回去,最终将返回给任何pygmo.algorithm用于评估

我的问题是,当pygmo初始化一个问题时,它会提供所提供的类的深层副本,在我的情况和下面提供的示例代码中,它包含多个队列。在执行深度拷贝时,我得到错误

File "pygmo_testing.py", line 121, in <module> 
    main() 
    File "pygmo_testing.py", line 108, in main 
    prob = pg.problem(my_prob) 
    File "C:\Anaconda3\lib\copy.py", line 180, in deepcopy 
    y = _reconstruct(x, memo, *rv) 
    File "C:\Anaconda3\lib\copy.py", line 280, in _reconstruct 
    state = deepcopy(state, memo) 
    File "C:\Anaconda3\lib\copy.py", line 150, in deepcopy 
    y = copier(x, memo) 
    File "C:\Anaconda3\lib\copy.py", line 240, in _deepcopy_dict 
    y[deepcopy(key, memo)] = deepcopy(value, memo) 
    File "C:\Anaconda3\lib\copy.py", line 169, in deepcopy 
    rv = reductor(4) 
    File "C:\Anaconda3\lib\multiprocessing\queues.py", line 58, in __getstate__ 
    context.assert_spawning(self) 
    File "C:\Anaconda3\lib\multiprocessing\context.py", line 356, in assert_spawning 
    ' through inheritance' % type(obj).__name__ 
RuntimeError: Queue objects should only be shared between processes through inheritance 

有没有办法解决这个问题?我需要保持执行风格异步,并分发给代码使用的其他方法。我也尝试过queue.Queue和multiprocessing.Manager.Queue(两者都不能与其他现有的代码一起工作)以获得完整性,但它总是归结为深层次的复制。

谢谢大家!


""" 
****************************** Import Statements ****************************** 
""" 
import pygmo as pg 
from multiprocessing import Pool, Queue 

""" 
****************************** Utility Functions ****************************** 
""" 
def sphere_fitness(x): 
    return sum(x*x) 

def worker(inp_q, out_q): 

    while True: 

     x = inp_q.get() 
     print("got {}".format(x)) 
     if x == False: 
      break 
     else: 
      fit = sphere_fitness(x) 
      print("x: {} f: {}".format(x, fit)) 
      out_q.put_nowait(fit) 
      print("submitted {}".format(x))   
""" 
********************************** Class(es) ************************************ 
""" 
class distributed_submit(object): 
    """ Class for pygmo Problem""" 

    def __init__(self, dim, inp_q, out_q): 
     self.dim = dim 
     self._inp_q = inp_q 
     self._out_q = out_q 

    def _submit(self, inp_q, x): 
     self._inp_q.put_nowait(x) 
     print("x delivered") 

    def _receive(self, out_q): 
     return self._out_q.get() 

    def fitness(self, x): 
     self._submit(x) 
     print("put in {}".format(x)) 
     fit = self._receive() 
     print("got {}".format(fit)) 
     return [fit] 

    def get_bounds(self): 
     return ([-1]*self.dim, [1]*self.dim) 

    def get_name(self): 
     return "Sphere Function" 

    def get_extra_info(self): 
     return "\tDimensions: {}".format(self.dim) 


""" 
******************************* Main Function ******************************** 
""" 
def main(): 

    # Queues from multiprocessing 
    _inp_q = Queue() 
    _out_q = Queue() 

    _workers = Pool(initializer=worker, 
        initargs=(_inp_q, _out_q)) 
    _workers.close() 

    my_prob = distributed_submit(3, _inp_q, _out_q) 

    prob = pg.problem(my_prob) 

    algo = pg.algorithm(pg.bee_colony(gen=20, limit=20)) 

    pop = pg.population(prob, 10) 
    print(pop) 

    pop = algo.evolve(pop) 

    print(pop.champion_f) 


if __name__ == "__main__": 
    main() 
+0

我在读的所有东西都说Queue是跨线程或进程(分别为collections.Queue和multiprocessing.Queue)的同步,并且它们不能被复制。像发电机一样,复制它将会耗尽它。如果pygmo必须复制它给出的内容,那么您需要找到一种方法来处理我认为可复制的数据结构。我将不得不再看看它... – flutefreak7

回答