2014-01-29 21 views
12

我有下面的情况下,我认为可能是相当普遍:任务测序和重entracy

  1. 有一个任务(一个UI命令处理程序),它可以完成同步或异步。

  2. 命令可能比他们正在处理的速度快。

  3. 如果已经有一个命令的待处理任务,则新的命令处理程序任务应该排队并按顺序处理。

  4. 每个新任务的结果都可能取决于上一个任务的结果。

取消应该遵守,但为了简单起见,我想让它不在这个问题的范围之内。另外,线程安全(并发)不是必需的,但必须支持重入。

这里是我想要达到一个基本的例子(如一个控制台应用程序,为简单起见):

using System; 
using System.Threading.Tasks; 

namespace ConsoleApp 
{ 
    class Program 
    { 
     static void Main(string[] args) 
     { 
      var asyncOp = new AsyncOp<int>(); 

      Func<int, Task<int>> handleAsync = async (arg) => 
      { 
       Console.WriteLine("this task arg: " + arg); 

       //await Task.Delay(arg); // make it async 

       return await Task.FromResult(arg); // sync 
      }; 

      Console.WriteLine("Test #1..."); 
      asyncOp.RunAsync(() => handleAsync(1000)); 
      asyncOp.RunAsync(() => handleAsync(900)); 
      asyncOp.RunAsync(() => handleAsync(800)); 
      asyncOp.CurrentTask.Wait(); 

      Console.WriteLine("\nPress any key to continue to test #2..."); 
      Console.ReadLine(); 

      asyncOp.RunAsync(() => 
      { 
       asyncOp.RunAsync(() => handleAsync(200)); 
       return handleAsync(100); 
      }); 

      asyncOp.CurrentTask.Wait(); 
      Console.WriteLine("\nPress any key to exit..."); 
      Console.ReadLine(); 
     } 

     // AsyncOp 
     class AsyncOp<T> 
     { 
      Task<T> _pending = Task.FromResult(default(T)); 

      public Task<T> CurrentTask { get { return _pending; } } 

      public Task<T> RunAsync(Func<Task<T>> handler) 
      { 
       var pending = _pending; 
       Func<Task<T>> wrapper = async() => 
       { 
        // await the prev task 
        var prevResult = await pending; 
        Console.WriteLine("\nprev task result: " + prevResult); 
        // start and await the handler 
        return await handler(); 
       }; 

       _pending = wrapper(); 
       return _pending; 
      } 
     } 

    } 
} 

输出:

 
Test #1... 

prev task result: 0 
this task arg: 1000 

prev task result: 1000 
this task arg: 900 

prev task result: 900 
this task arg: 800 

Press any key to continue to test #2... 


prev task result: 800 

prev task result: 800 
this task arg: 200 
this task arg: 100 

Press any key to exit... 

它可以按照要求,直到在测试2中引入再入入:

asyncOp.RunAsync(() => 
{ 
    asyncOp.RunAsync(() => handleAsync(200)); 
    return handleAsync(100); 
}); 

期望的输出应该是100,200,而不是200,100,因为100已有一个挂起的外部任务。这显然是因为内部任务同步执行,打破了外部任务的逻辑var pending = _pending; /* ... */ _pending = wrapper()

如何使它在测试#2中也能工作?

