2012-05-07 50 views
10

问题:我想使用.NET SDK从AWS S3并行下载100个文件。下载的内容应该存储在100个内存流中(文件足够小,我可以从中取出)。我在Task 4.0,IAsyncResult,Parallel。*和.NET 4.0中的其他不同方法之间混淆不清。使用适用于.NET的AWS S3 SDK从Amazon S3下载并行批处理文件

如果我试图解决自己的问题,把我的头顶部我想是这样的伪代码: (编辑以类型添加到一些变量)

using Amazon; 
using Amazon.S3; 
using Amazon.S3.Model; 

AmazonS3 _s3 = ...; 
IEnumerable<GetObjectRequest> requestObjects = ...; 


// Prepare to launch requests 
var asyncRequests = from rq in requestObjects 
    select _s3.BeginGetObject(rq,null,null); 

// Launch requests 
var asyncRequestsLaunched = asyncRequests.ToList(); 

// Prepare to finish requests 
var responses = from rq in asyncRequestsLaunched 
    select _s3.EndGetRequest(rq); 

// Finish requests 
var actualResponses = responses.ToList(); 

// Fetch data 
var data = actualResponses.Select(rp => { 
    var ms = new MemoryStream(); 
    rp.ResponseStream.CopyTo(ms); 
    return ms; 
}); 

此代码将启动100个请求平行,这是很好的。但是,有两个问题:

  1. 最后一条语句将串行下载文件,而不是并行下载文件。在流上似乎没有BeginCopyTo()/ EndCopyTo()方法...
  2. 在所有请求都响应之前,上述语句不会放开。换句话说,在所有文件开始之前,没有任何文件会开始下载。

所以在这里我开始思考我的标题错了路......

帮助?

回答

19

如果将操作分解为一个将异步处理一个请求然后调用它100次的方法,可能会更容易。

首先,让我们确定你想要的最终结果。因为你将要使用的是MemoryStream这意味着你会想要从你的方法中返回一个Task<MemoryStream>。签名会是这个样子:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request) 

因为你AmazonS3对象实现Asynchronous Design Pattern,您可以使用FromAsync methodTaskFactory class产生从实现异步设计模式,像这样一类Task<T>

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request) 
{ 
    Task<GetObjectResponse> response = 
     Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
      s3.BeginGetObject, s3.EndGetObject, request, null); 

    // But what goes here? 

所以,你已经在一个很好的地方,你有一个Task<T>,你可以等待或在呼叫完成时得到一个回调。但是,您需要以某种方式将拨打Task<GetObjectResponse>后返回的GetObjectResponse转换为MemoryStream

为此,您想在Task<T>类上使用ContinueWith method。把它看作是Enumerable class上的Select method的异步版本,它只是对另一个Task<T>的投影,除了每次调用ContinueWith时,您都可能创建一个运行代码部分的新任务。

就这样,你的方法如下所示:

static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3, 
    GetObjectRequest request) 
{ 
    // Start the task of downloading. 
    Task<GetObjectResponse> response = 
     Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
      s3.BeginGetObject, s3.EndGetObject, request, null 
     ); 

    // Translate. 
    Task<MemoryStream> translation = response.ContinueWith(t => { 
     using (Task<GetObjectResponse> resp = t){ 
      var ms = new MemoryStream(); 
      t.Result.ResponseStream.CopyTo(ms); 
      return ms; 
     } 
    }); 

    // Return the full task chain. 
    return translation; 
} 

注意,在上面你可以调用被overload of ContinueWith传递TaskContinuationOptions.ExecuteSynchronously,因为它似乎你在做最少的工作(我也说不清,答案可能是巨大的)。如果您为完成工作而开展一项新任务不利于开展新任务,那么您应该通过TaskContinuationOptions.ExecuteSynchronously,这样您就不会浪费时间为最少的操作创建新任务。

现在你已经可以转化一个请求转换成Task<MemoryStream>,创造了包装,将处理方法任何他们的数量很简单:

static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3, 
    IEnumerable<GetObjectRequest> requests) 
{ 
    // Just call Select on the requests, passing our translation into 
    // a Task<MemoryStream>. 
    // Also, materialize here, so that the tasks are "hot" when 
    // returned. 
    return requests.Select(r => GetMemoryStreamAsync(s3, r)). 
     ToArray(); 
} 

在上面,你随便拿一系列GetObjectRequest实例,它将返回一个Task<MemoryStream>的数组。它返回物化序列的事实很重要。如果在返回之前没有实现它,那么直到迭代序列才会创建任务。

当然,如果你想要这种行为,那么通过一切手段,只要删除.ToArray()的调用,让方法返回IEnumerable<Task<MemoryStream>>,然后当你迭代任务时,就会发出请求。

从那里,您可以一次处理它们一个(在回路中使用Task.WaitAny method)或等待它们全部完成(通过调用Task.WaitAll method)。后者的一个例子是:

static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3, 
    IEnumerable<GetObjectRequest> requests) 
{ 
    Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests); 
    Task.WaitAll(tasks); 
    return tasks.Select(t => t.Result).ToList(); 
} 

而且,应该提到的是,这是一个相当不错的适合的Reactive Extensions framework,因为这非常非常适合朝着IObservable<T>实现。

+2

这是一个很好的解决方案,这是一个非常好的描述,并在发布问题后约20分钟内发布。我很开心。 对于我来说,它也工作得很好,在我修正了添加更准确的S3类名称并指定了更具体的FromAsync()方法之后。 卡斯帕,你想让我编辑你的答案的变化? – DenNukem

+1

@DenNukem哦,我没有解决从一个流到另一个流的异步复制问题。这将在.NET 4.5中提供,但它需要一些“异步”/“等待”的好处,使它看起来不像火车残骸。现在,使用['Stream.CopyTo'方法](http://msdn.microsoft.com/en-us/library/system.io.stream.copyto.aspx),但知道在.NET 4.5中可以使用['Stream.CopyToAsync'](http://msdn.microsoft.com/en-us/library/system.io.stream.copytoasync.aspx)以及'async' /'await'做*所有*这个更优雅。 – casperOne

+0

@casperOne我可以看到.NET 4.5的一个例子吗? – user1265146