2013-01-16 36 views
3

我现在处于需要决定如何构建异步代码的地步。 我需要从我的函数Func1()一次调用20个不同的Web服务,当他们都返回一个XML答案时,将所有结果加入到一个大的XML。并行任务vs代表

我想过使用TPL任务。这样的事情:

var task = Task.Factory.StartNew (call the web service1...); 
var task2 = Task.Factory.StartNew (call the web service2...); 
var task3 = Task.Factory.StartNew (call the web service3...); 

task.WaitAll(); 

这听起来不错,或者有更好的方法来完成工作吗?

+3

是有一个更好的办法 - 打的电话异步 - 现在你做这些平行的,但块在等待结果时,多达3(或20)个线程 - 如果你进行异步调用,你将不会阻止任何 – Carsten

+1

我不会说“异步”必然会更好*,而是它不同,它的优越性将取决于具体情况。在这种情况下,“调用Web服务”大概是一个相当轻的操作,所以一个线程('Task.Factory.StartNew')的开销可能不合理。 – Snixtor

+0

根据问题,xml的组合应该作为输出,所以调用必须等到每个请求都应该完成。在最大时,所有进程都可以推送到背景线程。因为这部分已经平行无关。我不认为asyc会在这里工作。 – kunjee

回答

1

我能想到的只有2这里主要的方法:

  1. 有一些地方,你将尽快,因为它是接收到的集合。这可以通过ContinueWith()方法来完成。但是,您需要在聚合代码中处理同步,最后仍需要等到所有任务完成。因此,如果聚合需要很长时间并且可以并行完成,则这种方法才有意义。

  2. 你做的方式 - 简单好用:)

至于使用异步,我会投票支持使用TaskFactory.ContinueWhenAll()方法。这样你就不会阻塞任何线程,代码看起来会比有多个(取决于品味)更好,并且可能具有更少的开销(可能取决于实现)。

2

我有两种想法。

A。你现在正在做的方式,但使用/ContinueWhenAll,如this answerarticale中所述。因此,对于你的情况,你可以使用一个单一的延续使用的子任务,所以

TaskCreationoptions op = TaskCreationOptions.AttachedToParent; 
Task.Factory.StartNew(() => 
{ 
    var task1 = Task.Factory.StartNew (CallService(1)); 
    var task2 = Task.Factory.StartNew (CallService(2)); 
    var task3 = Task.Factory.StartNew (CallService(3)); 
}) 
.ContinueWith(ant => { SomeOtherselegate }); 

,或者你可以链上的延续作为解释here。另一种方法是使用ContinueWhenAll

var task1 = Task.Factory.StartNew (CallService(1)); 
var task2 = Task.Factory.StartNew (CallService(2)); 
var task3 = Task.Factory.StartNew (CallService(3)); 
var continuation = Task.Factory.ContinueWhenAll(
    new[] { task1, task2, task3 }, tasks => Console.WriteLine("Done!")); 

值得考虑的唯一的事情是,你可以有可变数量任务的方式,但是这是容易的,我会让你的工作,一出。

B。另一种方法是使用.NET4.5 +和async/await。所以,你的代码会是这样

private async void CallAllServicesAsync() 
{ 
    await CallServiceAsync(1); 
    await CallServiceAsync(2); 
    await CallServiceAsync(3); 
} 

其中

private Task CallServiceAsync(int serviceNumber) 
{ 
    return Task.Run(() = > { SomeMethod(); }); 
} 

以上是equivelent所示第一代码,但框架下引擎盖照顾一切的为您服务。

我希望这会有所帮助。

0

使用async正确的方法是,首先定义一个CallServiceAsync即自然异步(即,使用HttpClient或为TaskFactory.FromAsync包装纸围绕Begin/End方法)。它应该而不是使用Task.Run

一旦你有一个自然的异步CallServiceAsync,那么你就可以发出20个同时通话和(异步)等待它们的方式:

