1

我将从一个基本解释我如何理解一对事情的工作,然后用tldr完成全部内容开始,如果人们只是想要达到我在这里遇到的实际问题。如果我对这里的任何事情的理解是错误的,请纠正我。并行使用TPL中预期的每个双线程

TPL代表Task Parallel Library,它是.NET 4.0的答案,旨在进一步简化线程以便于开发人员使用。如果你不熟悉它(在一个非常基础的层次上),你启动一个新的Task对象并将它传递给一个委托,然后在从线程池获取的后台线程上运行(通过使用线程池而不是真正地制作新线程,时间和资源通过使用这些现有线程来保存,而不是创建和处理新线程)。

从我所了解的情况来看,C#中的Parallel.ForEach命令会为它应该执行的每个委托生成一个新线程(很可能来自线程池),并且可能会自动执行一个或多个内联如果编译器确定它们会更快地发生以提高效率,那么可能更多的迭代。

最相关的背景资料,我的目标:

我试图做一个快速的程序,开始了一个任务与程序的其他部分同时运行。在这个任务中,Parallel.ForEach运行3次。总的来说,我们预计程序现在可以运行5个线程(最多):主线程为1,实际任务为1,Parallel.ForEach为3。每个线程都有自己的目标来完成(尽管Parallel.ForEach都有相同的目标,其相关itemNumber的值有不同的计算值,当主线程完成所有目标时,它使用Task.Wait()等待。在完成任务,等待对Parallel.ForEach完成以及随后使用的价值和验证

tldr;实际的问题:

当上述理念运行时,Parallel.ForEach出现因为SynchronizationContexts(本质上是其他线程的TPL对象)的初始化速度是我期望的并且运行它们的两倍,但是只是等待它们的预期数量。因为Parallel.ForEach()。Wait()命令在预期的线程运行数Ta完成然后sk也完成了,因为它认为一切都完成了。然后主程序检测到Task已经完成,并且验证当前没有更多的后台线程正在运行,偶尔剩下的Parrallel.ForEach()还没有完成,因此抛出了错误。

线程的数量已经过验证,可以与我在每个SynchronizationContext的post调用(异步方法kicker)上打印到调试窗口中的内容相匹配。每个线程也被一个主线程对象引用,否则它计划在完成任务时被处置掉,但由于尚未真正期望创建未完成的线程,所以引用仍然存在,因此处理无法正常进行。

Thread testThread = Thread.CurrentThread; 
Task backgroundTask = taskFactory.StartNew(() => 
{ 
    Thread rootTaskThread = Thread.CurrentThread; 
    Assert.AreNotEqual(testThread, rootTaskThread, "First task should not inline"); 
    Thread.Sleep(TimeSpan.FromSeconds(2)); 

    Parallel.ForEach(new[] { 1, 2, 3, 4 }, 
     new ParallelOptions { TaskScheduler = taskFactory.Scheduler }, (int item) => { 
     Thread.Sleep(TimeSpan.FromSeconds(1)); 
    }); 
}); 

在上述例子中,主线程,所述backgroundTask任务,和8个Parallel.ForEach线程最终现有,其中最后9是在SynchronizationContexts创建。

中的SynchronizationContext重写我的自定义的唯一方法是邮电如下:

public override void Post(SendOrPostCallback d, object state){ 
    Request requestOrNull = Request.ExistsForCurrentThread() ? Request.GetForCurrentThread() as Request : null; 
    Request.IAsyncContextData requestData = null; 

    if (requestOrNull != null){ 
     requestData = requestOrNull.CaptureDataForNewThreadAndIncrementReferenceCount(); 
    } 

    Debug.WriteLine("Task started - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS")); 

    base.Post((object internalState) => { 
     // Capture the spawned thread state and restore the originating thread state 
     try{ 
      if (requestData != null){ 
       Request.AttachToAsynchronousContext(requestData); 
      } 
      d(state); 
     } 
     finally{ 
      // Restore original spawned thread state 
      if (requestData != null){ 
      // Disposes the request if this is the last reference to it 
       Request.DetachFromAsynchronousContext(requestData); 
      } 
     Debug.WriteLine("Task completed - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS")); 
     } 
    }, state); 
} 

的TaskScheduler,我相信是这样做只是它所需要的基本的东西:

private readonly RequestSynchronizationContext context; 
private readonly ConcurrentQueue<Task> tasks = new ConcurrentQueue<Task>(); 

public RequestTaskScheduler(RequestSynchronizationContext synchronizationContext) 
{ 
    this.context = synchronizationContext; 
} 

protected override void QueueTask(Task task){ 
    this.tasks.Enqueue(task); 
    this.context.Post((object state) => { 
     Task nextTask; 
     if (this.tasks.TryDequeue(out nextTask)) 
      this.TryExecuteTask(nextTask); 
    }, null); 
} 

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){ 
    if (SynchronizationContext.Current == this.context) 
     return this.TryExecuteTask(task); 
    else 
     return false; 
} 

