2012-11-14 51 views
5

我很难找到一个任务调度程序,我可以在该任务调度程序上安排优先任务,但也可以处理“包装”任务。它类似于Task.Run试图解决的问题,但不能将任务调度程序指定为Task.Run。 我一直在使用Parallel Extensions Extras Samples中的QueuedTaskScheduler来解决任务优先级要求(也由此post建议)。处理包装任务的有限并发级别任务调度程序(带任务优先级)

这是我的例子:

class Program 
{ 
    private static QueuedTaskScheduler queueScheduler = new QueuedTaskScheduler(targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1); 
    private static TaskScheduler ts_priority1; 
    private static TaskScheduler ts_priority2; 
    static void Main(string[] args) 
    { 
     ts_priority1 = queueScheduler.ActivateNewQueue(1); 
     ts_priority2 = queueScheduler.ActivateNewQueue(2); 

     QueueValue(1, ts_priority2); 
     QueueValue(2, ts_priority2); 
     QueueValue(3, ts_priority2); 
     QueueValue(4, ts_priority1); 
     QueueValue(5, ts_priority1); 
     QueueValue(6, ts_priority1); 

     Console.ReadLine();   
    } 

    private static Task QueueTask(Func<Task> f, TaskScheduler ts) 
    { 
     return Task.Factory.StartNew(f, CancellationToken.None, TaskCreationOptions.HideScheduler | TaskCreationOptions.DenyChildAttach, ts); 
    } 

    private static Task QueueValue(int i, TaskScheduler ts) 
    { 
     return QueueTask(async() => 
     { 
      Console.WriteLine("Start {0}", i); 
      await Task.Delay(1000); 
      Console.WriteLine("End {0}", i); 
     }, ts); 
    } 
} 

上面的例子的典型输出是:

Start 4 
Start 5 
Start 6 
Start 1 
Start 2 
Start 3 
End 4 
End 3 
End 5 
End 2 
End 1 
End 6 

我要的是:

Start 4 
End 4 
Start 5 
End 5 
Start 6 
End 6 
Start 1 
End 1 
Start 2 
End 2 
Start 3 
End 3 

编辑:

我想我正在寻找一个类似于QueuedTaskScheduler的任务调度程序,它可以解决这个问题。但欢迎任何其他建议。

+0

那么,你想要的是处理任务的优先级,但不能在并行模式下运行它们?你能不能限制调度程序中并发线程的数量? – Kek

+0

@Kek'new QueuedTaskScheduler(targetScheduler:TaskScheduler.Default,maxConcurrencyLevel:1);'上面完全是这样(限制并发线程的数量为1) –

回答

2

我能找到的最好的解决办法是让我自己QueuedTaskScheduler(在Parallel Extensions Extras Samples源代码中发现原)的版本。

我在QueuedTaskScheduler的构造函数中添加了一个bool awaitWrappedTasks参数。

public QueuedTaskScheduler(
     TaskScheduler targetScheduler, 
     int maxConcurrencyLevel, 
     bool awaitWrappedTasks = false) 
{ 
    ... 
    _awaitWrappedTasks = awaitWrappedTasks; 
    ... 
} 

public QueuedTaskScheduler(
     int threadCount, 
     string threadName = "", 
     bool useForegroundThreads = false, 
     ThreadPriority threadPriority = ThreadPriority.Normal, 
     ApartmentState threadApartmentState = ApartmentState.MTA, 
     int threadMaxStackSize = 0, 
     Action threadInit = null, 
     Action threadFinally = null, 
     bool awaitWrappedTasks = false) 
{ 
    ... 
    _awaitWrappedTasks = awaitWrappedTasks; 

    // code starting threads (removed here in example) 
    ... 
} 

我然后修改ProcessPrioritizedAndBatchedTasks()方法为async

private async void ProcessPrioritizedAndBatchedTasks() 

我然后只是其中执行预定任务的部件之后修改的代码:

