2017-09-04 61 views
2

我想读取ShardRegions邮箱中的消息。在以前的版本阿卡,我们可以使用下面的代码只得到邮箱的大小:访问演员邮箱的消息

getContext().getMailboxSize(); 

有什么办法来获取信息的邮箱类型?

回答

3

下面是解

这样定义为碎片区域的包装邮箱:

class UnboundedMailboxWrapper extends MailboxType with ProducesMessageQueue[UnboundedMailboxWrapper.MessageQueue] { 
    def this(settings: ActorSystem.Settings, config: Config) = this() 

    final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue = 
    new UnboundedMailboxWrapper.MessageQueue 
} 

object UnboundedMailboxWrapper { 
    class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue { 
    val runtime = RuntimeManager.runtime 
    final def queue: Queue[Envelope] = this 
    override def enqueue(receiver: ActorRef, handle: Envelope): Unit = { 
     runtime.queue.add(handle) 
     queue add handle 
    } 
    override def dequeue(): Envelope = { 
     if (!runtime.queue.isEmpty) runtime.mailbox.queue.poll() 
     queue.poll() 
    } 
    } 
} 
此邮箱我们重覆的添加的元素

,到另一个队列,因此,当计算的大小和其它操作就可以了,没有对邮箱队列的性能没有任何影响

在重复的队列中,我们可以计算出消息的数量和对它们进行排序:

def getElemets(): String = { 
    runtime.queue.asScala.toList.groupBy(_.message.getClass.getName) 
    .map(e ⇒ (e._1, e._2.length)).toSeq 
    .sortBy(_._2).foldLeft("") { (a, b) ⇒ 
    b._1 + ":" + b._2 + "\n" + a 
    } 
} 

与JMX,或者我们可以调用此方法在运行时

和finlally此邮箱分配给ShardRegion调度其他任何方式:

monit-dispatcher { 
    mailbox-type = "im.actor.server.cluster.UnboundedMailboxWrapper" 
} 

akka.cluster.sharding { 
    use-dispatcher = "monit-dispatcher" 
} 
+0

警告:你现在已经完全换成了一个默认的实现AbstractNodeQueue java.util.concurrent.ConcurrentLinkedQueue中。请注意,默认的一个针对actor邮箱用例进行了优化,将其替换可能会影响性能。 – johanandren

+0

这是akka.dispatch.UnboundedMailbox的实现,我刚刚复制它。 UnboundedMailbox的akka​​实现使用ConcurrentLinkedQueue。 –

+0

是的,你是对的,不知道为什么我在考虑抽象节点队列(它用在调度器中)。 “对性能没有任何影响”可能是​​不正确的,但是,您正在复制正在完成的工作和额外队列的存储开销,并且额外的队列必须在其他位置轮询,或者元素保存在内存中即使他们被传递给演员。 – johanandren