2012-07-06 124 views
2

我正在编写一个应用程序来收集UDP消息并每隔1秒处理它们。这是python代码线程安全(线程与扭曲)?

应用原型的样子:

from twisted.internet.protocol import DatagramProtocol 
from twisted.internet import reactor 
import threading 
import time 

class UdpListener(DatagramProtocol): 

    messages = [] 

    def datagramReceived(self, data, (host, port)): 
     self.messages.append(data) 

class Messenger(threading.Thread): 

    listener = None 

    def __init__(self): 
     threading.Thread.__init__(self) 

    def run(self): 
     while True: 
      time.sleep(1) 
      recivedMessages = self.listener.messages 
      length = len(recivedMessages) 
      messagesToProccess = recivedMessages[0:length] 
      #doSomethingWithMessages(messagesToProccess) 
      del self.listener.messages[0:length] 
      print(length) 

listener = UdpListener() 

messenger = Messenger() 
messenger.listener = listener 
messenger.start() 

reactor.listenUDP(5556, listener) 
reactor.run() 

我不知道我是否可以很容易地从列表中删除开始值(德尔self.listener.messages [0:长度])没有任何风险传入消息更改列表和应用程序崩溃。

更新 - 版本锁

class Messenger(threading.Thread): 

listener = None 
lock = threading.Lock() 

def __init__(self): 
    threading.Thread.__init__(self) 

def run(self): 
    while True: 
     time.sleep(1) 
     recivedMessages = self.listener.messages 
     self.lock.acquire() 
     try: 
      length = len(recivedMessages) 
      messagesToProccess = recivedMessages[0:length] 
      del self.listener.messages[0:length] 
     except Exception as e: 
      raise e 
     finally: 
      self.lock.release() 

     #doSomethingWithMessages(messagesToProccess) 
     print(length) 
+3

在扭曲启动线程* *通常意味着你正在做的事情的方式错了,这是这里的情况 – 2012-07-06 21:44:06

回答

6

你的代码是不是线程安全的,没有。您需要锁定messages

但是,这里不需要线程。为什么不这样做?

from twisted.internet.protocol import DatagramProtocol 
from twisted.internet import reactor 

class UdpListener(DatagramProtocol): 
    callingLater = False 

    messages = [] 

    def process(self): 
     doSomethingWithMessages(self.messages) 
     self.messages = [] 
     self.callingLater = False 

    def datagramReceived(self, data, (host, port)): 
     self.messages.append(data) 
     if not self.callingLater: 
      reactor.callLater(1.0, self.process) 
      self.callingLater = True 

listener = UdpListener() 

reactor.listenUDP(5556, listener) 
reactor.run() 

更新:这是怎样的原始版本将与锁的工作,只为教育目的。请注意,这不是有效的,也更容易出现错误。编辑:将所有消息逻辑分离为UdpListener,以便使用它的类不需要知道其粘性内部细节。

from twisted.internet.protocol import DatagramProtocol 
from twisted.internet import reactor 
import threading 
import time 

class UdpListener(DatagramProtocol): 
    message_lock = threading.Lock() 
    messages = [] 

    def datagramReceived(self, data, (host, port)): 
     with self.message_lock: 
      self.messages.append(data) 

    def getAndClearMessages(self): 
     with self.message_lock: 
      res = self.messages 
      self.messages = [] 
     return res 

class Messenger(threading.Thread): 

    listener = None 

    def __init__(self): 
     threading.Thread.__init__(self) 

    def run(self): 
     while True: 
      time.sleep(1) 
      recivedMessages = self.listener.getAndClearMessages() 
      length = len(recivedMessages) 
      #doSomethingWithMessages(recivedMessages) 
      print(length) 

listener = UdpListener() 

messenger = Messenger() 
messenger.listener = listener 
messenger.start() 

reactor.listenUDP(5556, listener) 
reactor.run() 
+0

感谢您与callingLater绝招 - 我将使用它。 – mrok 2012-07-06 22:01:48

+0

只是为了提高我的技能,我增加了锁定版本“问题”(我们更新)。你能告诉我现在好不好吗? – mrok 2012-07-06 22:06:37

+1

@ mrok:你还需要'锁定'。追加(数据)',否则锁实际上不会做任何事情。我已经更新了我的帖子,看起来像是用锁来看的(也用'用'表示法而不用'try/finally')。 – Claudiu 2012-07-06 22:15:44

0

为什么不用DeferredQueue来实现,这正是为了这个目的。如果你想使用线程,你需要格外小心。

这是一个带DeferredQueue一个例子,允许线程:

class UdpListener(DatagramProtocol): 

    def __init__(self) 
     self._messages = DeferredQueue() 

    def datagramReceived(self, data, (host, port)): 
     self._messages.put(message) 

    @inlineCallbacks 
    def _wait_for_and_process_next_message(self): 

     # Get message from queue through a deferred call from the DeferredQueue 
     # Here we use @inlineCallbacks, so we assign the result from yield 
     # which is the new message, and will "block" (actually releasing control to Twisted) until a message gets in 
     message = yield self._message_queue.get() 

     # Do something with your message here, and ensure you catch any exceptions! 
     # If your message processing may be long, you may wish to run it in another thread, 
     # and because of @inlineCallbacks, this call will "block" until your function finishes. 
     # In case you did this, ensure you read the notes below. 
     yield threads.deferToThread(my_long_function, message) 

     # Schedule an immediate call to this method again in order to process next message 
     self.wait_for_and_process_next_message() 

    def wait_for_and_process_next_message(self): 
     reactor.callLater(0, self._wait_for_and_process_next_message) 

    def initialize(self): 
     # Call this during your application bootstrapping, so you start processing messages 
     self.wait_for_and_process_next_message() 

要注意,如果你选择你的消息处理推迟到扭曲的线程池是非常重要的(使用threads.deferToThread)您的代码将在不同的线程中运行。您可能会对来自其他线程的消息作出响应,并且在Twisted中,协议不是线程安全的对象(http://twistedmatrix.com/documents/13.2.0/core/howto/threading.html#auto0)。

对于这种情况你会使用reactor.callFromThread()保护关键资源transport,如本例:做

def _send_message_critical_section(self, message): 
    self.transport.write(message, (self.host, self.port)) 

def send_message(self, message): 
    reactor.callFromThread(self._send_message_critical_section, message) 

其他变化:

  • 改名为messages变量_messages,因为它应该被认为是完全私人的。
  • __init__()方法内移动_messages初始化并分配给self._messages,否则消息列表将在所有实例之间共享!我猜你有这个类的一个实例,但...(Variables inside and outside of a class __init__() function