2011-07-01 57 views
6

在下面的代码中,我想同步任务列表的结果报告。这是因为task.Result阻塞,直到任务完成。但是,任务ID = 3需要很长时间才能完成,并阻止所有其他已完成的任务报告其状态。没有UI线程的任务同步

我认为我可以通过将报告(Console.Write)移动到.ContinueWith指令中,但我没有UI线程,因此如何获取TaskScheduler来同步.ContinueWith任务?

我现在拥有的一切:

static void Main(string[] args) 
{ 
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId); 

    var tasks = new List<Task<int>>(); 

    for (var i = 0; i < 10; i++) 
    { 
     var num = i; 
     var t = Task<int>.Factory.StartNew(() => 
     { 
      if (num == 3) 
      { 
       Thread.Sleep(20000); 
      } 
      Thread.Sleep(new Random(num).Next(1000, 5000)); 
      Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId); 
      return num; 
     }); 
     tasks.Add(t); 
    } 

    foreach (var task in tasks) 
    { 
     Console.WriteLine("Completed {0} on {1}", task.Result, Thread.CurrentThread.ManagedThreadId); 
    } 

    Console.WriteLine("End of Main"); 
    Console.ReadKey(); 
} 

我想移动到这个或类似的东西,但我需要的Console.Write(“已完成...”),以一切发生在同一线程上:

static void Main(string[] args) 
{ 
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId); 

    for (var i = 0; i < 10; i++) 
    { 
     var num = i; 
     Task<int>.Factory.StartNew(() => 
     { 
      if (num == 3) 
      { 
       Thread.Sleep(20000); 
      } 
      Thread.Sleep(new Random(num).Next(1000, 10000)); 
      Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId); 
      return num; 
     }).ContinueWith(value => 
     { 
      Console.WriteLine("Completed {0} on {1}", value.Result, Thread.CurrentThread.ManagedThreadId); 
     } 

    /* need syncronization context */); 
    } 

    Console.WriteLine("End of Main"); 
    Console.ReadKey(); 
} 

- 解决方案 - 得到一些意见和阅读部分的解决方案,这是完整的解决方案,我想要做什么之后。这里的目标是尽可能快地处理长时间运行的任务,然后对每个任务的结果进行一次处理。

static void Main(string[] args) 
{ 
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId); 

    var results = new BlockingCollection<int>(); 

    Task.Factory.StartNew(() => 
    { 
     while (!results.IsCompleted) 
     { 
      try 
      { 
       var x = results.Take(); 
       Console.WriteLine("Completed {0} on {1}", x, Thread.CurrentThread.ManagedThreadId); 
      } 
      catch (InvalidOperationException) 
      { 
      } 
     } 
     Console.WriteLine("\r\nNo more items to take."); 
    }); 

    var tasks = new List<Task>(); 

    for (var i = 0; i < 10; i++) 
    { 
     var num = i; 
     var t = Task.Factory.StartNew(() => 
     { 
      if (num == 3) 
      { 
       Thread.Sleep(20000); 
      } 
      Thread.Sleep(new Random(num).Next(1000, 10000)); 
      Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId); 
      results.Add(num); 
     }); 

     tasks.Add(t); 
    } 

    Task.Factory.ContinueWhenAll(tasks.ToArray(), _ => results.CompleteAdding()); 

    Console.WriteLine("End of Main"); 
    Console.ReadKey(); 
} 
+0

我假设控制台线程是一个替身的GUI(的WinForms/WPF)。这不是一个好主意,Dispatcher/Messageloop的存在会造成(很大的)差异。 –

+1

否则,请思考“在同一个线程上发生的事情”。除非该线程正在轮询,否则不能这样做。 –

+0

我需要对其进行更改,以便一次只运行一个ContinueWith任务。不管他们是否在同一个线程上运行,但我不能让他们中的两个并行运行。在现实生活中,ContinueWith部分将大量数据写入数据库。 –

回答

1

你必须建立某种形式的作家的任务,但是,请记住,即使任务可以被重新安排到其他本地或托管线程!使用TPL中的默认调度程序,您无法控制哪个托管线程接收工作。

public class ConcurrentConsole 
{ 
    private static BlockingCollection<string> output 
     = new BlockingCollection<string>(); 

    public static Task CreateWriterTask(CancellationToken token) 
    { 
     return new Task(
      () => 
      { 
       while (!token.IsCancellationRequested) 
       { 
        string nextLine = output.Take(token); 
        Console.WriteLine(nextLine); 
       } 
      }, 
      token); 
    } 

    public static void WriteLine(Func<string> writeLine) 
    { 
     output.Add(writeLine()); 
    } 
} 

当我打开你的代码中使用这个我收到了以下的输出:

