2017-03-03 53 views
1

我最近开始使用Spark和Java。我目前正在尝试RDD转换和操作。目前,我正在从包含一些日期时间字段的csv读取数据,然后应用过滤器仅保留那些小于2天的行,最后检查结果RDD是否为空。我写了一个简单的代码片段,在最低级别上完成我想要的功能。如何使用非Lambda函数定义Spark RDD转换

Function<List<String>, Boolean> filterPredicate = row -> new DateTime(row.get(1).isAfter(dtThreshold); 

sc.textFile(inputFilePath) 
      .map(text -> Arrays.asList(text.split(","))) 
      .filter(filterPredicate) 
      .isEmpty(); 

在这种简单的情况下,我假定DateTime对象总是位于第一列。我现在想扩展它以使用多列索引。 但要做到这一点,我需要能够定义一个谓词函数与多个行。这就是为什么我将变量代码中的谓词函数定义分开的原因。

我该如何定义这样的功能?

回答

2

使用大括号符号...

Function<List<String>, Boolean> filterPredicate = row -> { 
     boolean isDateAfter = new DateTime(row.get(1)).isAfter(dtThreshold); 
     boolean hasName = row.get(2) != ""; 
     return isDateAfter && hasName; 
    } 
+1

凉爽,由于固定 – Brad

相关问题