2017-09-22 36 views
0

目前,我有一个用例,我需要从RabbitMQ消息总线获取消息,使用HDFS接收器附加消息大小(以字节为单位)并输出消息。Spring云数据流(兔|处理器| hdfs)输出二进制

首先,我创建了自己的处理器,它将大小附加到消息中。我这样做的原因是因为编码需要是Google Protocol Buffer的编码。

我的应用程序看起来如下:

stream create --name rabbit-to-hdfs --definition "rabbit | delim-protobuf | hdfs " 

当HDFS水槽输出我看到[B @ 12768762该消息。我已经Google'd四周,看到建议增加以下内容:

spring.cloud.stream.bindings.input.consumer.headerMode=raw 

然而,这并没有看到帮我了!这就是说,如果我更改应用程序中使用去一个文件中:

[input | processor ] | file --binary=true 

然后,一切工作正常。但是,我喜欢HDFS接收器提供的翻转功能。

任何想法?

回答

0

文件正在工作,因为它只是倾倒它收到的字节,但看着HDFS接收器,它似乎需要使用java.io.Serializable对象作为输入。但在你的情况下,你是从protobuf对象发送一个字节数组(我假设这是怎么回事)

+0

是的,我实现的处理器返回一个GPB的字节数组。你是说它应该返回一个java.io.Serializable? –

+0

我试过“stream deploy --name rabbit-to-log --properties”app.log.spring.cloud.stream.bindings.input.content-type = application/x-java-object; type = java.io。可序列化的“”没有工作。你是说处理器应该返回一个java.io.Serializable而不是byte [] –

+0

我重写了处理器以返回一个从com.google.protobuf.GeneratedMessageV3继承的Envelope对象。这个类继承自Serializable。当取消注册/注册新应用程序并开始处理数据时,我收到一条错误消息“使用contentType [application/x-java-object; type = CUAVProtos $ Envelope] CUAVProtos $ Envelope”无法反序列化[CUAVProtos $ Envelope] –

0

这些类型不兼容,这就是问题所在。通过在SCS中设置contentType,您只是要求框架使用java序列化来调用writeObject。但是由于您使用的是已经是序列化框架的protobuf,它不会起作用。这里的问题是,水槽真的看起来(我不熟悉接收器代码)期望一个序列化,但你没有提供一个。你可以做的是修改接收器应用程序或提供知道如何从protobuf转换为Serializable的自定义转换器,甚至不知道这是否有道理是诚实的。