2016-12-29 97 views
1

我已经流数据来降低如下星火由

id, date, value 
i1, 12-01-2016, 10 
i2, 12-02-2016, 20 
i1, 12-01-2016, 30 
i2, 12-05-2016, 40 

想通过ID,以减少度日日期总值信息像RDD需要

输出对于给定的ID和列表(天365) 我必须把值的基础上一年的一天是2016年12月1日列表中的位置是336,因为有与同一日期设备I1两个实例,他们应该聚集

id, List [0|1|2|3|...    |336| 337| |340| |365] 
i1,        |10+30|  - this goes to 336 position 

i2,          20  40 -- this goes to 337 and 340 position 

请指导减少或转换组来做到这一点。

+0

这是Spark Streaming还是Structured Streaming?你试过什么了?问题在哪里? –

+0

问题是动态列表更新和如何减少如果我通过编号减少所有的值将汇总不管一年中的哪一天 –

+1

你已经有什么代码?这是Spark Streaming吗? –

回答

0

我会为您提供基本代码片断,但您没有指定有关语言,数据源或数据格式的几个假设。

JavaDStream<String> lineStream = //Your data source for stream 
JavaPairDStream<String, Long> firstReduce = lineStream.mapToPair(line -> { 
    String[] fields = line.split(","); 
    String idDate = fields[0] + fields[1]; 
    Long value = Long.valueOf(fields[2]); 
    return new Tuple2<String, Long>(idDate, value); 
}).reduceByKey((v1, v2) -> { 
    return (v1+v2); 
}); 
firstReduce.map(idDateValueTuple -> { 
    String idDate = idDateValueTuple._1(); 
    Long valueSum = idDateValueTuple._2(); 
    String id = idDate.split(",")[0]; 
    String date = idDate.split(",")[]; 
    //TODO parse date and put the sumValue in array as you wish 
} 
+0

抱歉忘记提及我正在使用scala –

+0

没关系。同样的上面的代码可以很容易地转换为斯卡拉 – code

+0

我不认为上述逻辑将适用于id = id2的情况下,因为在给定的示例数据中id2的每个条目的日期不同。 – Phoenix

0

只能达到这么远。我不确定如何在最后一步添加数组的每个元素。希望这有助于!!!如果你得到了最后一步或任何其他方式,欣赏你是否在这里发布!

def getDateDifference(dateStr:String):Int = { 
val startDate = "01-01-2016" 
val formatter = DateTimeFormatter.ofPattern("MM-dd-yyyy") 
val oldDate = LocalDate.parse(startDate, formatter) 
val currentDate = dateStr 
val newDate = LocalDate.parse(currentDate, formatter) 
return newDate.toEpochDay().toInt - oldDate.toEpochDay().toInt 
} 
def getArray(numberofDays:Int,data:Int):Iterable[Int] = { 
val daysArray = new Array[Int](366) 
daysArray(numberofDays) = data 
return daysArray 
} 
val idRDD = <read from stream> 
val idRDDMap = idRDD.map { rec => ((rec.split(",")(0),rec.split(",")(1)), 
     (getDateDifference(rec.split(",")(1)),rec.split(",")(2).toInt))} 
val idRDDconsiceMap = idRDDMap.map { rec => (rec._1._1,getArray(rec._2._1, rec._2._2)) } 
val finalRDD = idRDDconsiceMap.reduceByKey((acc,value)=>(???add each element of the arrays????))