您可以通过在案件引入一个中间演员解决这个问题,当你需要等待两个消息。
该角色会是这个样子:
class AggregationActor(aActor: ActorRef) extends Actor {
var awaitForM2: Option[M2] = None
var awaitForM3: Option[M3] = None
var originalSender: Option[ActorRef] = None
def receive: Receive = {
case M1 =>
// We save the sender
originalSender = Some(sender())
// Proxy the message
aActor ! M1
case M2 =>
awaitForM2 = Some(M2)
checkIfBothMessagesHaveArrived()
case M3 =>
awaitForM3 = Some(M3)
checkIfBothMessagesHaveArrived()
}
private def checkIfBothMessagesHaveArrived() = {
for {
m2 <- awaitForM2
m3 <- awaitForM3
s <- originalSender
} {
// Send as a tuple
s ! (m2, m3)
// Shutdown, our task is done
context.stop(self)
}
}
}
本质上,它具有内部状态和跟踪M1
和M2
反应是如何到达的。
您可以使用此类似:
def awaitBothMessages(input: M1, underlyingAActor: ActorRef, system: ActorSystem): Future[(M2, M3)] = {
val aggregationActor = system.actorOf(Props(new AggregationActor(aActor)))
(aggregationActor ? input).mapTo[(M2, M3)]
}
val system = ActorSystem("test")
val aActor = system.actorOf(Props(new A), name = "aActor")
// Awaiting the first message only:
val firstMessage = aActor ? M1
val first = Await.result(firstMessage, Duration.Inf)
// Awaiting both messages:
val bothMessages: Future[(M2, M3)] = awaitBothMessages(M1, aActor, system)
val both = Await.result(firstMessage, Duration.Inf)
你开到创建另一个演员来代理M1和等待都M2和M3? –
是的,但又是另一个演员,我们需要等到M2和M3都收到。有什么办法做到这一点?如果你的意思是异步等待M2和M3那很棘手,因为我需要确保M3是从相应的演员A – sbeliakov
的实例中发出的,我从同事那里听说它要么太棘手,要么根本不可能,因为akka并非为此设计的,方法是重新设计解决方案以异步接收消息。 – sbeliakov