4
我在Spark Streaming应用程序中使用updateStateByKey函数来保存并更新每个密钥的状态。问题是我想知道更新函数里面的“key”。Spark Streaming - 如何获取updateStateByKey函数中的“密钥”
input.updateStateByKey(updateStateByKeyOfUsers)
def updateStateByKeyOfUsers(newUsers: Seq[Set[String]],
userStatus: Option[(#####)]
): Option[(#####)] = {
//How to get the "Key"
}
-Tao
你不觉得如果你分享了'input'的类型会有帮助吗?基于[这个例子](https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala#L74)我认为关键是第55行中的“t._1”,尽管你的方法签名看起来非常不同。它是否编译? –
是的,这个工程。谢谢你的帮助。 –