2013-08-05 22 views
3

我正在构建一个Rack中间件,该中间件订阅Redis通道并使用Server Sent Events将消息推送到客户端。 Sinatra provides a nice DSL for doing this.我有一个工作的例子,但是,我遇到的问题是,一旦我到达7或8个客户端,性能就会大大降低。在尝试重用请求之间的Redis连接时,我也遇到了“锁定”服务器的问题。如何提高使用Sinatra Server发送事件流时的并发性

我使用Thin来提供应用程序(它使用EventMachine)。我认为Sinatra DSL已经处理了EventMachine的并发性,但也许这是我需要实现的东西?我不想只限于基于EventMachine的服务器(Thin,Rainbows!),以防有人想使用像Puma这样的多线程服务器。我应该怎样做才能提高我的代码的并发性?

require 'redis' 
require 'sinatra/base' 

class SSE < Sinatra::Base 

    def send_message(json) 
    "id: #{Time.now}\n" + 
    "data: #{json}" + 
    "\r\n\n" 
    end 

    get '/channels/:id/subscribe', provides: 'text/event-stream' do 
    channel_id = params['id'] 
    stream(:keep_open) do |connection| 
     Redis.new.subscribe("channels:#{channel_id}") do |on| 
     on.message do |channel, json| 
      connection << send_message(json) 
     end 
     end 
    end 
    end 

end 

回答

1

想到了一些东西,所以我会重复这些没有部分顺序。

我使用Thin来为应用程序提供服务(它使用EventMachine在 下)。我认为Sinatra DSL已经使用EventMachine处理了并发性 ,但是也许这是我需要执行的东西,我需要执行 ?

你是对的,Thin使用EventMachine。但是,EventMachine(或任何其他反应堆)的事情是,一旦执行同步操作,就会停止整个反应器。因此,要真正获得并发性,您需要在整个应用程序中继续使用EventMachine。

结帐em-hiredis用于支持pub/sub的支持EventMachine的Redis客户端。

我不想限制自己只EventMachine的基于服务器 (薄,彩虹!)如果有人想使用多线程服务器 像彪马

从未尝试过我可以说,但我不认为在没有服务器的服务器中使用EventMachine会有问题。只记得开始你自己的EM。也许在config.ru?

我想 时,重用请求之间的连接Redis的运行也与“死锁”问题服务器

我相信您遇到这种情况的原因是因为每次调用'/ channels /:id/subscribe'正在与Redis建立新的连接。你只能有那么多人打开。考虑将Redis.new重构为您的应用程序的共享连接。只打开一次。单个Redis连接应该能够处理多个pub/subs。

只是有些想法,我希望他们帮助。

1

大量的研究和实验后,这里是我使用的是西纳特拉+西纳特拉上交所创业板代码:

class EventServer < Sinatra::Base 
include Sinatra::SSE 
set :connections, [] 
. 
. 
. 
get '/channel/:channel' do 
. 
. 
. 
    sse_stream do |out| 
    settings.connections << out 
    out.callback { 
     puts 'Client disconnected from sse'; 
     settings.connections.delete(out); 
    } 
    redis.subscribe(channel) do |on| 
     on.subscribe do |channel, subscriptions| 
     puts "Subscribed to redis ##{channel}\n" 
     end 
     on.message do |channel, message| 
     puts "Message from redis ##{channel}: #{message}\n" 
     message = JSON.parse(message) 
     . 
     . 
     . 
     if settings.connections.include?(out) 
      out.push(message) 
     else 
      puts 'closing orphaned redis connection' 
      redis.unsubscribe 
     end 
     end 
    end 
    end 
end 

Redis的连接块on.message和只接受(P)认购/(P)退订命令。一旦您取消订阅,Redis连接不再被阻止,并且可以由初始sse请求实例化的Web服务器对象释放。当您在redis上收到消息时会自动清除,并且sse到浏览器的连接不再存在于收集数组中。

+0

谢谢你的好例子,我解决了很多问题,只有第一次连接按预期工作。经过研究,我发现Redis.new应该在每个请求中被调用,否则只有首先会被处理。 –