长时间潜伏者,第一次海报。请让我知道如何改善我的帖子!在pygmo中使用队列进行功能评估
我试图通过使用安装Anaconda3优化pygmo 2.5图书馆现有的一些代码,它们通过做弹道优化(这是POST2如果有人有兴趣)可执行包装异步和参数向量的分布评价。为了促进这一点,我使用多处理.SyncManager和multiprocessing.Queues通过网络传递输入并接收输出和日志消息。因此,在这种情况下,pygmo将选择向量来尝试,支持代码会将其传递到一个分布式工作人员将抓取的输入队列中,通过可执行文件评估并将结果传递回去,最终将返回给任何pygmo.algorithm用于评估
我的问题是,当pygmo初始化一个问题时,它会提供所提供的类的深层副本,在我的情况和下面提供的示例代码中,它包含多个队列。在执行深度拷贝时,我得到错误
File "pygmo_testing.py", line 121, in <module>
main()
File "pygmo_testing.py", line 108, in main
prob = pg.problem(my_prob)
File "C:\Anaconda3\lib\copy.py", line 180, in deepcopy
y = _reconstruct(x, memo, *rv)
File "C:\Anaconda3\lib\copy.py", line 280, in _reconstruct
state = deepcopy(state, memo)
File "C:\Anaconda3\lib\copy.py", line 150, in deepcopy
y = copier(x, memo)
File "C:\Anaconda3\lib\copy.py", line 240, in _deepcopy_dict
y[deepcopy(key, memo)] = deepcopy(value, memo)
File "C:\Anaconda3\lib\copy.py", line 169, in deepcopy
rv = reductor(4)
File "C:\Anaconda3\lib\multiprocessing\queues.py", line 58, in __getstate__
context.assert_spawning(self)
File "C:\Anaconda3\lib\multiprocessing\context.py", line 356, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: Queue objects should only be shared between processes through inheritance
有没有办法解决这个问题?我需要保持执行风格异步,并分发给代码使用的其他方法。我也尝试过queue.Queue和multiprocessing.Manager.Queue(两者都不能与其他现有的代码一起工作)以获得完整性,但它总是归结为深层次的复制。
谢谢大家!
"""
****************************** Import Statements ******************************
"""
import pygmo as pg
from multiprocessing import Pool, Queue
"""
****************************** Utility Functions ******************************
"""
def sphere_fitness(x):
return sum(x*x)
def worker(inp_q, out_q):
while True:
x = inp_q.get()
print("got {}".format(x))
if x == False:
break
else:
fit = sphere_fitness(x)
print("x: {} f: {}".format(x, fit))
out_q.put_nowait(fit)
print("submitted {}".format(x))
"""
********************************** Class(es) ************************************
"""
class distributed_submit(object):
""" Class for pygmo Problem"""
def __init__(self, dim, inp_q, out_q):
self.dim = dim
self._inp_q = inp_q
self._out_q = out_q
def _submit(self, inp_q, x):
self._inp_q.put_nowait(x)
print("x delivered")
def _receive(self, out_q):
return self._out_q.get()
def fitness(self, x):
self._submit(x)
print("put in {}".format(x))
fit = self._receive()
print("got {}".format(fit))
return [fit]
def get_bounds(self):
return ([-1]*self.dim, [1]*self.dim)
def get_name(self):
return "Sphere Function"
def get_extra_info(self):
return "\tDimensions: {}".format(self.dim)
"""
******************************* Main Function ********************************
"""
def main():
# Queues from multiprocessing
_inp_q = Queue()
_out_q = Queue()
_workers = Pool(initializer=worker,
initargs=(_inp_q, _out_q))
_workers.close()
my_prob = distributed_submit(3, _inp_q, _out_q)
prob = pg.problem(my_prob)
algo = pg.algorithm(pg.bee_colony(gen=20, limit=20))
pop = pg.population(prob, 10)
print(pop)
pop = algo.evolve(pop)
print(pop.champion_f)
if __name__ == "__main__":
main()
我在读的所有东西都说Queue是跨线程或进程(分别为collections.Queue和multiprocessing.Queue)的同步,并且它们不能被复制。像发电机一样,复制它将会耗尽它。如果pygmo必须复制它给出的内容,那么您需要找到一种方法来处理我认为可复制的数据结构。我将不得不再看看它... – flutefreak7