2017-10-28 130 views
1

我想提取由FlinkKafkaConsumer010生成的消息的时间戳作为数据流中的值。Flink + Kafka 0.10:如何使用Kafka消息时间戳创建表作为字段?

我知道AssignerWithPeriodicWatermarks类,但这似乎只是通过DataStream API为时间聚合的目的提取时间戳。

我想在后面的Table中提供该卡夫卡消息时间戳,我可以在其上使用SQL。

编辑:尝试这样:

val consumer = new FlinkKafkaConsumer010("test", new SimpleStringSchema, properties) 
    consumer.setStartFromEarliest() 

    val env = StreamExecutionEnvironment.getExecutionEnvironment 
    val tenv = TableEnvironment.getTableEnvironment(env) 

    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) 

    class KafkaAssigner[T] extends AssignerWithPeriodicWatermarks[T] { 
    var maxTs = 0L 
    override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = { 
     maxTs = Math.max(maxTs, previousElementTimestamp) 
     previousElementTimestamp 
    } 
    override def getCurrentWatermark: Watermark = new Watermark(maxTs - 1L) 
    } 

    val stream = env 
    .addSource(consumer) 
    .assignTimestampsAndWatermarks(new KafkaAssigner[String]) 
    .flatMap(_.split("\\W+")) 

    val tbl = tenv.fromDataStream(stream, 'w, 'ts.rowtime) 

它编译,但抛出:

Exception in thread "main" org.apache.flink.table.api.TableException: Field reference expression requested. 
    at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:630) 
    at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241) 
    at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) 
    at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) 
    at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241) 
    at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186) 
    at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624) 
    at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398) 
    at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85) 

在上面的代码中的最后一行。

EDIT2:感谢@ fabian-hueske指点我的解决方法。 Full code at https://github.com/andrey-savov/flink-kafka

回答

0

如果配置了时间特征EventTime(请参阅docs),Flink的Kafka 0.10使用者会自动将Kafka消息的时间戳设置为生成的记录的事件时间戳记。

你已经摄入了卡夫卡的主题为DataStream与分配的时间戳(仍然不可见)和水印后,您可以用StreamTableEnvironment.fromDataStream(stream, fieldExpr*)方法将其转换成TablefieldExpr*参数是描述生成的表的模式的表达式列表。您可以使用表达式mytime.rowtime添加一个保存流录制时间戳的字段,其中mytime是新字段的名称,而rowtime表示该值是从记录时间戳中提取的。请检查docs for details

注:作为@bfair指出,原子类型(如DataStream[String])的DataStream的转换失败,并在弗林克1.3.2和更早版本的异常。该错误已报告为FLINK-7939,并将在下一个版本中修复。

+0

嗨@fabian,谢谢你的回答。我从你有的链接跟踪Scala示例,但是当我尝试将'.rowtime'注册为'fieldExpr'时,遇到了运行时异常(Flink 1.3.2)。用一个例子更新了这个问题。 – bfair

+0

嗨,我看了一下代码,你遇到了一个与DataStream类型相关的bug。具有原子类型的DataStreams(在您的情况下为单个值,如String)在内部使用单独的代码路径处理。如果你添加一个MapFunction来将String包装在一个元组中('Tuple1 [String]'),它应该按照预期工作。 –

+0

在这里发布对bug的引用是非常好的,这样其他人可以在问题得到解决时进行跟踪。谢谢你的帮助。 – bfair