2014-01-30 52 views
6

说我正在发送消息给一个Actor,当它正在处理一条消息时可能会出现更多的消息。现在,当它准备处理下一条消息时,我希望它只处理最新的消息,因为以前的消息已经过时。我怎样才能最好地实现这一点?Akka演员如何处理最新消息

使用Scala的演员库,我能够通过第一检查从我的发件人来实现这一如下:

if (myActor.getState != Runnable) 
    myActor ! message  

但我不认为我可以在阿卡系统做这样的测试

+1

什么消息处理保证你试图确保? –

+0

我相信阿卡也有优先收件箱。这可能会做你想要的,但这会导致问题:如何处理旧的消息?因此,您可以按照建议使用自定义收件箱,或让您的演员存储最后处理的消息(必须存储在消息中)的时间戳,然后删除之前的所有消息。 –

回答

5

你可以实现自己的邮箱,这种方式不会影响你的actor的实现。请参阅this answer以获取解决方案,以更改执行者实现而不是定制邮箱实施。

实现邮箱的上enqueue下降旧邮件:

package akka.actor.test 

import akka.actor.{ ActorRef, ActorSystem } 
import com.typesafe.config.Config 
import akka.dispatch.{Envelope, MessageQueue} 

class SingleMessageMailbox extends akka.dispatch.MailboxType { 

    // This constructor signature must exist, it will be called by Akka 
    def this(settings: ActorSystem.Settings, config: Config) = this() 

    // The create method is called to create the MessageQueue 
    final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = 
    new MessageQueue { 
     val message = new java.util.concurrent.atomic.AtomicReference[Envelope] 

     final def cleanUp(owner: ActorRef, deadLetters: MessageQueue): Unit = 
     Option(message.get) foreach {deadLetters.enqueue(owner, _)} 

     def enqueue(receiver: ActorRef, handle: Envelope): Unit = 
     for {e <- Option(message.getAndSet(handle))} 
      receiver.asInstanceOf[InternalActorRef]. 
      provider.deadLetters. 
      tell(DeadLetter(e.message, e.sender, receiver), e.sender) 

     def dequeue(): Envelope = message.getAndSet(null) 

     def numberOfMessages: Int = Option(message.get).size 

     def hasMessages: Boolean = message.get != null 
    } 
} 

请注意,我有这个类添加到包akka.actor送旧邮件使用InternalActorRefimplementedBoundedQueueBasedMessageQueue一纸空文。

如果你想直接跳过你可以实现enqueue像这样的旧邮件:

def enqueue(receiver: ActorRef, handle: Envelope): Unit = message.set(handle) 

用法:

object Test extends App { 
    import akka.actor._ 
    import com.typesafe.config.ConfigFactory 

    // you should use your config file instead of ConfigFactory.parseString 
    val actorSystem: ActorSystem = 
    ActorSystem("default", ConfigFactory.parseString(
""" 
    akka.daemonic=on 
    myMailbox.mailbox-type = "akka.actor.test.SingleMessageMailbox" 
""")) 

    class EchoActor extends Actor { 
    def receive = { 
     case m => println(m); Thread.sleep(500) 
    } 
    } 

    val actor = actorSystem.actorOf(Props[EchoActor].withMailbox("myMailbox")) 

    for {i <- 1 to 10} { 
    actor ! i 
    Thread.sleep(100) 
    } 

    Thread.sleep(1000) 

} 

测试:

$ sbt run 
1 
[INFO] <dead letters log> 
[INFO] <dead letters log> 
[INFO] <dead letters log> 
5 
[INFO] <dead letters log> 
[INFO] <dead letters log> 
[INFO] <dead letters log> 
[INFO] <dead letters log> 
10 

akka/Mailboxes见。

10

没有必要实现自己的邮箱。完全一样。

去掉大量的文字,让这段代码为自己说话:

// Either implement "equals" so that every job is unique (by default) or do another comparison in the match. 
class Work 
case class DoWork(work: Work) 

class WorkerActor extends Actor { 
    // Left as an exercise for the reader, it clearly should do some work. 
    def perform(work: Work): Unit =() 

    def lookingForWork: Receive = { 
    case w: Work => 
     self forward DoWork(w) 
     context become prepareToDoWork(w) 
    } 

    def prepareToDoWork(work: Work): Receive = { 
    case DoWork(`work`) => 
     // No new work, so perform this one 
     perform(work) 
     // Now we're ready to look for new work 
     context become lookingForWork 
    case DoWork(_) => 
     // Discard work that we don't need to do anymore 
    case w2: Work => 
     // Prepare to do this newer work instead 
     context become prepareToDoWork(w2) 
    } 

    //We start out as looking for work 
    def receive = lookingForWork 
} 

这意味着,如果在邮箱中没有新的工作,工作仅进行。

+0

这是一个好主意,但是在你的实现中存在一个错误:你不应该继续工作的平等:让我们假设我们在邮箱中有2个工作:a从s1和a从s2 = (a)来自s1','DoWork(a)来自s1' =>'DoWork(a)来自s1','DoWork(a)来自s2'。所以我们将从s1处理'a而不是从s2'处理a。所以你要么放弃关于发件人的信息,要么修复你的实现。如果{s来自s1,b来自s1,a来自s2},这可能很重要。 – senia

+0

这是[没有这个bug的实现](http://pastebin.com/4Awz9VVb)。 – senia

+0

的确,我假定实施等同的人考虑到工作项目是唯一的。这是除了一般意义上的点。我会添加一个免责声明。 –