2016-09-21 82 views
1

我试图使用流,而不是纯粹的演员来处理HTTP请求和我用下面的代码来:如何绑定akka http与akka流?

trait ImagesRoute { 

    val log = LoggerFactory.getLogger(this.getClass) 

    implicit def actorRefFactory: ActorRefFactory 
    implicit def materializer: ActorMaterializer 

    val source = 
    Source 
     .actorRef[Image](Int.MaxValue, OverflowStrategy.fail) 
     .via(Flow[Image].mapAsync(1)(ImageRepository.add)) 
     .toMat(Sink.asPublisher(true))(Keep.both) 

    val route = { 
    pathPrefix("images") { 
     pathEnd { 
     post { 
      entity(as[Image]) { image => 

      val (ref, publisher) = source.run() 

      val addFuture = Source.fromPublisher(publisher) 

      val future = addFuture.runWith(Sink.head[Option[Image]]) 

      ref ! image 

      onComplete(future.mapTo[Option[Image]]) { 
       case Success(img) => 
       complete(Created, img) 

       case Failure(e) => 
       log.error("Error adding image resource", e) 
       complete(InternalServerError, e.getMessage) 
      } 
      } 
     } 
     } 
    } 
    } 
} 

我不知道这是,或者即使做了正确的方法这是一个好方法,或者如果我应该使用演员与路线进行交互,使用问题模式,然后在演员内部进行流式传输。

任何想法?

+0

如果我没有记错的话,你的情况你根本不需要流。据我所知,'ImageRepository.add'方法返回一个'Future';所有你需要做的就是编写'onComplete(ImageRepository.add(image))',就这些了。 –

+0

@VladimirMatveev是的没错,这只是一个简单的例子,但流管道应该做更多的事情,比如联系外部资源并最终回压东西...... –

回答

4

如果你只从实体预计1幅图像,那么你不需要创建从一个ActorRef和Source你不需要Sink.asPublisher,你可以简单地使用Source.single

def imageToComplete(img : Option[Image]) : StandardRoute = 
    img.map(i => complete(Created, i)) 
    .getOrElse { 
     log error ("Error adding image resource", e) 
     complete(InternalServerError, e.getMessage 
    } 

... 

entity(as[Image]) { image => 

    val future : Future[StandardRoute] = 
    Source.single(image) 
      .via(Flow[Image].mapAsync(1)(ImageRepository.add)) 
      .runWith(Sink.head[Option[Image]]) 
      .map(imageToComplete) 

    onComplete(future) 
} 

简化你的代码进一步,事实上,你只能处理1个图像意味着,俱乐部是不必要的,因为没有必要为背压只有1个元素:

val future : Future[StandardRoute] = ImageRepository.add(image) 
                .map(imageToComplete) 

onComplete(future) 

在评论你表示

“这只是一个简单的例子,但流管道应该是 做大做很多事情想联系的外部资源和 最终回压的东西”

这将仅适用于您的实体是图像流。如果您只处理每个HttpRequest 1个图像,则背压永不适用,并且您创建的任何流都将为slower version of a Future

如果实体其实是在图像流,那么你可以使用它作为流的一部分:

val byteStrToImage : Flow[ByteString, Image, _] = ??? 

val imageToByteStr : Flow[Image, Source[ByteString], _] = ??? 

def imageOptToSource(img : Option[Image]) : Source[Image,_] = 
    Source fromIterator img.toIterator 

val route = path("images") { 
    post { 
    extractRequestEntity { reqEntity => 

     val stream = reqEntity.via(byteStrToImage) 
          .via(Flow[Image].mapAsync(1)(ImageRepository.add)) 
          .via(Flow.flatMapConcat(imageOptToSource)) 
          .via(Flow.flatMapConcat(imageToByteStr)) 

     complete(HttpResponse(status=Created,entity = stream)) 
    } 
    } 
}  
+0

谢谢,这让我很有意义 –

+1

@ ThiagoPereira欢迎您,愉快的黑客入侵。 –