2016-12-16 66 views
1
class WSHandler(tornado.websocket.WebSocketHandler): 
    clients = [] 
    def open(self, name): 
     # WSHandler.clients.append(self) 
     # liveWebSockets.add(self) 
     self.id = name 
     self.clients.append(self) 
     # self.application.pc.add_event_listener(self) 
     print 'new connection' 



    def on_message(self, message): 
     print 'message received: %s' % message 
     # Reverse Message and send it back 
     print 'sending back message: %s' % message[::-1] 


     # pika sending message 
     import pika 
     connection = pika.BlockingConnection(pika.ConnectionParameters(
         'localhost')) 
     channel = connection.channel() 
     # clients.append(self) 
     channel.queue_declare(queue='hello') 
     # print dir(self) 
     message_rabbit_mq = { 
           'web_socket': self.id, 
           'message': message 
          } 
     message_rabbit_mq = json.dumps(message_rabbit_mq)      
     channel.basic_publish(exchange='', 
           routing_key='hello', 
           body=message_rabbit_mq) 
     connection.close() 


     self.rabbit_connect() 
     # def rabbit_connect(): 
     # pika receving message 
     connection = pika.BlockingConnection(pika.ConnectionParameters(
     host='localhost')) 
     channel = connection.channel() 

     channel.queue_declare(queue='hello') 

     def callback(ch, method, properties, body): 
      print(" [x] Received %r" % body) 
      self.write_message(body) 
      time.sleep(4) 
      body_obj = json.loads(body) 
      if 'message' in body: 
       if body_obj['message'] == "crack": 
        channel.stop_consuming() 

     channel.basic_consume(callback, 
         queue='hello', 
         no_ack=True) 

     channel.start_consuming() 

     self.write_message("closed reference") 

在上述代码的问题是RabbitMQ的消耗消息是非阻塞与龙卷风

connection = pika.BlockingConnection(pika.ConnectionParameters(
    host='localhost')) 
    channel = connection.channel() 

    channel.queue_declare(queue='hello') 

    def callback(ch, method, properties, body): 
     print(" [x] Received %r" % body) 
     self.write_message(body) 
     time.sleep(4) 
     body_obj = json.loads(body) 
     if 'message' in body: 
      if body_obj['message'] == "crack": 
       channel.stop_consuming() 

    channel.basic_consume(callback, 
        queue='hello', 
        no_ack=True) 

    channel.start_consuming() 

上述部分块中的逻辑的功能ON_MESSAGE的其余部分。我如何获得上述部分与逻辑的其余部分运行异步? 这使得来自客户端的进一步websocket消息没有经过处理。

回答

1

尝试使用此代码:

https://github.com/Gsantomaggio/rabbitmqexample/tree/master/webSocketPython

def threaded_rmq(): channel.queue_declare(queue="my_queue") logging.info('consumer ready, on my_queue') channel.basic_consume(consumer_callback, queue="my_queue", no_ack=True) channel.start_consuming()

if __name__ == "__main__": logging.info('Starting thread RabbitMQ') threadRMQ = Thread(target=threaded_rmq) threadRMQ.start()

+0

谢谢你,那工作 – DeadDjangoDjoker