2013-12-21 27 views
0

我的问题似乎类似于This Thread但是,虽然我认为我遵循建议的方法,但我仍然遇到PicklingError。当我在本地运行进程而不发送到IPython集群引擎时,该功能正常工作。IPython集群和PicklingError

我使用与IPyhon的笔记本溜索,所以第一创建基于zipline.TradingAlgorithm

细胞[1]

from IPython.parallel import Client 
rc = Client() 
lview = rc.load_balanced_view() 

细胞[2]

%%px --local # This insures that the Class and modules exist on each engine 
import zipline as zpl 
import numpy as np 

class Agent(zpl.TradingAlgorithm): # must define initialize and handle_data methods 
    def initialize(self): 
     self.valueHistory = None 
     pass 

    def handle_data(self, data): 
     for security in data.keys(): 
      ## Just randomly buy/sell/hold for each security 
      coinflip = np.random.random() 
      if coinflip < .25: 
       self.order(security,100) 
      elif coinflip > .75: 
       self.order(security,-100) 
     pass 

细胞类[3]

from zipline.utils.factory import load_from_yahoo 

start = '2013-04-01' 
end = '2013-06-01' 
sidList = ['SPY','GOOG'] 
data = load_from_yahoo(stocks=sidList,start=start,end=end) 

agentList = [] 
for i in range(3): 
    agentList.append(Agent()) 

def testSystem(agent,data): 
    results = agent.run(data) #-- This is how the zipline based class is executed 
    #-- next I'm just storing the final value of the test so I can plot later 
    agent.valueHistory.append(results['portfolio_value'][len(results['portfolio_value'])-1]) 
    return agent 

for i in range(10): 
    tasks = [] 
    for agent in agentList: 
     #agent = testSystem(agent,data) ## On its own, this works! 
     #-- To Test, uncomment the above line and comment out the next two 
     tasks.append(lview.apply_async(testSystem,agent,data)) 
    agentList = [ar.get() for ar in tasks] 

for agent in agentList: 
    plot(agent.valueHistory) 

这里时产生的错误:

PicklingError        Traceback (most recent call last)/Library/Python/2.7/site-packages/IPython/kernel/zmq/serialize.pyc in serialize_object(obj, buffer_threshold, item_threshold) 
    100   buffers.extend(_extract_buffers(cobj, buffer_threshold)) 
    101 
--> 102  buffers.insert(0, pickle.dumps(cobj,-1)) 
    103  return buffers 
    104 
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed 

如果我从zipline.TradingAlgorithm的东西,如覆盖run()方法:

def run(self, data): 
    return 1 

尝试这样的事情...

def run(self, data): 
    return zpl.TradingAlgorithm.run(self,data) 

导致相同的PicklingError。

然后传递给引擎工作,但显然测试的胆量不执行。由于runline是zipline.TradingAlgorithm的内部方法,我不知道它所做的所有事情,我如何确保它通过?

回答

0

它看起来像溜索TradingAlgorithm对象不与pickle已运行后:

import zipline as zpl 

class Agent(zpl.TradingAlgorithm): # must define initialize and handle_data methods 
    def handle_data(self, data): 
     pass 

agent = Agent() 
pickle.dumps(agent)[:32] # ok 

agent.run(data) 
pickle.dumps(agent)[:32] # fails 

但这个建议,我认为你应该对引擎产生的代理,只传递数据/返回结果(理想情况下,根本不传递数据,或者最多一次)。

最大限度地减少数据传输可能是这个样子:

定义类:

%%px 
import zipline as zpl 
import numpy as np 

class Agent(zpl.TradingAlgorithm): # must define initialize and handle_data methods 
    def initialize(self): 
     self.valueHistory = [] 

    def handle_data(self, data): 
     for security in data.keys(): 
      ## Just randomly buy/sell/hold for each security 
      coinflip = np.random.random() 
      if coinflip < .25: 
       self.order(security,100) 
      elif coinflip > .75: 
       self.order(security,-100) 

负载数据

%%px 
from zipline.utils.factory import load_from_yahoo 

start = '2013-04-01' 
end = '2013-06-01' 
sidList = ['SPY','GOOG'] 

data = load_from_yahoo(stocks=sidList,start=start,end=end) 
agent = Agent() 

和运行代码:

def testSystem(agent, data): 
    results = agent.run(data) #-- This is how the zipline based class is executed 
    #-- next I'm just storing the final value of the test so I can plot later 
    agent.valueHistory.append(results['portfolio_value'][len(results['portfolio_value'])-1]) 

# create references to the remote agent/data objects 
agent_ref = parallel.Reference('agent') 
data_ref = parallel.Reference('data') 

tasks = [] 
for i in range(10): 
    for j in range(len(rc)): 
     tasks.append(lview.apply_async(testSystem, agent_ref, data_ref)) 
# wait for the tasks to complete 
[ t.get() for t in tasks ] 

和情节的结果,从来不取代理商自己

%matplotlib inline 
import matplotlib.pyplot as plt 

for history in rc[:].apply_async(lambda : agent.valueHistory): 
    plt.plot(history) 

这是不太你共享相同的代码 - 三个代理来回弹跳上的所有发动机,而这对每个引擎剂。我对zipline不太了解,不知道这对你有没有用。