2017-08-18 43 views
2

我想以异步方式读取文件夹中的许多.CSV文件,并返回一个自定义案例类的Iterable。使用Akka Streams,Scala异步读取多个文件

我可以通过Akka Streams和How?实现吗?

*我试图根据文档以某种方式平衡工作,但它是一个有点难以管理,通过...

或者

这是一个好的做法是使用替代演员?(父母演员带孩子,每个孩子读一个文件,并返回一个Iterable到父母,然后父母结合所有Iterables?)

+0

问题还不是很清楚。 1.是否要为所有CSV文件返回一个自定义案例类的单个Iterable,或者每个csv文件要返回一个? 2.如果有成千上万的文件,你想同时阅读它们,还是只想要一定程度的并行性? –

回答

1

首先你需要阅读/学习Akka流如何与Source,Flow和Sink一起工作。然后你可以开始学习操作员。

要并行执行多个操作,可以使用运算符mapAsync在其中指定并行度的数量。

/** 
    * Using mapAsync operator, we pass a function which return a Future, the number of parallel run futures will 
    * be determine by the argument passed to the operator. 
    */ 
    @Test def readAsync(): Unit = { 
    Source(0 to 10)//-->Your files 
     .mapAsync(5) { value => //-> It will run in parallel 5 reads 
     implicit val ec: ExecutionContext = ActorSystem().dispatcher 
     Future { 
      //Here read your file 
      Thread.sleep(500) 
      println(s"Process in Thread:${Thread.currentThread().getName}") 
      value 
     } 
     } 
     .runWith(Sink.foreach(value => println(s"Item emitted:$value in Thread:${Thread.currentThread().getName}"))) 
    } 

您可以了解更多关于阿卡这里阿卡流https://github.com/politrons/Akka

1

晴一样@保罗的答案,但有小的改进

def files = new java.io.File("").listFiles().map(_.getAbsolutePath).to[scala.collection.immutable.Iterable] 

Source(files).flatMapConcat(filename => //you could use flatMapMerge if you don't bother about line ordering 
    FileIO.fromPath(Paths.get(filename)) 
     .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true).map(_.utf8String)) 
).map { csvLine => 
    // parse csv here 
    println(csvLine) 
    }