2012-09-06 32 views
8

我有一个应用程序对客户端发送的消息作出反应。一条消息是reload_credentials,应用程序在任何时候收到新客户端注册的消息。然后,该消息将连接到PostgreSQL数据库,对所有凭据进行查询,然后将它们存储在常规Ruby哈希(client_id => client_token)中。如何使用EventMachine处理此用例?

应用程序可能收到的一些其他消息是start,stop,pause,这些消息用于跟踪某些会话时间。我的观点是我设想应用程序以如下方式发挥作用:

  • 客户端将消息发送
  • 消息被排队
  • 队列正在处理

然而,例如,我不不想阻止反应堆。此外,我们假设我有一个reload_credentials消息,它是队列中的下一个。我不希望队列中的任何其他消息被处理,直到证书从数据库重新加载。另外,当我正在处理某个消息(如等待凭证查询完成)时,我想让其他消息排入队列。

你能指导我解决这样的问题吗?我想我可能不得不使用em-synchrony,但我不确定。

回答

7

使用Postgresql EM驱动程序之一或EM.defer,以便不会阻塞反应堆。

当您收到'reload_credentials'消息时,只需翻转一个标志,导致所有后续消息被排入队列。 'reload_credentials'完成后,处理队列中的所有消息。在队列为空之后,翻转导致消息在接收时被处理的标志。

EM司机对PostgreSQL这里列出:https://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations

module Server 
    def post_init 
    @queue    = [] 
    @loading_credentials = false 
    end 

    def recieve_message(type, data) 
    return @queue << [type, data] if @loading_credentials || [email protected]? 
    return process_msg(type, data) unless :reload_credentials == type 
    @loading_credentials = true 
    reload_credentials do 
     @loading_credentials = false 
     process_queue 
    end 
    end 

    def reload_credentials(&when_done) 
    EM.defer(proc { query_and_load_credentials }, when_done) 
    end 


    def process_queue 
    while (type, data = @queue.shift) 
     process_msg(type, data) 
    end 
    end 

    # lots of other methods 
end 

EM.start_server(HOST, PORT, Server) 

如果希望所有连接排队,只要任何连接接收“reload_connections”消息,你必须通过eigenclass协调消息。

+0

但是,可以多次接收reload_credentials消息。不应该像2线程?保持排队和正在处理的一个? – Geo

+0

是的,如果在处理另一个reload_credentials时收到reload_credentials,它将像其他消息一样排队。 – simulacre

+0

应该像第一个那样处理多个reload_credentials消息。通过将reload_credentials放入EM.defer块中,您可以在另一个线程中执行它。只要你的'处理'代码是非阻塞的,你就会不断收到消息。使用EM兼容库来确保您不会阻止。或者使用EM.defer进行处理。 – simulacre

4

以下是我想,像你的当前实现:

class Worker 
     def initialize queue 
     @queue = queue 
     dequeue 
     end 

     def dequeue 
     @queue.pop do |item| 
      begin 
      work_on item 
      ensure 
      dequeue 
      end 
     end 
     end 

     def work_on item 
     case item.type 
     when :reload_credentials 
      # magic happens here 
     else 
      # more magic happens here 
     end 
     end 
    end 


    q = EM::Queue.new 

    workers = Array.new(10) { Worker.new q } 

上面的问题,如果我理解正确的话,就是你不想工人职位(工种的工作是已经在制作人时间表早些时候到达),而不是任何reload_credentials作业。以下内容应该为此服务(最后要谨慎的话)。

class Worker 
     def initialize queue 
     @queue = queue 
     dequeue 
     end 

     def dequeue 
     @queue.pop do |item| 
      begin 
      work_on item 
      ensure 
      dequeue 
      end 
     end 
     end 

     def work_on item 
     case item.type 
     when :reload_credentials 
      # magic happens here 
     else 
      # more magic happens here 
     end 
     end 
    end 

    class LockingDispatcher 
     def initialize channel, queue 
     @channel = channel 
     @queue = queue 

     @backlog = [] 
     @channel.subscribe method(:dispatch_with_locking) 

     @locked = false 
     end 

     def dispatch_with_locking item 
     if locked? 
      @backlog << item 
     else 
      # You probably want to move the specialization here out into a method or 
      # block that's passed into the constructor, to make the lockingdispatcher 
      # more of a generic processor 
      case item.type 
      when :reload_credentials 
      lock 
      deferrable = CredentialReloader.new(item).start 
      deferrable.callback { unlock } 
      deferrable.errback { unlock } 
      else 
      dispatch_without_locking item 
      end 
     end 
     end 

     def dispatch_without_locking item 
     @queue << item 
     end 

     def locked? 
     @locked 
     end 

     def lock 
     @locked = true 
     end 

     def unlock 
     @locked = false 
     bl = @backlog.dup 
     @backlog.clear 
     bl.each { |item| dispatch_with_locking item } 
     end 

    end 

    channel = EM::Channel.new 
    queue = EM::Queue.new 

    dispatcher = LockingDispatcher.new channel, queue 

    workers = Array.new(10) { Worker.new queue } 

因此,输入到第一系统自带在q,但在这个新系统中谈到在channelqueue仍然用于工作人员之间的工作分配,但queue未在刷新凭证操作进行时填充。不幸的是,由于我没有花更多的时间,我还没有推广LockingDispatcher,因此它没有与调度CredentialsReloader的物品类型和代码相结合。我会把它留给你。

你应该在这里注意到,虽然这服务于我对原始请求的理解,但通常放松这种要求会更好。有迹象表明,本质上不能没有在要求改变被消灭几个突出问题:

  • 系统不等待执行作业开始凭据工作
  • 前完成系统将处理凭证的工作非常糟糕的爆发 - 其他可能可处理的项目将不会。
  • 如果证书代码存在错误,积压可能会填满内存并导致失败。一个简单的超时可能足以避免灾难性后果,如果代码是可中止的,并且后续消息可以充分处理以避免进一步的死锁。

它实际上听起来像你有一些在系统中的userid的概念。如果您仔细考虑了需求,则很可能您只需要将与属于凭据处于刷新状态的用户ID相关的项目积压起来。这是一个不同的问题,涉及不同类型的调度。尝试为那些用户锁定待办事项的哈希值,回调凭证完成以将这些待办事项排除在工作人员身上或类似的安排。

祝你好运!