2013-08-06 216 views
1

假设我有一个“生产者 - 消费者”问题:生产者向消费者发送消息,消费者使用Scala Futures异步处理它们:例如future { /* do the processing */ }Scala生产者 - 消费者

现在假设生产者每秒产生100条消息。但消费者每秒只能处理10条消息。会发生什么 ?我猜会有内存泄漏。将会有很多Future对象,并且线程池的内部消息队列也会增长。是否有意义 ?

处理它的最好方法是什么?

+0

注 - 我很困惑 - 这个问题是关于期货而不是演员。 – JasonG

回答

1

您可以设置最大队列大小。实际上,我认为认为默认情况下Akka演员队列有限,尽管我在这里可能是错的。

这并不能真正解决问题,但最终,如果您没有足够的后端参与者来执行处理,您将无法处理所有内容。

我喜欢Netflix所做的:所有请求都通过代理来监视后端的健康状况。如果后端花费的时间过长,则会丢弃请求并提供回退:或者是合理的默认值或错误消息。他们谈论了很多关于他们的架构,例如this presentation

+0

谢谢。假设现在我在生产者和消费者之间有一个有限的队列。它解决了问题吗?你不会改变消费者逻辑吗? – Michael

0

有多个消费者 - 使用演员池。根据泳池的压力,您可以动态调整尺寸。见http://doc.akka.io/docs/akka/snapshot/scala/routing.html

+0

谢谢。假设我不能添加更多的演员/线程。例如,处理是CPU限制的,我只有一个CPU。 – Michael

+0

然后你会耗尽你的资源。如果你生产更多,那么你可以消耗,你将不得不放弃消息在某个时刻(使用与丹尼尔建议的大小限制的有界队列)。或者使用更多的资源,也许是一种集群方法(参见http://doc.akka.io/docs/akka/snapshot/common/cluster.html) –

+0

假设我只有一台主机,只有一个CPU。假设生产者和消费者平均工作得很好。问题是处理_bursts_。如果爆发比计划更大,则允许消费者丢弃一些消息。我知道如何用线程实现它并阻止有界的消息队列。现在我想知道如何使用'scala.concurrent.future'来实现。 – Michael

2

在阿卡,执行上下文中使用,但似乎没有邮箱 - 这将是值得阅读的源代码,但我可以通过实验来回答你的问题:

未来的不具有“邮箱“和我不是100%肯定正是阿卡执行上下文实际上包含发动机罩或下什么做的,但我们可以看到,阿卡会出现内存不足的直接使用期货时:

scala> import scala.concurrent.Future 
import scala.concurrent.Future 

scala> import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.ExecutionContext.Implicits.global    ^

scala> while(1==1) Future(Thread.sleep(100)) 
java.lang.OutOfMemoryError: Java heap space 

如果我们”重新讨论消息,然后有一个邮箱描述了参与者消息队列的行为ld填充,因为一次只处理一条消息) - 我将在下面解释这一点。

假设一个有界的邮箱(例如一个大小限制的邮箱)会发生什么消息。 答案取决于邮箱。 首先,有界邮箱有一些设置,如大小限制:

bounded-mailbox { 
    mailbox-type = "akka.dispatch.BoundedMailbox" 
    mailbox-capacity = 1000 
    mailbox-push-timeout-time = 10s 
} 

现在,当这个限制被击中,阿卡要么删除旧的或者根据邮箱的配置方式新的信息 - 例如用此设置

# whether to drop older items (instead of newer) when the queue is full 
discard-old-when-full = on 

显然,如果有喜欢的内存不足然后您的应用程序可能会崩溃意味着该消息将丢失,因为它们存储在内存中的其他资源问题。无界邮箱将继续堆叠邮件,直到出现错误情况,这就是为什么您可能想使用有界邮箱。

如果错误情况下的消息丢失是不可取的,还有另一种选择=可以使用持久邮箱,它将消息存储在更持久的地方,例如文件中。以下是一个示例邮箱配置,它使用文件提供更持久的邮件存储。

akka { 
    actor { 
    mailbox { 
     file-based { 
     # directory below which this queue resides 
     directory-path = "./_mb" 

     # attempting to add an item after the queue reaches this size (in items) 
     # will fail. 
     max-items = 2147483647 

     # attempting to add an item after the queue reaches this size (in bytes) 
     # will fail. 
     max-size = 2147483647 bytes 

     # attempting to add an item larger than this size (in bytes) will fail. 
     max-item-size = 2147483647 bytes 

     # maximum expiration time for this queue (seconds). 
     max-age = 0s 

     # maximum journal size before the journal should be rotated. 
     max-journal-size = 16 MiB 

     # maximum size of a queue before it drops into read-behind mode. 
     max-memory-size = 128 MiB 

     # maximum overflow (multiplier) of a journal file before we re-create it. 
     max-journal-overflow = 10 

     # absolute maximum size of a journal file until we rebuild it, 
     # no matter what. 
     max-journal-size-absolute = 9223372036854775807 bytes 

     # whether to drop older items (instead of newer) when the queue is full 
     discard-old-when-full = on 

     # whether to keep a journal file at all 
     keep-journal = on 

     # whether to sync the journal after each transaction 
     sync-journal = off 

     # circuit breaker configuration 
     circuit-breaker { 
      # maximum number of failures before opening breaker 
      max-failures = 3 

      # duration of time beyond which a call is assumed to be timed out and 
      # considered a failure 
      call-timeout = 3 seconds 

      # duration of time to wait until attempting to reset the breaker during 
      # which all calls fail-fast 
      reset-timeout = 30 seconds 
     } 
     } 
    } 
    } 
} 
+0

感谢您的详细解释,但我的问题是关于“期货”。我认为,当我编写'future {...}'时,我实际上创建了一个对象并将对象引用放入线程池邮箱。是否有意义 ?如果是这样,我会询问邮箱是否已绑定,以及如果邮箱已满,“future {...}”会发生什么情况。 – Michael

+0

啊是的 - 我很抱歉 - 使用邮箱这个词让我觉得这是一个阿卡问题。 – JasonG

+0

我很困惑,因为所有其他的答案都在谈论演员/调度员,但是你正在专门讨论Future。 – JasonG