2012-01-17 20 views
2

我有一个扭曲的项目,旨在通过TCP以JSON重新广播收集的数据。我基本上是有我需要订阅,并同步读取的,而无限循环,像这样一个USB库:无限期地在所有连接的单独线程中生成内容?

while True: 
    for line in usbDevice.streamData(): 
     data = MyBrandSpankingNewUSBDeviceData(line) 
     # parse the data, convert to JSON 
     output = convertDataToJSON(data) 
     # broadcast the data 
     ... 

的问题,当然,是...。实质上,我需要在服务器启动时立即启动此过程,并在服务器结束时立即结束(Protocol.doStartProtocol.doStop),并让它始终运行并将output广播到每个连接的传输。

如何在Twisted中做到这一点?显然,我需要让while循环运行在自己的线程中,但是我怎样才能“订阅”客户端来监听输出呢? USB数据收集只能运行一次也很重要,因为它可能会严重地干扰它,使其运行不止一次。

概括地说,这里是我的架构:

  1. 服务器有其流数据的USB集线器所有的时间。服务器不断订阅此USB集线器并不断读取数据。
  2. 客户会来去去,随意连接和断开连接。

我们希望在所有连接的客户端可用时发送数据。我怎么能在Twisted中做到这一点?

回答

2

您可能想要做的一件事是尝试扩展通用协议/传输独立性。即使你需要一个具有长时间运行循环的线程,你也可以从协议中隐藏它。好处与往常一样:协议变得更容易测试,如果您设法读取USB事件的非线程实现,则可以在不更改协议的情况下更改传输。

from threading import Thread 

class USBThingy(Thread): 
    def __init__(self, reactor, device, protocol): 
     self._reactor = reactor 
     self._device = device 
     self._protocol = protocol 

    def run(self): 
     while True: 
      for line in self._device.streamData(): 
       self._reactor.callFromThread(self._protocol.usbStreamLineReceived, line) 

使用callFromThread是使该解决方案可用的部分原因。它确保usbStreamLineReceived方法在反应器线程中被调用,而不是在从USB设备读取的线程中调用。所以从协议对象的角度来看,关于线程没有什么特别的地方:它只是在需要处理一些数据时偶尔调用它的方法。

你的协议,那么就需要实现usbStreamLineReceived不知何故,和实现其他应用程序特定的逻辑,喜欢养观察员名单:

class SomeUSBProtocol(object): 
    def __init__(self): 
     self.observers = [] 

    def usbStreamLineReceived(self, line): 
     data = MyBrandSpankingNewUSBDeviceData(line) 
     # broadcast the data 
     for obs in self.observers[:]: 
      obs(output) 

然后观测者可以自己注册这个类的一个实例,做任何他们想做的数据:

class USBObserverThing(Protocol): 
    def connectionMade(self): 
     self.factory.usbProto.observers.append(self.emit) 

    def connectionLost(self): 
     self.factory.usbProto.observers.remove(self.emit) 

    def emit(self, output): 
     # parse the data, convert to JSON 
     output = convertDataToJSON(data) 
     self.transport.write(output) 

胡克一起:

usbDevice = ... 
usbProto = SomeUSBProtocol() 
thingy = USBThingy(reactor, usbDevice, usbProto) 
thingy.start() 

factory = ServerFactory() 
factory.protocol = USBObserverThing 
factory.usbProto = usbProto 
reactor.listenTCP(12345, factory) 
reactor.run() 

你可以想象一个更好的观察者注册/取消注册API(如使用实际方法而不是直接访问该列表)。你也可以想象给USBThingy一个关闭的方法,这样SomeUSBProtocol可以控制它什么时候停止运行(所以你的进程实际上能够退出)。

相关问题