2017-09-18 66 views
0

使用Ruby的“bunny”RabbitMQ客户端,我希望我的生产者(某些Ruby代码)向消费者(使用“sneakers”gem)的工作者发送消息,并且我希望我的生产者不要执行另一行Ruby代码,直到生产者收到消费者收到我的消息并做了一些工作的确认。等待消息生产者收到消费者完成的工作确认

在我的消费者中,我正在做一些工作,然后调用运动鞋'ack!方法来确认邮件已收到并且工作已完成。

在我的制片人,我对我的Bunny::Channel实例调用confirm_select付诸确认码,并publish后-ing我的短信,我打电话wait_for_confirms通道上理应等到我所有的消息都被ack!由-ed消费者。 (我尝试过在兔子文档中发现的东西here。)

但是,似乎我的制作人并没有等待消费者拨打ack!。我正在登录我的制片人和我的消费者,并发现我的制作人似乎认为在消费者实际承认他们之前已经确认了消息。

如何让RabbitMQ生产者等到消费者在Ruby中完成其工作?

Ruby 2.3.3,RabbitMQ 3.6.12,Erlang 17.3。

这里是我的锁文件:

GEM 
    specs: 
    amq-protocol (2.2.0) 
    bunny (2.7.0) 
     amq-protocol (>= 2.2.0) 
    concurrent-ruby (1.0.5) 
    serverengine (1.5.11) 
     sigdump (~> 0.2.2) 
    sigdump (0.2.4) 
    sneakers (2.6.0) 
     bunny (~> 2.7.0) 
     concurrent-ruby (~> 1.0) 
     serverengine (~> 1.5.11) 
     thor 
    thor (0.20.0) 

PLATFORMS 
    ruby 

DEPENDENCIES 
    bunny 
    sneakers 

BUNDLED WITH 
    1.14.6 

这里是我的消费/工人(consumer_worker.rb):

class ConsumerWorker 
    include Sneakers::Worker 

    from_queue 'do-work-here', 
      exchange: 'do-work-here', 
      exchange_type: :direct, 
      durable: true, 
      prefetch: 1, 
      arguments: { 
       :'x-dead-letter-exchange' => 'do-work-here-retry' 
      }, 
      timeout_job_after: 5, 
      retry_timeout: 60000, 
      ack: true 

    def work(msg) 
    open('ruby-debug.log', 'a') do |f| 
     f.puts "message received: #{msg}" 
    end 
    sleep 1 
    open('ruby-debug.log', 'a') do |f| 
     f.puts "acknowledging message at: #{Time.now.to_i}" 
    end 
    ack! 
    open('ruby-debug.log', 'a') do |f| 
     f.puts "acknowledged message at: #{Time.now.to_i}" 
    end 
    end 
end 

在一个终端选项卡,我正在这名工人用:

bundle exec sneakers work ConsumerWorker --require consumer_worker.rb 

这是我的出版商(publisher.rb):

require 'bunny' 
connection = Bunny.new('amqp://guest:[email protected]:5672').tap(&:start) 
channel = connection.create_channel 
channel.confirm_select 
queue = channel.queue('do-work-here', 
         {arguments: {:'x-dead-letter-exchange' => 'do-work-here-retry'}, 
         durable: true}) 
queue.publish('hello world', persistent: true) 
channel.wait_for_confirms 
open('ruby-debug.log', 'a') do |f| 
    f.puts "messages confirmed at: #{Time.now.to_i}" 
end 

当我在另一个选项卡中运行以下命令:

ruby ./publisher.rb 

然后我的日志文件(./ruby-debug.log)包含以下行:

message received: hello world 
messages confirmed at: 1505774819 
acknowledging message at: 1505774820 
acknowledged message at: 1505774820 

我想是对的顺序事件如下:

message received 
acknowledging message 
acknowledged message 
messages confirmed 

我如何解决这个问题?

+0

我认为我对出版商如何确认的理解应该是有效的是错误的。请参阅https://lists.rabbitmq.com/pipermail/rabbitmq-discuss/2014-January/033269.html。本教程似乎可能与解决我的问题有关:https://www.rabbitmq.com/tutorials/tutorial-six-ruby.html – Jackson

回答

0

发布者确认仅涵盖发布者与RabbitMQ的沟通。出版商不知道消费者。

如果请求/响应模式为 ​​,请参阅教程6。

ConditionVariable是一个常用的并发原语,用于延迟进一步的操作,直到发生事件,并有可选的超时。

发布商确认和消费者确认记录在http://www.rabbitmq.com/confirms.htm

+0

任何想法如何执行RPC与运动鞋? – Jackson

相关问题