2015-06-10 42 views
0

假设我有演员A. A预计接收消息,一旦它收到消息,它会发回两条消息。是否有可能等待Scala中的第二个响应

A extends Actor { 
    def receive: Receive = { 
    case M1 => 
     context.sender ! M2 
     context.sender ! M3 
    } 
} 

而在演员A中,我想发送消息,然后等待两个响应。 我知道这很容易像

val task = A ? M1 
Await.result(task, timeout) 

的方式一个响应,但我不知道是否有可能有两个连续的消息。

重要的是发送两个单独的消息,因为我需要等待其他地方只有他们的第一个。

+1

你开到创建另一个演员来代理M1和等待都M2和M3? –

+0

是的,但又是另一个演员,我们需要等到M2和M3都收到。有什么办法做到这一点?如果你的意思是异步等待M2和M3那很棘手,因为我需要确保M3是从相应的演员A – sbeliakov

+0

的实例中发出的,我从同事那里听说它要么太棘手,要么根本不可能,因为akka并非为此设计的,方法是重新设计解决方案以异步接收消息。 – sbeliakov

回答

1

您可以通过在案件引入一个中间演员解决这个问题,当你需要等待两个消息。

该角色会是这个样子:

class AggregationActor(aActor: ActorRef) extends Actor { 

    var awaitForM2: Option[M2] = None 
    var awaitForM3: Option[M3] = None 
    var originalSender: Option[ActorRef] = None 

    def receive: Receive = { 
    case M1 => 
     // We save the sender 
     originalSender = Some(sender()) 
     // Proxy the message 
     aActor ! M1 
    case M2 => 
     awaitForM2 = Some(M2) 
     checkIfBothMessagesHaveArrived() 
    case M3 => 
     awaitForM3 = Some(M3) 
     checkIfBothMessagesHaveArrived() 
    } 

    private def checkIfBothMessagesHaveArrived() = { 
    for { 
     m2 <- awaitForM2 
     m3 <- awaitForM3 
     s <- originalSender 
    } { 
     // Send as a tuple 
     s ! (m2, m3) 
     // Shutdown, our task is done 
     context.stop(self) 
    } 
    } 

} 

本质上,它具有内部状态和跟踪M1M2反应是如何到达的。

您可以使用此类似:

def awaitBothMessages(input: M1, underlyingAActor: ActorRef, system: ActorSystem): Future[(M2, M3)] = { 
    val aggregationActor = system.actorOf(Props(new AggregationActor(aActor))) 
    (aggregationActor ? input).mapTo[(M2, M3)] 
} 

val system = ActorSystem("test") 
val aActor = system.actorOf(Props(new A), name = "aActor") 

// Awaiting the first message only: 
val firstMessage = aActor ? M1 
val first = Await.result(firstMessage, Duration.Inf) 

// Awaiting both messages: 
val bothMessages: Future[(M2, M3)] = awaitBothMessages(M1, aActor, system) 
val both = Await.result(firstMessage, Duration.Inf) 
1

如何返回发件人包含M2和M3的元组?

import akka.pattern.ask 
import akka.actor.{Props, ActorSystem, Actor} 
import akka.util.Timeout 
import com.test.A.{M1, M2, M3} 

import scala.concurrent.Await 
import scala.concurrent.duration._ 

object Test extends App { 

    implicit val timeout = Timeout(5 seconds) 

    val system = ActorSystem("test-system") 
    val actor = system.actorOf(Props[A], name = "a-actor") 
    val future = actor ? M1 
    val await = Await.result(future, Duration.Inf) 
    println(await) 

} 

class A extends Actor { 
    override def receive: Receive = { 
    case M1 => sender() ! (M2, M3) 
    } 
} 

object A { 
    case object M1 
    case object M2 
    case object M3 
} 

运行,这将导致:

(M2,M3) 
+0

感谢您的回答,但正如我所提到的,发送两封单独的邮件非常重要,因为在另一个地方,我希望只在收到第二封邮件时才会收到第一封邮件 – sbeliakov

相关问题