-1

如果我将maxConcurrency作为10传递,下面的代码将最大并行任务限制为10?如何验证一次运行的任务数量?在下面的代码上下文中,SemaphoreSlim的用法是否正确?它会限制最大运行线程吗?

public BlockingCollection<Task> _workTaskQueue; 

public void DequeueTask(int maxConcurrency) 
{ 
    var tasks = new List<Task>(); 
    using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) 
    { 
     foreach (var task in _workTaskQueue.GetConsumingEnumerable()) 
     { 
      concurrencySemaphore.Wait(); 
      if (!(task.IsCanceled) && task.Status == TaskStatus.Created) 
      { 
       task.ContinueWith((t) => { concurrencySemaphore.Release(); }); 
       tasks.Add(task); 
       task.Start(); 
      } 
     } 
    } 
    Task.WaitAll(tasks.ToArray()); 
} 
+1

参见https://stackoverflow.com/questions/14075029/have-a-set-of-tasks-with-only-x-running-at-a-time –

+0

非常感谢它帮助! – Nitheesh

+0

[有一组任务只能运行一次X](https://stackoverflow.com/questions/14075029/have-a-set-of-tasks-with-only-x-running-at -a-time) – Cheesebaron

回答

1

由于您利用my other answer改变了一点逻辑,我为您准备了测试代码。 (无需var tasks = new List<Task>();

Random rnd = new Random(); 
int maxConcurrency = 5; 
var _workTaskQueue = new System.Collections.Concurrent.BlockingCollection<Task>(); 
for (int i = 0; i < 250; i++) 
{ 
    //Tasks running 250=500ms 
    _workTaskQueue.Add(new Task(()=> { Task.Delay(250 + rnd.Next(250)).Wait(); })); 
} 
_workTaskQueue.CompleteAdding(); 
int runningTaks = 0; 

using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency)) 
{ 
    foreach (var task in _workTaskQueue.GetConsumingEnumerable()) 
    { 
     Console.WriteLine("LoopStart: " + runningTaks); 

     await concurrencySemaphore.WaitAsync(); 

     Console.WriteLine("GotASem : " + runningTaks); 

     task.Start(); 
     Interlocked.Increment(ref runningTaks); 
     task.ContinueWith(t => 
      { 
       Interlocked.Decrement(ref runningTaks); 
       concurrencySemaphore.Release(); 
      }); 

     if (runningTaks > maxConcurrency) throw new Exception("ERROR"); 
     Console.WriteLine("LoopEnd : " + runningTaks + Environment.NewLine); 
    } 

    Console.WriteLine("Finalizing: " + runningTaks); 
    //Make sure all all tasks have ended. 
    for (int i = 0; i < maxConcurrency; i++) 
    { 
     await concurrencySemaphore.WaitAsync(); 
    } 

    Console.WriteLine("Finished: " + runningTaks); 
} 
+0

谢谢L.B.太棒了。我有一个小问题:万一如果我的maxConcurrency设置为100将循环“await concurrencySemaphore.WaitAsync();”仍然有效的表现明智或我以前的task.waitall()更好? – Nitheesh

+0

设置你想要* maxConcurrency *。没问题。 –

0

否。等待呼叫(进入互斥块)缺失。

+0

Sunil,我修改了包含wait的代码,它会工作吗?我如何验证其工作? – Nitheesh

+0

创建比共享并发更多的线程,> 10并在任务中诱发大约一秒或更长时间的睡眠并进行控制台打印。 10打印报表将被看到并发布,他们将被推迟 –

相关问题