2016-04-05 18 views
3

予取数据的流(nginx的联机日志),数据结构是:火花流为时间序列处理从UDP套接字(由时间间隔划分的数据)

date    | ip  | mac | objectName | rate | size 
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2 | book1  | 10 | 121 
2016-04-05 11:17:34 | 10.0.0.2 | a5:a8 | book2351 | 8 | 2342 
2016-04-05 11:17:34 | 10.0.0.3 | d1:b56| bookA5  | 10 | 12 

2016-04-05 11:17:35 | 10.0.0.1 | e1:e2 | book67  | 10 | 768 
2016-04-05 11:17:35 | 10.0.0.2 | a5:a8 | book2351 | 8 | 897 
2016-04-05 11:17:35 | 10.0.0.3 | d1:b56| bookA5  | 9 | 34 
2016-04-05 11:17:35 | 10.0.0.4 | c7:c2 | book99  | 9 | 924 
... 
2016-04-05 11:18:01 | 10.0.0.1 | e1:e2 | book-10 | 8 | 547547 
2016-04-05 11:18:17 | 10.0.0.4 | c7:c2 | book99  | 10 | 23423 
2016-04-05 11:18:18 | 10.0.0.3 | d1:b56| bookA5  | 10 | 1138 

我不得不:

  • 聚合数据,按分钟分区 - 一分钟结果行(分钟,ip,mac)
  • objectName - 可以在分钟内更改,我必须先取第一个,即为2016-04-05 11:17:34 | 10.0.0.1 | e1:e2book1更改为book67 ,因此必须是book1
  • 速率 - 期间munute
  • 大小的变化率计数 - 大小之间差别为2016-04-05 11:17:34 | 10.0.0.1 | e1:e2(内侧分区前面的时间,当前的内部分区时间),即= ... 768 - 121

所以,结果(不计算尺寸):

date    | ip  | mac | objectName | changes | size 
2016-04-05 11:17:00 | 10.0.0.1 | e1:e2 | book1  | 0  | 768 - 121 
2016-04-05 11:17:00 | 10.0.0.2 | a5:a8 | book2351 | 0  | 897 - 2342 
2016-04-05 11:17:00 | 10.0.0.3 | d1:b56| bookA5  | 1  | 34 - 12  
2016-04-05 11:17:00 | 10.0.0.4 | c7:c2 | book99  | 0  | 924 
... 
2016-04-05 11:18:00 | 10.0.0.1 | e1:e2 | book-10 | 0  | 547547 
2016-04-05 11:18:00 | 10.0.0.4 | c7:c2 | book99  | 0  | 23423 
2016-04-05 11:18:00 | 10.0.0.3 | d1:b56| bookA5  | 0  | 1138 

这里扣我的代码,我知道updateStateByKeywindow,但我不明白particul阿尔利,我怎么能刷新数据,数据库或文件系统,当周期(分钟)改为

