2015-08-20 33 views
0

我有一个Spark Streaming应用程序正在处理一系列网站点击事件。每个事件都有一个包含GUID的属性,用于标识事件所属的用户会话。Spark Streaming的countByValueAndWindow如何工作?

我的应用程序向上计数发生为每个会话的事件数,使用窗口:

def countEvents(kafkaStream: DStream[(String, Event)]): DStream[(String, Session)] = { 

    // Get a list of the session GUIDs from the events 
    val sessionGuids = kafkaStream 
    .map(_._2) 
    .map(_.getSessionGuid) 

    // Count up the GUIDs over our sliding window 
    val sessionGuidCountsInWindow = sessionGuids.countByValueAndWindow(Seconds(60), Seconds(1)) 

    // Create new session objects with the event count 
    sessionGuidCountsInWindow 
    .map({ 
     case (guidS, eventCount) => 
     guidS -> new Session().setGuid(guidS).setEventCount(eventCount) 
    }) 
} 

我的理解是,countByValueAndWindow功能只对在其被调用的函数在DSTREAM值。换句话说,在上面的代码中,对countByValueAndWindow的调用应仅为我们调用该函数的sessionGuids DStream中的会话GUID返回事件计数。

但是我正在观察一些不同的东西;对countByValueAndWindow的调用将返回不在sessionGUID中的会话GUID的计数。它似乎是返回前几批中处理的会话GUID的计数。我只是误解了这个功能是如何工作的?我一直无法在网上找到有用的文档。

回答

1

我的一位熟悉Spark的方法的同事比我帮助我做的更好。显然,我误解了countByValueAndWindow函数的工作方式。我认为它只会返回您调用函数的DStream中的值的计数。但是,实际上,它会返回整个窗口中所有值的计数。为了解决我的问题,我只需在输入DStream和由countByValueAndWindow操作产生的DStream之间执行连接。因此,我只能得到输入DStream中值的结果。