2012-06-21 90 views
12

我在Akka docs中读到,关闭附近演员的变量是危险的。Akka演员,期货和关闭

警告

在这种情况下,你需要仔细避免关闭在 包含演员的参考,即不呼吁 封闭男主角方法从匿名演员类中。这将使 中断演员封装,并可能引入同步错误 和竞争条件,因为其他演员的代码将被同时安排到 到封闭演员。

现在,我有两个演员,其中一个从第二个角色请求某个东西,并对结果做了一些处理。在下面这个例子中,我已经放在一起,演员累加器从演员号码发现者检索号码,并将它们相加,沿途报告总和。

这可以以至少两种不同的方式,因为这例如具有示出了两个不同的接收函数( VS )来完成。两者的区别在于A未关闭计数器变量;相反,它等待一个整数并总结它,而B创建一个未来关闭计数器并做了总和。如果我正确理解这是如何工作的,这发生在为处理onSuccess而创建的匿名actor中。

import com.esotericsoftware.minlog.Log 

import akka.actor.{Actor, Props} 
import akka.pattern.{ask, pipe} 
import akka.util.Timeout 
import akka.util.duration._ 

case object Start 
case object Request 


object ActorTest { 
    var wake = 0 

    val accRef = Main.actorSystem.actorOf(Props[Accumulator], name = "accumulator") 
    val genRef = Main.actorSystem.actorOf(Props[NumberGenerator], name = "generator") 

    Log.info("ActorTest", "Starting !") 

    accRef ! Start 
} 

class Accumulator extends Actor { 
    var counter = 0 

    implicit val timeout = Timeout(5 seconds) 

    // A: WITHOUT CLOSURE 
    def receive = { 
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] pipeTo self 
    case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start 
    } 
    // B: WITH CLOSURE 
    def receive = { 
    case Start => ask(ActorTest.genRef, Request).mapTo[Int] onSuccess { 
     case x: Int => counter += x; Log.info("Accumulator", "counter = " + counter); self ! Start 
    } 
    } 
} 

class NumberGenerator extends Actor { 
    val rand = new java.util.Random() 

    def receive = { 
    case Request => sender ! rand.nextInt(11)-5 
    } 
} 

在这种情况下使用闭包是绝对邪恶的吗?当然,我可以使用AtomicInteger而不是Int,或者在某些使用netty的网络场景中,在threadsafe频道上发出写入操作,但这不是我的观点。

要问可笑的风险:有没有对未来的一个的onSuccess方式执行的,而不是一个匿名的中层演员这个演员,确定在的情况下没有接收功能?

编辑

说得比较清楚,我的问题是:有没有办法迫使一系列期货在同一个线程中给定的演员来运行?

回答

5

的问题是,onSuccess是要在不同的线程比线程演员的receive是要在运行运行。你可以使用pipeTo方法,或者使用Agent。制作counterAtomicInteger可以解决问题,但它并不那么干净 - 也就是说,它打破了Actor模型。

+1

建议使用代理的+1 – gsimard

5

实现这样的设计,最简单的方法是使用“发射后不管”的语义:

class Accumulator extends Actor { 
    private[this] var counter = 0 

    def receive = { 
    case Start => ActorTest.genRef ! Request 
    case x: Int => { 
     counter += x 
     Log.info("Accumulator", "counter = " + counter) 
     self ! Start 
    } 
    } 
} 

该解决方案是完全异步的,你不需要任何超时。

+0

是的,如果我放弃使用Futures,这是有效的。在我的例子中,链接期货很容易成为可能,如果它们以_pipeTo self_结尾,但不可能再用“即燃即用”语义。相反,我必须在累加器的接收函数中定义N个中间消息,以保证代码在该Actor的线程中运行。我想我可以再问一次,这次更清楚一点:是否有办法迫使一系列期货在与给定的演员相同的线程中运行? – gsimard

+0

为什么你需要保证'Accumulator' Actor总是在同一个线程中运行?这似乎违背了演员模型哲学。期货也是如此:应该派出一个线程池,以最大化表现。如果他们都在同一个线程中运行,你只需要一个简单的顺序程序,并且你不再需要期货... – paradigmatic

+0

实际上,它是否在同一个线程中运行并不重要,要求是更多的是它应该按顺序运行,就像处理单个actor的消息一样。这不违背演员模型,它是演员模型。 – gsimard