2015-04-03 62 views
11

我设置一个简单的测试,流从S3的文本文件,当我试图像星火流textFileStream不支持通配符

val input = ssc.textFileStream("s3n://mybucket/2015/04/03/") 

得到它的工作,并在桶我本来的日志文件去那里一切都会正常工作。

但是,如果他们是一个子文件夹,它不会发现,得到了投入的子文件夹的任何文件(是的,我知道HDFS实际上并未使用的文件夹结构)

val input = ssc.textFileStream("s3n://mybucket/2015/04/") 

所以,我想简单地做通配符像我曾与一个标准的火花应用程序之前完成

val input = ssc.textFileStream("s3n://mybucket/2015/04/*") 

但当我尝试这个,它抛出一个错误

java.io.FileNotFoundException: File s3n://mybucket/2015/04/* does not exist. 
at org.apache.hadoop.fs.s3native.NativeS3FileSystem.listStatus(NativeS3FileSystem.java:506) 
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1483) 
at org.apache.hadoop.fs.FileSystem.listStatus(FileSystem.java:1523) 
at org.apache.spark.streaming.dstream.FileInputDStream.findNewFiles(FileInputDStream.scala:176) 
at org.apache.spark.streaming.dstream.FileInputDStream.compute(FileInputDStream.scala:134) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:300) 
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:299) 
at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:287) 
at scala.Option.orElse(Option.scala:257) 
..... 

我知道一个事实,即在读标准Spark应用程序的fileInput时可以使用通配符,但似乎在进行流输入时,它不会这样做,也不会自动处理子文件夹中的文件。有什么我在这里失踪?

最终什么,我需要的是全天候运行streaming作业将被监视具有按日期放在它的日志的S3桶

因此,像

s3n://mybucket/<YEAR>/<MONTH>/<DAY>/<LogfileName> 

有什么方式将其交给最顶层的文件夹,并自动读取显示在任何文件夹中的文件(因为显然日期会每天增加)?

编辑

所以在挖掘到的文档在http://spark.apache.org/docs/latest/streaming-programming-guide.html#basic-sources它指出嵌套目录,不支持。

谁能说出为什么会出现这种情况?

另外,由于我的文件将根据日期进行嵌套,因此在流式传输应用程序中解决此问题的好方法是什么?这有点复杂,因为日志需要几分钟才能写入S3,所以即使我们在新的一天进入了几分钟,也可以将前一天写入的最后一个文件写入前一天的文件夹。

+0

其实我不确定s3是否支持通配符...... – eliasah 2015-04-03 09:19:24

+1

它当然是。过去8个月,我的工作一直使用通配符。另外,仅仅为了理智检查,我只是用通配符输入了一个工作,工作得很好。 我也注意到,这是一个有点挑剔要求,你不这样做 S3N:// mybucket/2015/04 * 如说 异常线程“main”产生java.io.IOException :不是一个文件:S3N:// mybucket/2015/4月1日 这是有道理的,因为它不是一个文件 但是,如果你 S3N:// mybucket/2015/04/* 它正确解析天子文件夹中的所有文件.... 这种感觉对我来说就像一个错误。 – 2015-04-03 17:00:07

+1

我要投票回答这个问题。我记得有类似的问题,但我不记得我是如何解决它的。 – eliasah 2015-04-03 17:03:09

回答

0

我们有同样的问题。我们用逗号加入了子文件夹名称。

List<String> paths = new ArrayList<>(); 
SimpleDateFormat sdf = new SimpleDateFormat("yyyy/MM/dd"); 

try {   
    Date start = sdf.parse("2015/02/01"); 
    Date end = sdf.parse("2015/04/01"); 

    Calendar calendar = Calendar.getInstance(); 
    calendar.setTime(start);   

    while (calendar.getTime().before(end)) { 
     paths.add("s3n://mybucket/" + sdf.format(calendar.getTime())); 
     calendar.add(Calendar.DATE, 1); 
    }     
} catch (ParseException e) { 
    e.printStackTrace(); 
} 

String joinedPaths = StringUtils.join(",", paths.toArray(new String[paths.size()])); 
val input = ssc.textFileStream(joinedPaths); 

我希望以这种方式解决您的问题。

+0

很酷。你如何处理更大的结束日期?通过编译和重新启动程序?或者我错过了什么? – 2016-02-24 22:34:46

6

通过扩展FileInputDStream可以创建一些“丑陋但工作的解决方案”。 写作sc.textFileStream(d)相当于

new FileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString) 

您可以创建CustomFileInputDStream,将延长FileInputDStream。自定义类将从FileInputDStream类复制计算方法,并根据需要调整findNewFiles方法。

改变findNewFiles方法来自:

