2016-08-15 26 views
1

我试图使用spring.cloud.stream.kafka.binder.headers传输我设置的自定义标头,它基于之前的questionspring.cloud.stream.kafka.binder.headers按预期工作

我在哪里documentation ...读

spring.cloud.stream.kafka.binder.headers 
The list of custom headers that will be transported by the binder. 

Default: empty. 

似乎表明,设置列表(用逗号隔开?)会导致自定义标题在Message<>获得运输,但首标丢失只要kafka写入完成。

我的注释创建的标题为调用MessagingGateway的一部分:

@MessagingGateway(name = "redemptionGateway", defaultRequestChannel = Channels.GATEWAY_OUTPUT, defaultHeaders = @GatewayHeader(name = "orderId", expression = "#gatewayMethod.name")) 
public interface RedemptionGateway { 
    ... 
} 

我观察到,头在第一preSend调试正确创建:

2016-08-15 15:09:04 http-nio-8080-exec-2 DEBUG DirectChannel:430 - preSend on channel 'gatewayOutput', message: GenericMessage [[email protected][orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={orderId=create, id=5dccea6f-266e-82b9-54c6-57ec441a26ac, timestamp=1471288144882}] - {applicationSystemCode=x, clientIP=0:0:0:0:0:0:0:1, clusterId=Cluster-Id-NA, containerId=Container-Id-NA, correlationId=UNDEFINED, domainName=defaultDomain, hostName=Host-NA, messageId=10.113.21.144-eb8404d0-de93-4f94-80cb-e5b638e8aeef, userId=anonymous, webAnalyticsCorrelationId=|} 

但于next preSend,标题丢失:

2016-08-15 15:09:05 kafka-binder- DEBUG DirectChannel:430 - preSend on channel 'enrichingInput', message: GenericMessage [[email protected][orderId=f72b2d9b-4e60-43fa-95d4-1b0b368fe49f], headers={kafka_offset=10, orderId=create, kafka_messageKey=null, kafka_topic=received, kafka_partitionId=0, kafka_nextOffset=11, contentType=application/x-java-object;type=x.TrivialRedemption}] - {} 

我的属性包含:

 

    spring.cloud.stream.kafka.binder.headers=orderId 

回答

3

您使用的是什么版本的spring-cloud-stream?

我只是写了一个快速测试案例和它的工作就好了... ...

spring.cloud.stream.kafka.binder.headers=bar 
spring.cloud.stream.bindings.output.destination=foobar 
spring.cloud.stream.bindings.input.destination=foobar 
spring.cloud.stream.bindings.input.group=foo 

应用:

package com.example; 

import org.springframework.beans.factory.annotation.Autowired; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.cloud.stream.annotation.EnableBinding; 
import org.springframework.cloud.stream.messaging.Processor; 
import org.springframework.context.ConfigurableApplicationContext; 
import org.springframework.context.annotation.Bean; 
import org.springframework.integration.support.MessageBuilder; 
import org.springframework.messaging.Message; 
import org.springframework.messaging.MessageHandler; 
import org.springframework.messaging.MessagingException; 

@SpringBootApplication 
@EnableBinding(Processor.class) 
public class So38961697Application { 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So38961697Application.class, args); 
     Foo foo = context.getBean(Foo.class); 
     foo.start(); 
     foo.send(); 
     Thread.sleep(30000); 
     context.close(); 
    } 

    @Bean 
    public Foo foo() { 
     return new Foo(); 
    } 

    private static class Foo { 

     @Autowired 
     Processor processor; 

     public void send() { 
      Message<?> m = MessageBuilder.withPayload("foo") 
        .setHeader("bar", "baz") 
        .build(); 
      processor.output().send(m); 
     } 

     public void start() { 
      this.processor.input().subscribe(new MessageHandler() { 

       @Override 
       public void handleMessage(Message<?> m) throws MessagingException { 
        System.out.println(m); 
       } 

      }); 
     } 

    } 

} 

结果:

GenericMessage [payload=foo, headers={bar=baz, kafka_offset=0, kafka_messageKey=null, kafka_topic=foobar, kafka_partitionId=0, kafka_nextOffset=1, contentType=text/plain}] 

完整的项目is here

编辑:见注释,升级到1.0.2.RELEASE解决问题

编辑

添加组,以确保消费者从最早的消息消耗。见下面的评论。

+0

我检查了您的依赖关系,发现您使用的是1.0.2.RELEASE,而我使用的是1.0.0.RELEASE。升级我的项目以使用1.0.2.RELEASE解决了这个问题。下一次,我会确保我使用的是最新版本。 –

+0

另请注意,问题中的文档链接指向当前快照,这可能会超出当前版本。当前(1.0.2)版本的正确链接是[here](http://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#_kafka_binder_properties);它将始终指向最新发布的文档版本。 –

+0

在1.2.0.RELEASE – Savash