private async void ProcessPrioritizedAndBatchedTasks() 
{ 
    bool continueProcessing = true; 
    while (!_disposeCancellation.IsCancellationRequested && continueProcessing) 
    { 
     try 
     { 
      // Note that we're processing tasks on this thread 
      _taskProcessingThread.Value = true; 

      // Until there are no more tasks to process 
      while (!_disposeCancellation.IsCancellationRequested) 
      { 
       // Try to get the next task. If there aren't any more, we're done. 
       Task targetTask; 
       lock (_nonthreadsafeTaskQueue) 
       { 
        if (_nonthreadsafeTaskQueue.Count == 0) break; 
        targetTask = _nonthreadsafeTaskQueue.Dequeue(); 
       } 

       // If the task is null, it's a placeholder for a task in the round-robin queues. 
       // Find the next one that should be processed. 
       QueuedTaskSchedulerQueue queueForTargetTask = null; 
       if (targetTask == null) 
       { 
        lock (_queueGroups) FindNextTask_NeedsLock(out targetTask, out queueForTargetTask); 
       } 

       // Now if we finally have a task, run it. If the task 
       // was associated with one of the round-robin schedulers, we need to use it 
       // as a thunk to execute its task. 
       if (targetTask != null) 
       { 
        if (queueForTargetTask != null) queueForTargetTask.ExecuteTask(targetTask); 
        else TryExecuteTask(targetTask); 

        // ***** MODIFIED CODE START **** 
        if (_awaitWrappedTasks) 
        { 
         var targetTaskType = targetTask.GetType(); 
         if (targetTaskType.IsConstructedGenericType && typeof(Task).IsAssignableFrom(targetTaskType.GetGenericArguments()[0])) 
         { 
          dynamic targetTaskDynamic = targetTask; 
          // Here we await the completion of the proxy task. 
          // We do not await the proxy task directly, because that would result in that await will throw the exception of the wrapped task (if one existed) 
          // In the continuation we then simply return the value of the exception object so that the exception (stored in the proxy task) does not go totally unobserved (that could cause the process to crash) 
          await TaskExtensions.Unwrap(targetTaskDynamic).ContinueWith((Func<Task, Exception>)(t => t.Exception), TaskContinuationOptions.ExecuteSynchronously); 
         } 
        } 
        // ***** MODIFIED CODE END **** 
       } 
      } 
     } 
     finally 
     { 
      // Now that we think we're done, verify that there really is 
      // no more work to do. If there's not, highlight 
      // that we're now less parallel than we were a moment ago. 
      lock (_nonthreadsafeTaskQueue) 
      { 
       if (_nonthreadsafeTaskQueue.Count == 0) 
       { 
        _delegatesQueuedOrRunning--; 
        continueProcessing = false; 
        _taskProcessingThread.Value = false; 
       } 
      } 
     } 
    } 
} 

方法的变化ThreadBasedDispatchLoop有点不同,因为我们不能使用async关键字,否则我们会打破ex的功能在专用线程中处理预定任务。所以这里的ThreadBasedDispatchLoop

private void ThreadBasedDispatchLoop(Action threadInit, Action threadFinally) 
{ 
    _taskProcessingThread.Value = true; 
    if (threadInit != null) threadInit(); 
    try 
    { 
     // If the scheduler is disposed, the cancellation token will be set and 
     // we'll receive an OperationCanceledException. That OCE should not crash the process. 
     try 
     { 
      // If a thread abort occurs, we'll try to reset it and continue running. 
      while (true) 
      { 
       try 
       { 
        // For each task queued to the scheduler, try to execute it. 
        foreach (var task in _blockingTaskQueue.GetConsumingEnumerable(_disposeCancellation.Token)) 
        { 
         Task targetTask = task; 
         // If the task is not null, that means it was queued to this scheduler directly. 
         // Run it. 
         if (targetTask != null) 
         { 
          TryExecuteTask(targetTask); 
         } 
         // If the task is null, that means it's just a placeholder for a task 
         // queued to one of the subschedulers. Find the next task based on 
         // priority and fairness and run it. 
         else 
         { 
          // Find the next task based on our ordering rules...          
          QueuedTaskSchedulerQueue queueForTargetTask; 
          lock (_queueGroups) FindNextTask_NeedsLock(out targetTask, out queueForTargetTask); 

          // ... and if we found one, run it 
          if (targetTask != null) queueForTargetTask.ExecuteTask(targetTask); 
         } 

         if (_awaitWrappedTasks) 
         { 
          var targetTaskType = targetTask.GetType(); 
          if (targetTaskType.IsConstructedGenericType && typeof(Task).IsAssignableFrom(targetTaskType.GetGenericArguments()[0])) 
          { 
           dynamic targetTaskDynamic = targetTask; 
           // Here we wait for the completion of the proxy task. 
           // We do not wait for the proxy task directly, because that would result in that Wait() will throw the exception of the wrapped task (if one existed) 
           // In the continuation we then simply return the value of the exception object so that the exception (stored in the proxy task) does not go totally unobserved (that could cause the process to crash) 
           TaskExtensions.Unwrap(targetTaskDynamic).ContinueWith((Func<Task, Exception>)(t => t.Exception), TaskContinuationOptions.ExecuteSynchronously).Wait(); 
          } 
         } 
        } 
       } 
       catch (ThreadAbortException) 
       { 
        // If we received a thread abort, and that thread abort was due to shutting down 
        // or unloading, let it pass through. Otherwise, reset the abort so we can 
        // continue processing work items. 
        if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload()) 
        { 
         Thread.ResetAbort(); 
        } 
       } 
      } 
     } 
     catch (OperationCanceledException) { } 
    } 
    finally 
    { 
     // Run a cleanup routine if there was one 
     if (threadFinally != null) threadFinally(); 
     _taskProcessingThread.Value = false; 
    } 
} 

我已经测试这个修改后的版本,它提供了所需的输出。这种技术也可以用于其他任何调度器。例如。 LimitedConcurrencyLevelTaskSchedulerOrderedTaskScheduler

+0

等待调度程序中的任务将破坏异步IO的值。如果你不需要异步IO,你可以切换到同步任务主体。 – usr