一个解决方案是强制每个任务的异步,Task.Factory.StartNew(..., TaskScheduler.FromCurrentSynchronizationContext()。但是,我不想在可能在内部同步的命令处理程序强加异步执行。另外,我不想依赖任何特定同步上下文的行为(即依靠Task.Factory.StartNew应该在创建的任务实际开始之前返回)。

在现实生活中的项目中,我对AsyncOp位于上面负责,但无法控制命令处理程序(即无论是否在handleAsync内)。

+1

我已经上了很多关注你的名字很多最近好的问题和答案,有新的优质用户来到这个网站是很好的。 –

回答

9

我几乎忘记了可以手动构建Task,而无需启动或调度它。然后,"Task.Factory.StartNew" vs "new Task(...).Start"让我回到正轨。我认为这是那些少数情况下,当Task<TResult>构造实际上可能是有用的,嵌套任务(Task<Task<T>>)和Task.Unwrap()沿:

// AsyncOp 
class AsyncOp<T> 
{ 
    Task<T> _pending = Task.FromResult(default(T)); 

    public Task<T> CurrentTask { get { return _pending; } } 

    public Task<T> RunAsync(Func<Task<T>> handler, bool useSynchronizationContext = false) 
    { 
     var pending = _pending; 
     Func<Task<T>> wrapper = async() => 
     { 
      // await the prev task 
      var prevResult = await pending; 
      Console.WriteLine("\nprev task result: " + prevResult); 
      // start and await the handler 
      return await handler(); 
     }; 

     var task = new Task<Task<T>>(wrapper); 
     var inner = task.Unwrap(); 
     _pending = inner; 

     task.RunSynchronously(useSynchronizationContext ? 
      TaskScheduler.FromCurrentSynchronizationContext() : 
      TaskScheduler.Current); 

     return inner; 
    } 
} 

输出:

 
Test #1... 

prev task result: 0 
this task arg: 1000 

prev task result: 1000 
this task arg: 900 

prev task result: 900 
this task arg: 800 

Press any key to continue to test #2... 


prev task result: 800 
this task arg: 100 

prev task result: 100 
this task arg: 200 

它现在也很容易如果需要,通过添加lock来保护_pending以使AsyncOp线程安全。


更新,下面是最近的这种模式,它使用TaskCompletionSource,是版本线程安全:

/// <summary> 
/// AsyncOperation 
/// By Noseratio - http://stackoverflow.com/a/21427264 
/// </summary> 
/// <typeparam name="T">Task result type</typeparam> 
class AsyncOperation<T> 
{ 
    readonly object _lock = new Object(); 
    Task<T> _currentTask = null; 
    CancellationTokenSource _currentCts = null; 

    // a client of this class (e.g. a ViewModel) has an option 
    // to handle TaskSucceeded or TaskFailed, if needed 
    public event EventHandler<TaskEventArgs> TaskSucceeded = null; 
    public event EventHandler<TaskEventArgs> TaskFailing = null; 

    public Task<T> CurrentTask 
    { 
     get 
     { 
      lock (_lock) 
       return _currentTask; 
     } 
    } 

    public bool IsCurrent(Task task) 
    { 
     lock (_lock) 
      return task == _currentTask; 
    } 

    public bool IsPending 
    { 
     get 
     { 
      lock (_lock) 
       return _currentTask != null && !_currentTask.IsCompleted; 
     } 
    } 

    public bool IsCancellationRequested 
    { 
     get 
     { 
      lock (_lock) 
       return _currentCts != null && _currentCts.IsCancellationRequested; 
     } 
    } 

    public void Cancel() 
    { 
     lock (_lock) 
     { 
      if (_currentTask != null && !_currentTask.IsCompleted) 
       _currentCts.Cancel(); 
     } 
    } 

    /// <summary> 
    /// Start the task routine and observe the result of the previous task routine 
    /// </summary> 
    /// <param name="routine"></param> 
    /// <param name="token"></param> 
    /// <param name="cancelPrevious"></param> 
    /// <param name="throwImmediately"></param> 
    public Task<T> StartAsync(
     Func<CancellationToken, Task<T>> routine, 
     CancellationToken token, 
     bool cancelPrevious = true, 
     bool throwImmediately = true) 
    { 
     Task<T> previousTask = null; // pending instance 
     CancellationTokenSource previousCts = null; // pending instance CTS 

     CancellationTokenSource thisCts = CancellationTokenSource.CreateLinkedTokenSource(token); 
     TaskCompletionSource<T> thisTcs = new TaskCompletionSource<T>(); // this task 
     CancellationToken thisToken; // this task's cancellation Token 
     Task<T> routineTask = null; // as returned by routine 

     lock (_lock) 
     { 
      // remember the _currentTask as previousTask 
      previousTask = _currentTask; 
      previousCts = _currentCts; 

      thisToken = thisCts.Token; 

      // set the new _currentTask 
      _currentTask = thisTcs.Task; 
      _currentCts = thisCts; 
     } 

     Action startAsync = async() => 
     { 
      // because startAsync is "async void" method, 
      // any exception not handled inside it 
      // will be immediately thrown on the current synchronization context, 
      // more details: http://stackoverflow.com/a/22395161/1768303 

      // run and await this task 
      try 
      { 
       // await the previous task instance 
       if (previousTask != null) 
       { 
        if (cancelPrevious) 
         previousCts.Cancel(); 
        try 
        { 
         await previousTask; 
        } 
        catch (OperationCanceledException) 
        { 
         // ignore previous cancellations 
        } 
       } 

       thisToken.ThrowIfCancellationRequested(); 

       routineTask = routine(thisToken); 
       await routineTask; 
      } 
      catch (Exception ex) 
      { 
       // ignore cancellation 
       if (ex is OperationCanceledException) 
       { 
        System.Diagnostics.Debug.Print("Task cancelled, id={0}", thisTcs.Task.Id); 
        thisTcs.SetCanceled(); 
        return; 
       } 

       // fire TaskFailing 
       System.Diagnostics.Debug.Print("Task failing, id={0}", thisTcs.Task.Id); 
       if (this.TaskFailing != null) 
       { 
        var args = new TaskEventArgs(thisTcs.Task, ex); 
        this.TaskFailing(this, args); 
        if (args.Handled) 
        { 
         // exception handled 
         // make thisTcs cancelled rather than faulted 
         thisTcs.SetCanceled(); 
         return; 
        } 
       } 

       // exception unhandled 
       thisTcs.SetException(ex); 
       if (throwImmediately) 
        throw; // rethrow on the current synchronization context 

       // exception should be observed via CurrentTask.Exception 
       return; 
      } 

      // success, fire TaskSucceeded 
      System.Diagnostics.Debug.Print("Task succeded, id={0}", thisTcs.Task.Id); 
      thisTcs.SetResult(routineTask.Result); 
      if (this.TaskSucceeded != null) 
       this.TaskSucceeded(this, new TaskEventArgs(thisTcs.Task)); 
     }; 

     startAsync(); 
     return thisTcs.Task; 
    } 

    // StartAsync with CancellationToken.None 
    public Task<T> StartAsync(
     Func<CancellationToken, Task<T>> routine, 
     bool cancelPrevious = true, 
     bool throwImmediately = true) 
    { 
     return StartAsync(routine, CancellationToken.None, cancelPrevious: true, throwImmediately: true); 
    } 

    /// <summary> 
    /// TaskEventArgs 
    /// </summary> 
    public class TaskEventArgs : EventArgs 
    { 
     public Task<T> Task { get; private set; } 
     public Exception Exception { get; private set; } 
     public bool Handled { get; set; } 

     public TaskEventArgs(Task<T> task, Exception exception = null) 
     { 
      this.Task = task; 
      this.Exception = exception; 
     } 
    } 
}