2017-08-05 37 views
0

我想实现一个定制的Akka接收器,但是我找不到处理它内部未来的方法。如何处理定制的阿卡水槽内的未来?

class EventSink(...) { 

    val in: Inlet[EventEnvelope2] = Inlet("EventSink") 
    override val shape: SinkShape[EventEnvelope2] = SinkShape(in) 

    override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = { 
    new GraphStageLogic(shape) { 

     // This requests one element at the Sink startup. 
     override def preStart(): Unit = pull(in) 

     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val future = handle(grab(in)) 
      Await.ready(future, Duration.Inf) 
      /* 
      future.onComplete { 
      case Success(_) => 
       logger.info("pulling next events") 
       pull(in) 
      case Failure(failure) => 
       logger.error(failure.getMessage, failure) 
       throw failure 
      }*/ 
      pull(in) 
     } 
     }) 
    } 
    } 

    private def handle(envelope: EventEnvelope2): Future[Unit] = { 
    val EventEnvelope2(query.Sequence(offset), _/*persistenceId*/, _/*sequenceNr*/, event) = envelope 
    ... 
    db.run(statements.transactionally) 
    } 
} 

我必须去阻止未来,这看起来不好。我评论过的非封锁内容仅适用于第一项活动。任何人都可以请帮忙?


更新感谢@ViktorKlang。它现在似乎在工作。

override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = 
{ 
    new GraphStageLogic(shape) { 
     val callback = getAsyncCallback[Try[Unit]] { 
     case Success(_) => 
      //completeStage() 
      pull(in) 
     case Failure(error) => 
      failStage(error) 
     } 

     // This requests one element at the Sink startup. 
     override def preStart(): Unit = { 
     pull(in) 
     } 

     setHandler(in, new InHandler { 
     override def onPush(): Unit = { 
      val future = handle(grab(in)) 
      future.onComplete { result => 
      callback.invoke(result) 
      } 
     } 
     }) 
    } 
    } 

我想实现一个Rational DB事件接收器连接到ReadJournal.eventsByTag。所以这是一个连续的流,除非有错误,否则它将永不终止 - 这就是我想要的。我的方法是否正确?

两个问题:

  1. 请问GraphStage永远不会结束,除非我手动调用completeStage或failStage?

  2. 我是否正确或正常地声明preStart方法之外的回调?在这种情况下,我是否有权在preStart中调用pull(in)?

感谢, 程

+0

http://doc.akka.io/docs/akka/current/scala/流/流自定义。html#使用异步端通道 –

+0

感谢@ViktorKlang,我已经阅读过。没有找到有用的东西。 – Cheng

+0

为什么我链接的细分市场没有用?你试过了吗? –

回答

0

免俗阶段

在一般情况下,你应该尽量吸尽与图书馆的SourceFlowSink的给出的方法所有的可能性。自定义阶段几乎不需要,并且使代码难以维护。

编写你的“自定义”阶段使用标准方法

基于对你的问题的示例代码我看不出有任何理由,你为什么会使用自定义Sink开始与细节。

鉴于你的handle方法,你可以稍微修改它这样做,你在问题中指定的日志记录:

val loggedHandle : (EventEnvelope2) => Future[Unit] = 
    handle(_) transform { 
    case Success(_)  => { 
     logger.info("pulling next events") 
     Success(Unit) 
    } 
    case Failure(failure) => { 
     logger.error(failure.getMessage, failure) 
     Failure(failure) 
    } 
    } 

那么就使用Sink.foreachParallel处理信封:

val createEventEnvelope2Sink : (Int) => Sink[EventEnvelope2, Future[Done]] = 
    (parallelism) => 
    Sink[EventEnvelope2].foreachParallel(parallelism)(handle _) 

现在,即使您希望将每个EventEnvelope2发送到db,以便您只需使用1进行并行:

val inOrderDBInsertSink : Sink[EventEnvelope2, Future[Done]] = 
    createEventEnvelope2Sink(1) 

而且,如果数据库抛出一个异常,你仍然可以得到它保持在foreachParallel完成:

val someEnvelopeSource : Source[EventEnvelope2, _] = ??? 

someEnvelopeSource 
    .to(createEventEnvelope2Sink(1)) 
    .run() 
    .andThen { 
    case Failure(throwable) => { /*deal with db exception*/ } 
    case Success(_)   => { /*all inserts succeeded*/ } 
    } 
相关问题