2016-05-20 29 views
2

我有一个我想要使用HttpClient并发下载的页面的URL列表。 URL列表可能很大(100以上!)使用Rx和SelectMany限制并发请求

目前我已经有这样的代码:

var urls = new List<string> 
      { 
       @"http:\\www.amazon.com", 
       @"http:\\www.bing.com", 
       @"http:\\www.facebook.com", 
       @"http:\\www.twitter.com", 
       @"http:\\www.google.com" 
      }; 

var client = new HttpClient(); 

var contents = urls 
    .ToObservable() 
    .SelectMany(uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute))); 

contents.Subscribe(Console.WriteLine); 

问题:由于SelectMany使用,任务的一大束创建几乎在同一时间。看来,如果URL的列表足够大,很多任务会给超时(我得到“任务被取消”例外)。

所以,我认为应该有一种方法,可能使用某种调度程序来限制并发任务的数量,在给定时间不允许超过5或6个任务。

通过这种方式,我可以获得并发下载,而无需启动太多可能会失速的任务,就像他们现在所做的那样。

如何做到这一点,所以我不饱和大量的超时任务?

非常感谢。

+1

你可能要考虑使用[数据流](https://msdn.microsoft.com/en-us/library/hh228603%28v= vs.110%29.aspx)API。 –

+0

你可以使用我的代码来整合它吗?我忽略了如何使用DataFlow来完成它。 TBH,我从来没有用过,但看一些样品会有很大的帮助。 – SuperJMN

回答

10

还记得SelectMany()实际上是Select().Merge()。虽然SelectMany没有maxConcurrent参数,Merge()的确如此。所以你可以使用它。

从你的例子,你可以这样做:

var urls = new List<string> 
    { 
     @"http:\\www.amazon.com", 
     @"http:\\www.bing.com", 
     @"http:\\www.facebook.com", 
     @"http:\\www.twitter.com", 
     @"http:\\www.google.com" 
    }; 

var client = new HttpClient(); 

var contents = urls 
    .ToObservable() 
    .Select(uri => Observable.FromAsync(() => client.GetStringAsync(uri))) 
    .Merge(2); // 2 maximum concurrent requests! 

contents.Subscribe(Console.WriteLine); 
1

下面是如何你可以用DataFlow API做一个例子:

private static Task DoIt() 
{ 
    var urls = new List<string> 
    { 
     @"http:\\www.amazon.com", 
     @"http:\\www.bing.com", 
     @"http:\\www.facebook.com", 
     @"http:\\www.twitter.com", 
     @"http:\\www.google.com" 
    }; 

    var client = new HttpClient(); 

    //Create a block that takes a URL as input 
    //and produces the download result as output 
    TransformBlock<string,string> downloadBlock = 
     new TransformBlock<string, string>(
      uri => client.GetStringAsync(new Uri(uri, UriKind.Absolute)), 
      new ExecutionDataflowBlockOptions 
      { 
       //At most 2 download operation execute at the same time 
       MaxDegreeOfParallelism = 2 
      }); 

    //Create a block that prints out the result 
    ActionBlock<string> doneBlock = 
     new ActionBlock<string>(x => Console.WriteLine(x)); 

    //Link the output of the first block to the input of the second one 
    downloadBlock.LinkTo(
     doneBlock, 
     new DataflowLinkOptions { PropagateCompletion = true}); 

    //input the urls into the first block 
    foreach (var url in urls) 
    { 
     downloadBlock.Post(url); 
    } 

    downloadBlock.Complete(); //Mark completion of input 

    //Allows consumer to wait for the whole operation to complete 
    return doneBlock.Completion; 
} 

static void Main(string[] args) 
{ 
    DoIt().Wait(); 
    Console.WriteLine("Done"); 
    Console.ReadLine(); 
} 
+0

哇。它看起来非常好,但我想知道如何使用Rx做同样的事情。提前致谢! – SuperJMN

1

你能看到,如果这有助于?

var urls = new List<string> 
     { 
      @"http:\\www.amazon.com", 
      @"http:\\www.bing.com", 
      @"http:\\www.google.com", 
      @"http:\\www.twitter.com", 
      @"http:\\www.google.com" 
     }; 

var contents = 
    urls 
     .ToObservable() 
     .SelectMany(uri => 
      Observable 
       .Using(
        () => new System.Net.Http.HttpClient(), 
        client => 
         client 
          .GetStringAsync(new Uri(uri, UriKind.Absolute)) 
          .ToObservable())); 
+0

对不起,它不能很好地工作。在超时后取消了100个任务:( – SuperJMN

+0

)您可以尝试使用'EventLoopScheduler'吗? – Enigmativity

+0

谢谢!我已经尝试过了,它的表现相同。请看@Dorus的答案,因为它很简单并且按预期工作没有太多的麻烦。 – SuperJMN