2013-10-20 44 views
5

我有一个使用Play Framework 2.2.0-scala构建的示例,它使用WebSockets将数据传输到客户端。我遇到的问题是,无论出于何种原因,父Actor的子项之一未正确关闭。所有日志都表明它正在停止并且已经关闭,但是我发现它并没有通过向其发布数据而实际关闭。下面是一些代码,先用我的控制器操作:Scala:未在Play Framework中死去的Akka actor 2.2.0

def scores(teamIds: String) = WebSocket.async[JsValue] { request => 
    val teamIdsArr:Array[String] = teamIds.split(",").distinct.map { el => 
     s"nfl-streaming-scores-${el}" 
    } 

    val scoresStream = Akka.system.actorOf(Props(new ScoresStream(teamIdsArr))) 
    ScoresStream.join(scoresStream) 
    } 

所以每一个客户端连接时,他们加入ScoresStream返回各自Iteratee,枚举WebSocket.async需要。实际ScoresStream对象如下所示:

object ScoresStream { 

    implicit val timeout = Timeout(5 seconds) 

    def join(scoresStream:ActorRef):scala.concurrent.Future[(Iteratee[JsValue,_],Enumerator[JsValue])] = { 

    (scoresStream ? BeginStreaming).map { 

     case Connected(enumerator) => 
     val iteratee = Iteratee.foreach[JsValue] { _ => 
      Logger.info("Ignore iteratee input.") 
     }.map { _ => 
      Logger.info("Client quitting - killing Actor.") 
      scoresStream ! UnsubscribeAll 
      scoresStream ! PoisonPill 
     } 
     (iteratee,enumerator) 
} 

这里的想法是要杀死的主要演员,ScoresStream,当客户端断开连接。我通过使用scoresStream ! PoisonPill来做到这一点。

反过来ScoresStream创建PubSub情况下它们是连接到Redis的用于发布/切割到消息的包装,这里的演员代码:

class ScoresStream(teamIds: Array[String]) extends Actor with CreatePubSub with akka.actor.ActorLogging { 

    val (scoresEnumerator, scoresChannel) = Concurrent.broadcast[JsValue] 

    case class Message(kind: String, user: String, message: String) 
    implicit val messageReads = Json.reads[Message] 
    implicit val messageWrites = Json.writes[Message] 

    val sub = context.child("sub") match { 
    case None => createSub(scoresChannel) 
    case Some(c) => c 
    } 

    val pub = context.child("pub") match { 
    case None  => createPub(teamIds) 
    case Some(c) => c 
    } 

    def receive = { 
    case BeginStreaming => { 
     log.info("hitting join...") 
     sub ! RegisterCallback 
     sub ! SubscribeChannel(teamIds) 
     sender ! Connected(scoresEnumerator) 
    } 

    case UnsubscribeAll => { 
     sub ! UnsubscribeChannel(teamIds) 
    } 
    } 

} 

trait CreatePubSub { self:Actor => 
    def createSub(pChannel: Concurrent.Channel[JsValue]) = context.actorOf(Props(new Sub(pChannel)), "sub") 
    def createPub(teamIds: Array[String]) = context.actorOf(Props(new Pub(teamIds)), "pub") 
} 

最后,这里的实际子演员代码:(Pub没有按“T似乎与此有关,因为它是关闭精):

class Sub(pChannel: Concurrent.Channel[JsValue]) extends Actor with CreatePublisherSubscriber with ActorLogging { 
    val s = context.child("subscriber") match { 
    case None => createSubscriber 
    case Some(c) => c 
    } 

    def callback(pubsub: PubSubMessage) = pubsub match { 
    case E(exception) => println("Fatal error caused consumer dead. Please init new consumer reconnecting to master or connect to backup") 
    case S(channel, no) => println("subscribed to " + channel + " and count = " + no) 
    case U(channel, no) => println("unsubscribed from " + channel + " and count = " + no) 
    case M(channel, msg) => 
     msg match { 
     // exit will unsubscribe from all channels and stop subscription service 
     case "exit" => 
      println("unsubscribe all ..") 
      pChannel.end 
      r.unsubscribe 

     // message "+x" will subscribe to channel x 
     case x if x startsWith "+" => 
      val s: Seq[Char] = x 
      s match { 
      case Seq('+', rest @ _*) => r.subscribe(rest.toString){ m => } 
      } 

     // message "-x" will unsubscribe from channel x 
     case x if x startsWith "-" => 
      val s: Seq[Char] = x 
      s match { 
      case Seq('-', rest @ _*) => r.unsubscribe(rest.toString) 
             pChannel.end 
      } 

     case x => 
     try { 
      log.info("Just got a message: " + x) 
      pChannel.push(Json.parse(x)) 
      } 
      catch { 
      case ex: com.fasterxml.jackson.core.JsonParseException => { 
       log.info("Malformed JSON sent.") 
      } 
      } 
     } 
    } 

    def receive = { 
    case RegisterCallback => { 
     log.info("Creating a subscriber and registering callback") 
     s ! Register(callback) 
    } 
    case SubscribeChannel(teamIds) => { 
     teamIds.foreach { x => log.info("subscribing to channel " + x + " ") } 
     //sub ! Subscribe(Array("scores-5","scores-6")) 
     s ! Subscribe(teamIds) 
    } 
    case UnsubscribeChannel(teamIds) => { 
     teamIds.foreach { x => log.info("unsubscribing from channel " + x + " ") } 
     s ! Unsubscribe(teamIds) 
    } 
    case true => println("Subscriber successfully received message.") 
    case false => println("Something went wrong.") 
    } 
} 

trait CreatePublisherSubscriber { self:Actor => 
    def r = new RedisClient("localhost", 6379) 
    def createSubscriber = context.actorOf(Props(new Subscriber(r)), "subscriber") 
    def createPublisher = context.actorOf(Props(new Publisher(r)), "publisher") 
} 

现在,当一个客户端连接,启动消息看起来很健康:

[DEBUG] [10/20/2013 00:35:53.618] [application-akka.actor.default-dispatcher-12] [akka://application/user] now supervising Actor[akka://application/user/$c#-54456921] 
[DEBUG] [10/20/2013 00:35:53.619] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.620] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/sub#1376180991] 
[DEBUG] [10/20/2013 00:35:53.621] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/pub/publisher] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.622] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] started ([email protected]) 
Subscriber successfully received message. 
Subscriber successfully received message. 
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-19] [akka://application/user/$c/sub] now supervising Actor[akka://application/user/$c/sub/subscriber#-1562348862] 
subscribed to nfl-streaming-scores-5 and count = 1 
[DEBUG] [10/20/2013 00:35:53.699] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] now supervising Actor[akka://application/user/$c/pub#-707418539] 
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-12] [akka://application/user/$c] hitting join... 
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] Creating a subscriber and registering callback 
[INFO] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-23] [akka://application/user/$c/sub] subscribing to channel nfl-streaming-scores-5 
[DEBUG] [10/20/2013 00:35:53.700] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] started ([email protected]) 
[DEBUG] [10/20/2013 00:35:53.703] [application-akka.actor.default-dispatcher-18] [akka://application/user/$c/pub] now supervising Actor[akka://application/user/$c/pub/publisher#1509054514] 

和断开看起来健康:

[info] application - Client quitting - killing Actor. 
unsubscribed from nfl-streaming-scores-5 and count = 0 
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] received AutoReceiveMessage Envelope(PoisonPill,Actor[akka://application/deadLetters]) 
[INFO] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] unsubscribing from channel nfl-streaming-scores-5 
[DEBUG] [10/20/2013 00:37:51.696] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopping 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/sub] stopping 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-25] [akka://application/user/$c/pub/publisher] stopped 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub/subscriber] stopped 
[DEBUG] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] stopped 
[INFO] [10/20/2013 00:37:51.697] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c/sub] Message [java.lang.Boolean] from Actor[akka://application/user/$c/sub/subscriber#-1562348862] to Actor[akka://application/user/$c/sub#1376180991] was not delivered. [2] dead letters encountered. This logging can be turned off or adjusted with configuration settings 'akka.log-dead-letters' and 'akka.log-dead-letters-during-shutdown'. 
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopping 
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-26] [akka://application/user/$c/pub] stopped 
[DEBUG] [10/20/2013 00:37:51.699] [application-akka.actor.default-dispatcher-17] [akka://application/user/$c] stopped 

而这里的问题,客户端已经断开后,我要发送当前关机演员被订阅的消息:

redis-cli publish "nfl-streaming-scores-5" "{\"test\":\"message\"}" 

这里它显示了,当它不应该,这个演员应该在技术上是死的。其他参与者也收到邮件,也就是标有$ a/$ b的邮件。我可以确认没有其他客户连接。

[INFO] [10/20/2013 00:38:33.097] [Thread-7] [akka://application/user/$c/sub] Just got a message: {"test":"message"} 

什么也是一个奇怪的指标是,地址名称永远不会被重复使用。我不断看到这样的名字的下面趋势产卵,当我断开/连接:

akka://application/user/$c 
akka://application/user/$d 
akka://application/user/$e 

再也看不到老引用得到重新使用。

这里我的假设是与Redis的连接并未完全关闭。这并不能解释为什么日志表示Actor已经停止但仍然存在,但是在运行netstat之后,即使在所有Actor都可能已经死亡后,我仍然可以看到建立到redis的连接。当我完全停止运行应用程序时,这些连接就会清除。就好像退订程序默默无闻,并且保持了活动者以及连接的连接,由于多种原因,这是非常糟糕的,因为最终系统将耗尽文件描述符和/或存在内存泄漏。这里有什么明显的我做错了吗?

+1

'def r = new RedisClient',我认为lazy val会更好,因此您只需创建一个RedisClient实例,而不是每次创建新实例时调用'r.doSomeThing'。 – Schleichardt

+0

我确实尝试过。我甚至将它从特质中移出并直接放入演员。尽管如此,最终的结果是一样的。使用'redis-cli',我看到UNSUBSCRIBE也进来了。但是,连接仍然建立。我在这里设置/拆除演员的方式我觉得很麻烦。 – randombits

+0

自动生成的演员姓名将永远不会被重复使用,这是让他们独一无二的最安全的方式。我怀疑是,订阅者actor(你没有显示)将回调传递给redis客户端库,在邮件到达时执行 - 它独立于actor。 –

回答

3

仅仅因为你停止演员并不意味着该演员拥有的任何资源都会自动清理。如果有与该演员实例绑定的RedisClient,并且此连接需要停止才能正确清理,那么您应该在postStop方法中执行类似的操作。我也同意@Schleichardt在你应该改变你的def r = new RedisClient为一个val或一个懒惰的val(取决于初始化顺序和需求)。这样你就知道每个用户实例只有一个RedisClient清理。我不知道您使用的RedisClient的API,但我们假设它有一个shutdown方法,该方法将终止其连接并清理其资源。然后你可以简单的添加postStop给用户的演员,像这样:

override def postStop { 
    r.shutdown 
} 

假设你做高清到VAL的变化,这可能是你在找什么。