2017-02-20 28 views
0

我正在实施一个有多个状态的演员,并且使用Stash不会丢失任何消息。我的状态正在初始化(从数据库中获取某些内容),运行(处理请求)和更新(更新我的状态)。 我的问题是,当我尝试unstashAll()未来解决时,我输了消息。akka unstashAll不工作

def initializing: Receive = { 
case Initialize => 
    log.info("initializing") 
    (for { 
    items1 <- db.getItems("1") 
    items2 <- db.getItems("2") 
    } yield items1 ::: items2) map {items => 
    unstashAll() 
    context.become(running(items)) 
    } 
case r => 
    log.debug(s"actor received message: $r while initializing and stashed it for further processing") 
    stash()} 

我改变我实现这个

def initializing: Receive = { 
case Initialize => 
    log.info("initializing") 
    (for { 
    items1 <- db.getItems("1") 
    items2 <- db.getItems("2") 
    } yield items1 ::: items2) pipeTo self 
    context.become({ 
    case items: List[Item] => 
     unstashAll() 
     context.become(running(items)) 
    case r => 
     log.debug(s"actor received message: $r while initializing and stashed it for further processing") 
     stash() 
    }) 
case r => 
    log.debug(s"actor received message: $r while initializing and stashed it for further processing") 
    stash()} 

任何人都可以解释为什么第一个没有工作固定呢?

回答

1

我认为unstashAll部分工作正常。问题在于,您正在将其与您的context.become一起运行 - 作为未来回调的一部分。

这意味着您的map块中的代码会转移您演员顺序处理的可预测性。换句话说,这可能发生:

  1. 你叫unstashAll
  2. 您的消息放回到演员的邮箱
  3. 演员拿起你的消息一个接一个。上下文仍然没有改变,所以他们再次藏起来
  4. 你的背景下终于成为running(但为时已晚)

的解决方案是 - 当你发现 - 在pipeTo模式,它本质上发送Future的结果给演员作为消息。这使得它都是连续的和可预测的。