0

我正在使用以下配置来将activemq与kafka集成。我从activemq收到消息并将其转发给kafka。但是,我注意到消息正从JMS队列中出列,但消息不会发送到卡夫卡。Spring集成 - Apache ActiveMQ到Kafka

<?xml version="1.0" encoding="UTF-8"?> 
<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:jms="http://www.springframework.org/schema/integration/jms" 
    xmlns:integration="http://www.springframework.org/schema/integration" 
    xmlns:int-kafka="http://www.springframework.org/schema/integration/kafka" 
    xmlns:task="http://www.springframework.org/schema/task" 
    xsi:schemaLocation="http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration/jms 
    http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd 
    http://www.springframework.org/schema/integration/kafka 
    http://www.springframework.org/schema/integration/kafka/spring-integration-kafka.xsd"> 

    <jms:message-driven-channel-adapter 
     id="helloJMSAdapater" destination="helloJMSQueue" connection-factory="jmsConnectionfactory" 
     channel="helloChannel" extract-payload="true" /> 

    <integration:channel id="helloChannel" /> 

    <integration:service-activator id="sayHelloServiceActivator" 
     input-channel="helloChannel" ref="sayHelloService" method="sayHello" /> 

    <int-kafka:outbound-channel-adapter 
     id="kafkaOutboundChannelAdapter" kafka-template="template" 
     auto-startup="false" sync="true" channel="helloChannel" topic="test1234" 
     > 
    </int-kafka:outbound-channel-adapter> 

    <bean id="template" class="org.springframework.kafka.core.KafkaTemplate"> 
     <constructor-arg> 
      <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory"> 
       <constructor-arg> 
        <map> 
         <entry key="bootstrap.servers" value="localhost:9092" /> 
         <!--entry key="retries" value="5" /> <entry key="batch.size" value="16384" 
          /> <entry key="linger.ms" value="1" /> <entry key="buffer.memory" value="33554432" 
          /> < entry key="key.serializer" value="org.apache.kafka.common.serialization.StringSerializer" 
          /> <entry key="value.serializer" value="org.apache.kafka.common.serialization.StringSerializer" 
          /--> 
        </map> 
       </constructor-arg> 
      </bean> 
     </constructor-arg> 
    </bean> 



</beans> 

此外,如果有任何来自Kafka的问题,它甚至不报告任何异常堆栈跟踪。

我错过了什么吗?

+0

我不太了解Spring,但在Kafka中,您需要配置acks = all以便在经纪商存储您的消息时获得确认。如果acks = 0,您将不会确认成功或失败提交消息。 –

回答

0

您的消息由sayHelloServiceActivator消耗。

所以你helloChannel渠道类型更改为

<publish-subscribe-channel id="helloChannel"/> 

默认为DirectChannel

的DirectChannel具有点至点语义但除此之外更 类似PublishSubscribeChannel比任何队列基于上述的信道实现。它实现了SubscribableChannel接口 而不是PollableChannel接口,因此它将消息直接发送给订阅者。但是,作为 点对点信道,它不同于 PublishSubscribeChannel,因为它只会将每个消息发送到 单订阅MessageHandler。

+0

感谢您的评论。我注意到我的serviceActivator是无效类型,因此,它不会前进。我现在可以将消息从jms发送到kafka。感谢您的投入。 –

0

正如@哈森Bennour说,如果你想发送消息给两个消费者,你需要一个发布/订阅频道。

也就是说,你在kafka适配器上有auto-startup="false",所以它甚至不会订阅该频道。

如果它已启动,则使用您当前的配置消息将循环发送到服务激活器和适配器。

+0

感谢您的评论。我注意到我的serviceActivator是无效类型,因此,它不会前进。我现在可以将消息从jms发送到kafka。感谢您的投入。 –