我想提取由FlinkKafkaConsumer010
生成的消息的时间戳作为数据流中的值。Flink + Kafka 0.10:如何使用Kafka消息时间戳创建表作为字段?
我知道AssignerWithPeriodicWatermarks
类,但这似乎只是通过DataStream
API为时间聚合的目的提取时间戳。
我想在后面的Table
中提供该卡夫卡消息时间戳,我可以在其上使用SQL。
编辑:尝试这样:
val consumer = new FlinkKafkaConsumer010("test", new SimpleStringSchema, properties)
consumer.setStartFromEarliest()
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tenv = TableEnvironment.getTableEnvironment(env)
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
class KafkaAssigner[T] extends AssignerWithPeriodicWatermarks[T] {
var maxTs = 0L
override def extractTimestamp(element: T, previousElementTimestamp: Long): Long = {
maxTs = Math.max(maxTs, previousElementTimestamp)
previousElementTimestamp
}
override def getCurrentWatermark: Watermark = new Watermark(maxTs - 1L)
}
val stream = env
.addSource(consumer)
.assignTimestampsAndWatermarks(new KafkaAssigner[String])
.flatMap(_.split("\\W+"))
val tbl = tenv.fromDataStream(stream, 'w, 'ts.rowtime)
它编译,但抛出:
Exception in thread "main" org.apache.flink.table.api.TableException: Field reference expression requested.
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:630)
at org.apache.flink.table.api.TableEnvironment$$anonfun$1.apply(TableEnvironment.scala:624)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
at scala.collection.mutable.ArrayOps$ofRef.flatMap(ArrayOps.scala:186)
at org.apache.flink.table.api.TableEnvironment.getFieldInfo(TableEnvironment.scala:624)
at org.apache.flink.table.api.StreamTableEnvironment.registerDataStreamInternal(StreamTableEnvironment.scala:398)
at org.apache.flink.table.api.scala.StreamTableEnvironment.fromDataStream(StreamTableEnvironment.scala:85)
在上面的代码中的最后一行。
EDIT2:感谢@ fabian-hueske指点我的解决方法。 Full code at https://github.com/andrey-savov/flink-kafka
嗨@fabian,谢谢你的回答。我从你有的链接跟踪Scala示例,但是当我尝试将'.rowtime'注册为'fieldExpr'时,遇到了运行时异常(Flink 1.3.2)。用一个例子更新了这个问题。 – bfair
嗨,我看了一下代码,你遇到了一个与DataStream类型相关的bug。具有原子类型的DataStreams(在您的情况下为单个值,如String)在内部使用单独的代码路径处理。如果你添加一个MapFunction来将String包装在一个元组中('Tuple1 [String]'),它应该按照预期工作。 –
在这里发布对bug的引用是非常好的,这样其他人可以在问题得到解决时进行跟踪。谢谢你的帮助。 – bfair