public async Task<XDocument> GetAllDataAsync() 
{ 
    var task1 = CallServiceAsync(1); 
    var task2 = CallServiceAsync(2); 
    var task3 = CallServiceAsync(3); 
    ... 
    XDocument[] results = await task.WhenAll(task1, task2, task3, ...); 
    return JoinXmlDocuments(results); 
} 

这种做法完全无法阻止任何线程。

你可以把它稍微更好的性能,通过使用ConfigureAwait(false)这样:

XDocument[] results = await task.WhenAll(task1, task2, task3, ...).ConfigureAwait(false); 
3

我们几个月前需要这样的事情,可以同时处理多个远程URL的。我们通过从SemaphoreSlim类派生我们自己的类来实现这一点。

您可以实现这样的事情:

/// <summary> 
    /// Can be used to process multiple URL's concurrently. 
    /// </summary> 
    public class ConcurrentUrlProcessor : SemaphoreSlim 
    { 
     private int initialCount; 
     private int maxCount; 
     private readonly HttpClient httpClient; 

     /// <summary> 
     /// Initializes a new instance of the <see cref="T:System.Threading.SemaphoreSlim" /> class, specifying the initial number of requests that can be granted concurrently. 
     /// </summary> 
     /// <param name="initialCount">The initial number of requests for the semaphore that can be granted concurrently.</param> 
     public ConcurrentUrlProcessor(int initialCount) 
      :base(initialCount) 
     { 
      this.initialCount = initialCount; 
      this.maxCount = int.MaxValue; 
      this.httpClient = new HttpClient(); 
     } 
     /// <summary> 
     /// Initializes a new instance of the <see cref="T:System.Threading.SemaphoreSlim" /> class, specifying the initial and maximum number of requests that can be granted concurrently. 
     /// </summary> 
     /// <param name="initialCount">The initial number of requests for the semaphore that can be granted concurrently.</param> 
     /// <param name="maxCount">The maximum number of requests for the semaphore that can be granted concurrently.</param> 
     public ConcurrentUrlProcessor(int initialCount, int maxCount) 
      : base(initialCount, maxCount) 
     { 
      this.initialCount = initialCount; 
      this.maxCount = maxCount; 
      this.httpClient = new HttpClient(); 
     } 
     /// <summary> 
     /// Starts the processing. 
     /// </summary> 
     /// <param name="urls">The urls.</param> 
     /// <returns>Task{IEnumerable{XDocument}}.</returns> 
     public virtual async Task<IEnumerable<XDocument>> StartProcessing(params string[] urls) 
     { 
      List<Task> tasks = new List<Task>(); 
      List<XDocument> documents = new List<XDocument>(); 

      SemaphoreSlim throttler = new SemaphoreSlim(initialCount, maxCount); 
      foreach (string url in urls) 
      { 
       await throttler.WaitAsync(); 

       tasks.Add(Task.Run(async() => 
       { 
        try 
        { 
         string xml = await this.httpClient.GetStringAsync(url); 

         //move on to the next page if no xml is returned. 
         if (string.IsNullOrWhiteSpace(xml)) 
          return; 

         var document = XDocument.Parse(xml); 
         documents.Add(document); 
        } 
        catch (Exception) 
        { 
         //TODO: log the error or do something with it. 
        } 
        finally 
        { 
         throttler.Release(); 
        } 
       })); 
      } 

      await Task.WhenAll(tasks); 

      return documents; 
     } 
    } 

而且根据单元测试:

[Test] 
    public async void CanProcessMultipleUrlsTest() 
    { 
     string[] urls = new string[] 
     { 
      "http://google.nl", 
      "http://facebook.com", 
      "http://linkedin.com", 
      "http://twitter.com" 
     }; 

     IEnumerable<XDocument> documents = null; 
     ConcurrentUrlProcessor processor = new ConcurrentUrlProcessor(100); 

     documents = await processor.StartProcessing(urls); 

     Assert.AreEqual(4, documents.Count()); 
    } 
+0

+1漂亮的小班! – MoonKnight

+0

我没有收到+1 :-p,谢谢! –