private static final Duration SLIDE_INTERVAL = Durations.seconds(10); 
private static final String nginxLogHost = "localhost"; 
private static final int nginxLogPort = 9999; 
private class Raw { 
    LocalDate time; // full time with seconds 
    String ip; 
    String mac; 
    String objectName; 
    int rate; 
    int size; 
} 
private class Key { 
    LocalDate time; // time with 00 seconds 
    String ip; 
    String mac; 
} 
private class RawValue { 
    LocalDate time; // full time with seconds 
    String objectName; 
    int rate; 
    int size; 
} 
private class Value { 
    String objectName; 
    int changes; 
    int size; 
} 
public static void main(String[] args) { 
    SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("TestNginxLog"); 
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 
    JavaStreamingContext jssc = new JavaStreamingContext(conf, SLIDE_INTERVAL); 
    jssc.checkpoint("/tmp"); 
JavaReceiverInputDStream<Raw> logRecords = jssc.receiverStream(new NginxUDPReceiver(nginxLogHost, nginxLogPort)); 
PairFunction<Raw, Key, RawValue> pairFunction = (PairFunction<Raw, Key, RawValue>) rawLine -> { 
     LocalDateTime time = rawLine.getDateTime(); 
     Key k = new Key(LocalTime.of(time.getHour(), time.getMinute()), rawLine.getIp(), rawLine.getMac()); 
     RawValue v = new RawValue(time, rawLine.getObjectName(), rawLine.getRate(), rawLine.getSize()); 
     return new Tuple2<>(k, v); 
    }; 
    JavaPairDStream<Key, RawValue> logDStream = logRecords.mapToPair(pairFunction); 

回答

0

所以,我通过自己的回答来解决这个问题。您可以使用此功能示例作为updateStateByKey的参数。此代码中的线索字是:Optional.absent()以消除已保存的数据,Optional.of(...将数据分组和setAggregateReady(true)
最后一个用于通过过滤器getAggregateReady(true)和一些Spark Streaming输出操作将数据保存到外部目标(DB或文件系统),例如foreachRDD
之后,下一批中的这些数据将落入updateStateByKey中,并将通过代码removeIf(T::isAggregateReady)排除。

/** 
* It aggregates data between batches. 
* <p> 
* currentBatchValues values that was got in current batch 
* previousBatchesState values that was got in all previous batches 
* You have to clear data (return for them Optional.absent()) to eliminate them from DStream. 
* First batch: data checked for aggregateReady. 
* Second batch: data, signed aggregateReady=true removes from DStream (you have to save them to DB or another target before this cleaning) 
*/ 
protected Function2<List<Iterable<T>>, Optional<List<T>>, Optional<List<T>>> updateDataRowsFunc = (currentBatchValues, previousBatchesState) -> { 

    Optional<List<T>> res; 

    //log.debug("previousBatchesState isPresent {}", previousBatchesState.isPresent()); 
    //log.debug("previousBatchesState {}", previousBatchesState); 
    //log.debug("currentBatchValues isEmpty {}", currentBatchValues.isEmpty()); 
    //log.debug("currentBatchValues {}", currentBatchValues); 

    // previous data that was aggregateReady already saved 
    if (previousBatchesState.isPresent()) { 
     log.debug("count before remove = {}", previousBatchesState.get().size()); 
     previousBatchesState.get().removeIf(T::isAggregateReady); 
     // absent previous state if all of it's data was aggregated already 
     int cntBefore = previousBatchesState.get().size(); 
     if (cntBefore == 0) previousBatchesState = Optional.absent(); 
    } 

    // warn: can't bear comparator outside, for the reason that error "Task can'not serializable" 
    Comparator<T> dataRowByAggGroupComparator = (o1, o2) -> o1.getAggregateGroup().compareTo(o2.getAggregateGroup()); 

    // no data was collected at previous batches && data exists in current batch 
    if (!previousBatchesState.isPresent() && !currentBatchValues.isEmpty()) { 

     log.debug("algorithm 1"); 

     // list currentBatchValues contains only 1 value (1-to-N NginxDataRow records), so we getAllJsonFilesInFolder it Iterable and convert to List 
     // warn: may be another way to compare Iterable elements, without using List 
     List<T> listDataRow = new ArrayList<>(); 
     currentBatchValues.get(0).forEach(listDataRow::add); 

     // in one batch we can getAllJsonFilesInFolder data for 2 aggregateGroups, if batch was split between groups 
     LocalDateTime minAggGroup = listDataRow.stream().min(dataRowByAggGroupComparator).get().getAggregateGroup(); 
     LocalDateTime maxAggGroup = listDataRow.stream().max(dataRowByAggGroupComparator).get().getAggregateGroup(); 

     // batch was split between groups 
     if (!minAggGroup.equals(maxAggGroup)) { 
      log.debug("batch was split between groups {} and {}", minAggGroup, maxAggGroup); 
      // set ready to aggregate for previous group of data, because aggregate group was changed 
      listDataRow.stream().filter(z -> z.getAggregateGroup().equals(minAggGroup)).forEach(z -> z.setAggregateReady(true)); 
     } 

     res = Optional.of(listDataRow); 
     //log.debug("agg res = {}", res); 

     // data exist in both: previous and current batches 
    } else if (previousBatchesState.isPresent() && !currentBatchValues.isEmpty()) { 

     log.debug("algorithm 2"); 

     List<T> listCurrentBatchDataRow = new ArrayList<>(); 
     currentBatchValues.get(0).forEach(listCurrentBatchDataRow::add); 

     LocalDateTime previousBatchAggGroup = previousBatchesState.get().stream().findFirst().get().getAggregateGroup(); 

     // in one batch we can getAllJsonFilesInFolder data for 2 aggregateGroups, if batch was split between groups 
     LocalDateTime minCurrentBatchAggGroup = listCurrentBatchDataRow.stream().min(dataRowByAggGroupComparator).get().getAggregateGroup(); 
     LocalDateTime maxCurrentBatchAggGroup = listCurrentBatchDataRow.stream().max(dataRowByAggGroupComparator).get().getAggregateGroup(); 

     // previous and current data in different groups 
     if (!previousBatchAggGroup.equals(maxCurrentBatchAggGroup)) { 

      log.debug("previous batch needed to save, because agg group was changed from {} to {}", previousBatchAggGroup, maxCurrentBatchAggGroup); 
      // set ready to aggregate for previous group of data, because aggregate group was changed 
      previousBatchesState.get().stream().forEach(z -> z.setAggregateReady(true)); 

      // batch was split between groups 
      if (!minCurrentBatchAggGroup.equals(maxCurrentBatchAggGroup)) { 

       log.debug("batch was split between groups {} and {}", minCurrentBatchAggGroup, maxCurrentBatchAggGroup); 
       listCurrentBatchDataRow.stream().filter(z -> z.getAggregateGroup().equals(minCurrentBatchAggGroup)).forEach(z -> z.setAggregateReady(true)); 

      } 
     } 

     // union previous and current batches data 
     previousBatchesState.get().addAll(listCurrentBatchDataRow); 

     res = Optional.of(previousBatchesState.get()); 
     //log.debug("agg res = {}", res); 

     // data exist in previous batch but current batch is empty 
    } else if (previousBatchesState.isPresent() && currentBatchValues.isEmpty()) { 

     log.debug("algorithm 3"); 

     res = previousBatchesState; 
     //log.debug("agg res = {}", res); 

     // all of previous data was aggregated and absent() already 
    } else if (!previousBatchesState.isPresent() && currentBatchValues.isEmpty()) { 

     log.debug("algorithm 4"); 

     res = Optional.absent(); 

    } else { 

     log.error("Strange situation, you have to check log-file"); 
     res = null; 

    } 

    // if abonent data was received in one minute and after abonent shut down connection, they will stay in DStream forever 
    // after some period forced to save them 
    if (res != null && res.isPresent()) { 
     res.get().stream().filter(z -> Math.abs(java.time.Duration.between(z.getAggregateGroup(), LocalDateTime.now()).getSeconds()/60) > FORCED_SAVE_INTERVAL).forEach(z -> z.setAggregateReady(true)); 
    } 

    return res; 
}; 
0

这是部分答案,但问题尚未完成。 mapToPair后,我用:

// 1 key - N values 
    JavaPairDStream<Key, Iterable<Value>> abonentConnects = logDStream.groupByKey(); 

    // Accumulate data 
    Function2<List<Iterable<Value>>, Optional<List<Value>>, Optional<List<Value>>> updateFunc = (Function2<List<Iterable<Value>>, Optional<List<Value>>, Optional<List<Value>>>) (values, previousState) -> { 
     List<Value> sum = previousState.or(new ArrayList<>()); 
     for (Iterable<Value> v : values) { 
      v.forEach(sum::add); 
     } 
     return Optional.of(sum); 
    }; 
    JavaPairDStream<Key, List<Value>> state = abonentConnects.updateStateByKey(updateFunc); 

    // filter data (previous minute) 
    Function<Tuple2<Key, List<Value>>, Boolean> filterFunc = (Function<Tuple2<Key, List<Value>>, Boolean>) v1 -> { 
     LocalDateTime previousTime = LocalDateTime.now().minusMinutes(1).withSecond(0).withNano(0); 
     LocalDateTime valueTime = v1._1().getTime(); 
     return valueTime.compareTo(previousTime) == 0; 
    }; 
    JavaPairDStream<Key, List<Value>> filteredRecords = state.filter(filterFunc); 

    // save data 
    filteredRecords.foreachRDD(x -> { 
     if (x.count() > 0) { 
      x.saveAsTextFile("/tmp/xxx/grouped/" + LocalDateTime.now().toString().replace(":", "-").replace(".", "-")); 
     } 
    }); 

    streamingContext.start(); 
    streamingContext.awaitTermination(); 

至于结果数据产生,但由于操作执行每5秒钟,我得到了相同的重复数据每5秒的微小的。
我知道,我必须使用Optional.absent()来清除保存的数据流。我尝试过使用它,但是我不能在一个片段中合并:将数据保存到文件系统或HashMap |立即清除保存的数据。
问题:我该怎么做?

相关问题