2015-05-20 29 views
4

我在Spark Streaming应用程序中使用updateStateByKey函数来保存并更新每个密钥的状态。问题是我想知道更新函数里面的“key”Spark Streaming - 如何获取updateStateByKey函数中的“密钥”

input.updateStateByKey(updateStateByKeyOfUsers) 

def updateStateByKeyOfUsers(newUsers: Seq[Set[String]], 
          userStatus: Option[(#####)] 
          ): Option[(#####)] = { 
    //How to get the "Key" 
} 

-Tao

+2

你不觉得如果你分享了'input'的类型会有帮助吗?基于[这个例子](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala#L74)我认为关键是第55行中的“t._1”,尽管你的方法签名看起来非常不同。它是否编译? –

+0

是的,这个工程。谢谢你的帮助。 –

回答

4

一般来说,星火API不会让你获得一个关键。这很令人伤心。您有两种选择:在每个输入中包含密钥或将其包含在一个状态中。

相关问题