我想实现一个定制的Akka接收器,但是我找不到处理它内部未来的方法。如何处理定制的阿卡水槽内的未来?
class EventSink(...) {
val in: Inlet[EventEnvelope2] = Inlet("EventSink")
override val shape: SinkShape[EventEnvelope2] = SinkShape(in)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
// This requests one element at the Sink startup.
override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = {
val future = handle(grab(in))
Await.ready(future, Duration.Inf)
/*
future.onComplete {
case Success(_) =>
logger.info("pulling next events")
pull(in)
case Failure(failure) =>
logger.error(failure.getMessage, failure)
throw failure
}*/
pull(in)
}
})
}
}
private def handle(envelope: EventEnvelope2): Future[Unit] = {
val EventEnvelope2(query.Sequence(offset), _/*persistenceId*/, _/*sequenceNr*/, event) = envelope
...
db.run(statements.transactionally)
}
}
我必须去阻止未来,这看起来不好。我评论过的非封锁内容仅适用于第一项活动。任何人都可以请帮忙?
更新感谢@ViktorKlang。它现在似乎在工作。
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
{
new GraphStageLogic(shape) {
val callback = getAsyncCallback[Try[Unit]] {
case Success(_) =>
//completeStage()
pull(in)
case Failure(error) =>
failStage(error)
}
// This requests one element at the Sink startup.
override def preStart(): Unit = {
pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
val future = handle(grab(in))
future.onComplete { result =>
callback.invoke(result)
}
}
})
}
}
我想实现一个Rational DB事件接收器连接到ReadJournal.eventsByTag。所以这是一个连续的流,除非有错误,否则它将永不终止 - 这就是我想要的。我的方法是否正确?
两个问题:
请问GraphStage永远不会结束,除非我手动调用completeStage或failStage?
我是否正确或正常地声明preStart方法之外的回调?在这种情况下,我是否有权在preStart中调用pull(in)?
感谢, 程
http://doc.akka.io/docs/akka/current/scala/流/流自定义。html#使用异步端通道 –
感谢@ViktorKlang,我已经阅读过。没有找到有用的东西。 – Cheng
为什么我链接的细分市场没有用?你试过了吗? –