我将从一个基本解释我如何理解一对事情的工作,然后用tldr完成全部内容开始,如果人们只是想要达到我在这里遇到的实际问题。如果我对这里的任何事情的理解是错误的,请纠正我。并行使用TPL中预期的每个双线程
TPL代表Task Parallel Library,它是.NET 4.0的答案,旨在进一步简化线程以便于开发人员使用。如果你不熟悉它(在一个非常基础的层次上),你启动一个新的Task对象并将它传递给一个委托,然后在从线程池获取的后台线程上运行(通过使用线程池而不是真正地制作新线程,时间和资源通过使用这些现有线程来保存,而不是创建和处理新线程)。
从我所了解的情况来看,C#中的Parallel.ForEach命令会为它应该执行的每个委托生成一个新线程(很可能来自线程池),并且可能会自动执行一个或多个内联如果编译器确定它们会更快地发生以提高效率,那么可能更多的迭代。
最相关的背景资料,我的目标:
我试图做一个快速的程序,开始了一个任务与程序的其他部分同时运行。在这个任务中,Parallel.ForEach运行3次。总的来说,我们预计程序现在可以运行5个线程(最多):主线程为1,实际任务为1,Parallel.ForEach为3。每个线程都有自己的目标来完成(尽管Parallel.ForEach都有相同的目标,其相关itemNumber的值有不同的计算值,当主线程完成所有目标时,它使用Task.Wait()等待。在完成任务,等待对Parallel.ForEach完成以及随后使用的价值和验证
tldr;实际的问题:
当上述理念运行时,Parallel.ForEach出现因为SynchronizationContexts(本质上是其他线程的TPL对象)的初始化速度是我期望的并且运行它们的两倍,但是只是等待它们的预期数量。因为Parallel.ForEach()。Wait()命令在预期的线程运行数Ta完成然后sk也完成了,因为它认为一切都完成了。然后主程序检测到Task已经完成,并且验证当前没有更多的后台线程正在运行,偶尔剩下的Parrallel.ForEach()还没有完成,因此抛出了错误。
线程的数量已经过验证,可以与我在每个SynchronizationContext的post调用(异步方法kicker)上打印到调试窗口中的内容相匹配。每个线程也被一个主线程对象引用,否则它计划在完成任务时被处置掉,但由于尚未真正期望创建未完成的线程,所以引用仍然存在,因此处理无法正常进行。
Thread testThread = Thread.CurrentThread;
Task backgroundTask = taskFactory.StartNew(() =>
{
Thread rootTaskThread = Thread.CurrentThread;
Assert.AreNotEqual(testThread, rootTaskThread, "First task should not inline");
Thread.Sleep(TimeSpan.FromSeconds(2));
Parallel.ForEach(new[] { 1, 2, 3, 4 },
new ParallelOptions { TaskScheduler = taskFactory.Scheduler }, (int item) => {
Thread.Sleep(TimeSpan.FromSeconds(1));
});
});
在上述例子中,主线程,所述backgroundTask任务,和8个Parallel.ForEach线程最终现有,其中最后9是在SynchronizationContexts创建。
中的SynchronizationContext重写我的自定义的唯一方法是邮电如下:
public override void Post(SendOrPostCallback d, object state){
Request requestOrNull = Request.ExistsForCurrentThread() ? Request.GetForCurrentThread() as Request : null;
Request.IAsyncContextData requestData = null;
if (requestOrNull != null){
requestData = requestOrNull.CaptureDataForNewThreadAndIncrementReferenceCount();
}
Debug.WriteLine("Task started - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS"));
base.Post((object internalState) => {
// Capture the spawned thread state and restore the originating thread state
try{
if (requestData != null){
Request.AttachToAsynchronousContext(requestData);
}
d(state);
}
finally{
// Restore original spawned thread state
if (requestData != null){
// Disposes the request if this is the last reference to it
Request.DetachFromAsynchronousContext(requestData);
}
Debug.WriteLine("Task completed - request data " + (requestData == null ? "DOES NOT EXIST" : "EXISTS"));
}
}, state);
}
的TaskScheduler,我相信是这样做只是它所需要的基本的东西:
private readonly RequestSynchronizationContext context;
private readonly ConcurrentQueue<Task> tasks = new ConcurrentQueue<Task>();
public RequestTaskScheduler(RequestSynchronizationContext synchronizationContext)
{
this.context = synchronizationContext;
}
protected override void QueueTask(Task task){
this.tasks.Enqueue(task);
this.context.Post((object state) => {
Task nextTask;
if (this.tasks.TryDequeue(out nextTask))
this.TryExecuteTask(nextTask);
}, null);
}
protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued){
if (SynchronizationContext.Current == this.context)
return this.TryExecuteTask(task);
else
return false;
}
protected override IEnumerable<Task> GetScheduledTasks(){
return this.tasks.ToArray();
}
TaskFactory :
public RequestTaskFactory(RequestTaskScheduler taskScheduler)
: base(taskScheduler)
{ }
关于为什么会发生这种情况的任何想法?
您能发布一个简短但完整的示例来显示您的行为吗?特别是你的'SynchronizationContext'(我假设你正在使用一个自定义的)。因为我没有看到你描述的行为。另外,我真的不明白为什么使用'Post()'超过您的预期会给您带来任何问题?你的'SynchronizationContext'是否有一些依赖于此的特殊行为? – svick 2012-08-08 16:54:28
你想通过这样做完成什么? 'Assert.AreNotEqual'表明你正在编写某种单元测试。但是,'Assert.AreNotEqual'确实验证了TPL,而不是你的代码 - 所以,我没有看到验证第三方代码的意义。 – 2012-08-10 18:47:25