2015-04-17 53 views
-1

我遇到了3个线程同时运行的问题。我希望“交易”循环,“价格”循环和“停止”循环同时运行,但似乎“停止”循环劫持程序并在其他人等待轮到时运行。我应该如何设置它们以便它们全部同时运行?Python中的并发线程

import Queue 
import threading 
import time 
import json 

from execution import Execution 
from settings import STREAM_DOMAIN, API_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID 
from strategy import TestRandomStrategy 
from streaming import StreamingForexPrices 
from event import TickEvent 
from rates import stop 



def trade(events, strategy, execution): 
    """ 
    Carries out an infinite while loop that polls the 
    events queue and directs each event to either the 
    strategy component of the execution handler. The 
    loop will then pause for "heartbeat" seconds and 
    continue. 
    """ 
    while True: 
     try: 
      event = events.get(False) 
     except Queue.Empty: 
      pass 
     else: 
      if event is not None: 
       if event.type == 'TICK': 
        strategy.calculate_signals(event) 
       elif event.type == 'ORDER': 
        print "Executing order!" 
        execution.execute_order(event) 
     time.sleep(heartbeat) 


if __name__ == "__main__": 
    heartbeat = 0 # Half a second between polling 
    events = Queue.Queue() 

    # Trade 1000 unit of EUR/USD 
    instrument = "EUR_USD" 
    units = 1 
    stopLoss = stopper 




    # Create the OANDA market price streaming class 
    # making sure to provide authentication commands 
    prices = StreamingForexPrices(
     STREAM_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID, 
     instrument, events 
    ) 
    #handle stopLoss price 
    stopper = stop() 

    # Create the execution handler making sure to 
    # provide authentication commands 
    execution = Execution(API_DOMAIN, ACCESS_TOKEN, ACCOUNT_ID) 

    # Create the strategy/signal generator, passing the 
    # instrument, quantity of units and the events queue 
    strategy = TestRandomStrategy(instrument, units, events, stopLoss) 

    # Create two separate threads: One for the trading loop 
    # and another for the market price streaming class 
    trade_thread = threading.Thread(target=trade, args=(events, strategy, execution)) 
    price_thread = threading.Thread(target=prices.stream_to_queue, args=[]) 
    rate_thread = threading.Thread(target=stop, args=[]) 

    # Start both threads 
    trade_thread.start() 
    price_thread.start() 
    rate_thread.start() 

只是fyi,一切都很好,直到我试图添加“率”。我添加的唯一东西是一个额外的线程,stopLoss和rate.py文件。

rate.py:

import oandapy 
import time 
oanda = oandapy.API(environment="practice", access_token="xxxxxxxxx") 


while True: 
    response = oanda.get_prices(instruments="EUR_USD") 
    prices = response.get("prices") 
    asking_price = prices[0].get("ask") 
    stop = asking_price - .001 
    print stop 
    time.sleep(1) 

感谢您的帮助提前!

回答

1

首先,一句话:

  • 不使用sleep如果你能避免它;例如,在“以旧换新”循环您 不需要睡觉的,如果你在你的队列进行阻断.get()

然后,一旦“rates.py”导入它开始while循环;你 缺少stop()函数(或者你的代码不完整?)

编辑:如果你想添加在rates.py的stop功能,只是把 的while循环代码def stop():块内这样

def stop(): 
    while True: 
     response = oanda.get_prices(instruments="EUR_USD") 
     prices = response.get("prices") 
     asking_price = prices[0].get("ask") 
     stop = asking_price - .001 
     print stop 
     time.sleep(1) 

(顺便说一句:你真的知道你在做什么)

+0

感谢您的见解!我会看看我是否可以正确地做到这一点。一个问题是,使用睡眠导致同时线程的问题? – MacD

+0

不,它可能不会导致多线程的问题,但它会减慢处理速度 - 一个阻塞等待,直到某些东西到达队列中,所以如果没有睡眠,您正在处理更多事件,更快 – mguijarr

+0

我认为您对我的说法是正确的代码也不完整。我如何将stop()函数添加到这种情况下? – MacD