2015-05-12 37 views
0

是否有可能在春季xd运行时将kafka源模块用作处理器模块?任何代码示例?如何让kafka在spring xd运行时使用http流数据?

我想实现这样的事情:http(xd source)| kafka源(xd处理器)| kafka消费者(xd接收器)

我正在尝试这样做,因为我有通过http传输的流数据,我想用kafka消息总线进行管理。

我的流定义是这样的:

stream create kafkaSourceTest --definition "http --outputType=application/json | kafka --zkconnect=localhost:2181 --topic=kafkaTestTopic | log " --deploy 

扑灭在弹簧XD结果的处理器模块的盒实现卡夫卡源模块执行的一个错误是这样的:

2015-05-12 11:18:52,914 1.1.1.RELEASE ERROR pool-13-thread-4 http.NettyHttpInboundChannelAdapter - Error sending message 

有机springframework.messaging.MessageDeliveryException:调度程序没有用户通道'admin:default,admin,singlenode,hsqldbServer:9393.kafkaSourceTest.0'。嵌套异常是org.springframework.integration.MessageDispatchingException:分派器没有订户 at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:81) at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel的.java:277) 在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:239) 在org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) 在org.springframework .messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)

回答

3

我想d这是因为我有通过http发送的流数据,我想用kafka消息总线进行管理。

如果您使用kafka作为messagebus(设置传输之后),则类似“http | log”的流将通过kafka messagebus传输http消息。在这种情况下,卡夫卡经纪人的话题将由XD内部人员定义。

是否有可能在春季xd运行时将kafka源模块作为处理器模块工作?

不,源模块不能充当处理器模块。如果您希望消息在Kafka中流过特定主题,那么您可以有一个流,该流具有一个从http源接收数据的kafka接收器模块,以及另一个使用相同主题配置kafka源模块的流。

和,这可以这样来实现:

流创建KafkaSink --definition的 “http --outputType =应用/ JSON |卡夫卡--brokerList = --topic = kafkaTestTopic” --deploy

stream create KafkaSource --definition“kafka --zkconnect = localhost:2181 --topic = kafkaTestTopic | log”--deploy

相关问题