我试图使用流,而不是纯粹的演员来处理HTTP请求和我用下面的代码来:如何绑定akka http与akka流?
trait ImagesRoute {
val log = LoggerFactory.getLogger(this.getClass)
implicit def actorRefFactory: ActorRefFactory
implicit def materializer: ActorMaterializer
val source =
Source
.actorRef[Image](Int.MaxValue, OverflowStrategy.fail)
.via(Flow[Image].mapAsync(1)(ImageRepository.add))
.toMat(Sink.asPublisher(true))(Keep.both)
val route = {
pathPrefix("images") {
pathEnd {
post {
entity(as[Image]) { image =>
val (ref, publisher) = source.run()
val addFuture = Source.fromPublisher(publisher)
val future = addFuture.runWith(Sink.head[Option[Image]])
ref ! image
onComplete(future.mapTo[Option[Image]]) {
case Success(img) =>
complete(Created, img)
case Failure(e) =>
log.error("Error adding image resource", e)
complete(InternalServerError, e.getMessage)
}
}
}
}
}
}
}
我不知道这是,或者即使做了正确的方法这是一个好方法,或者如果我应该使用演员与路线进行交互,使用问题模式,然后在演员内部进行流式传输。
任何想法?
如果我没有记错的话,你的情况你根本不需要流。据我所知,'ImageRepository.add'方法返回一个'Future';所有你需要做的就是编写'onComplete(ImageRepository.add(image))',就这些了。 –
@VladimirMatveev是的没错,这只是一个简单的例子,但流管道应该做更多的事情,比如联系外部资源并最终回压东西...... –