我想以异步方式读取文件夹中的许多.CSV文件,并返回一个自定义案例类的Iterable。使用Akka Streams,Scala异步读取多个文件
我可以通过Akka Streams和How?实现吗?
*我试图根据文档以某种方式平衡工作,但它是一个有点难以管理,通过...
或者
这是一个好的做法是使用替代演员?(父母演员带孩子,每个孩子读一个文件,并返回一个Iterable到父母,然后父母结合所有Iterables?)
我想以异步方式读取文件夹中的许多.CSV文件,并返回一个自定义案例类的Iterable。使用Akka Streams,Scala异步读取多个文件
我可以通过Akka Streams和How?实现吗?
*我试图根据文档以某种方式平衡工作,但它是一个有点难以管理,通过...
或者
这是一个好的做法是使用替代演员?(父母演员带孩子,每个孩子读一个文件,并返回一个Iterable到父母,然后父母结合所有Iterables?)
首先你需要阅读/学习Akka流如何与Source,Flow和Sink一起工作。然后你可以开始学习操作员。
要并行执行多个操作,可以使用运算符mapAsync
在其中指定并行度的数量。
/**
* Using mapAsync operator, we pass a function which return a Future, the number of parallel run futures will
* be determine by the argument passed to the operator.
*/
@Test def readAsync(): Unit = {
Source(0 to 10)//-->Your files
.mapAsync(5) { value => //-> It will run in parallel 5 reads
implicit val ec: ExecutionContext = ActorSystem().dispatcher
Future {
//Here read your file
Thread.sleep(500)
println(s"Process in Thread:${Thread.currentThread().getName}")
value
}
}
.runWith(Sink.foreach(value => println(s"Item emitted:$value in Thread:${Thread.currentThread().getName}")))
}
您可以了解更多关于阿卡这里阿卡流https://github.com/politrons/Akka
晴一样@保罗的答案,但有小的改进
def files = new java.io.File("").listFiles().map(_.getAbsolutePath).to[scala.collection.immutable.Iterable]
Source(files).flatMapConcat(filename => //you could use flatMapMerge if you don't bother about line ordering
FileIO.fromPath(Paths.get(filename))
.via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true).map(_.utf8String))
).map { csvLine =>
// parse csv here
println(csvLine)
}
问题还不是很清楚。 1.是否要为所有CSV文件返回一个自定义案例类的单个Iterable,或者每个csv文件要返回一个? 2.如果有成千上万的文件,你想同时阅读它们,还是只想要一定程度的并行性? –