2014-03-27 141 views
0

我正在骆驼的一个机制,将选择一个标志,可以是true或false的消息端点。这是一种限制机制,在我的上行信道被淹没的情况下,将消息重新路由到批量接收端点(发送到HDFS)。动态路由在骆驼

最终,我的路线是这样的:

from("queue:myqueue").bean("messageParser") 
    .dynamicRoute(bean(ThrottleHelper.class, 'buildEndpoint')); 

from('direct:regular').to('hbase'); 

from('direct:throttle').to('hdfs'); 

我ThrottleHelper类的buildEndpoint方法是这样的:

public static String buildEndpoint() { 
    synchronized(shouldThrottle) { 
     if(shouldThrottle) 
     return "direct:throttle"; 
     else 
     return "direct:regular" 
    } 
} 

目前,我对所谓的checkStatus()类的方法;其中设置的应该是Tottle(一个静态变量)。 checkStatus()每分钟在一个Camel石英计时器上运行。

我注意到了一些奇怪的行为,我想我可能会滥用这种模式。通过对Camel实现该模式的进一步搜索,看起来buildEndpoint()将在消息遍历每个返回的端点之后调用。这是真的?或者,我是否可以预期在“direct:throttle”或“direct:regular”之后路径会终止?

从我在网上收集的信息看,我的方法应该看起来像这样吗?

public static String buildEndpoint(Message message) { 
    if(message.getHeader('throttled') != null) 
     return null; 
    else 
     message.setHeader('throttled', true); 

    synchronized(shouldThrottle) { 
     if(shouldThrottle) 
     return "direct:throttle"; 
     else 
     return "direct:regular" 
    } 
} 

谢谢!

+0

没关系。我用单元测试验证了这种行为。我列出的第二种方法(测试以查看方法是否已被调用)有效。 我的下一个问题 - 为什么我会使用dynamicRouter,而不是让我的ThrottleHelper使用“shouldThrottle”属性来丰富每个消息的头并使用基于该头部属性的基于内容的路由? –

回答

0

official documentation看起来,是的,你的第二个构造更接近正确的用法。基本上,动态路由器可用于通过多个端点路由消息,而不仅仅是立即终止的单个端点。要告诉动态路由器停止将您的消息路由到另一个端点,您的bean必须返回null(如您在最终代码段中所写),以表明此消息的路由已完成。

+0

这里的想法是试图摆脱我的邮件存储额外的属性。这也是为了避免必须迭代过滤器/选择来确定消息应该穿过哪个端点。然而,它确实似乎比它的价值更麻烦,特别是如果我仍然需要在消息上存储状态。我最好在这里使用choice()构造。 –

+0

不够公平,但我认为你的基于内容的路由思想实际上是这种情况的最佳方法。使用附加属性丰富您的消息是一个非常轻量级的选项,它的设计基本上可以帮助您确切的使用情况。也可以使用手头上的工具,以更好地满足您的需求。 – ProgrammerDan

+0

同意。感谢您的答复! –

0

正如ProgrammerDan指出的那样,动态路由器用于通过多个端点路由消息,因此需要显式返回null以指示路由的结束。

如果您只想使用表达式或bean方法选择一个端点,最好使用动态收件人列表(cfr。http://camel.apache.org/recipient-list.html)。如果您在路由生成器中使用.recipientList()而不是.dynamicRoute(),则您的第一个buildEndpoint方法实现将工作得很好。