2014-12-02 23 views
0

我有一个期货池,每个未来的作品都与同一个AKKA演员系统 - 系统中的一些演员应该是全球性的,有些仅在未来使用。akka演员选择没有竞赛条件

val longFutures = for (i <- 0 until 2) yield Future { 
    val p:Page = PhantomExecutor(isDebug=true) 
    Await.result(p.open("http://www.stackoverflow.com/") ,timeout = 10.seconds) 
    } 

PhantomExecutor tryes使用使用system.actorSelection

def selectActor[T <: Actor : ClassTag](system:ActorSystem,name:String) = { 
    val timeout = Timeout(0.1 seconds) 
    val myFutureStuff = system.actorSelection("akka://"+system.name+"/user/"+name) 
    val aid:ActorIdentity = Await.result(myFutureStuff.ask(Identify(1))(timeout).mapTo[ActorIdentity], 
     0.1 seconds) 

    aid.ref match { 
     case Some(cacher) => 
     cacher 
     case None => 
     system.actorOf(Props[T],name) 
    } 
    } 

一个全球共享的演员(简单的增量计数器),但在并发环境中这种方法并没有因为比赛条件下工作。

我知道这个问题只有一个解决方案 - 在分解到期货之前创建全球主角。但这意味着我无法封装顶级库用户的大量隐藏工作。

回答

1

你是对的,确保全球演员首先被初始化是正确的方法。你不能将它们绑定到伴侣对象并从那里引用它们,所以你知道它们只会被初始化一次吗?如果你真的不能这样做,那么你可以尝试这样的东西来查找或创建演员。它类似于你的代码,但它包含的逻辑通过查找回去/创建,如果比赛条件被击中(最多只能到最大次数)逻辑(递归):

def findOrCreateActor[T <: Actor : ClassTag](system:ActorSystem, name:String, maxAttempts:Int = 5):ActorRef = { 
    import system.dispatcher 
    val timeout = 0.1 seconds 

    def doFindOrCreate(depth:Int = 0):ActorRef = { 
     if (depth >= maxAttempts) 
     throw new RuntimeException(s"Can not create actor with name $name and reached max attempts of $maxAttempts") 

     val selection = system.actorSelection(s"/user/$name") 
     val fut = selection.resolveOne(timeout).map(Some(_)).recover{ 
     case ex:ActorNotFound => None 
     } 
     val refOpt = Await.result(fut, timeout) 

     refOpt match { 
     case Some(ref) => ref 
     case None => util.Try(system.actorOf(Props[T],name)).getOrElse(doFindOrCreate(depth + 1)) 
     } 
    } 

    doFindOrCreate() 
    } 

现在重试逻辑会在创建actor时触发任何异常,因此您可能需要进一步指定(可能通过另一个recover组合器),只在获得InvalidActorNameException时递归,但您明白了。

0

您可能需要考虑创建一个负责创建“反”演员的经理演员。这样你就可以确保counter actor创建请求被序列化。

object CounterManagerActor { 
    case class SelectActorRequest(name : String) 
    case class SelectActorResponse(name : String, actorRef : ActorRef) 
} 

class CounterManagerActor extends Actor { 
    def receive = { 
    case SelectActorRequest(name) => { 
     sender() ! SelectActorResponse(name, selectActor(name)) 
    } 
    } 

    private def selectActor(name : String) = { 
    // a slightly modified version of the original selectActor() method 
    ??? 
    } 
} 
+0

但我应避免两次运行此CounterManager演员,或者运行这个演员的未来之外{...}调用 - 实际上这是我的问题,如何避免这种结构没有小把戏,比如忽略了双演员或连续异常选择。 – Oleg 2014-12-02 18:29:51