2014-07-11 47 views
7

问:正确的模式

什么是在阿卡演员积累状态正确的模式?

语境:

比方说,我有几个服务,所有返回的数据。

class ServiceA extends Actor { 
    def receive = { 
    case _ => sender ! AResponse(100) 
    } 
} 

class ServiceB extends Actor { 
    def receive = { 
    case _ => sender ! BResponse("n") 
    } 
} 

// ... 

我想有一个坐标谈论所有这些服务,并保持他们的反应轨道,然后发送的所有数据恢复到原始发件人的响应一个控制/监控的演员。

class Supervisor extends Actor { 
    def receive = { 
    case "begin" => begin 
    case AResponse(id) => ??? 
    case BResponse(letter) => ??? 
    } 

// end goal: 
def gotEverything(id: Int, letter: String) = 
    originalSender ! (id, letter) 

    def begin = { 
    ServiceA ! "a" 
    ServiceB ! "b" 
    } 
} 

随着服务响应的进入,我该如何将所有状态保持在一起?据我了解,如果我要将AResponse的值分配给例如var aResponse: Int,那么随着接收到不同的消息,该变量不断变化,并且当我等待BResponse消息时,我不可能依靠该var停留。

我意识到我可以用ask和nest/flatMap Future的,但是从我读过的那是一个不好的模式。没有未来的方法来实现这一切吗?

+0

为什么'aResponse'会改变多次?你只发送一条消息到'ServiceA'。你的目标究竟是什么?等到你收到'AResponse'和'BResponse'并用他们的值调用'gotEverything'? – vptheron

+0

>>你的目标是什么?等到你收到AResponse和BResponse,并打电话给他们所有的价值? YES –

+0

您可以将回复存储在带有某个标识符或参与者的列表中 – wedens

回答

15

因为角色不能同时从多个线程访问,所以你可以很容易地存储和改变你想要的状态。例如,您可以这样做:

class Supervisor extends Actor { 
    private var originalSender: Option[ActorRef] = None 
    private var id: Option[WhateverId] = None 
    private var letter: Option[WhateverLetter] = None 

    def everythingReceived = id.isDefined && letter.isDefined 

    def receive = { 
    case "begin" => 
     this.originalSender = Some(sender) 
     begin() 

    case AResponse(id) => 
     this.id = Some(id) 
     if (everythingReceived) gotEverything() 

    case BResponse(letter) => 
     this.letter = Some(letter) 
     if (everythingReceived) gotEverything() 
    } 

    // end goal: 
    def gotEverything(): Unit = { 
    originalSender.foreach(_ ! (id.get, letter.get)) 
    originalSender = None 
    id = None 
    letter = None 
    } 

    def begin(): Unit = { 
    ServiceA ! "a" 
    ServiceB ! "b" 
    } 
} 

但是,还有一种更好的方法。你可以在没有显式状态变量的情况下模拟某种状态机。这是使用become()机制完成的。

class Supervisor extends Actor { 
    def receive = empty 

    def empty: Receive = { 
    case "begin" => 
     AService ! "a" 
     BService ! "b" 
     context become noResponses(sender) 
    } 

    def noResponses(originalSender: ActorRef): Receive = { 
    case AResponse(id) => context become receivedId(originalSender, id) 
    case BResponse(letter) => context become receivedLetter(originalSender, letter) 
    } 

    def receivedId(originalSender: ActorRef, id: WhateverId): Receive = { 
    case AResponse(id) => context become receivedId(originalSender, id) 
    case BResponse(letter) => gotEverything(originalSender, id, letter) 
    } 

    def receivedLetter(originalSender: ActorRef, letter: WhateverLetter): Receive = { 
    case AResponse(id) => gotEverything(originalSender, id, letter) 
    case BResponse(letter) => context become receivedLetter(originalSender, letter) 
    } 

    // end goal: 
    def gotEverything(originalSender: ActorRef, id: Int, letter: String): Unit = { 
    originalSender ! (id, letter) 
    context become empty 
    } 
} 

这可能稍微冗长些,但它不包含显式变量;所有状态都隐含在Receive方法的参数中,并且当这个状态需要更新时,actor的接收函数刚刚切换以反映这个新状态。

请注意,上面的代码非常简单,当可以有很多“原始发件人”时,它将无法正常工作。在这种情况下,您必须为所有消息添加一个id,并使用它来确定哪些响应属于哪个“原始发件人”状态,或者您可以创建多个参与者,每个参与者都针对每个“原始发件人”。

1

我相信Akka的方式是使用actor-per-request模式。通过这种方式,您可以确定哪些响应与什么相对应,您每次获得请求时都会创建一个新的演员。这很便宜,事实上,每当你问()时都会发生。

这些请求处理器(我就是这么称呼他们的)通常具有简单的响应字段。看看请求是否已经到达只是简单的空比较的问题。

使用此方案重试/失败也变得更容易。超时也是如此。