您可能想要做的一件事是尝试扩展通用协议/传输独立性。即使你需要一个具有长时间运行循环的线程,你也可以从协议中隐藏它。好处与往常一样:协议变得更容易测试,如果您设法读取USB事件的非线程实现,则可以在不更改协议的情况下更改传输。
from threading import Thread
class USBThingy(Thread):
def __init__(self, reactor, device, protocol):
self._reactor = reactor
self._device = device
self._protocol = protocol
def run(self):
while True:
for line in self._device.streamData():
self._reactor.callFromThread(self._protocol.usbStreamLineReceived, line)
使用callFromThread
是使该解决方案可用的部分原因。它确保usbStreamLineReceived
方法在反应器线程中被调用,而不是在从USB设备读取的线程中调用。所以从协议对象的角度来看,关于线程没有什么特别的地方:它只是在需要处理一些数据时偶尔调用它的方法。
你的协议,那么就需要实现usbStreamLineReceived
不知何故,和实现其他应用程序特定的逻辑,喜欢养观察员名单:
class SomeUSBProtocol(object):
def __init__(self):
self.observers = []
def usbStreamLineReceived(self, line):
data = MyBrandSpankingNewUSBDeviceData(line)
# broadcast the data
for obs in self.observers[:]:
obs(output)
然后观测者可以自己注册这个类的一个实例,做任何他们想做的数据:
class USBObserverThing(Protocol):
def connectionMade(self):
self.factory.usbProto.observers.append(self.emit)
def connectionLost(self):
self.factory.usbProto.observers.remove(self.emit)
def emit(self, output):
# parse the data, convert to JSON
output = convertDataToJSON(data)
self.transport.write(output)
胡克一起:
usbDevice = ...
usbProto = SomeUSBProtocol()
thingy = USBThingy(reactor, usbDevice, usbProto)
thingy.start()
factory = ServerFactory()
factory.protocol = USBObserverThing
factory.usbProto = usbProto
reactor.listenTCP(12345, factory)
reactor.run()
你可以想象一个更好的观察者注册/取消注册API(如使用实际方法而不是直接访问该列表)。你也可以想象给USBThingy
一个关闭的方法,这样SomeUSBProtocol
可以控制它什么时候停止运行(所以你的进程实际上能够退出)。