private def findNewFiles(currentTime: Long): Array[String] = { 
    try { 
     lastNewFileFindingTime = clock.getTimeMillis() 

    // Calculate ignore threshold 
    val modTimeIgnoreThreshold = math.max(
    initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting 
    currentTime - durationToRemember.milliseconds // trailing end of the remember window 
) 
    logDebug(s"Getting new files for time $currentTime, " + 
    s"ignoring files older than $modTimeIgnoreThreshold") 
    val filter = new PathFilter { 
    def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) 
    } 
    val newFiles = fs.listStatus(directoryPath, filter).map(_.getPath.toString) 
    val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime 
    logInfo("Finding new files took " + timeTaken + " ms") 
    logDebug("# cached file times = " + fileToModTime.size) 
    if (timeTaken > slideDuration.milliseconds) { 
    logWarning(
     "Time taken to find new files exceeds the batch size. " + 
     "Consider increasing the batch size or reducing the number of " + 
     "files in the monitored directory." 
    ) 
    } 
    newFiles 
} catch { 
    case e: Exception => 
    logWarning("Error finding new files", e) 
    reset() 
    Array.empty 
} 

}

到:

private def findNewFiles(currentTime: Long): Array[String] = { 
    try { 
     lastNewFileFindingTime = clock.getTimeMillis() 

     // Calculate ignore threshold 
     val modTimeIgnoreThreshold = math.max(
     initialModTimeIgnoreThreshold, // initial threshold based on newFilesOnly setting 
     currentTime - durationToRemember.milliseconds // trailing end of the remember window 
    ) 
     logDebug(s"Getting new files for time $currentTime, " + 
     s"ignoring files older than $modTimeIgnoreThreshold") 
     val filter = new PathFilter { 
     def accept(path: Path): Boolean = isNewFile(path, currentTime, modTimeIgnoreThreshold) 
     } 
     val directories = fs.listStatus(directoryPath).filter(_.isDirectory) 
     val newFiles = ArrayBuffer[FileStatus]() 

     directories.foreach(directory => newFiles.append(fs.listStatus(directory.getPath, filter) : _*)) 

     val timeTaken = clock.getTimeMillis() - lastNewFileFindingTime 
     logInfo("Finding new files took " + timeTaken + " ms") 
     logDebug("# cached file times = " + fileToModTime.size) 
     if (timeTaken > slideDuration.milliseconds) { 
     logWarning(
      "Time taken to find new files exceeds the batch size. " + 
      "Consider increasing the batch size or reducing the number of " + 
      "files in the monitored directory." 
     ) 
     } 
     newFiles.map(_.getPath.toString).toArray 
    } catch { 
     case e: Exception => 
     logWarning("Error finding new files", e) 
     reset() 
     Array.empty 
    } 
    } 

会检查文件中的所有一级子文件夹,你可以调整它使用批处理时间戳以访问相关的“子目录”。

我创建的CustomFileInputDStream正如我所提到,并通过调用激活它:

new CustomFileInputDStream[LongWritable, Text, TextInputFormat](streamingContext, d).map(_._2.toString) 

这似乎表现我们的预期。

当我写的解决方案这样我必须补充几点考虑:

  • 您正在打破星火封装和创造,你将不得不完全支持随着时间的循环中的自定义类。

  • 我相信像这样的解决方案是最后的选择。如果你的用例可以通过不同的方式实现,通常避免这样的解决方案会更好。

  • 如果你在S3上会有很多“子目录”,并且会检查它们中的每一个,这会花费你。

  • 理解Databricks是否仅仅因为可能的性能损失而不支持嵌套文件会非常有趣,也许还有更深层次的原因我没有想过。

+0

我有一个类似的用例,如果我找不到替代方案,我正在考虑沿着这条路走下去。我使用格式为YYYY-MM-DD-HH的日期分区子文件夹。每个小时都会创建一个新文件夹并将文件上传到其中。所以我不一定要扫描所有的子文件夹(只有最后三个),并不会遇到性能问题。我更担心这种代码和状态管理的重新启动(哪个小时文件夹+文件最后一次扫描等)的可维护性。看看你是否可以分享你的想法,甚至可以为你的自定义FileDstream工作的代码。 – Cheeko 2016-03-01 21:11:52

+0

如果您在流中使用检查点目录,那么当您重新启动应用程序时,您将首先重新安排应用程序停机期间应执行的所有批次。例如,如果您的流式传输间隔为1分钟,并且您的应用程序在10:00下班并在10:30后备份,那么当应用程序启动时,该应用程序将尝试执行批次为10:01,10:02等。 现在,如果您执行findNewFiles(currentTime)的方式是您扫描的文件夹是从当前时间派生的,那么您将能够在重新启动后扫描“正确”的文件。 – 2016-03-02 12:01:53

+0

请注意currentTime实际上不是CURRENT时间,而是批次的时间。 我能想到的唯一问题就是如果你的文件不是不可变的。例如,你在10:10将一些数据写入文件A并在10:20覆盖这些数据,那么如果你的应用程序在10:10-10:20之间停止工作,那么你将失去对A的第一次写入。确实是一个问题,但我并不熟悉在这种情况下使用可变文件的许多组织。 – 2016-03-02 12:05:07