2016-03-28 88 views
0

我解析.log文件JavaRDD,后整理这JavaRDD,现在我有,例如oldJavaRDD
2016-03-28 | 11:00 | X | object1 | region1
2016-03-28 | 11:01 | Y | object1 | region1
2016-03-28 | 11:05 | X | object1 | region1
2016-03-28 | 11:09 | X | object1 | region1
2016-03-28 | 11:00 | X | object2 | region1
2016-03-28 | 11:01 | Z | object2 | region1如何计算当前和以前的行之间的差异星火JavaRDD

我怎么能得到newJavaRDD f或保存到数据库?
新JavaRDD结构必须是:
2016-03-28 | 9 | object1 | region1
2016-03-28 | 1 | object2 | region1
所以,我现在和以前的行之间的时间来计算(也使用标志X, Y, Z在某些情况下定义,添加时间造成与否),并添加新元素更改为date, objectNameobjectRegion之后的JavaRDD。

我可以使用这种类型的代码(地图)做到这一点,但我认为这是不好的,不是最快的方式

JavaRDD<NewObject> newJavaRDD = oldJavaRDD.map { r -> 
     String datePrev[] = ... 
     if (datePrev != dateCurr ...) { 
      return newJavaRdd; 
     } else { 
      return null; 
     } 
    } 
+0

你能澄清'X,Y,Z'是什么意思?不清楚哪些记录应该包含在输出中,哪些不应该...... –

+0

仅供示例:前一行包含标志“X”,当前行包含“Y”,因此我们有转换“X-> Y”。在这种情况下,我们不能在这些行之间聚合时间,结果'sum(11:01 - 11:00)= 0'。如果'Y-> X',我们必须在行之间聚合时间,结果'sum(11:05 - 11:01)= 4分钟'。如果'X-> X' - 也聚合,则结果为'4分钟+总和(11:09 - 11:05)= 4分钟+4分钟= 8分钟。我还必须认识到其他一些规则,但它们都涉及当前行和预览行之间的区别。 –

回答

0

首先,你的代码示例从转换中引用newJavaRDD创建newJavaRDD - 在几个不同的水平,是不可能的:

  • 不能引用该变量的德的右手侧的可变claration ...
  • 你不能在RDD的转换中使用RDD(同一个或另一个 - 无关紧要) - 转换中的任何内容都必须由Spark序列化,并且Spark不能序列化它自己的RDD(这是没有意义的)

那么,你应该怎么做?

假设

  1. 这里你的目的是要获得一个纪录的date + object + region
  2. 不应该有对每一个这样的组合太多记录,所以它的每个组合安全groupBy这些领域为重点

可以groupBy的重点领域,然后mapValues以获得第一个和最后一个记录之间的“分钟距离”(如果我没有正确理解,传递给mapValues的函数可以更改为包含您的确切逻辑)。我将使用乔达时间库的时间计算:

public static void main(String[] args) { 
    // some setup code for this test: 
    JavaSparkContext sc = new JavaSparkContext("local", "test"); 

    // input: 
    final JavaRDD<String[]> input = sc.parallelize(Lists.newArrayList(
      //    date  time  ? object  region 
      new String[]{"2016-03-28", "11:00", "X", "object1", "region1"}, 
      new String[]{"2016-03-28", "11:01", "Y", "object1", "region1"}, 
      new String[]{"2016-03-28", "11:05", "X", "object1", "region1"}, 
      new String[]{"2016-03-28", "11:09", "X", "object1", "region1"}, 
      new String[]{"2016-03-28", "11:00", "X", "object2", "region1"}, 
      new String[]{"2016-03-28", "11:01", "Z", "object2", "region1"} 
    )); 

    // grouping by key: 
    final JavaPairRDD<String, Iterable<String[]>> byObjectAndDate = input.groupBy(new Function<String[], String>() { 
     @Override 
     public String call(String[] record) throws Exception { 
      return record[0] + record[3] + record[4]; // date, object, region 
     } 
    }); 

    // mapping each "value" (all record matching key) to result 
    final JavaRDD<String[]> result = byObjectAndDate.mapValues(new Function<Iterable<String[]>, String[]>() { 
     @Override 
     public String[] call(Iterable<String[]> records) throws Exception { 
      final Iterator<String[]> iterator = records.iterator(); 
      String[] previousRecord = iterator.next(); 
      int diffMinutes = 0; 

      for (String[] record : records) { 
       if (record[2].equals("X")) { // if I got your intention right... 
        final LocalDateTime prev = getLocalDateTime(previousRecord); 
        final LocalDateTime curr = getLocalDateTime(record); 
        diffMinutes += Period.fieldDifference(prev, curr).toStandardMinutes().getMinutes(); 
       } 
       previousRecord = record; 
      } 

      return new String[]{ 
        previousRecord[0], 
        Integer.toString(diffMinutes), 
        previousRecord[3], 
        previousRecord[4] 
      }; 
     } 
    }).values(); 

    // do whatever with "result"... 
} 

// extracts a Joda LocalDateTime from a "record" 
static LocalDateTime getLocalDateTime(String[] record) { 
    return LocalDateTime.parse(record[0] + " " + record[1], formatter); 
} 

static final DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd HH:mm"); 

附:在斯卡拉这将需要大约8行...:/

+0

对不起,我用伪代码搞糊涂了,你说的对'newJavaRDD',我的意思是'返回new NewObject(...)'。没关系,你的回答真的很有帮助和工作(幸运的是,我可以使用java8来减少愚蠢的线条)。 –

相关问题