End of Main 
Done 1 on 6 
Completed 1 on 6 
Done 5 on 9 
Completed 5 on 9 
Done 0 on 4 
Completed 0 on 4 
Done 2 on 5 
Completed 2 on 13 
Done 7 on 10 
Completed 7 on 10 
Done 4 on 8 
Completed 4 on 5 
Done 9 on 12 
Completed 9 on 9 
Done 6 on 6 
Completed 6 on 5 
Done 8 on 11 
Completed 8 on 4 
Done 3 on 7 
Completed 3 on 7 

即使你的代码发送() => String.Format("Completed {0} on {1}"...ConcurrentConsole.WriteLine,确保ManagedThreadId将是对ConcurrentConsole任务回升,它仍然会改变它运行的线程。尽管比正在执行的任务更少变化。

+0

这实际上比我的foreach任务循环更糟糕。在报告其中任何一项的结果之前,它等待所有任务完成。我正在寻找的是一种完成任务并完成该任务并将该报告同步到一个线程的方法。我不希望一个长时间运行的任务阻止其他人报告。 –

+0

@Ryan:我已经将报告添加到了特定的任务中,这是您可以在TPL中保证的最好的任务。 – user7116

+0

这不完全是我如何实现系统,但使用BlockingCollection的想法是实现的核心。 –

0

我建议:

1)创建的锁定对象
2)创建一个字符串列表写入
3)因为它产生的循环,睡了一下线程,然后锁定字符串列表,然后如果它不是空的,写入所有这些并清空列表
4)其他线程然后锁定列表,添加它们的状态,解锁并继续。

object writeListLocker = new object(); 
List<string> linesToWrite = new List<string>(); 

// Main thread loop 
for (; ;) 
{ 
    lock (writerListLocker) 
    { 
     foreach (string nextLine in linesToWrite) 
      Console.WriteLine(nextLine); 
     linesToWrite.Clear(); 
    } 
    Thread.Sleep(500); 
} 

// Reporting threads 
lock (writerListLocker) 
{ 
    linesToWrite.Add("Completed (etc.)"); 
} 
+0

我想继续使用TPL,如果我能够。它看起来并不像上面所说的那样,在一系列线程上执行处理,而在一个线程上同步写入结果。 –

+0

但是,这正是这样做。主线程循环完成所有的写操作。其他线程全部添加到linesToWrite对象而不是写入。 –

1

您可以使用OrderedTaskScheduler确保一次只运行一个任务;但是,它们将在线程池线程上运行(不一定全部在同一个线程上)。

如果你真的需要在同一个线程上(而不是一次一个),那么你可以使用ActionThreadNito.Async library。它为其代码提供SynchronizationContext,可以通过FromCurrentSynchronizationContext找到。

+0

这是我想去的方向。我想将ContinueWith操作放到单个线程中,以确保一次只能运行其中的一个。不知道我是否可以在生产中使用OrderedTaskScheduler。我想使用FromCurrentSynchronizationContext,但我不能因为我处于服务环境(a.k.a.没有UI线程)而SyncronizationContext.Current返回null。 –

+0

我认为'OrderedTaskScheduler'和'ActionThread'都是生产质量。听起来他们中的任何一个都可以解决你的问题。 –

0

我想你会期待像下面这样的结果。

Starting on 8 
Done 1 on 11 
Completed 1 on 9 
Done 5 on 11 
Completed 5 on 9 
Done 0 on 10 
Completed 0 on 9 
Done 2 on 12 
Completed 2 on 9 
Done 7 on 16 
Completed 7 on 9 
Done 4 on 14 
Completed 4 on 9 
Done 9 on 18 
Completed 9 on 9 
Done 6 on 15 
Completed 6 on 9 
Done 8 on 17 
Completed 8 on 9 
Done 3 on 13 
Completed 3 on 9 

如下,我用StaSynchronizationContext在我的代码从the Understanding SynchronizationContext其中一个线程同步调用很好地解释。请参考它。

我的代码片断是:

static void Main(string[] args) 
{ 
    StaSynchronizationContext context = new StaSynchronizationContext(); 
    StaSynchronizationContext.SetSynchronizationContext(context); 
    Console.WriteLine("Starting on {0}", Thread.CurrentThread.ManagedThreadId); 
    for (var i = 0; i < 10; i++) 
    { 
     var num = i; 
     Task<int>.Factory.StartNew(() => 
     { 
      if (num == 3) 
      { 
       Thread.Sleep(20000); 
      } 
      Thread.Sleep(new Random(num).Next(1000, 10000)); 
      Console.WriteLine("Done {0} on {1}", num, Thread.CurrentThread.ManagedThreadId); 
      return num; 
     }).ContinueWith(
     value => 
     { 
      Console.WriteLine("Completed {0} on {1}", value.Result, Thread.CurrentThread.ManagedThreadId); 
     } 
     ,TaskScheduler.FromCurrentSynchronizationContext()); 
    } 
    Console.WriteLine("End of Main"); 
    Console.ReadKey(); 
}