2016-01-08 78 views
1

我正试图用Scala实现映射条目侦听器。实现映射条目侦听器

理念:

  1. 我需要订阅来自服务的地图。
  2. 我需要在添加/更新特定密钥的条目时通知所有订户。
  3. 我需要从其他服务访问地图来检查输入值。

我找不到这个现成的解决方案,所以我试图用阿卡实现它:

class TrackingService(system: ActorSystem) extends LazyLogging { 
    private val trackingActor = system.actorOf(TrackingActor.props) 
    private val duration = Duration(15, TimeUnit.SECONDS) 
    private implicit val timeout = Timeout(duration) 

    def fireEvent(key: String): Unit = { 
    TrackingActor ! EventFired(key) 
    } 

    def eventHappened(key: String): Future[Boolean] = { 
    TrackingActor ? DoesEventHappened(key)).mapTo[Boolean] 
    } 

    def registerHiddenCardListener(key: String, data: MyData): Unit = { 
    TrackingActor ! Subscribe(key, data) 
    } 
} 

case class EventFired(key: String) 
case class EventHappened(key: String) 
case class EventHappenedResponse(happened: Boolean) 

case class Subscribe(key: String, data: Data) 
class TrackingActor extends Actor with LazyLogging { 
    var eventMap: Map[String, Boolean] = Map.empty[String, Boolean] 
    var dataMap: Map[String, List[Data]] = Map.empty[String, List[Data]] 

    def receive: Receive = { 
    case Subscribe(key, data)  => 
     val currentData: List[Data] = dataMap.getOrElse(key, Nil) 
     val newData = data :: currentData 
     dataMap = dataMap + (key -> newData) 
    case EventHappened(key)   => sender() ! EventHappenedResponse(eventMap.getOrElse(key, false)) 
    case [email protected](key)   => 
     eventMap = eventMap + (key -> true) 

     for { 
     dataOpt <- dataMap.get(key) 
     data <- dataOpt 
     } { 
     // do callback with data (e.g. send email) 
     } 
    case x => logger.warn(s"Received unknown message: $x") 
    } 
} 

object TrackingActor { 
    def props: Props = Props(classOf[TrackingActor]) 
} 

我并不在此解决方案喜欢什么:我不喜欢问模式,但我需要访问非演员类的条目。另外,我不喜欢有2张地图,但我需要存储某个地方的数据,这些数据应该用于回调。

关于如何改进这一点的任何想法?

回答

1

这里有一个想法:

case class Subscribe[A, B](f: (A, B, NotifyingMap[A,B]) => Any) 

case class Event[A, B](key: A, value: B, originator: NotifyingMap[A,B]) 

case class RegisterObserver(actorRef: ActorRef) 

/** 
    * Subscribes to events 
    */ 
class Subscriber[A,B]{ 

    def register(actorSystem: ActorSystem) = { 
    val actor = actorSystem.actorOf(Props(classOf[Observer[A,B]])) 
    actor ! Subscribe(handleEvent) 
    } 

    def handleEvent(key: A, value: B, notifyingMap: NotifyingMap[A, B]) = { 
    println(s"Saw key $key with value $value") 
    } 
} 

/** 
    * Observer of events that will call a partial function when 
    * an event comes in. 
    */ 
class Observer[A, B] extends Actor{ 
    var f: (A,B,NotifyingMap[A,B]) => Any = _ 

    def receive = { 
    case x: Subscribe[A, B] => 
     f = x.f 
     Notifier() ! RegisterObserver(self) 
    case e: Event[A,B] => 
     f(e.key, e.value, e.originator) 
    } 
} 

/** 
    * Notifier that sends out the event to all registered observers. 
    */ 
class Notifier extends Actor { 
    var observers = List[ActorRef]() 

    def receive = { 
    case x: RegisterObserver => 
     observers = x.actorRef :: observers 
    case x: Event[_,_] => 
     observers.foreach(_ ! Event) 
    } 
} 

/** 
    * Singleton notifier. 
    */ 
object Notifier{ 

    var notifier: ActorRef = _ 

    def create(actorSystem: ActorSystem) = 
    actorSystem.actorOf(Props(classOf[Notifier])) 

    def apply(): ActorRef = notifier 
} 

/** 
    * Class that sends out an event when an item is put. Also allows for 
    * getting an item based on a key. 
    */ 
class NotifyingMap[A, B](){ 
    val map: TrieMap[A,B] = TrieMap[A,B]() 

    // Specific business logic here on when you publish the event. 
    def put(key: A, value: B) = { 
    map.put(key, value).foreach(v => Notifier() ! Event(key, v, this)) 
    } 

    def get(key: A) = map.get(key) 
} 

通过这样做,你可以让你的用户非Actor类,同时仍允许其对事件作出反应。您也可以在您的NotifyingMap上调用简单的旧方法,因为它只是一个类,而不是Actor

我个人喜欢在消息中存储回调信息。通常情况下,您可以通过在案例分类中设置另一个ActorRef来看到这一点。在这个例子中,我们在案例类中有NotifyingMap,所以我们知道事件来自哪里,并且可以在那里适当地调用get方法。

完全披露:我没有运行任何此代码。它编译。