目前,我有一个用例,我需要从RabbitMQ消息总线获取消息,使用HDFS接收器附加消息大小(以字节为单位)并输出消息。Spring云数据流(兔|处理器| hdfs)输出二进制
首先,我创建了自己的处理器,它将大小附加到消息中。我这样做的原因是因为编码需要是Google Protocol Buffer的编码。
我的应用程序看起来如下:
stream create --name rabbit-to-hdfs --definition "rabbit | delim-protobuf | hdfs "
当HDFS水槽输出我看到[B @ 12768762该消息。我已经Google'd四周,看到建议增加以下内容:
spring.cloud.stream.bindings.input.consumer.headerMode=raw
然而,这并没有看到帮我了!这就是说,如果我更改应用程序中使用去一个文件中:
[input | processor ] | file --binary=true
然后,一切工作正常。但是,我喜欢HDFS接收器提供的翻转功能。
任何想法?
是的,我实现的处理器返回一个GPB的字节数组。你是说它应该返回一个java.io.Serializable? –
我试过“stream deploy --name rabbit-to-log --properties”app.log.spring.cloud.stream.bindings.input.content-type = application/x-java-object; type = java.io。可序列化的“”没有工作。你是说处理器应该返回一个java.io.Serializable而不是byte [] –
我重写了处理器以返回一个从com.google.protobuf.GeneratedMessageV3继承的Envelope对象。这个类继承自Serializable。当取消注册/注册新应用程序并开始处理数据时,我收到一条错误消息“使用contentType [application/x-java-object; type = CUAVProtos $ Envelope] CUAVProtos $ Envelope”无法反序列化[CUAVProtos $ Envelope] –