2016-11-23 20 views
1

还有就是流数据卡夫卡,连续浮点数:如何使用星火流计算均值和流数据的变化,在过去25天,从卡夫卡

2016年11月23日11:00:00 | 12.2

2016年11月23日11:03:00 | 13.2

2016年11月23日11:05:00 | 15.1

......

我想计算这些浮点数的均值和方差在过去的25天里,bers在上午11点到12点之间。

火花流适合处理这个问题?

非常感谢!

+0

什么是您的数据看起来像什么呢?你怎么知道它的时间也存在于数据中? –

+0

还有一件事,你想多长时间计算一次,平均值和差异,这是单次还是每一秒或每一天? –

+0

@ShivanshSrivastava我已经编辑了数据格式,请看问题详情。数据格式:时间|价值。 – Ming

回答

0

@Ming,您可以使用此作为一个抽象的

val sparkConf = new SparkConf().setAppName("StreamCount") 
    val ssc = new StreamingContext(sparkConf, Seconds(2)) 
    //update the time according to your need 

    // Create direct kafka stream with brokers and topics 
    val topicsSet = topics.split(",").toSet 
    val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers) 
    val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
     ssc, kafkaParams, topicsSet) 

    // Get the lines, and timestamp data along with the float values 


    SELECT float_number 
FROM  [YourTable] 
WHERE  [YourDate] BETWEEN DATEADD(DAY, DATEDIFF(DAY, 0, GETDATE()), 0) + '11:00' AND DATEADD(DAY, DATEDIFF(DAY, 0, GETDATE()), 0) + '12:00' 
//store it to a data frame 

df.select(avg($"float_number")).show() 
+0

数据连续来自kafka,因此存储过去25天数据的位置?如何更新这些数据,即第二天如何删除最早一天的数据并添加今天的数据? – Ming

+0

请问您是否同意上述解决方案适合您的问题。请计划再提问另一个问题,如果你有更多的问题,但没有一个问题的一切。 :) – JustCoder

+0

对不起,我只是一个新来的人。我已经同意你的答案。 – Ming

相关问题