protected override IEnumerable<Task> GetScheduledTasks(){ 
    return this.tasks.ToArray(); 
} 

TaskFactory :

public RequestTaskFactory(RequestTaskScheduler taskScheduler) 
    : base(taskScheduler) 
{ } 

关于为什么会发生这种情况的任何想法?

+0

您能发布一个简短但完整的示例来显示您的行为吗?特别是你的'SynchronizationContext'(我假设你正在使用一个自定义的)。因为我没有看到你描述的行为。另外,我真的不明白为什么使用'Post()'超过您的预期会给您带来任何问题?你的'SynchronizationContext'是否有一些依赖于此的特殊行为? – svick 2012-08-08 16:54:28

+0

你想通过这样做完成什么? 'Assert.AreNotEqual'表明你正在编写某种单元测试。但是,'Assert.AreNotEqual'确实验证了TPL,而不是你的代码 - 所以,我没有看到验证第三方代码的意义。 – 2012-08-10 18:47:25

回答

0

任务本身不会创建线程。 TaskScheduler决定如何操作以使操作异步(如果可以的话)。例如一些操作使用异步IO,在这种情况下,硬件使其异步,而不是另一个工作线程。

连续调用的方式取决于同步上下文。该上下文不是另一个线程,它只是抽象操作可以运行的标准。在WPF,WinForms,Silverlight等中,例如有一个UI同步上下文需要在特定线程(UI线程或主线程,以避免异常)上执行操作。

ForEach将尝试来创建线程(更具体地说,它会尝试询问同步上下文以启动多个异步操作)。调度程序确定它是如何做到的。如果你给它三项任务,它可能是创建三个线程或它可能不是。它决定三个并发线程是否是好事。例如,如果您只有两个内核,ForEach不会创建两个以上的线程,因为使用单个线程并由于上下文切换开销而按顺序运行代码可能会更糟糕。

目前还不清楚“初始化SynchronizationContext的两倍”是什么意思。这些不是线程。你是否仅仅意味着它创造的线程超出了你的预期?或者你的意思是邮政被称为超过你的预期?什么是你的SynchronizationContext类基于? (即它的基类是什么)。基地确实在很大程度上定义了Post调用将会如何。它可能觉得需要创建另一个异步操作来跟踪其他操作......您如何让调度程序使用此上下文?

SynchronizationContext在TPL(首先出现在.NET 2.0中)之前就已经存在。它所做的一件事是管理异步操作请求。从您发帖后不清楚您是否理解这一点。

更新: 第一次调用QueueTask来自StartNew。 对QueueTask的第二次调用来自间接调用ForEach 第三次调用QueueTask间接来自QueueTask中的TryExecuteTask 对QueueTask的后续4次调用是将正文传递给ForEach。

根据负载的不同,QueueTask可能最多再调用3次。如果我在QueueTask上调试并中断,则QueueTask只会被调用7次。

此时,由于您在QueueTask中执行与TPL不同的操作(即,TryExecuteTask调用额外的操作)很难说为什么有时会有一些额外的QueueTask调用。这可能来自您实施QueueTask的方式,因为您正在有效地要求调度程序从已异步执行的任务中排队另一个异步任务。我的猜测是,这只是时机。可以快速调用QueueTask(因为这是从另一个异步操作完成的),TryExecuteTask不知道任务已排队,并强制任务执行(强制调用另一个调用QueueTask)。

如果QueueTask事实上正在导致对QueueTask的另一个调用,因为它尚未调度它所在的任务,这就解释了为什么最多有10个对QueueTask的调用。这是TryExecteTask调用导致每个ForEach正文“双”调用...

+0

My SynchronizationContext直接扩展了基本的SynchronizationContext,除了post之外,没有其他方法被覆盖。 是的,帖子被称为超过我的预期。对于ForEach循环中的每次迭代,都会进行2次post调用。或者最起码,它是这样的:一个ForEach大小为3的结果总共有7个post调用,size 4的结果是9个post调用,等等...... 基本的TaskScheduler也用于基本的TaskFactory,没有no重写的方法。 你是对的,我没有意识到SynchronizationContext在TPL之前就已经存在。 – 2012-08-09 15:13:29

+0

TaskScheduler是一个抽象类;你如何实现这些抽象方法?另外,你如何实现TaskFactory?你在做与TPL不同的事情。你如何设置你的TaskFactory的调度器属性?我不知道你是否在使用默认的任务调度器(它是SynchronizationContextTaskScheduler或ThreadPoolTask​​Scheduler),所以它看起来就像你在比较苹果和橙子。 – 2012-08-09 15:53:44

+0

添加了我的TaskScheduler实现,TaskFactory与我添加的一样简单。至于任何TaskFactory调度器属性,这将是我不知道必须做的事情? – 2012-08-09 16:55:36