2016-03-15 43 views
0

我正在实施stomp消费者作为图书馆。通过在其他应用程序中调用该库,我应该能够在ActiveMQ中获取数据。我正在执行如下,但我在返回frame.body时遇到问题。我无法从课堂外检索数据。Stomp消费者使用deferred.inlinecallback

from twisted.internet import defer 
from stompest.async import Stomp 
from stompest.async.listener import SubscriptionListener 
from stompest.config import StompConfig 
from socket import gethostname 
from uuid import uuid1 

import json 

class Consumer(object): 
     def __init__(self, amq_uri): 
      self.amq_uri = amq_uri 
      self.hostname = gethostname() 
      self.config = StompConfig(uri=self.amq_uri) 


    @defer.inlineCallbacks 
    def run(self, in_queue): 
     client = yield Stomp(self.config) 
     headers = { 
      StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL, 
      StompSpec.ID_HEADER: self.hostname, 
      'activemq.prefetchSize': '1000', 
      } 

     yield client.connect(headers=self._return_client_id()) 
     client.subscribe(
       in_queue, 
       headers, 
       listener=SubscriptionListener(self.consume) 
       ) 

     try: 
      client = yield client.disconnected 
     except StompConnectionError: 
      yield client.connect(headers=self._return_client_id()) 
      client.subscribe(
       in_queue, 
       headers, 
       listener=SubscriptionListener(self.consume) 
       ) 

      while True: 
       try: 
        yield client.disconnected 
       except StompProtocolError: 
        pass 
       except StompConnectionError: 
        yield client.connect(headers=self._return_client_id()) 
       client.subscribe(
         in_queue, 
         headers, 
         listener=SubscriptionListener(self.consume) 
         ) 

     def _return_client_id(self): 
      client_id = {} 
      client_id['client-id'] = gethostname() + '-' + str(uuid1()) 
      return client_id 

     def consume(self, client, frame): 
      data = json.loads(frame.body) 
      print 'Received Message Type {}'.format(type(data)) 
      print 'Received Message {}'.format(data) 
      ## I want to return data here. I am able to print the frame.body here. 


     # Call from another application 
      import Queue 
      from twisted.internet import reactor 

      amq_uri = 'tcp://localhost:61613' 
      in_queue = '/queue/test_queue' 

      c = Consumer(amq_uri) 
      c.run(in_queue) 
      print "data is from outside function", data # Should be able to get the data which is returned by consume here 
      reactor.run() 

有人可以让我知道我怎么能做到这一点。 谢谢

回答

0

我找到了解决我的问题。我使用了同步stomp库,而不是使用异步stomp库。实施如下,

class Consumer(object): 
    def __init__(self, amq_uri): 
     self.amq_uri = amq_uri 
     self.hostname = gethostname() 
     self.config = StompConfig(uri=self.amq_uri) 

    def run(self, in_queue, return_dict): 
     client = Stomp(self.config) 
     headers = { 
     StompSpec.ACK_HEADER: StompSpec.ACK_CLIENT_INDIVIDUAL, 
     StompSpec.ID_HEADER: self.hostname 
      } 
     client.connect() 
     client.subscribe(in_queue, headers) 
     try: 
      frame = client.receiveFrame() 
      data = json.dumps(frame.body) 
     except Exception as exc: 
      print exc 
     client.ack(frame) 
     client.disconnect() 
     return_dict['data'] = data 
     return data