2013-07-05 17 views
4

我想从Queue.Queue或TCP套接字中读取消息,以先到者为准。 如何在不使用2个线程的情况下实现? 平台是CPython的2.7.5在WindowsPython - 如何在同一时间在队列和套接字上等待

+0

你不能...轻松。类似于'select(2)'的东西可以用于两个阻塞套接字,但不适用于“队列”。如果可以接受的话,可以得到最接近的就是在循环中使用非阻塞方法。 – Aya

+0

这是个坏消息。任何建议如何解决这个问题? – GabiMe

回答

1

做到在一个单独的线程,你将不得不使用非阻塞方法,并将它们合并到单个事件循环中。实际上,我使用select代替非阻塞套接字的I/O在这里,因为它是,如果你需要从多个插槽读取稍微干净...

import socket 
import select 
import Queue 
import time 

TIMEOUT = 0.1 # 100ms 


def process_queue_item(item): 
    print 'Got queue item: %r' % item 


def process_socket_data(data): 
    print 'Got socket data: %r' % data 


def main(): 

    # Build queue 
    queue = Queue.Queue() 
    for i in range(10): 
     queue.put(i) 
    queue.put(None) # Using None to indicate no more data on queue 
    queue_active = True 

    # Build socket 
    sock = socket.socket() 
    sock.connect(('www.google.com', 80)) 
    sock.send('GET/HTTP/1.0\r\n\r\n') 
    socket_active = True 

    # Main event loop 
    while 1: 

     # If there's nothing to read, bail out 
     if not (socket_active or queue_active): 
      break 

     # By default, sleep at the end of the loop 
     do_sleep = True 

     # Get data from socket without blocking if possible 
     if socket_active: 
      r, w, x = select.select([sock], [], [], TIMEOUT) 
      if r: 
       data = sock.recv(64) 
       if not data: # Hit EOF 
        socket_active = False 
       else: 
        do_sleep = False 
        process_socket_data(data) 

     # Get item from queue without blocking if possible 
     if queue_active: 
      try: 
       item = queue.get_nowait() 
       if item is None: # Hit end of queue 
        queue_active = False 
       else: 
        do_sleep = False 
        process_queue_item(item) 
      except Queue.Empty: 
       pass 

     # If we didn't get anything on this loop, sleep for a bit so we 
     # don't max out CPU time 
     if do_sleep: 
      time.sleep(TIMEOUT) 


if __name__ == '__main__': 
    main() 

输出看起来像......

Got socket data: 'HTTP/1.0 302 Found\r\nLocation: http://www.google.co.uk/\r\nCache-Co' 
Got queue item: 0 
Got socket data: 'ntrol: private\r\nContent-Type: text/html; charset=UTF-8\r\nSet-Cook' 
Got queue item: 1 
Got socket data: 'ie: PREF=ID=a192ab09b4c13176:FF=0:TM=1373055330:LM=1373055330:S=' 
Got queue item: 2 
etc. 
+1

每秒钟迭代10次左右,并进行轮询。不是非常有效,但我想没有其他办法。如何使用2个线程并将数据推送到单个队列?这是更好的方法吗? – GabiMe

+0

@GabiMe你可以调整'TIMEOUT'值来品尝。如果您准备使用多个线程,则有很多选项。如果每个线程都可以处理自己收到的数据,那么我只需要将每个阻塞调用放在它自己的线程中。 – Aya

0

你可以做下面几行内容:

def check_for_message(queue,socket,sock_accept_size=512): 
    socket.setblocking(0) 
    while True: 
     try: 
      sock_msg=socket.recv(sock_accept_size) 
     except socket.error: 
      """Do stuff if there is no message""" 
      sock_msg=None 
     try: 
      que_msg=queue.get() 
     except Queue.Empty: 
      """Do stuff if there is no message""" 
      que_msg=None 
     yield (que_msg,sock_msg) 

然后你就可以使用遍历它:

for que_message,sock_message in check_for_message(que_instance,socket_instance): 
    print que_message,sock_message 
2

有一个很好的技巧来做到这一点here适用于您的问题。

import queue 
import socket 
import os 

class PollableQueue(queue.Queue): 
    def __init__(self): 
     super().__init__() 
     # Create a pair of connected sockets 
     if os.name == 'posix': 
      self._putsocket, self._getsocket = socket.socketpair() 
     else: 
      # Compatibility on non-POSIX systems 
      server = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
      server.bind(('127.0.0.1', 0)) 
      server.listen(1) 
      self._putsocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
      self._putsocket.connect(server.getsockname()) 
      self._getsocket, _ = server.accept() 
      server.close() 

    def fileno(self): 
     return self._getsocket.fileno() 

    def put(self, item): 
     super().put(item) 
     self._putsocket.send(b'x') 

    def get(self): 
     self._getsocket.recv(1) 
     return super().get() 
相关问题