予取数据的流(nginx的联机日志),数据结构是:火花流为时间序列处理从UDP套接字(由时间间隔划分的数据)
date | ip | mac | objectName | rate | size
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2 | book1 | 10 | 121
2016-04-05 11:17:34 | 10.0.0.2 | a5:a8 | book2351 | 8 | 2342
2016-04-05 11:17:34 | 10.0.0.3 | d1:b56| bookA5 | 10 | 12
2016-04-05 11:17:35 | 10.0.0.1 | e1:e2 | book67 | 10 | 768
2016-04-05 11:17:35 | 10.0.0.2 | a5:a8 | book2351 | 8 | 897
2016-04-05 11:17:35 | 10.0.0.3 | d1:b56| bookA5 | 9 | 34
2016-04-05 11:17:35 | 10.0.0.4 | c7:c2 | book99 | 9 | 924
...
2016-04-05 11:18:01 | 10.0.0.1 | e1:e2 | book-10 | 8 | 547547
2016-04-05 11:18:17 | 10.0.0.4 | c7:c2 | book99 | 10 | 23423
2016-04-05 11:18:18 | 10.0.0.3 | d1:b56| bookA5 | 10 | 1138
我不得不:
- 聚合数据,按分钟分区 - 一分钟结果行(分钟,ip,mac)
- objectName - 可以在分钟内更改,我必须先取第一个,即为
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2
book1
更改为book67
,因此必须是book1
- 速率 - 期间munute
- 大小的变化率计数 - 大小之间差别为
2016-04-05 11:17:34 | 10.0.0.1 | e1:e2
(内侧分区前面的时间,当前的内部分区时间),即= ... 768 - 121
所以,结果(不计算尺寸):
date | ip | mac | objectName | changes | size
2016-04-05 11:17:00 | 10.0.0.1 | e1:e2 | book1 | 0 | 768 - 121
2016-04-05 11:17:00 | 10.0.0.2 | a5:a8 | book2351 | 0 | 897 - 2342
2016-04-05 11:17:00 | 10.0.0.3 | d1:b56| bookA5 | 1 | 34 - 12
2016-04-05 11:17:00 | 10.0.0.4 | c7:c2 | book99 | 0 | 924
...
2016-04-05 11:18:00 | 10.0.0.1 | e1:e2 | book-10 | 0 | 547547
2016-04-05 11:18:00 | 10.0.0.4 | c7:c2 | book99 | 0 | 23423
2016-04-05 11:18:00 | 10.0.0.3 | d1:b56| bookA5 | 0 | 1138
这里扣我的代码,我知道updateStateByKey
约window
,但我不明白particul阿尔利,我怎么能刷新数据,数据库或文件系统,当周期(分钟)改为:
private static final Duration SLIDE_INTERVAL = Durations.seconds(10);
private static final String nginxLogHost = "localhost";
private static final int nginxLogPort = 9999;
private class Raw {
LocalDate time; // full time with seconds
String ip;
String mac;
String objectName;
int rate;
int size;
}
private class Key {
LocalDate time; // time with 00 seconds
String ip;
String mac;
}
private class RawValue {
LocalDate time; // full time with seconds
String objectName;
int rate;
int size;
}
private class Value {
String objectName;
int changes;
int size;
}
public static void main(String[] args) {
SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("TestNginxLog");
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
JavaStreamingContext jssc = new JavaStreamingContext(conf, SLIDE_INTERVAL);
jssc.checkpoint("/tmp");
JavaReceiverInputDStream<Raw> logRecords = jssc.receiverStream(new NginxUDPReceiver(nginxLogHost, nginxLogPort));
PairFunction<Raw, Key, RawValue> pairFunction = (PairFunction<Raw, Key, RawValue>) rawLine -> {
LocalDateTime time = rawLine.getDateTime();
Key k = new Key(LocalTime.of(time.getHour(), time.getMinute()), rawLine.getIp(), rawLine.getMac());
RawValue v = new RawValue(time, rawLine.getObjectName(), rawLine.getRate(), rawLine.getSize());
return new Tuple2<>(k, v);
};
JavaPairDStream<Key, RawValue> logDStream = logRecords.mapToPair(pairFunction);