2017-10-06 52 views
1

我试图从Kafka做结构化流式处理。我打算将检查点存储在HDFS中。我读了一个cloudera博客,建议不要在HDFS中为Spark流存储检查点。结构流式检查点是同样的问题吗? https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/Kafka结构化流式检查点

在结构化流媒体中,如果我的火花程序停机一段时间,如何从检查点目录获取最新偏移量,并在该偏移量之后加载数据。 我将检查点存储在如下所示的目录中。

df.writeStream\ 
     .format("text")\ 
     .option("path", '\files') \ 
     .option("checkpointLocation", 'checkpoints\chkpt') \ 
     .start() 

更新:

这是我的结构化数据流的程序读取卡夫卡消息,解压缩并写入到HDFS。

df = spark \ 
     .readStream \ 
     .format("kafka") \ 
     .option("kafka.bootstrap.servers", KafkaServer) \ 
     .option("subscribe", KafkaTopics) \ 
     .option("failOnDataLoss", "false")\ 
     .load() 
Transaction_DF = df.selectExpr("CAST(value AS STRING)") 
Transaction_DF.printSchema() 

decomp = Transaction_DF.select(zip_extract("value").alias("decompress")) 
#zip_extract is a UDF to decompress the stream 

query = decomp.writeStream\ 
    .format("text")\ 
    .option("path", \Data_directory_inHDFS) \ 
    .option("checkpointLocation", \pathinDHFS\) \ 
    .start() 

query.awaitTermination() 
+0

你确定博客的建议你不要检查点存储在HDFS?这很奇怪。你有链接吗?对于结构化流式问题,只需使用相同的检查点目录运行相同的代码,结构化流将拾取最后一个失败偏移并从中重新启动。 – zsxwing

+0

@zsxwing这是cloudera博客链接https://blog.cloudera.com/blog/2017/06/offset-management-for-apache-kafka-with-apache-spark-streaming/我手动杀死了我的流媒体节目分钟,然后再次启动它,并在启动后才开始处理收到的消息。它忽略了错过的消息,当它发生故障并且它没有再处理它们时 –

+0

你可以看看驱动程序日志,并查找由logInfo(s“GetBatch调用start = $ start,end = $ end”)输出的日志吗?它应该告诉你查询处理了什么。 – zsxwing

回答

0

在您的查询,尝试将一个检查点,而写结果像实木复合地板的一些格式像HDFS一些持久性存储。它对我有好处。

您可以分享您的代码,以便我们可以更深入地了解一下吗?

+0

我在问题中添加了完整的代码作为更新。你是如何得到检查点文件的最新偏移量的? –

+0

虽然这样做,但您如何防止HDFS中的检查点随着时间的推移使用越来越多的存储?有没有可用于管理的“清理”配置? –

2

在长期存储(HDFS,AWS S3等)上存储检查点是最优选的。我想在此添加一点,即属性“failOnDataLoss”不应该设置为false,因为它不是最佳做法。数据丢失是任何人都不愿意承担的。休息你在正确的道路上。

+0

虽然这样做,你如何防止HDFS中的检查点随着时间的推移使用越来越多的存储?有没有可用于管理的“清理”配置? –

+0

据我所知,检查点不存储太多的数据,它像Kafka一样存储偏移量,所以你不必担心存储问题,如果你想清除检查点,你可以在维护过程中做到这一点,或者你可以把这是一个调度器。 –

+0

我在SparkConf上使用“spark.cleaner.referenceTracking.cleanCheckpoints”,“true”,为我工作干净的检查点。 –

0

正如我所理解的那样,它建议在Hbase,Kafka,HDFS或Zookeeper中维护偏移量管理。

“值得一提的是,你还可以存储偏移在存储 系统,如HDFS。HDFS中存储的偏移量是一个不太常用的方法 比上述选项HDFS有着较高的延迟相比 其他像ZooKeeper和HBase这样的系统。“

您可以在星火文档找到如何从现有的检查站重新启动的查询:http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#recovering-from-failures-with-checkpointing

相关问题