+0

+1。我在这个问题上学到了很多东西。不完全相信这种解决方案比“AsyncSemaphore”更可取,但我会考虑这一点。 – usr

+0

您正在执行'TaskScheduler'实现中的'async-void'方法吗?可怕的是,我不知道@StephenCleary帽子对此没有什么可说的。 – springy76

0

我认为实现这个目标是不可能的。一个核心问题似乎是TaskScheduler只能用于运行代码。但是有些任务不运行代码,例如IO任务或计时器任务。我不认为TaskScheduler基础设施可以用来安排这些。

从的TaskScheduler的角度看,它看起来像这样:

1. Select a registered task for execution 
2. Execute its code on the CPU 
3. Repeat 

步骤(2)是同步的,这意味着Task要执行必须开始和结束作为步骤的部分(2)。这意味着这个Task不能做异步IO,因为那是非阻塞的。从这个意义上说,TaskScheduler只支持阻塞码。

我想你会得到最好的实施你自己的版本AsyncSemaphore释放服务员在优先顺序,并进行限制。您的异步方法可以以非阻塞方式等待该信号量。所有CPU工作都可以在默认线程池上运行,因此不需要在自定义TaskScheduler内部启动自定义线程。 IO任务可以继续使用非阻塞IO。

+0

你在这里解释了我已经试过了,它基本上有相同的输出如原来的问题)。在你的建议中'firstPartTask'在排队的任务调度器上被调度,但是一旦它碰到第一个'await'并且调度器简单地执行队列中的下一个“第一部分”,即使前面的“内部任务”(在第一次“等待”之后重置任务)尚未完成。我只能认为这将通过一个**调度程序**来解决,这个调度程序处理我正在寻找的这个场景,并且无法通过调度程序外的某些魔法来解决。 –

+0

我来相信你是对的。我添加了一些想法和建议。请让我知道你在想什么。 – usr

+0

感谢您的更新。您使用信号量锁的建议正是用户在以下[答案](http://stackoverflow.com/a/13379980/1514235)中建议的内容(请参阅我的意见)。您建议调度程序仅同步执行其任务的方式有些不正确,但如果调度程序在执行任何其他任务之前等待每个任务的“包装”任务,该怎么办?我认为这给了我一个想法...谢谢(如果我想出点什么,会让你知道)。 –

3

不幸的是,这不能用TaskScheduler解决,因为他们总是在Task水平工作,以及async方法几乎总是包含多个Task秒。

您应该将SemaphoreSlim与优先级调度程序结合使用。或者,您可以使用AsyncLock(这也包含在我的AsyncEx library中)。

class Program 
{ 
    private static QueuedTaskScheduler queueScheduler = new QueuedTaskScheduler(targetScheduler: TaskScheduler.Default, maxConcurrencyLevel: 1); 
    private static TaskScheduler ts_priority1; 
    private static TaskScheduler ts_priority2; 
    private static SemaphoreSlim semaphore = new SemaphoreSlim(1); 
    static void Main(string[] args) 
    { 
    ts_priority1 = queueScheduler.ActivateNewQueue(1); 
    ts_priority2 = queueScheduler.ActivateNewQueue(2); 

    QueueValue(1, ts_priority2); 
    QueueValue(2, ts_priority2); 
    QueueValue(3, ts_priority2); 
    QueueValue(4, ts_priority1); 
    QueueValue(5, ts_priority1); 
    QueueValue(6, ts_priority1); 

    Console.ReadLine();   
    } 

    private static Task QueueTask(Func<Task> f, TaskScheduler ts) 
    { 
    return Task.Factory.StartNew(f, CancellationToken.None, TaskCreationOptions.HideScheduler | TaskCreationOptions.DenyChildAttach, ts).Unwrap(); 
    } 

    private static Task QueueValue(int i, TaskScheduler ts) 
    { 
    return QueueTask(async() => 
    { 
     await semaphore.WaitAsync(); 
     try 
     { 
     Console.WriteLine("Start {0}", i); 
     await Task.Delay(1000); 
     Console.WriteLine("End {0}", i); 
     } 
     finally 
     { 
     semaphore.Release(); 
     } 
    }, ts); 
    } 
} 
+1

这看起来像一个有趣的解决方案。但是,我看到了这个问题。尽管解决方案(首先)会导致正确的输出(如在这个问题中),但它会破坏已执行任务的优先级。调度程序将执行所有任务(以正确的优先级),直到'await semaphore.WaitAsync()',但具有较高优先级的任务将不会从优先级较低的任务之前的锁中释放。如果在优先级较低的任务(仍在等待从锁释放)之后安排更高优先级的任务,则尤其如此。 –

+0

在这种情况下,您将需要一个实际的基于优先级的锁定,因为AFAIK没有其他人需要锁定,所以这个锁定不存在。你必须建立自己的。 –

+0

我已经添加了我自己的[答案](http://stackoverflow.com/a/13414364/1514235)。请看看你的想法。 –

相关问题