2016-04-15 91 views
3

我想知道是否有可能在整个应用程序中保持完全不同的状态?例如,第一个状态的update function是否从第二个状态调用?处理不同的状态

我不记得经历过任何这样的例子,也没有找到任何计数器指示......基于https://docs.cloud.databricks.com/docs/spark/1.6/examples/Streaming%20mapWithState.html的例子,我知道没有理由为什么我不能有不同的trackStateFunc s与不同State s,进一步更新那些由于他们的Key,如下图所示:

def firstTrackStateFunc(batchTime: Time, 
         key: String, 
         value: Option[Int], 
         state: State[Long]): Option[(String, Long)] = { 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val output = (key, sum) 
    state.update(sum) 
    Some(output) 
} 

def secondTrackStateFunc(batchTime: Time, 
         key: String, 
         value: Option[Int], 
         state: State[Int]): Option[(String, Long)] = { 
    // disregard problems this example would cause 
    val dif = value.getOrElse(0) - state.getOption.getOrElse(0L) 
    val output = (key, dif) 
    state.update(dif) 
    Some(output) 
} 

我认为这是可能的,但仍然不能确定。我希望有人验证或无效这个假设...

+0

你想要输入是相同的,但有能力更新一个状态在另一个?或者你想从第一个状态中获取状态,并用它来更新两个状态? –

+0

我不一定要输入是相同的,但状态是完全不同的(如一个字段状态和多字段其他)。能够在第一个状态中更新第二个状态会很好,但不是主要目的。实际上,我想知道是否可以在同一应用程序中更新完全不同的状态(通过不同的更新功能)?它对你更清楚吗? – wipman

+0

我这么认为。国家是孤立的,你不能在不同的'mapWithState'函数内在它们之间进行交互。你可以做的就是将这些状态链接在一起,并将它们作为值传递给下一个'mapWithState',但我不认为这就是你想要做的。 –

回答

2

我想知道是否有可能在整个应用程序中保持完全不同的 状态?

DStream[(Key, Value)]mapWithState每次调用可以容纳一个State[T]对象。对于mapWithState的每个调用,此T需要相同。为了使用不同的状态,您可以链接mapWithState呼叫,其中的Option[U]是另一个输入,或者您可以拆分DStream并将不同的mapWithState呼叫应用到每个呼叫。但是,您不能在另一个对象内调用另一个对象,因为它们彼此隔离,而且不能改变另一个对象的状态。

1

@Yuval给出了链式mapWithState函数的一个很好的答案。但是,我有另一种方法。而不是有两个mapWithState调用,你可以把sum和diff放在同一个状态[(Int,Int)]中。

在这种情况下,您只需要一个mapWithState函数,您可以在其中更新这两件事情。就像这样:

def trackStateFunc(batchTime: Time, 
        key: String, 
        value: Option[Int], 
        state: State[(Long, Int)]): Option[(String, (Long, Int))] = 
{ 
    val sum = value.getOrElse(0).toLong + state.getOption.getOrElse(0L) 
    val dif = value.getOrElse(0) - state.getOption.getOrElse(0L) 
    val output = (key, (sum, diff)) 
    state.update((sum, diff)) 
    Some(output) 
}