2011-07-19 50 views
8

我正在尝试使用AMQP,Websockets和Ruby构建一个简单的聊天应用程序。我明白,这可能不是理解AMQP的最佳用例,但我想知道我错在哪里。AMQP创建订阅动态队列

以下是我的AMQP的服务器代码

require 'rubygems' 
require 'amqp' 
require 'mongo' 
require 'em-websocket' 
require 'json' 

class MessageParser 
    # message format => "room:harry_potter, nickname:siddharth, room:members" 
    def self.parse(message) 
    parsed_message = JSON.parse(message) 

    response = {} 
    if parsed_message['status'] == 'status' 
     response[:status] = 'STATUS' 
     response[:username] = parsed_message['username'] 
     response[:roomname] = parsed_message['roomname'] 
    elsif parsed_message['status'] == 'message' 
     response[:status] = 'MESSAGE' 
     response[:message] = parsed_message['message'] 
     response[:roomname] = parsed_message['roomname'].split().join('_') 
    end 

    response 
    end 
end 

class MongoManager 
    def self.establish_connection(database) 
    @db ||= Mongo::Connection.new('localhost', 27017).db(database) 
    @db.collection('rooms') 

    @db 
    end 
end 


@sockets = [] 
EventMachine.run do 
    connection = AMQP.connect(:host => '127.0.0.1') 
    channel = AMQP::Channel.new(connection) 

    puts "Connected to AMQP broker. #{AMQP::VERSION} " 

    mongo = MongoManager.establish_connection("trackertalk_development") 

    EventMachine::WebSocket.start(:host => '127.0.0.1', :port => 8080) do |ws| 
    socket_detail = {:socket => ws} 
    ws.onopen do 
     @sockets << socket_detail 

    end 

    ws.onmessage do |message| 

     status = MessageParser.parse(message)   
     exchange = channel.fanout(status[:roomname].split().join('_')) 

     if status[:status] == 'STATUS'    
     queue = channel.queue(status[:username], :durable => true) 

     unless queue.subscribed? 
     puts "--------- SUBSCRIBED --------------" 
     queue.bind(exchange).subscribe do |payload| 
      puts "PAYLOAD : #{payload}" 
      ws.send(payload) 
      end 
     else 
      puts "----ALREADY SUBSCRIBED" 
     end     

     # only after 0.8.0rc14 
     #queue = channel.queue(status[:username], :durable => true)  
     #AMQP::Consumer.new(channel, queue)   

     elsif status[:status] == 'MESSAGE' 
     puts "********************* Message- published ******************************" 
     exchange.publish(status[:message) 
     end     
    end 

    ws.onclose do 
     @sockets.delete ws 
    end 
    end  
end 

我使用状态指示输入消息是否是正在进行的聊天或需要我来处理家务事像订阅队列中的状态消息的消息。

我所面临的问题是,当我发送邮件一样 socket.send(JSON.stringify({status:'message', message:'test', roomname:'Harry Potter'}))

exchange.publish' is called but it still doesn't get pushed via the ws.send`到浏览器。

我对EventMachine和AMQP的理解有没有根本性的错误?

下面是相同的代码pastie http://pastie.org/private/xosgb8tw1w5vuroa4w7a

我的代码似乎希望在工作的时候取出durable => truequeue = channel.queue(status[:username], :durable => true)

以下是我的Rails的片段查看标识用户的用户名和房间名称并通过Websockets将其作为消息的一部分发送。

虽然代码似乎工作,当我删除durable => true我不明白为什么会影响被传递的消息。请忽略mongo的一部分,因为它还没有发挥任何作用。

我也想知道,如果我的方法来AMQP和它的使用是正确的

<script> 
    $(document).ready(function(){ 
     var username = '<%= @user.email %>'; 
     var roomname = 'Bazingaa'; 

     socket = new WebSocket('ws://127.0.0.1:8080/'); 

     socket.onopen = function(msg){ 
      console.log('connected'); 
      socket.send(JSON.stringify({status:'status', username:username, roomname:roomname})); 
     } 

     socket.onmessage = function(msg){ 
      $('#chat-log').append(msg.data); 

     } 

    }); 

</script> 
<div class='block'> 
    <div class='content'> 
    <h2 class='title'><%= @room.name %></h2> 
    <div class='inner'> 
     <div id="chat-log"> 
     </div> 

     <div id="chat-console"> 
     <textarea rows="5" cols="40"></textarea> 
     </div> 
    </div> 
    </div> 
</div> 

<style> 
    #chat-log{ 
     color:#000; 
     font-weight:bold; 
     margin-top:1em; 
     width:900px; 
     overflow:auto; 
     height:300px; 
    } 
    #chat-console{ 
     bottom:10px; 
    } 

    textarea{ 
     width:100%; 
     height:60px; 
    } 
</style> 
+0

如果我将amqp代码作为守护程序运行,是否有人可以告诉我如何在生产环境中组织我的代码。任何能够帮助我组织代码的示例代码都会有很大的帮助。 – Sid

回答

1

我认为你的问题可能是队列挂在ws.onmessage调用之间的代理上。当客户端重新连接队列并且绑定已经存在时,ws.send()不会被调用。

默认情况下,当您创建队列时,该队列及其所具有的任何绑定都将挂起,直到代理重新启动,或者明确告诉代理将其删除。

有两种办法可以改变:

  • 添加耐用标志,当你创建队列,这将导致队列坚持围绕即使代理重新启动
  • 添加auto_delete标志,这将导致经纪人在没有消费者附加的短时间后自动删除该实体

如果您可以控制使用rabbitmq代理的代理,则反省代理上发生的事情的简单方法是安装management plugin,该代理为代理上的交换,绑定和队列提供Web界面。

0

在第一次看AMQP位似乎是美好的,但我不想设置所有依赖。如果您只提供AMQP部分的最小示例,我会检查它。

+0

在我的Rails代码中,我标识用户的电子邮件ID和Roomname,并通过Websockets将它传递给AMQP服务器代码。 虽然代码似乎工作,当我删除持久=>真我不明白为什么会影响被传递的消息。请忽略mongo的一部分,因为它还没有发挥任何作用。 我也想知道我的AMQP方法及其用法是否正确 – Sid