如果将操作分解为一个将异步处理一个请求然后调用它100次的方法,可能会更容易。
首先,让我们确定你想要的最终结果。因为你将要使用的是MemoryStream
这意味着你会想要从你的方法中返回一个Task<MemoryStream>
。签名会是这个样子:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
因为你AmazonS3
对象实现Asynchronous Design Pattern,您可以使用FromAsync
method上TaskFactory
class产生从实现异步设计模式,像这样一类Task<T>
:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null);
// But what goes here?
所以,你已经在一个很好的地方,你有一个Task<T>
,你可以等待或在呼叫完成时得到一个回调。但是,您需要以某种方式将拨打Task<GetObjectResponse>
后返回的GetObjectResponse
转换为MemoryStream
。
为此,您想在Task<T>
类上使用ContinueWith
method。把它看作是Enumerable
class上的Select
method的异步版本,它只是对另一个Task<T>
的投影,除了每次调用ContinueWith
时,您都可能创建一个运行即代码部分的新任务。
就这样,你的方法如下所示:
static Task<MemoryStream> GetMemoryStreamAsync(AmazonS3 s3,
GetObjectRequest request)
{
// Start the task of downloading.
Task<GetObjectResponse> response =
Task.Factory.FromAsync<GetObjectRequest,GetObjectResponse>(
s3.BeginGetObject, s3.EndGetObject, request, null
);
// Translate.
Task<MemoryStream> translation = response.ContinueWith(t => {
using (Task<GetObjectResponse> resp = t){
var ms = new MemoryStream();
t.Result.ResponseStream.CopyTo(ms);
return ms;
}
});
// Return the full task chain.
return translation;
}
注意,在上面你可以调用被overload of ContinueWith
传递TaskContinuationOptions.ExecuteSynchronously
,因为它似乎你在做最少的工作(我也说不清,答案可能是巨大的)。如果您为完成工作而开展一项新任务不利于开展新任务,那么您应该通过TaskContinuationOptions.ExecuteSynchronously
,这样您就不会浪费时间为最少的操作创建新任务。
现在你已经可以转化一个请求转换成Task<MemoryStream>
,创造了包装,将处理方法任何他们的数量很简单:
static Task<MemoryStream>[] GetMemoryStreamsAsync(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
// Just call Select on the requests, passing our translation into
// a Task<MemoryStream>.
// Also, materialize here, so that the tasks are "hot" when
// returned.
return requests.Select(r => GetMemoryStreamAsync(s3, r)).
ToArray();
}
在上面,你随便拿一系列GetObjectRequest
实例,它将返回一个Task<MemoryStream>
的数组。它返回物化序列的事实很重要。如果在返回之前没有实现它,那么直到迭代序列才会创建任务。
当然,如果你想要这种行为,那么通过一切手段,只要删除.ToArray()
的调用,让方法返回IEnumerable<Task<MemoryStream>>
,然后当你迭代任务时,就会发出请求。
从那里,您可以一次处理它们一个(在回路中使用Task.WaitAny
method)或等待它们全部完成(通过调用Task.WaitAll
method)。后者的一个例子是:
static IList<MemoryStream> GetMemoryStreams(AmazonS3 s3,
IEnumerable<GetObjectRequest> requests)
{
Task<MemoryStream>[] tasks = GetMemoryStreamsAsync(s3, requests);
Task.WaitAll(tasks);
return tasks.Select(t => t.Result).ToList();
}
而且,应该提到的是,这是一个相当不错的适合的Reactive Extensions framework,因为这非常非常适合朝着IObservable<T>
实现。
这是一个很好的解决方案,这是一个非常好的描述,并在发布问题后约20分钟内发布。我很开心。 对于我来说,它也工作得很好,在我修正了添加更准确的S3类名称并指定了更具体的FromAsync()方法之后。 卡斯帕,你想让我编辑你的答案的变化? – DenNukem
@DenNukem哦,我没有解决从一个流到另一个流的异步复制问题。这将在.NET 4.5中提供,但它需要一些“异步”/“等待”的好处,使它看起来不像火车残骸。现在,使用['Stream.CopyTo'方法](http://msdn.microsoft.com/en-us/library/system.io.stream.copyto.aspx),但知道在.NET 4.5中可以使用['Stream.CopyToAsync'](http://msdn.microsoft.com/en-us/library/system.io.stream.copytoasync.aspx)以及'async' /'await'做*所有*这个更优雅。 – casperOne
@casperOne我可以看到.NET 4.5的一个例子吗? – user1265146