2015-08-28 33 views
2

我有这种情况:演员是否可以在一定条件下阅读信息?

  • ActorA发送每30-40秒
  • ActorA发送ActorB字符串打印(总是)
  • ActorB必须打印,他收到的琴弦ActorB开始/停止的消息,但只如果ActorA派只是一个开始消息

现在我不知道如果我可以做以下的事情:

  • ActorB只能在特定条件下(如果布尔值设置为true)读取消息而不丢失在布尔值设置为false时收到的消息?
  • ActorB可以在其他字符串消息之前从ActorA读取开始/停止消息吗?我想有这种情况:ActorA发送一条开始消息给ActorB,ActorB开始打印他在开始消息之前收到的字符串(并且仍在接收),然后一旦收到停止消息就立即停止?

我不知道我是否解释得好。

编辑:谢谢你,答案很好,但我仍然有一些疑虑。

  • 是否成为消息的顺序?我的意思是,如果我有“Start-M1-M2-Stop-M3-M4-M5-Start-M6-M7-Stop”,打印顺序是“M1-M2”,然后是“M3-M4-M5-M6 -M7“,或者可以在M3,M4和M5之前读取M6(如果M6在变成后立即被接收)?

  • 我可以给予更高的优先级来启动/停止消息吗?如果ActorB收到“M1-M2-M3”,然后在打印“M1”时收到停止消息,我希望ActorB再次保存M2和M3。

回答

2

您可以准确地解决您的问题,其中Stash特征和become/unbecome功能的Akka。这个想法如下:

当您收到一个Stop消息,然后你切换到一种行为,你藏匿所有不是Start的邮件。当您收到Start消息,然后切换到您打印所有收到的消息,另外你unstash其在此期间已经到达所有消息的行为。

case object Start 
case object Stop 
case object TriggerStateChange 
case object SendMessage 

class ActorB extends Actor with Stash { 
    override def receive: Receive = { 
    case Start => 
     context.become(printingBehavior, false) 
     unstashAll() 
    case x => stash() 
    } 

    def printingBehavior: Receive = { 
    case msg: String => println(msg) 
    case Stop => context.unbecome() 
    } 
} 

class ActorA(val actorB: ActorRef) extends Actor { 

    var counter = 0 
    var started = false 

    override def preStart: Unit = { 
    import context.dispatcher 

    this.context.system.scheduler.schedule(0 seconds, 5 seconds, self, TriggerStateChange) 
    this.context.system.scheduler.schedule(0 seconds, 1 seconds, self, SendMessage) 
    } 

    override def receive: Actor.Receive = { 
    case SendMessage => 
     actorB ! "Message: " + counter 
     counter += 1 
    case TriggerStateChange => 
     actorB ! (if (started) { 
     started = false 
     Stop 
     } else { 
     started = true 
     Start 
     }) 
    } 
} 

object Akka { 
    def main(args: Array[String]) = { 
    val system = ActorSystem.create("TestActorSystem") 

    val actorB = system.actorOf(Props(classOf[ActorB]), "ActorB") 
    val actorA = system.actorOf(Props(classOf[ActorA], actorB), "ActorA") 

    system.awaitTermination() 
    } 
} 
+0

'unstashAll()'是否保留了消息的顺序?我的意思是,如果我有 “开始-M1-M2-停止-M3-M4-M5-开始-M6-M7停止”,将打印顺序是 “M1-M2-M3-M4-M5-M6-M7”或者可以在M3,M4和M5之前读取M6(如果在'unstashAll()'之后接收到M6)? –

+0

@AsAs,内部的'unstashAll()'方法调用邮箱上的enqueueFirst。因此,我假设你将首先按照它们被隐藏的顺序接收你的未压缩的消息,然后接收在'unstashAll()'调用期间或之后收到的任何消息。 –

+0

谢谢。另一件事。我可以给予更高的优先级来启动/停止消息吗?如果ActorB收到“Start-M1-M2-M3”,然后在打印“M1”时收到停止消息,我希望ActorB再次保存M2和M3。可能吗? –

2

检查Become/Unbecome的功能。它可以让你改变演员的行为。

如果我理解正确,您希望您的ActorB具有两种不同的状态。在第一个状态中,它应该缓存它收到的消息。在第二个状态中,它应该打印所有缓存的消息并开始打印所有新消息。

像这样:

case class Start 
case class Stop 
case class Message(val: String) 

class ActorB extends Actor { 

    var cache = Queue() 

    def initial: Receive = { 
    case Message(msg) => cache += msg 
    case Start => 
     for (m <- cache) self ! Message(m) 
     context.become(printing) 
    } 

    def printing: Receive = { 
    case Message(msg) => println(msg) 
    case Stop => context.become(initial) //or stop the actor 
    } 

    def receive = initial 
} 
+0

1)不应该'变成'线后的'换行'? 2)是否'become'十个分量消息的订单?我的意思是,如果我有 “开始-M1-M2-停止-M3-M4-M5-开始-M6-M7停止”,将打印顺序是 “M1-M2-M3-M4-M5-M6-M7”或者可以在M3,M4和M5之前读取M6(如果它是在'成为'之后收到的)? –

0

具有两个状态(两个不同的行为)之间演员乙交替。在最初的“挂起”状态中,它等待“开始”消息,而其他消息则为stashing

在收到'开始'消息后,取消所有存储的消息并且become'激活',等待'停止'消息并写出接收到的其他消息(其将包括未消除的消息)。在收到“停止”信息时,请回复“待处理”行为。

0

我的一些想法

  1. 是如果布尔标志从像数据库或配置文件中的一些系统资源了,但我不认为它应该是依赖于任何外部消息,因为演员接收来自多个其他演员的消息。如果ActorB仅用于ActorA,两者可以合并到一个

  2. 为1类似,如何处理来自多个参与者的消息?如果只有一个演员A,则两个演员可以合并。如果有多个,可以在数据库中设置标志,actorA将数据库中的标志更改为“开始”或“停止”。演员B将根据该标志打印或不打印。

一个演员应该对其他演员的状态非常独立。启动和停止实际上是ActorA而不是ActorB的某种状态

0

您已经有很多很好的答案,但不知何故,我觉得有必要提供一些更简单,因为你需要的是不一定的状态机:

class ActorB extends Actor { 
    def receive: Receive = caching(Nil) 

    def caching(cached: List[String]): Receive = { 
    case msg: String => 
     context.become(caching(msg :: cached)) 
    case Start => 
     cached.reverse.foreach(println) 
     context.become(caching(Nil)) 
    } 
}