现在Source.actorPublisher
已被弃用,我想找到一个适当的替代品。通过Source.fromGraph替换不赞成使用的Source.actorPublisher - 如何限制?
警告:我仍然是阿卡新生,试图找到我的方式!
基本上我所拥有的是一个websocket,服务器每秒都会推送一条新消息。
相关代码:
// OLD, deprecated way
//val source: Source[TextMessage.Strict, ActorRef] = Source.actorPublisher[String](Props[KeepAliveActor]).map(i => TextMessage(i))
// NEW way
val sourceGraph: Graph[SourceShape[TextMessage.Strict], NotUsed] = new KeepAliveSource
val source: Source[TextMessage.Strict, NotUsed] = Source.fromGraph(sourceGraph)
val requestHandler: HttpRequest => HttpResponse =
{
case req @ HttpRequest(GET, Uri.Path("/ws"), _, _, _) =>
req.header[UpgradeToWebSocket] match
{
case Some(upgrade) => upgrade.handleMessagesWithSinkSource(Sink.ignore, source)
case None => HttpResponse(400, entity = "Not a valid websocket request")
}
case r: HttpRequest =>
r.discardEntityBytes() // important to drain incoming HTTP Entity stream
HttpResponse(404, entity = "Unknown resource!")
}
的老演员:(基本上摘自:Actorpublisher as source in handleMessagesWithSinkSource)
case class KeepAlive()
class KeepAliveActor extends ActorPublisher[String]
{
import scala.concurrent.duration._
implicit val ec = context.dispatcher
val tick = context.system.scheduler.schedule(1 second, 1 second, self, KeepAlive())
var cnt = 0
var buffer = Vector.empty[String]
override def receive: Receive =
{
case KeepAlive() =>
{
cnt = cnt + 1
if (buffer.isEmpty && totalDemand > 0)
{
onNext(s"${cnt}th Message from server!")
}
else {
buffer :+= cnt.toString
if (totalDemand > 0) {
val (use,keep) = buffer.splitAt(totalDemand.toInt)
buffer = keep
use foreach onNext
}
}
}
}
override def postStop() = tick.cancel()
}
旧的方式就像一个魅力。
现在新代码的基础上,GraphStage
class KeepAliveSource extends GraphStage[SourceShape[TextMessage.Strict]]
{
import scala.concurrent.duration._
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
{
new TimerGraphStageLogic(shape)
{
// All state MUST be inside the GraphStageLogic,
// never inside the enclosing GraphStage.
// This state is safe to access and modify from all the
// callbacks that are provided by GraphStageLogic and the
// registered handlers.
private var counter = 1
setHandler(out, new OutHandler
{
override def onPull(): Unit =
{
push(out, TextMessage(counter.toString))
counter += 1
schedulePeriodically(None, 1 second)
}
})
}
}
val out: Outlet[TextMessage.Strict] = Outlet("KeepAliveSource")
override def shape: SourceShape[TextMessage.Strict] = SourceShape(out)
}
无论出于何种原因,这仍然淹没了我,虽然我已经承担了schedulePeriodically(None, 1 second)
会增加每个消息之间的1秒的延迟。但我显然是错误的。
增加该值不会改变的事实,我那可怜的浏览器无法处理请求和崩溃(我可以看到它的日志simple websocket client
中)
以及如何使用'GraphStage'和'TimerGraphStageLogic'来做到这一点? – Sorona
给答案增加了信息 –
对'TimerGraphStageLogic'不起作用,它表示'尚未初始化:只允许在GraphStageLogic构造函数中设置setHandler' – Sorona