2017-08-09 38 views
0

我的目标是使用kafka以json格式读取字符串,对字符串进行过滤,选择部分消息并下沉消息(仍以json字符串格式)。如何在Flink 1.2中从Kafka中提取部分json格式的字符串

出于测试目的,我的输入字符串信息是这样的:

{"a":1,"b":2,"c":"3"} 

而且我实现的代码是:

def main(args: Array[String]): Unit = { 

val inputProperties = new Properties() 
inputProperties.setProperty("bootstrap.servers", "localhost:9092") 
inputProperties.setProperty("group.id", "myTest2") 
val inputTopic = "test" 

val outputProperties = new Properties() 
outputProperties.setProperty("bootstrap.servers", "localhost:9092") 
val outputTopic = "test2" 


val env = StreamExecutionEnvironment.getExecutionEnvironment 
env.getConfig.disableSysoutLogging 
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000)) 
// create a checkpoint every 5 seconds 
env.enableCheckpointing(5000) 

// create a Kafka streaming source consumer for Kafka 0.10.x 
val kafkaConsumer = new FlinkKafkaConsumer010(
    inputTopic, 
    new JSONDeserializationSchema(), 
    inputProperties) 

val messageStream : DataStream[ObjectNode]= env 
    .addSource(kafkaConsumer).rebalance 

val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a") 
    .asText.equals("1") && node.get("b").asText.equals("2")) 

// Need help in this part, how to extract for instance a,c and 
// get something like {"a":"1", "c":"3"}? 
val testStream:DataStream[JsonNode] = filteredStream.map(
    node => { 
    node.get("a") 
    } 
) 

testStream.addSink(new FlinkKafkaProducer010[JsonNode](
    outputTopic, 
    new SerializationSchema[JsonNode] { 
    override def serialize(element: JsonNode): Array[Byte] = element.toString.getBytes() 
    }, outputProperties 
)) 

env.execute("Kafka 0.10 Example") 
} 

本规范的注释显示,我不知道怎么样正确选择部分消息。我使用map,但我不知道如何连接整个消息。例如,我在代码中所做的只能给我一个结果为“1”,但我想要的是{“a”:1,“c”:“3”}

或者,不同的方式来解决这个问题。事情是在火花流媒体有一个“选择”API,但我无法在Flink找到它。

非常感谢flink社区的帮助!这是我希望在这个小型项目中实现的最后一个功能。

回答

1

Flink流处理作业每处理一次输入并将其输出到下一个任务或将其保存到外部存储器。

一种方法是将所有输出保存到外部存储中,如HDFS。流式作业完成后,使用批处理作业将它们组合成JSON。

另一种方法是使用state和RichMapFunction来获取包含所有键值的JSON。

stream.map(new MapFunction<String, Tuple2<String, String>>() { 
    public Tuple2<String, String> map(String value) throws Exception { 
     return new Tuple2<String, String>("mock", value); 
    } 
}).keyBy(0).map(new RichMapFunction<Tuple2<String,String>, String>() { 
    @Override 
    public String map(Tuple2<String, String> value) throws Exception { 
     ValueState<String> old = getRuntimeContext().getState(new ValueStateDescriptor<String>("test", String.class)); 
     String newVal = old.value(); 
     if (newVal != null) makeJSON(newVal, value.f1); 
     else newVal = value.f1; 
     old.update(newVal); 
     return newVal; 
    } 
}).print(); 

并使用此映射函数:filteredStream.map(function);

请注意,使用状态时,您将看到如下输出: {“a”:1},{“a”:1,“c”:3}。 最后的输出应该是你想要的。

+0

谢谢! makeJSON是Flink的一个内置函数吗?或者你的意思是我需要自己写一个函数并放在那里? – teddy

+0

@teddy不,Flink不包含此类方法,它是一个用于说明的伪代码。你可以实现一个。不需要很多代码;) – David

+0

我得到一个错误,说键控状态只能用在'键控流'上,也就是说,在这一行上的'keyBy()'操作之后(ValueState state = getRuntimeContext()。 getState(new ValueStateDescriptor (“json”,String.class));) – teddy