2017-06-13 37 views
4

我目前正在向Web API提出大量请求。我已尝试async这个过程,以便我可以在合理的时间内完成此操作,但是我无法控制连接,因此我不会发送超过10个请求/秒的连接。我正在使用信号量进行调节,但我不完全确定它在这种情况下会如何工作,因为我有一个嵌套循环。使用循环限制并发异步请求

我基本上是得到一个模型列表,每个模型都有一个列表中的天数。我需要为模型内的每一天提出请求。天数可以从1到约50,99%之间的任何时间,它只会是1。所以我想async每个模型,因为会有大约3000他们,但我想async日的情况下,有多天需要完成。我需要停留在或低于10个请求/秒,所以我认为最好的方法是将整个操作的请求限制设置为10。有没有一个地方可以让信号量限制连接整个链条?

每个单独的请求还必须对2不同的数据段提出两个请求,并且此API现在不支持任何类型的批处理。

我对c#很陌生,对async很新,对WebRequests/HttpClient很新,所以对此有所帮助。我试图在这里添加所有相关的代码。如果你需要其他东西,请告诉我。

public static async Task GetWeatherDataAsync(List<Model> models) 
{ 
    SemaphoreSlim semaphore = new SemaphoreSlim(10); 
    var taskList = new List<Task<ComparisonModel>>(); 

    foreach (var x in models) 
    { 
     await semaphore.WaitAsync(); 
     taskList.Add(CompDaysAsync(x)); 
    } 

    try 
    { 
     await Task.WhenAll(taskList.ToArray()); 
    } 
    catch (Exception e) { } 
    finally 
    { 
     semaphore.Release(); 
    } 
} 

public static async Task<Models> CompDaysAsync(Model model) 
{ 
    var httpClient = new HttpClient(); 
    httpClient.DefaultRequestHeaders.Authorization = new 
       Headers.AuthenticationHeaderValue("Token","xxxxxxxx"); 
    httpClient.Timeout = TimeSpan.FromMinutes(5); 
    var taskList = new List<Task<Models.DateTemp>>(); 

    foreach (var item in model.list) 
    { 
     taskList.Add(WeatherAPI.GetResponseForDayAsync(item, 
      httpClient, Latitude, Longitude)); 
    } 
    httpClient.Dispose(); 
    try 
    { 
     await Task.WhenAll(taskList.ToArray()); 
    } 
    catch (Exception e) { } 

    return model; 
} 

public static async Task<DateTemp> GetResponseForDayAsync(DateTemp date, HttpClient httpClient, decimal? Latitude, decimal? Longitude) 
{ 
    var response = await httpClient.GetStreamAsync(request1); 
    StreamReader myStreamReader = new StreamReader(response); 
    string responseData = myStreamReader.ReadToEnd(); 
    double[] data = new double[2]; 
    if (responseData != "[[null, null]]") 
    { 
     data = Array.ConvertAll(responseData.Replace("[", "").Replace("]", "").Split(','), double.Parse); 
    } 
    else { data = null; }; 

    double precipData = 0; 
    var response2 = await httpClient.GetStreamAsync(request2); 
    StreamReader myStreamReader2 = new StreamReader(response2); 
    string responseData2 = myStreamReader2.ReadToEnd(); 
    if (responseData2 != null && responseData2 != "[null]" && responseData2 != "[0.0]") 
    { 
     precipData = double.Parse(responseData2.Replace("[", "").Replace("]", "")); 
    } 
    date.Precip = precipData; 

    if (data != null) 
    { 
     date.minTemp = data[0]; 
     date.maxTemp = data[1]; 
    } 
    return date; 
} 
+0

我只是在使用'Parallel.ForEach'之前做过类似的事情。使用'ParallelOptions'的重载让你设置'MaxDegreeOfParallelism',但是你需要先用'Enumerable.SelectMany'将每个模型中的日子变平。 – Biscuits

+0

因此,如果我选择了很多我的收藏,那么我会得到一个我所有日子没有与模型本身关系的大名单,虽然,不是吗?它是否天生与他们有联系,还是我需要做一些特别的事情来确保? – DevDevDev

+0

有一个'SelectMany'的重载,它允许您指定一个结果选择器来将来自父对象和元素的信息投影到一个新对象中。 'Linq'语法使它更容易工作。请记住,'Parallel.ForEach'让你以异步的方式运行动作(或任务),你仍然可以在每次迭代中“等待”完成它们。 – Biscuits

回答

1

我认为你完全不明白SemaphoreSlim做什么。

  1. 你的信号是一种方法级的基于局部变量,所以GetWeatherDataAsync方法调用将产生10调用您的API,而无需等待其他客户端。
  2. 此外,你的代码就会死锁,如果models.Count > 10,因为你在等待在每个迭代信号,这些请求被堆叠,并为11th你的线程将永远挂,因为你是不是信号释放:

    var semaphore = new SemaphoreSlim(10); 
    
    foreach (var item in Enumerable.Range(0, 15)) 
    { 
        // will stop after 9 
        await semaphore.WaitAsync(); 
        Console.WriteLine(item); 
    } 
    

你真正需要做的是移动信号,以实例级(甚至TYPE-水平static关键字),并等待它GetWeatherDataAsync,并把Releasefinally块。

至于Parallel.Foreach - 你不应该在这种情况下使用它,因为它不知道async方法(这是async/await之前推出),以及你的方法看起来并不像它们是CPU密集型的。

+0

你对使用“Parallel.ForEach”的观点是错误的。框架库如何在C#语言功能有用之前以某种方式依赖于它? – Biscuits

+0

我的意思是说,并行Foreach不适用于异步方法,因为它不使用异步lambda表达式,只是在第一次返回后将方法标记为已完成。 – VMAtm

+1

哦,我明白你的意思了。所以你无法在每次迭代中“等待”。 – Biscuits