2014-04-26 34 views
1

我有一个应用程序使用Redis发布/订阅在客户端之间使用Java中的Jedis客户端传输消息。我希望能够在用户输入命令时在运行时订阅频道,但是因为订阅是阻止操作,因为它在调用订阅的线程上进行侦听,所以我不确定如何在以后订阅其他频道在原始线程上。订阅同一线程的多个频道Jedis

例子:

private PubSubListener psl = new PubSubListener(); 

public void onCommand(String[] args) { 
    subscribe(args[0]); 
} 

public void subscribe(String channel) { 
    Jedis jedis = jedisPool.getResource(); 

    jedis.subscribe(psl, channel); 
} 

这只是分派则命令将被用于轮询Redis的,我将无法订阅任何更多的渠道与线程的线程工作。

回答

2

我观察到同样的问题,即订阅线程一旦你订阅就会阻塞。为了解决这个问题,我使用Netty实现了优化的pub/sub客户端,并将其合并到Jedis分支here中。这不是一个全面的解决方案,我还没有来得及真正完成它,但它的工作原理基本渠道和模式subscriptions.The基础是:

使用先拿发布订阅实例:

public static OptimizedPubSub getInstance(String host, int port, String auth, long timeout) 

问题/使用取消订阅模式:

public ChannelFuture psubscribe(String... patterns) 
public ChannelFuture punsubscribe(String... patterns) 

可以忽略返回的ChannelFuture除非你想100%确定您的请求获得通过(它的非同步)。

版本/使用取消订阅的频道:

public ChannelFuture subscribe(String... channels) 
public ChannelFuture unsubscribe(String... channels) 

然后实现SubListener实例:使用

public interface SubListener { 
    /** 
    * Callback when a message is published on a subscribed channel 
    * @param channel The channel the message was received on 
    * @param message The received message 
    */ 
    public void onChannelMessage(String channel, String message); 

    /** 
    * Callback when a message is published on a subscribed channel matching a subscribed pattern 
    * @param pattern The pattern that the channel matched 
    * @param channel The channel the message was received on 
    * @param message The received message 
    */ 
    public void onPatternMessage(String pattern, String channel, String message); 
} 

和注册/注销听众:

public void registerListener(SubListener listener) 
public void unregisterListener(SubListener listener) 

OptimizedPubSub从未块和事件以异步方式传递到已注册的子监听器。

现在fork已经有点老了,所以它可能对你目前的形式没有用处,但是你可以很容易地将源代码放在那个包里并且单独构建它。依赖关系是Jedis和Netty。

对不起,我没有一个更全面的解决方案。