我观察到同样的问题,即订阅线程一旦你订阅就会阻塞。为了解决这个问题,我使用Netty实现了优化的pub/sub客户端,并将其合并到Jedis分支here中。这不是一个全面的解决方案,我还没有来得及真正完成它,但它的工作原理基本渠道和模式subscriptions.The基础是:
使用先拿发布订阅实例:
public static OptimizedPubSub getInstance(String host, int port, String auth, long timeout)
问题/使用取消订阅模式:
public ChannelFuture psubscribe(String... patterns)
public ChannelFuture punsubscribe(String... patterns)
可以忽略返回的ChannelFuture除非你想100%确定您的请求获得通过(它的非同步)。
版本/使用取消订阅的频道:
public ChannelFuture subscribe(String... channels)
public ChannelFuture unsubscribe(String... channels)
然后实现SubListener实例:使用
public interface SubListener {
/**
* Callback when a message is published on a subscribed channel
* @param channel The channel the message was received on
* @param message The received message
*/
public void onChannelMessage(String channel, String message);
/**
* Callback when a message is published on a subscribed channel matching a subscribed pattern
* @param pattern The pattern that the channel matched
* @param channel The channel the message was received on
* @param message The received message
*/
public void onPatternMessage(String pattern, String channel, String message);
}
和注册/注销听众:
public void registerListener(SubListener listener)
public void unregisterListener(SubListener listener)
OptimizedPubSub从未块和事件以异步方式传递到已注册的子监听器。
现在fork已经有点老了,所以它可能对你目前的形式没有用处,但是你可以很容易地将源代码放在那个包里并且单独构建它。依赖关系是Jedis和Netty。
对不起,我没有一个更全面的解决方案。