2016-03-21 143 views
1

我需要Apache Spark帮助实现一个工作流程。我的任务在下:分别在SPARK处理几个文件

  1. 我有几个CSV文件作为源数据。注意:这些文件可能有不同的布局

  2. 我有信息,我多么需要分析每个文件的元数据(这不是问题)

  3. 主要目标:结果是与几个其他列的源文件。我必须更新每个源文件而不加入一个输出范围。例如:源10个文件 - > 10个结果文件,每个结果文件只有来自相应源文件的数据。

据我所知星火可以通过面具打开多个文件:

var source = sc.textFile("/source/data*.gz"); 

但在这种情况下,我不能识别文件的哪一行。如果我得到的源文件的列表,并尝试过程以下情形:

JavaSparkContext sc = new JavaSparkContext(...); 
List<String> files = new ArrayList() //list of source files full name's 
for(String f : files) 
{ 
    JavaRDD<String> data = sc.textFile(f); 
    //process this file with Spark 
    outRdd.coalesce(1, true).saveAsTextFile(f + "_out"); 
} 

但在这种情况下,我会处理在连续模式下的所有文件。

我的问题是下一个:我怎么可以在并行模式下处理多个文件?例如:一个文件 - 一个执行者?

我试图通过简单的代码与源数据来实现这一点:

//JSON file with paths to 4 source files, saved in inData variable 
{ 
"files": [ 
    { 
     "name": "/mnt/files/DigilantDaily_1.gz", 
     "layout": "layout_1" 
    }, 
    { 
     "name": "/mnt/files/DigilantDaily_2.gz", 
     "layout": "layout_2" 
    }, 
    { 
     "name": "/mnt/files/DigilantDaily_3.gz", 
     "layout": "layout_3" 
    }, 
    { 
     "name": "/mnt/files/DigilantDaily_4.gz", 
     "layout": "layout_4" 
    } 
    ] 
} 

sourceFiles= new ArrayList<>(); 
    JSONObject jsFiles = (JSONObject) new JSONParser().parse(new FileReader(new File(inData))); 
    Iterator<JSONObject> iterator = ((JSONArray)jsFiles.get("files")).iterator(); 
    while (iterator.hasNext()){ 
     SourceFile sf = new SourceFile(); 
     JSONObject js = iterator.next(); 
     sf.FilePath = (String) js.get("name"); 
     sf.MetaPath = (String) js.get("layout"); 
     sourceFiles.add(sf); 
    } 

    SparkConf sparkConf = new SparkConf() 
      .setMaster("local[*]") 
      .setAppName("spark-app"); 
    final JavaSparkContext sparkContext = new JavaSparkContext(sparkConf); 

    try { 

     final Validator validator = new Validator(); 

     ExecutorService pool = Executors.newFixedThreadPool(4); 

     for(final SourceFile f : sourceFiles) 
     { 
      pool.execute(new Runnable() { 

       @Override 
       public void run() { 

        final Path inFile = Paths.get(f.FilePath); 

        JavaRDD<String> d1 = sparkContext 
          .textFile(f.FilePath) 
          .filter(new Function<String, Boolean>() { 
           @Override 
           public Boolean call(String s) throws Exception { 
            return validator.parseRow(s); 
           } 
          }); 

        JavaPairRDD<String, Integer> d2 = d1.mapToPair(new PairFunction<String, String, Integer>() { 
         @Override 
         public Tuple2<String, Integer> call(String s) throws Exception { 
          String userAgent = validator.getUserAgent(s); 
          return new Tuple2<>(DeviceType.deviceType(userAgent), 1); 
         } 
        }); 

        JavaPairRDD<String, Integer> d3 = d2.reduceByKey(new Function2<Integer, Integer, Integer>() { 
         @Override 
         public Integer call(Integer val1, Integer val2) throws Exception { 
          return val1 + val2; 
         } 
        }); 

        d3.coalesce(1, true) 
          .saveAsTextFile(outFolder + "/" + inFile.getFileName().toString());//, org.apache.hadoop.io.compress.GzipCodec.class); 
       } 
      }); 
     } 
     pool.shutdown(); 
     pool.awaitTermination(60, TimeUnit.MINUTES); 
    } catch (Exception e) { 
     throw e; 
    } finally { 
     if (sparkContext != null) { 
      sparkContext.stop(); 
     } 
    } 

但这个代码失败,异常:

Exception in thread "pool-13-thread-2" Exception in thread "pool-13-thread-3" Exception in thread "pool-13-thread-1" Exception in thread "pool-13-thread-4" java.lang.Error: org.apache.spark.SparkException: Task not serializable 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1151) 
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
at java.lang.Thread.run(Thread.java:745) 
    Caused by: org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:304) 
at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:294) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:122) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:2032) 
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:335) 
at org.apache.spark.rdd.RDD$$anonfun$filter$1.apply(RDD.scala:334) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:108) 
at org.apache.spark.rdd.RDD.withScope(RDD.scala:310) 
at org.apache.spark.rdd.RDD.filter(RDD.scala:334) 
at org.apache.spark.api.java.JavaRDD.filter(JavaRDD.scala:78) 
at append.dev.App$1.run(App.java:87) 
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
... 2 more 

我想知道我有错了吗?

谢谢你的帮助!

回答

0

你可以使用sc.wholeTextFiles(目录名)获得的(文件名,内容)对放射性散布并映射了该

+0

然后呢?你将如何从磁盘加载每个文件?你不能'fileRdd.flatMap(file => sc.textFile(file))',因为你不应该尝试序列化上下文。 –

+0

true,thx :)我更新了 –

+0

'wholeTextFiles'也是有风险的 - 如果其中一个文件大于2GB,您将得到一个异常(因为它将每个文件加载为单个记录,并且存在2GB的限制Spark中分区的大小)。即使你没有得到例外 - 你会得到糟糕的表现。 –

0

我用了良好的效果类似的多线程技术。我相信这个问题位于你定义的内部类中。

在一个单独的类上创建您的runnable/callable,并确保它可以通过Spark提交给您的jar。此外,实现序列化,因为你隐式地将状态传递给你的函数(f.FilePath)。

+0

2 ** Ioannis Deligiannis **嗨!你认为我应该将我的匿名Runnable类从main方法移到外部jar中的类中吗?你能给我提供一个简短的例子吗? – Yustas

+0

是的。这也将表明你基本上通过“状态”给你的功能。内部类和lambda表达式在spark中使用要复杂一些。先试试这个,序列化异常应该消失。根据您将代码发送给Spark的方式,您可能不需要额外的jar。 –

+0

对不起,您应该首先移动匿名函数<>。验证器是可序列化的吗? –