2
我想读取ShardRegions邮箱中的消息。在以前的版本阿卡,我们可以使用下面的代码只得到邮箱的大小:访问演员邮箱的消息
getContext().getMailboxSize();
有什么办法来获取信息的邮箱类型?
我想读取ShardRegions邮箱中的消息。在以前的版本阿卡,我们可以使用下面的代码只得到邮箱的大小:访问演员邮箱的消息
getContext().getMailboxSize();
有什么办法来获取信息的邮箱类型?
下面是解
这样定义为碎片区域的包装邮箱:
class UnboundedMailboxWrapper extends MailboxType with ProducesMessageQueue[UnboundedMailboxWrapper.MessageQueue] {
def this(settings: ActorSystem.Settings, config: Config) = this()
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
new UnboundedMailboxWrapper.MessageQueue
}
object UnboundedMailboxWrapper {
class MessageQueue extends ConcurrentLinkedQueue[Envelope] with UnboundedQueueBasedMessageQueue {
val runtime = RuntimeManager.runtime
final def queue: Queue[Envelope] = this
override def enqueue(receiver: ActorRef, handle: Envelope): Unit = {
runtime.queue.add(handle)
queue add handle
}
override def dequeue(): Envelope = {
if (!runtime.queue.isEmpty) runtime.mailbox.queue.poll()
queue.poll()
}
}
}
此邮箱我们重覆的添加的元素
,到另一个队列,因此,当计算的大小和其它操作就可以了,没有对邮箱队列的性能没有任何影响
在重复的队列中,我们可以计算出消息的数量和对它们进行排序:
def getElemets(): String = {
runtime.queue.asScala.toList.groupBy(_.message.getClass.getName)
.map(e ⇒ (e._1, e._2.length)).toSeq
.sortBy(_._2).foldLeft("") { (a, b) ⇒
b._1 + ":" + b._2 + "\n" + a
}
}
与JMX,或者我们可以调用此方法在运行时
和finlally此邮箱分配给ShardRegion调度其他任何方式:
monit-dispatcher {
mailbox-type = "im.actor.server.cluster.UnboundedMailboxWrapper"
}
akka.cluster.sharding {
use-dispatcher = "monit-dispatcher"
}
警告:你现在已经完全换成了一个默认的实现AbstractNodeQueue java.util.concurrent.ConcurrentLinkedQueue中。请注意,默认的一个针对actor邮箱用例进行了优化,将其替换可能会影响性能。 – johanandren
这是akka.dispatch.UnboundedMailbox的实现,我刚刚复制它。 UnboundedMailbox的akka实现使用ConcurrentLinkedQueue。 –
是的,你是对的,不知道为什么我在考虑抽象节点队列(它用在调度器中)。 “对性能没有任何影响”可能是不正确的,但是,您正在复制正在完成的工作和额外队列的存储开销,并且额外的队列必须在其他位置轮询,或者元素保存在内存中即使他们被传递给演员。 – johanandren