考虑TCP连接的情况。 TCP连接的接收缓冲区可以有或没有数据。你可以得到这些数据,或者什么也得不到,而不使用非阻塞套接字API拦截:
data = socket.recv(1024)
if data:
self.process_data(data)
可以等待数据可用使用select()
(或任何基本相当的API):
socket.setblocking(False)
while True:
data = socket.recv(1024)
if data:
self.process_data(data)
else:
select([socket], [], [])
其中,只有select()
是特别扭曲不友好的(尽管扭曲的成语是肯定不会让自己的socket.recv
调用)。虽然(用Protocol
用dataReceived
方法代替select
的调用方式可以触发Deferred
- 有点像你的on_data_available
方法 - 折腾一些收益率并使整个产品成为inlineCallbacks
生成器)。
但是,尽管这是一种可以从TCP连接获取数据的方法,但Twisted鼓励您使用的API并不是这样做的。相反,API是:
class SomeProtocol(Protocol):
def dataReceived(self, data):
# Your logic here
我不明白你的情况有多大的不同。如果,不是你写的循环,你做了这样的事情:
class YourDataProcessor(object):
def process_data(self, data):
# Your logic here
class SomeDataGetter(object):
def __init__(self, processor):
self.processor = processor
def on_available_data(self):
data = self.get_data_nonblocking()
if data is not None:
self.processor.process_data(data)
现在有根本没有Deferreds(也许除了任何实现on_available_data
或get_data_nonblocking
,但我看不到代码)。
如果您保持原样,您可以保证按顺序执行,因为Twisted是单线程的(在几个标有非常清晰标记的地方除外),并且在单线程程序中,以前的调用到process_data
必须在任何稍后致电process_data
之前完成(除了当然,process_data
重复地自行调用 - 但这是另一个故事)。
如果您切换回使用inlineCallbacks
(或任何等效的“协同”调味饮料组合),那么您可能会引入无序执行的可能性。
例如,如果get_data_nonblocking
回报Deferred
和你写的是这样的:
@inlineCallbacks
def on_available_data(self):
data = yield self.get_data_nonblocking()
if data is not None:
self.processor.process_data(data)
那么你已经改变on_available_data
说,上下文切换是允许呼叫get_data_nonblocking
时。在这种情况下,根据具体实施的get_data_nonblocking
和on_available_data
,这是完全有可能的:
on_available_data
叫
get_data_nonblocking
被调用,并返回一个Deferred
on_available_data
告诉执行切换到另一个上下文(通过yield
/inlineCallbacks
)
on_available_data
又被称为
get_data_nonblocking
被再次调用并返回一个Deferred
(也许是同一个!也许是一个新的!依赖于它是如何实现)
- 的
on_available_data
第二次调用告诉执行切换到另一个上下文(同样的原因)
- 反应器打转了一会儿,最终的事件到来,导致由秒返回
Deferred
调用get_data_nonblocking
来点燃。
- 执行切换回秒
on_available_data
框架
process_data
被称为与任何数据秒get_data_nonblocking
调用返回
- 最终,同样的事情发生在第一组对象和
process_data
与再次叫什么数据第一get_data_nonblocking
调用返回
现在也许你已经处理数据O没有命令 - 这又取决于系统其他部分的更多细节。
如果是这样,您可以随时重新排列顺序。有很多不同的可能的方法来做到这一点。 Twisted本身没有附带任何明确支持此操作的API,因此解决方案涉及编写一些新代码。下面是一个办法一个想法(未经测试) - 它知道对象序列号中的类似队列类:
class SequencedQueue(object):
"""
A queue-like type which guarantees objects come out of the queue in the order
defined by a sequence number associated with the objects when they are put into
the queue.
Application code manages sequence number assignment so that sequence numbers don't
have to have the same order as `put` calls on this type.
"""
def __init__(self):
# The sequence number of the object that should be given out
# by the next call to `get`
self._next_sequence = 0
# The sequence number of the next result that needs to be provided.
self._next_result = 0
# A holding area for objects past _next_sequence
self._queue = {}
# A holding area
self._waiting =
def put(self, sequence, object):
"""
Put an object into the queue at a particular point in the sequence.
"""
if sequence < self._next_sequence:
# Programming error. The sequence number
# of the object being put has already been used.
raise ...
self._queue[sequence] = object
self._check_waiters()
def get(self):
"""
Get an object from the queue which has the next sequence number
following whatever was previously gotten.
"""
result = self._waiters[self._next_sequence] = Deferred()
self._next_sequence += 1
self._check_waiters()
return result
def _check_waiters(self):
"""
Find any Deferreds previously given out by get calls which can now be given
their results and give them to them.
"""
while True:
seq = self._next_result
if seq in self._queue and seq in self._waiting:
self._next_result += 1
# XXX Probably a re-entrancy bug here. If a callback calls back in to
# put then this loop might run recursively
self._waiting.pop(seq).callback(self._queue.pop(seq))
else:
break
预期的行为(模我不小心添加任何错误)是一样的东西:
q = SequencedQueue()
d1 = q.get()
d2 = q.get()
# Nothing in particular happens
q.put(1, "second result")
# d1 fires with "first result" and afterwards d2 fires with "second result"
q.put(0, "first result")
使用这种方法,只要确保按照您希望分配数据的顺序分配顺序号,而不是按实际顺序显示的顺序。例如:
@inlineCallbacks
def on_available_data(self):
sequence = self._process_order
data = yield self.get_data_nonblocking()
if data is not None:
self._process_order += 1
self.sequenced_queue.put(sequence, data)
在其他地方,一些代码可以消耗的,如队列排序:
@inlineCallbacks
def queue_consumer(self):
while True:
yield self.process_data(yield self.sequenced_queue.get())
不幸的是'get_data_nonblocking()'是一个第三方的库函数返回一个延迟(即火灾时,服务器响应)。 – wRAR
这并非天生不幸。这只意味着你需要添加一些逻辑来强加排序。 –