我很想知道其他人如何使用官方RabbitMQ java客户端库来处理从错误连接中恢复的问题。我们正在使用它将我们的应用程序服务器连接到我们的RabbitMQ集群,并且我们已经实现了几种不同的方式来从连接故障中恢复,但是他们中的一些人感觉非常正确。如何处理使用RabbitMQ java客户端库从故障连接中恢复?
想象一下这样的模拟应用:
public class OurClassThatStartsConsumers {
Connection conn;
public void start() {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("someusername");
factory.setPassword("somepassword");
factory.setHost("somehost");
conn = factory.newConnection();
new Thread(new Consumer(conn.createChannel())).start();
}
}
class Consumer1 implements Runnable {
public Consumer1(Channel channel) {
this.channel = channel;
}
@Override
public void run() {
while (true) {
... consume incoming messages on the channel...
// How do we handle that the connection dies?
}
}
}
在现实世界中,我们有几百个消费者。那么,如果连接死亡会发生什么?在上面的例子中,Consumer1无法恢复,当连接关闭时,Channel也关闭,我们无法恢复的状态。所以,让我们看看一些方法来解决这个问题:
解决方案A)
让每一个消费者都有自己的连接,并登记在连接死亡触发的事件,然后重新连接处理。
优点:它的工作原理
缺点:
- 因为我们有很多的消费者,我们可能不希望很多 连接。
- 我们有可能会拥有 重新连接到兔子大量的重复的代码和处理重新连接
溶液B)
让每个消费者使用相同的连接,并订阅它的连接故障事件。
优点:比解决方案A
缺点更少的连接:由于连接被关闭,我们需要重新打开/替换它。 java客户端库似乎没有提供重新打开连接的方式,所以我们必须用新连接来替换它,然后以某种方式通知所有消费者关于这个新连接,并且他们将不得不重新创建渠道和消费者。再一次,我不想在消费者身上看到很多逻辑。
解决方案C)
裹Connection
和Channel
类是处理重新连接逻辑类,消费者只需要了解WrappedChannel
类。在连接失败时,WrappedConnection
将处理重新建立的连接,一旦连接,WrappedConnection
将自动创建新的通道并注册使用者。
优点:它的工作原理 - 这实际上是我们今天使用的解决方案。
缺点:这感觉就像是黑客,我认为这是应该由底层库更优雅地处理的东西。
也许有更好的方法? API文档没有涉及从错误连接恢复的问题。任何输入赞赏:)
你可以给出提示如何编码它。由于似乎没有回调,因此我们可以恢复所有通道,队列和绑定。你能告诉我,我应该如何包裹连接? – jeevs
您必须包装连接和频道,一旦它断开连接,无法恢复频道,因此您必须创建一个新频道。 –