2017-06-15 134 views
0

我有多个Java进程。每个进程将生成一个定期包含新数据的数据文件。Spark多个文件处理

我需要Spark来读取这些文件并将数据转储到数据库中。 我有以下相对于上述要求的问题 -

  1. 有什么办法星星之火可以知道它已处理1000条记录,并有从1001每次挑选处理文件的时间开始?
  2. 或者我必须清除文件一旦Spark处理它1000个记录,每次文件应该只包含未处理的记录。在这种情况下,我怎样才能在同一个文件中读写记录。

由于我有连续的数据生成,我不能等待Spark完成它的工作,然后让文件加载新数据。

回答

0

在阅读完您的问题后,我认为您应该使用Spark Streaming并指定HDFS/S3,Kafka或flume作为源代码。

最好的办法是改变你的Java进程发布卡夫卡的记录,并写一个火花流代码来读取这些记录。

你可以在这里阅读更多的细节: -

https://spark.apache.org/docs/latest/streaming-programming-guide.html

如果这是不可能的,有一两件事你可以做的是检查每个文件的最后修改日期和只读,其最后的修改过的文件日期距离当前时间少于2分钟。如果当前日期时间为2017年6月15日上午8:00您需要确保您的代码只读取上次修改日期小于06/15/2017 07:58 AM的文件。通过这种方式,您可以确保只读取那些未被Java进程处理的文件。即您目前不附加新数据。

您只会阅读最近2分钟内没有任何活动的文件。 2分钟的时间差是可以改变的,你可以根据你的代码逻辑改变它。

0

谢谢,考虑到使用基于上次修改时间戳的读取文件的逻辑将无法解决我在达到要处理的文件中的记录的要求。

我发现的一种方法是使用Java 1.7引入的SeekableByteChannel,使用它可以跟踪文件中您正在编写新记录并将其传递给Spark层的位置。 Spark然后可以从该位置读取记录并处理它。

+0

我假设你只会读取一次文件。如果这是真的,那么你不需要从特定位置读取文件。只有当进程停止写入文件时,才会读取该文件。 – Max08

+0

您是否正在阅读HDFS的这些文件?你可以使用水槽和火花流? – Max08

+0

不,我有一些限制,因为我不能通过卡夫卡整合我的过程。使用火花流媒体是次要的事情..我不认为我需要使用,因为我可以使用Spark RDD API读取文件。正如我所说,这个文件将不断获得新的数据,我的Spark程序必须不断从同一个文件中读取新的数据 – Techie