我的目标是使用kafka读取json格式的字符串,对字符串进行过滤,然后将消息下沉(仍在json字符串中格式)。在下沉kafka流时看不到消息,无法在flink 1.2中看到打印消息
出于测试目的,我的输入字符串信息是这样的:
{"a":1,"b":2}
而且我实现的代码是:
def main(args: Array[String]): Unit = {
// parse input arguments
val params = ParameterTool.fromArgs(args)
if (params.getNumberOfParameters < 4) {
println("Missing parameters!\n"
+ "Usage: Kafka --input-topic <topic> --output-topic <topic> "
+ "--bootstrap.servers <kafka brokers> "
+ "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]")
return
}
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)
// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new JSONKeyValueDeserializationSchema(false),
params.getProperties)
val messageStream = env.addSource(kafkaConsumer)
val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a").asText.equals("1")
&& node.get("b").asText.equals("2"))
messageStream.print()
// Refer to: https://stackoverflow.com/documentation/apache-flink/9004/how-to-define-a-custom-deserialization-schema#t=201708080802319255857
filteredStream.addSink(new FlinkKafkaProducer010[ObjectNode](
params.getRequired("output-topic"),
new SerializationSchema[ObjectNode] {
override def serialize(element: ObjectNode): Array[Byte] = element.toString.getBytes()
}, params.getProperties
))
env.execute("Kafka 0.10 Example")
}
可以看出,我想打印信息流控制台并将过滤的消息下载到kafka。但是,我看不到他们。
有趣的是,如果我将KafkaConsumer的模式从JSONKeyValueDeserializationSchema修改为SimpleStringSchema,我可以看到messageStream打印到控制台。代码如下所示:
val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topic"),
new SimpleStringSchema,
params.getProperties)
val messageStream = env.addSource(kafkaConsumer)
messageStream.print()
这让我觉得,如果我用JSONKeyValueDeserializationSchema,我输入消息实际上是不被接受的卡夫卡。但这似乎很奇怪,并且与在线文档(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html)
有很大区别希望有人能帮助我!
我其实曾试过。我的程序将在大约30秒内无问题地启动,但仍然没有输出(控制台和卡夫卡)。然后,我会得到一个错误:线程“main”中的异常org.apache.flink.runtime.client.JobExecutionException:作业执行失败。引起:com.fasterxml.jackson.core.JsonParseException:无法识别的标记'PREFIX':期待('true','false'或'null') at [Source:[B @ 68d8025;行:1,列:8] – teddy
你确定你喂的JSON是正确的吗?因为我用示例代码运行了示例,它在我的最后工作。你可以检查你的JSON的有效性:https://jsonlint.com/ –
是的,{“a”:1,“b”:2}绝对是一个有效的json(我也检查过)。我想知道你如何测试我的代码?我所做的就是使用当地的卡夫卡消费者进行投入,当地的卡夫卡生产者进行输出。我无法看到flink程序的输出。 – teddy