2016-01-20 33 views
6

在过去的几天中,我一直在试图找出使用Akka Streams和HTTP将HTTP资源下载到文件的最佳方法。如何使用Akka Streams和HTTP将HTTP资源下载到文件中?

起初,我开始与Future-Based Variant和看起来是这样的:

def downloadViaFutures(uri: Uri, file: File): Future[Long] = { 
    val request = Get(uri) 
    val responseFuture = Http().singleRequest(request) 
    responseFuture.flatMap { response => 
    val source = response.entity.dataBytes 
    source.runWith(FileIO.toFile(file)) 
    } 
} 

这是一种不错,但一旦我有了更多的了解纯阿卡流我想尝试并使用Flow-Based Variant创建流从Source[HttpRequest]开始。起初,这完全让我难住,直到我偶然发现了流程转换。这最后一点更详细:

def responseOrFail[T](in: (Try[HttpResponse], T)): (HttpResponse, T) = in match { 
    case (responseTry, context) => (responseTry.get, context) 
} 

def responseToByteSource[T](in: (HttpResponse, T)): Source[ByteString, Any] = in match { 
    case (response, _) => response.entity.dataBytes 
} 

def downloadViaFlow(uri: Uri, file: File): Future[Long] = { 
    val request = Get(uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 
    source. 
    via(requestResponseFlow). 
    map(responseOrFail). 
    flatMapConcat(responseToByteSource). 
    runWith(FileIO.toFile(file)) 
} 

然后,我希望得到一个有点棘手,并使用Content-Disposition头。

回过头来看看这个基于未来变:

def destinationFile(downloadDir: File, response: HttpResponse): File = { 
    val fileName = response.header[ContentDisposition].get.value 
    val file = new File(downloadDir, fileName) 
    file.createNewFile() 
    file 
} 

def downloadViaFutures2(uri: Uri, downloadDir: File): Future[Long] = { 
    val request = Get(uri) 
    val responseFuture = Http().singleRequest(request) 
    responseFuture.flatMap { response => 
    val file = destinationFile(downloadDir, response) 
    val source = response.entity.dataBytes 
    source.runWith(FileIO.toFile(file)) 
    } 
} 

但现在我不知道如何与未来的基于变做到这一点。这是据我得到:

def responseToByteSourceWithDest[T](in: (HttpResponse, T), downloadDir: File): Source[(ByteString, File), Any] = in match { 
    case (response, _) => 
    val source = responseToByteSource(in) 
    val file = destinationFile(downloadDir, response) 
    source.map((_, file)) 
} 

def downloadViaFlow2(uri: Uri, downloadDir: File): Future[Long] = { 
    val request = Get(uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 
    val sourceWithDest: Source[(ByteString, File), Unit] = source. 
    via(requestResponseFlow). 
    map(responseOrFail). 
    flatMapConcat(responseToByteSourceWithDest(_, downloadDir)) 
    sourceWithDest.runWith(???) 
} 

所以现在我有一个Source,将发出一个或多个(ByteString, File)元素为每个File(我说的每一个File,因为没有理由原来Source必须是单HttpRequest)。

反正有没有把这些和路由到动态Sink

我想这样flatMapConcat,如:

def runWithMap[T, Mat2](f: T => Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = ??? 

,这样我可以用完成downloadViaFlow2

def destToSink(destination: File): Sink[(ByteString, File), Future[Long]] = { 
    val sink = FileIO.toFile(destination, true) 
    Flow[(ByteString, File)].map(_._1).toMat(sink)(Keep.right) 
} 
sourceWithDest.runWithMap { 
    case (_, file) => destToSink(file) 
} 

回答

5

该解决方案不需要flatMapConcat。如果不从文件写入所需的任何返回值,那么你可以使用Sink.foreach

def writeFile(downloadDir : File)(httpResponse : HttpResponse) : Future[Long] = { 
    val file = destinationFile(downloadDir, httpResponse) 
    httpResponse.entity.dataBytes.runWith(FileIO.toFile(file)) 
} 

def downloadViaFlow2(uri: Uri, downloadDir: File) : Future[Unit] = { 
    val request = HttpRequest(uri=uri) 
    val source = Source.single((request,())) 
    val requestResponseFlow = Http().superPool[Unit]() 

    source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .runWith(Sink.foreach(writeFile(downloadDir))) 
} 

注意,Sink.foreachwriteFile函数创建Futures。因此,涉及的背压并不多。写入文件可能会被硬盘驱动器放慢速度,但该流将继续产生期货。为了控制这一点,你可以使用Flow.mapAsyncUnordered(或Flow.mapAsync):

val parallelism = 10 

source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .mapAsyncUnordered(parallelism)(writeFile(downloadDir)) 
     .runWith(Sink.ignore) 

如果你希望积累的长期价值,你需要有一个Sink.fold结合的总数:

source.via(requestResponseFlow) 
     .map(responseOrFail) 
     .map(_._1) 
     .mapAsyncUnordered(parallelism)(writeFile(downloadDir)) 
     .runWith(Sink.fold(0L)(_ + _)) 

折叠将继续一个运行总和并在请求源枯竭时发出最终值。

+0

嗯我希望有比这更好的方式。我不太确定这实际上是否可以正常工作。只要FileIO流已经实现,writeFile就会返回。如果响应被分块,那么它需要按顺序写入文件。使用'mapAsync'类似的问题。 'append'参数也需要设置。此外,似乎写入文件的任何错误都不会导致外部流接收错误信号。 – Steiny

+1

@Steiny打破我对你的多个评论的答案:(a)正确,立即用Future写文件返回,但是mapAsync处理这个(b)没有解决方案可以纠正分块源,也不是原始问题/要求的这一部分(c)只有在写入相同文件时才需要追加(d)强制外部流在任何文件写入失败时失败不是原始问题的一部分。你问:“有没有办法把它们引导到一个动态的水槽?”,我的回答回答**这个问题。我在您的示例代码的上下文中编写了我的回复... –