2014-12-25 100 views
1

我想读取和处理来自外部服务的一些数据。我询问服务是否有任何数据,如果返回了一些信息,我会处理它并再次询问(这样数据可以在可用时立即处理),否则我会等待数据可用的通知。这可以写成一个无限循环:在循环中扭曲等待事件

def loop(self): 
    while True: 
     data = yield self.get_data_nonblocking() 
     if data is not None: 
      yield self.process_data(data) 
     else: 
      yield self.data_available 

def on_data_available(self): 
    self.data_available.fire() 

如何data_available在这里实现?它可能是延迟,但延迟不能重置,只能重新创建。有更好的选择吗?

此循环是否可以集成到Twisted事件循环中?我可以在on_data_available中读取和处理数据,并编写一些代码,而不是循环检查get_data_nonblocking,但我觉得我需要一些锁以确保数据按照到达的顺序进行处理(上面的代码强制执行它,因为它是它处理的唯一地方)。这是一个好主意吗?

回答

3

考虑TCP连接的情况。 TCP连接的接收缓冲区可以有或没有数据。你可以得到这些数据,或者什么也得不到,而不使用非阻塞套接字API拦截:

data = socket.recv(1024) 
if data: 
    self.process_data(data) 

可以等待数据可用使用select()(或任何基本相当的API):

socket.setblocking(False) 
while True: 
    data = socket.recv(1024) 
    if data: 
     self.process_data(data) 
    else: 
     select([socket], [], []) 

其中,只有select()特别扭曲不友好的(尽管扭曲的成语是肯定不会让自己的socket.recv调用)。虽然(用ProtocoldataReceived方法代替select的调用方式可以触发Deferred - 有点像你的on_data_available方法 - 折腾一些收益率并使整个产品成为inlineCallbacks生成器)。

但是,尽管这是一种可以从TCP连接获取数据的方法,但Twisted鼓励您使用的API并不是这样做的。相反,API是:

class SomeProtocol(Protocol): 
    def dataReceived(self, data): 
     # Your logic here 

我不明白你的情况有多大的不同。如果,不是你写的循环,你做了这样的事情:

class YourDataProcessor(object): 
    def process_data(self, data): 
     # Your logic here 

class SomeDataGetter(object): 
    def __init__(self, processor): 
     self.processor = processor 

    def on_available_data(self): 
     data = self.get_data_nonblocking() 
     if data is not None: 
      self.processor.process_data(data) 

现在有根本没有Deferreds(也许除了任何实现on_available_dataget_data_nonblocking,但我看不到代码)。

如果您保持原样,您可以保证按顺序执行,因为Twisted是单线程的(在几个标有非常清晰标记的地方除外),并且在单线程程序中,以前的调用到process_data必须在任何稍后致电process_data之前完成(除了当然,process_data重复地自行调用 - 但这是另一个故事)。

如果您切换回使用inlineCallbacks(或任何等效的“协同”调味饮料组合),那么您可能会引入无序执行的可能性。

例如,如果get_data_nonblocking回报Deferred和你写的是这样的:

@inlineCallbacks 
    def on_available_data(self): 
     data = yield self.get_data_nonblocking() 
     if data is not None: 
      self.processor.process_data(data) 

那么你已经改变on_available_data说,上下文切换是允许呼叫get_data_nonblocking时。在这种情况下,根据具体实施的get_data_nonblockingon_available_data,这是完全有可能的:

  1. on_available_data
  2. get_data_nonblocking被调用,并返回一个Deferred
  3. on_available_data告诉执行切换到另一个上下文(通过yield/inlineCallbacks
  4. on_available_data又被称为
  5. get_data_nonblocking被再次调用并返回一个Deferred(也许是同一个!也许是一个新的!依赖于它是如何实现)
  6. on_available_data第二次调用告诉执行切换到另一个上下文(同样的原因)
  7. 反应器打转了一会儿,最终的事件到来,导致由返回Deferred调用get_data_nonblocking来点燃。
  8. 执行切换回on_available_data框架
  9. process_data被称为与任何数据get_data_nonblocking调用返回
  10. 最终,同样的事情发生在第一组对象和process_data与再次叫什么数据第一get_data_nonblocking调用返回

现在也许你已经处理数据O没有命令 - 这又取决于系统其他部分的更多细节。

如果是这样,您可以随时重新排列顺序。有很多不同的可能的方法来做到这一点。 Twisted本身没有附带任何明确支持此操作的API,因此解决方案涉及编写一些新代码。下面是一个办法一个想法(未经测试) - 它知道对象序列号中的类似队列类:

class SequencedQueue(object): 
    """ 
    A queue-like type which guarantees objects come out of the queue in the order 
    defined by a sequence number associated with the objects when they are put into 
    the queue. 

    Application code manages sequence number assignment so that sequence numbers don't 
    have to have the same order as `put` calls on this type. 
    """ 
    def __init__(self): 
     # The sequence number of the object that should be given out 
     # by the next call to `get` 
     self._next_sequence = 0 

     # The sequence number of the next result that needs to be provided. 
     self._next_result = 0 

     # A holding area for objects past _next_sequence 
     self._queue = {} 

     # A holding area 
     self._waiting = 

    def put(self, sequence, object): 
     """ 
     Put an object into the queue at a particular point in the sequence. 
     """ 
     if sequence < self._next_sequence: 
      # Programming error. The sequence number 
      # of the object being put has already been used. 
      raise ... 

     self._queue[sequence] = object 
     self._check_waiters() 

    def get(self): 
     """ 
     Get an object from the queue which has the next sequence number 
     following whatever was previously gotten. 
     """ 
     result = self._waiters[self._next_sequence] = Deferred() 
     self._next_sequence += 1 
     self._check_waiters() 
     return result 

    def _check_waiters(self): 
     """ 
     Find any Deferreds previously given out by get calls which can now be given 
     their results and give them to them. 
     """ 
     while True: 
      seq = self._next_result 
      if seq in self._queue and seq in self._waiting: 
       self._next_result += 1 
       # XXX Probably a re-entrancy bug here. If a callback calls back in to 
       # put then this loop might run recursively 
       self._waiting.pop(seq).callback(self._queue.pop(seq)) 
      else: 
       break 

预期的行为(模我不小心添加任何错误)是一样的东西:

q = SequencedQueue() 
d1 = q.get() 
d2 = q.get() 
# Nothing in particular happens 
q.put(1, "second result") 
# d1 fires with "first result" and afterwards d2 fires with "second result" 
q.put(0, "first result") 

使用这种方法,只要确保按照您希望分配数据的顺序分配顺序号,而不是按实际顺序显示的顺序。例如:

@inlineCallbacks 
    def on_available_data(self): 
     sequence = self._process_order 
     data = yield self.get_data_nonblocking() 
     if data is not None: 
      self._process_order += 1 
      self.sequenced_queue.put(sequence, data) 

在其他地方,一些代码可以消耗的,如队列排序:

@inlineCallbacks 
def queue_consumer(self): 
    while True: 
     yield self.process_data(yield self.sequenced_queue.get()) 
+0

不幸的是'get_data_nonblocking()'是一个第三方的库函数返回一个延迟(即火灾时,服务器响应)。 – wRAR

+0

这并非天生不幸。这只意味着你需要添加一些逻辑来强加排序。 –