2016-01-22 133 views
0

基本上我试图能够限制迭代列表的执行。c#限制每次迭代

我真的很喜欢使用RX的想法,因为我可以建立它的顶部,并有一个更优雅的解决方案,但它不必使用RX来完成。

我已经制定了这个在很多比我聪明的帮助下。我的问题是,我想能够说someCollection.RateLimitedForEach(速度,函数),并最终阻止它,直到我们完成处理...或者让它成为异步方法。

该函数下面的演示,在控制台应用程序中工作,但如果我在foreach后关闭,它会立即返回。

我只是有种不知所措,这是否是可以解决,或者我应该去了解它完全不同

public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action) 
{ 
    list.ToObservable().Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v) 
    .Do(action).Subscribe(); 
} 

//rate limits iteration of foreach... keep in mind this is not the same thing as just sleeping for a second 
//between each iteration, this is saying at the start of the next iteration, if minimum delay time hasnt past, hold until it has 
var maxRequestsPerMinute = 60; 
requests.RateLimitedForeach(60/maxRequestsPerMinute,(request) => SendRequest(request)); 
+0

把东西,并让消费者把他们按照自己的速度(与任何异步机制选择,选择包括 - 但不限于 - 线程池:

试试这个,任务,计时器,后台线程...)。 – Theraot

回答

0

你的代码是再好不过了。在队列

public static void RateLimitedForEach<T>(this List<T> list, double minumumDelay, Action<T> action) 
{ 
    list 
     .ToObservable() 
     .Zip(Observable.Interval(TimeSpan.FromSeconds(minumumDelay)), (v, _) => v) 
     .Do(action) 
     .ToArray() 
     .Wait(); 
} 
+0

谢谢!这工作,现在只有当我真正了解到底发生了什么=) – Ronnyek

+0

@Ronnyek - 无后顾之忧。我可以帮忙解释一下,但是你知道些什么? – Enigmativity

4

,但它不会被使用RX

完成

这里是你如何能做到这同步:

public static void RateLimitedForEach<T>(
    this List<T> list, 
    double minumumDelay, 
    Action<T> action) 
{ 
    foreach (var item in list) 
    { 
     Stopwatch sw = Stopwatch.StartNew(); 

     action(item); 

     double left = minumumDelay - sw.Elapsed.TotalSeconds; 

     if(left > 0) 
      Thread.Sleep(TimeSpan.FromSeconds(left)); 
    } 
} 

这里是你如何能做到这异步(唯一潜在的等待都是异步):

public static async Task RateLimitedForEachAsync<T>(
    this List<T> list, 
    double minumumDelay, 
    Action<T> action) 
{ 
    foreach (var item in list) 
    { 
     Stopwatch sw = Stopwatch.StartNew(); 

     action(item); 

     double left = minumumDelay - sw.Elapsed.TotalSeconds; 

     if (left > 0) 
      await Task.Delay(TimeSpan.FromSeconds(left)); 
    } 
}  

请注意,您可以更改异步版本中做出的动作是自我异步像这样:

public static async Task RateLimitedForEachAsync<T>(
    this List<T> list, 
    double minumumDelay, 
    Func<T,Task> async_task_func) 
{ 
    foreach (var item in list) 
    { 
     Stopwatch sw = Stopwatch.StartNew(); 

     await async_task_func(item); 

     double left = minumumDelay - sw.Elapsed.TotalSeconds; 

     if (left > 0) 
      await Task.Delay(TimeSpan.FromSeconds(left)); 

    } 
}  

这是有益的,如果动作需要在每个项目上运行是异步的。

的最后一个版本,可以使用这样的:

List<string> list = new List<string>(); 

list.Add("1"); 
list.Add("2"); 

var task = list.RateLimitedForEachAsync(1.0, async str => 
{ 
    //Do something asynchronous here, e.g.: 
    await Task.Delay(500); 
    Console.WriteLine(DateTime.Now + ": " + str); 
}); 

现在你应该等待task完成。如果是这样的Main方法,那么你就需要同步等待这样的:

task.Wait(); 

在另一方面,如果你是一个异步方法里面,那么你就需要异步等待这样的:

await task; 
+0

@Theraot,我更新了答案。 –

+0

我注意到,删除了我的评论 - 因为它不再相关,并投票。 – Theraot

0

,你需要得到翻过这一概念,是主线程未在等待你的RateLimitedForEach调用来完成。另外 - 在您的控制台应用程序中 - 一旦主线程结束,过程结束。

这是什么意思?这意味着这个过程将会结束,不管观察者是否在RateLimitedForEach已经完成执行。

注意:用户可能仍然强制执行你的应用程序完成,,这是一件好事。如果您希望能够在不挂断用户界面的情况下等待,则可以使用表单应用程序,如果您不希望用户关闭与该流程相关的窗口,则可以使用服务。


使用任务是一个superios solution我下面介绍。

请注意,在控制台应用程序上使用任务时,您仍需要等待任务以防止主线程在RateLimitedForEach完成其作业之前完成。仍然建议从控制台应用程序移开。


如果您在使用代码继续坚持,你可以调整它为它挂调用线程,直到完成:

public static void RateLimitedForEach<T> 
(
    this List<T> list, 
    double minumumDelay, 
    Action<T> action 
) 
{ 
    using (var waitHandle = new ManualResetEventSlim(false)) 
    { 

     var mainObservable = list.ToObservable(); 
     var intervalObservable = Observable.Interval(TimeSpan.FromSeconds(minumumDelay)); 
     var zipObservable = mainObservable .Zip(intervalObservable, (v, _) => v); 
     zipObservable.Subscribe 
     (
      action, 
      error => GC.KeepAlive(error), // Ingoring them, as you already were 
      () => waitHandle.Set() // <-- "Done signal" 
     ); 

     waitHandle.Wait(); // <--- Waiting on the observer to complete 
    } 
}