2017-09-02 94 views
0

我想弄清楚如何处理在你的一个阶段中你需要进行一个返回InputStream的调用,在那里我将处理该流作为舞台的来源进一步下降。流内的Akka-Stream流

例如

Source.map(e => Calls that return an InputStream) 
.via(processingFlow).runwith(sink.ignore) 

我想该元素将处理流程那些从InputStream到来。这基本上是我拖尾一个文件,读取每一行,这一行给我关于一个调用,我需要对一个CLI api进行的信息的情况下,当进行该调用时,我将Stdout作为一个InputStream从中读取结果。结果在大部分时间都会很大,所以我可以只收集记忆中的所有东西。

中号

回答

1
  • 可以使用StreamConverters事业得到Source S和从java.io流Sink秒。更多信息here
  • 您可以使用flatMapConcatflatMapMergeSource s的流平铺为单个流。更多信息here

一个简单的例子可以是:

val source: Source[String, NotUsed] = ??? 
    def gimmeInputStream(name: String): InputStream = ??? 
    val processingFlow: Flow[ByteString, ByteString, NotUsed] = ??? 

    source 
    .map(gimmeInputStream) 
    .flatMapConcat(is ⇒ StreamConverters.fromInputStream(() ⇒ is, chunkSize = 8192)) 
    .via(processingFlow) 
    .runWith(Sink.ignore) 

但是阿卡流提供了一个更地道DSL在FileIO对象为读/写文件。更多信息here

示例变为:

val source: Source[String, NotUsed] = ??? 
    val processingFlow: Flow[ByteString, ByteString, NotUsed] = ??? 

    source 
    .flatMapConcat(name ⇒ FileIO.fromPath(Paths.get(name))) 
    .via(processingFlow) 
    .runWith(Sink.ignore)