我所做的是读取来自kafka的消息以json格式。例如。 {"a":1,"b":2}
然后我应用的滤波器此消息,以确保对应于a的值是1,b的值是2。最后,我想以输出结果流至下游卡夫卡。但是,我不知道编译器为什么说类型不匹配。 我的代码如下: val kafkaConsumer = new FlinkKafkaConsumer010(
params.getRequired("input-topi
我的目标是使用kafka以json格式读取字符串,对字符串进行过滤,选择部分消息并下沉消息(仍以json字符串格式)。 出于测试目的,我的输入字符串信息是这样的: {"a":1,"b":2,"c":"3"}
而且我实现的代码是: def main(args: Array[String]): Unit = {
val inputProperties = new Properties()
i