我正在骆驼的一个机制,将选择一个标志,可以是true或false的消息端点。这是一种限制机制,在我的上行信道被淹没的情况下,将消息重新路由到批量接收端点(发送到HDFS)。动态路由在骆驼
最终,我的路线是这样的:
from("queue:myqueue").bean("messageParser")
.dynamicRoute(bean(ThrottleHelper.class, 'buildEndpoint'));
from('direct:regular').to('hbase');
from('direct:throttle').to('hdfs');
我ThrottleHelper类的buildEndpoint方法是这样的:
public static String buildEndpoint() {
synchronized(shouldThrottle) {
if(shouldThrottle)
return "direct:throttle";
else
return "direct:regular"
}
}
目前,我对所谓的checkStatus()类的方法;其中设置的应该是Tottle(一个静态变量)。 checkStatus()每分钟在一个Camel石英计时器上运行。
我注意到了一些奇怪的行为,我想我可能会滥用这种模式。通过对Camel实现该模式的进一步搜索,看起来buildEndpoint()将在消息遍历每个返回的端点之后调用。这是真的?或者,我是否可以预期在“direct:throttle”或“direct:regular”之后路径会终止?
从我在网上收集的信息看,我的方法应该看起来像这样吗?
public static String buildEndpoint(Message message) {
if(message.getHeader('throttled') != null)
return null;
else
message.setHeader('throttled', true);
synchronized(shouldThrottle) {
if(shouldThrottle)
return "direct:throttle";
else
return "direct:regular"
}
}
谢谢!
没关系。我用单元测试验证了这种行为。我列出的第二种方法(测试以查看方法是否已被调用)有效。 我的下一个问题 - 为什么我会使用dynamicRouter,而不是让我的ThrottleHelper使用“shouldThrottle”属性来丰富每个消息的头并使用基于该头部属性的基于内容的路由? –