我试图从Kafka做结构化流式处理。我打算将检查点存储在HDFS中。我读了一个cloudera博客,建议不要在HDFS中为Spark流存储检查点。结构流式检查点是同样的问题吗? https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/。Kafka结构化流式检查点
在结构化流媒体中,如果我的火花程序停机一段时间,如何从检查点目录获取最新偏移量,并在该偏移量之后加载数据。 我将检查点存储在如下所示的目录中。
df.writeStream\
.format("text")\
.option("path", '\files') \
.option("checkpointLocation", 'checkpoints\chkpt') \
.start()
更新:
这是我的结构化数据流的程序读取卡夫卡消息,解压缩并写入到HDFS。
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", KafkaServer) \
.option("subscribe", KafkaTopics) \
.option("failOnDataLoss", "false")\
.load()
Transaction_DF = df.selectExpr("CAST(value AS STRING)")
Transaction_DF.printSchema()
decomp = Transaction_DF.select(zip_extract("value").alias("decompress"))
#zip_extract is a UDF to decompress the stream
query = decomp.writeStream\
.format("text")\
.option("path", \Data_directory_inHDFS) \
.option("checkpointLocation", \pathinDHFS\) \
.start()
query.awaitTermination()
你确定博客的建议你不要检查点存储在HDFS?这很奇怪。你有链接吗?对于结构化流式问题,只需使用相同的检查点目录运行相同的代码,结构化流将拾取最后一个失败偏移并从中重新启动。 – zsxwing
@zsxwing这是cloudera博客链接https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/我手动杀死了我的流媒体节目分钟,然后再次启动它,并在启动后才开始处理收到的消息。它忽略了错过的消息,当它发生故障并且它没有再处理它们时 –
你可以看看驱动程序日志,并查找由logInfo(s“GetBatch调用start = $ start,end = $ end”)输出的日志吗?它应该告诉你查询处理了什么。 – zsxwing