2017-07-01 33 views
0

现在Source.actorPublisher已被弃用,我想找到一个适当的替代品。通过Source.fromGraph替换不赞成使用的Source.actorPublisher - 如何限制?

警告:我仍然是阿卡新生,试图找到我的方式!

基本上我所拥有的是一个websocket,服务器每秒都会推送一条新消息。

相关代码:

// OLD, deprecated way 
//val source: Source[TextMessage.Strict, ActorRef] = Source.actorPublisher[String](Props[KeepAliveActor]).map(i => TextMessage(i)) 

// NEW way 
val sourceGraph: Graph[SourceShape[TextMessage.Strict], NotUsed] = new KeepAliveSource 
val source: Source[TextMessage.Strict, NotUsed] = Source.fromGraph(sourceGraph) 

val requestHandler: HttpRequest => HttpResponse = 
{ 
    case req @ HttpRequest(GET, Uri.Path("/ws"), _, _, _) => 
    req.header[UpgradeToWebSocket] match 
    { 
     case Some(upgrade) => upgrade.handleMessagesWithSinkSource(Sink.ignore, source) 
     case None => HttpResponse(400, entity = "Not a valid websocket request") 
    } 
    case r: HttpRequest => 
    r.discardEntityBytes() // important to drain incoming HTTP Entity stream 
    HttpResponse(404, entity = "Unknown resource!") 
} 

的老演员:(基本上摘自:Actorpublisher as source in handleMessagesWithSinkSource

case class KeepAlive() 

class KeepAliveActor extends ActorPublisher[String] 
{ 
    import scala.concurrent.duration._ 
    implicit val ec = context.dispatcher 

    val tick = context.system.scheduler.schedule(1 second, 1 second, self, KeepAlive()) 

    var cnt = 0 
    var buffer = Vector.empty[String] 

    override def receive: Receive = 
    { 
    case KeepAlive() => 
    { 
     cnt = cnt + 1 
     if (buffer.isEmpty && totalDemand > 0) 
     { 
     onNext(s"${cnt}th Message from server!") 
     } 
     else { 
     buffer :+= cnt.toString 
     if (totalDemand > 0) { 
      val (use,keep) = buffer.splitAt(totalDemand.toInt) 
      buffer = keep 
      use foreach onNext 
     } 
     } 
    } 
    } 

    override def postStop() = tick.cancel() 
} 

旧的方式就像一个魅力。

现在新代码的基础上,GraphStage

class KeepAliveSource extends GraphStage[SourceShape[TextMessage.Strict]] 
{ 
    import scala.concurrent.duration._ 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
    { 
    new TimerGraphStageLogic(shape) 
    { 
     // All state MUST be inside the GraphStageLogic, 
     // never inside the enclosing GraphStage. 
     // This state is safe to access and modify from all the 
     // callbacks that are provided by GraphStageLogic and the 
     // registered handlers. 

     private var counter = 1 
     setHandler(out, new OutHandler 
     { 
     override def onPull(): Unit = 
     { 
      push(out, TextMessage(counter.toString)) 
      counter += 1 
      schedulePeriodically(None, 1 second) 
     } 
     }) 
    } 
    } 

    val out: Outlet[TextMessage.Strict] = Outlet("KeepAliveSource") 
    override def shape: SourceShape[TextMessage.Strict] = SourceShape(out) 
} 

无论出于何种原因,这仍然淹没了我,虽然我已经承担了schedulePeriodically(None, 1 second)会增加每个消息之间的1秒的延迟。但我显然是错误的。

增加该值不会改变的事实,我那可怜的浏览器无法处理请求和崩溃(我可以看到它的日志simple websocket client中)

回答

1

schedulePeriodically调用不影响的行为,你的阶段。无论何时下游阶段请求消息,都会调用onPull处理程序,并立即删除消息push。这就是为什么你看不到任何节流。

尽管GraphStage DSL(您选择的那个)非常灵活,但它也更难以正确使用。对于这样简单的任务,最好利用Akka提供的更高级别的阶段。像Source.tickdocs)。

val tickingSource: Source[String, Cancellable] = 
    Source.tick(initialDelay = 1.second, interval = 5.seconds, tick = "hello") 

在你的榜样,你需要一个计数予以公布,从而可以连接更多的逻辑,滴答滴答的来源,例如

val tickingSource: Source[Strict, Cancellable] = 
    Source.tick(initialDelay = 1.second, interval = 5.seconds, tick = NotUsed) 
     .zipWithIndex 
     .map{ case (_, counter) ⇒ TextMessage(counter.toString) } 

如果你有兴趣在如何底层GraphStage的作品,你可以随时看看TickSource代码本身(见github)。 主要区别在于TickSourceonTimer回调(其来自TimerGraphStageLogic,您可以覆盖)中调用push

+0

以及如何使用'GraphStage'和'TimerGraphStageLogic'来做到这一点? – Sorona

+0

给答案增加了信息 –

+0

对'TimerGraphStageLogic'不起作用,它表示'尚未初始化:只允许在GraphStageLogic构造函数中设置setHandler' – Sorona