请原谅我的这个帖子作为一个答案,因为它不是真的回答你的问题,但是,它涉及到你的问题,因为它涉及CorrelationManager行为和线程/任务/等。我一直在考虑使用CorrelationManager的LogicalOperationStack
(和StartLogicalOperation/StopLogicalOperation
方法)在多线程场景中提供额外的上下文。
我把你的例子稍加修改,添加使用Parallel.For并行执行工作的能力。此外,我使用StartLogicalOperation/StopLogicalOperation
括号(内部)DoLongRunningWork
。从概念上讲,DoLongRunningWork
做这样的事情是每次执行:
DoLongRunningWork
StartLogicalOperation
Thread.Sleep(3000)
StopLogicalOperation
我发现,如果我添加这些逻辑操作代码(或多或少是),所有的逻辑operatins的保持同步(总是预期的堆栈操作次数和堆栈操作的值始终如预期)。
在我的一些测试中,我发现并非总是如此。逻辑操作堆栈正在“损坏”。我可以想到的最好的解释是,当“子”线程退出时,将“CallContext”信息“合并”回“父”线程上下文导致“旧”子线程上下文信息(逻辑操作)为“继承“由另一个兄弟的子线程。
这个问题也可能与以下事实有关:Parallel.For显然使用主线程(至少在示例代码中,如写入的那样)作为“工作线程”之一(或者任何它们应该在并行域)。无论何时执行DoLongRunningWork,都会启动一个新的逻辑操作(开始时)并停止(结束时)(也就是将其压入LogicalOperationStack并从中弹出)。如果主线程已经具有逻辑操作并且DoLongRunningWork在主线程上执行,则启动新的逻辑操作,因此主线程的LogicalOperationStack现在具有两个操作。任何后续的DoLongRunningWork执行(只要DoLongRunningWork的这个“迭代”在主线程上执行)将(显然)继承主线程的LogicalOperationStack(现在有两个操作,而不仅仅是一个预期的操作)。
我花了很长时间才弄清楚为什么LogicalOperationStack的行为在我的示例中与我的示例的修改版本中的行为不同。最后,我看到在我的代码中,我用逻辑操作括住了整个程序,而在我的修改版本的测试程序中,我没有。其含义是,在我的测试程序中,每次执行我的“工作”(类似于DoLongRunningWork),已经有一个合理的操作。在我的测试程序的修改版本中,我没有将逻辑操作中的整个程序括起来。
所以,当我修改你的测试程序来将逻辑操作中的整个程序括起来,并且如果我使用的是Parallel.For,我遇到了完全相同的问题。
使用上面的概念模型,这将成功运行:
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
虽然这最终将导致断言到一个明显的不同步LogicalOperationStack的:
StartLogicalOperation
Parallel.For
DoLongRunningWork
StartLogicalOperation
Sleep(3000)
StopLogicalOperation
StopLogicalOperation
这里是我的示例程序。它和你的类似,它有一个操作ActivityId和LogicalOperationStack的DoLongRunningWork方法。我也有两种踢DoLongRunningWork的风格。一种风味使用任务,一种使用Parallel.For。也可以执行每种风格,以使得整个并行操作被封闭在逻辑操作中或不被操作。所以,总共有4种方式来执行并行操作。要尝试每一个,只需取消注释所需的“使用...”方法,重新编译并运行。 UseTasks
,UseTasks(true)
和UseParallelFor
应该全部运行完成。 UseParallelFor(true)
将在某个时刻断言,因为LogicalOperationStack没有预期的条目数。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
namespace CorrelationManagerParallelTest
{
class Program
{
static void Main(string[] args)
{
//UseParallelFor(true) will assert because LogicalOperationStack will not have expected
//number of entries, all others will run to completion.
UseTasks(); //Equivalent to original test program with only the parallelized
//operation bracketed in logical operation.
////UseTasks(true); //Bracket entire UseTasks method in logical operation
////UseParallelFor(); //Equivalent to original test program, but use Parallel.For
//rather than Tasks. Bracket only the parallelized
//operation in logical operation.
////UseParallelFor(true); //Bracket entire UseParallelFor method in logical operation
}
private static List<int> threadIds = new List<int>();
private static object locker = new object();
private static int mainThreadId = Thread.CurrentThread.ManagedThreadId;
private static int mainThreadUsedInDelegate = 0;
// baseCount is the expected number of entries in the LogicalOperationStack
// at the time that DoLongRunningWork starts. If the entire operation is bracketed
// externally by Start/StopLogicalOperation, then baseCount will be 1. Otherwise,
// it will be 0.
private static void DoLongRunningWork(int baseCount)
{
lock (locker)
{
//Keep a record of the managed thread used.
if (!threadIds.Contains(Thread.CurrentThread.ManagedThreadId))
threadIds.Add(Thread.CurrentThread.ManagedThreadId);
if (Thread.CurrentThread.ManagedThreadId == mainThreadId)
{
mainThreadUsedInDelegate++;
}
}
Guid lo1 = Guid.NewGuid();
Trace.CorrelationManager.StartLogicalOperation(lo1);
Guid g1 = Guid.NewGuid();
Trace.CorrelationManager.ActivityId = g1;
Thread.Sleep(3000);
Guid g2 = Trace.CorrelationManager.ActivityId;
Debug.Assert(g1.Equals(g2));
//This assert, LogicalOperation.Count, will eventually fail if there is a logical operation
//in effect when the Parallel.For operation was started.
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Count == baseCount + 1, string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Count, baseCount + 1));
Debug.Assert(Trace.CorrelationManager.LogicalOperationStack.Peek().Equals(lo1), string.Format("MainThread = {0}, Thread = {1}, Count = {2}, ExpectedCount = {3}", mainThreadId, Thread.CurrentThread.ManagedThreadId, Trace.CorrelationManager.LogicalOperationStack.Peek(), lo1));
Trace.CorrelationManager.StopLogicalOperation();
}
private static void UseTasks(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
TaskCreationOptions taskCreationOpt = TaskCreationOptions.None;
Task task = null;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Task[] allTasks = new Task[totalThreads];
for (int i = 0; i < totalThreads; i++)
{
task = Task.Factory.StartNew(() =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
}, taskCreationOpt);
allTasks[i] = task;
}
Task.WaitAll(allTasks);
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
private static void UseParallelFor(bool encloseInLogicalOperation = false)
{
int totalThreads = 100;
Stopwatch stopwatch = new Stopwatch();
stopwatch.Start();
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StartLogicalOperation();
}
Parallel.For(0, totalThreads, i =>
{
DoLongRunningWork(encloseInLogicalOperation ? 1 : 0);
});
if (encloseInLogicalOperation)
{
Trace.CorrelationManager.StopLogicalOperation();
}
stopwatch.Stop();
Console.WriteLine(String.Format("Completed {0} tasks in {1} milliseconds", totalThreads, stopwatch.ElapsedMilliseconds));
Console.WriteLine(String.Format("Used {0} threads", threadIds.Count));
Console.WriteLine(String.Format("Main thread used in delegate {0} times", mainThreadUsedInDelegate));
Console.ReadKey();
}
}
}
这整个如果LogicalOperationStack可以用的Parallel.For使用的问题(和/或其他线程/任务构建体)或它如何被使用的可能是值得它自己的问题。也许我会发布一个问题。同时,我想知道你是否对此有任何想法(或者,我想知道是否考虑过使用LogicalOperationStack,因为ActivityId看起来是安全的)。
[编辑]
见我的回答this question有关使用LogicalOperationStack和/或CallContext.LogicalSetData与一些不同的线程/线程池/任务/并行contstructs的更多信息。
也看到这里,我的问题上SO约LogicalOperationStack和并行扩展: Is CorrelationManager.LogicalOperationStack compatible with Parallel.For, Tasks, Threads, etc
最后,还看到我的问题在这里对微软的并行扩展论坛: http://social.msdn.microsoft.com/Forums/en-US/parallelextensions/thread/7c5c3051-133b-4814-9db0-fc0039b4f9d9
在我的测试中,它看起来像跟踪。如果您在主线程中启动逻辑操作,然后在委托中启动/停止逻辑操作,则CorrelationManager.LogicalOperationStack可能会在使用Parallel.For或Parallel.Invoke时损坏。在我的测试(见上述两种链接)的LogicalOperationStack应该总是恰好有2项时DoLongRunningWork正在执行(如果我使用各种技术DoLongRunningWork的踢前开始在主线程的逻辑运算)。所以,通过“损坏”我的意思是说,LogicalOperationStack最终会有超过2个条目。
从我可以告诉,这可能是因为和的Parallel.For使用Parallel.Invoke主线程为“工人”的一个线程来执行DoLongRunningWork动作。
使用存储在CallContext.LogicalSetData中的堆栈来模拟LogicalOperationStack(类似于通过CallContext.SetData存储的log4net的LogicalThreadContext.Stacks)的行为会产生更糟的结果。如果我正在使用这样的堆栈来维护上下文,那么在几乎所有的情况下,它都会被损坏(即没有预期的条目数),因为我在主线程中有一个“逻辑操作”,每次迭代都有一个逻辑操作/执行DoLongRunningWork委托。
我没有什么优惠,但我也对这个问题感兴趣。似乎同样的问题也适用于使用CallContext.LogicalSetData的信息集,因为这是Trace.CorrelationManager用来存储ActivityId和LogicalOperationStack的技术。 – wageoghe 2010-12-14 19:47:08
@wageohe - 我终于可以今天这个测试,已经张贴了我的结果:) – 2010-12-15 00:35:58
我张贴一些更多的细节在我的答案。我还在这里发布了一个关于SO的另一个答案的链接,这是我在这里问到的一个新问题,以及我在Microsoft的Parallel Extensions论坛上提问(但尚未在2011年1月21日回答)的问题。也许你会发现有用的信息,也许不会。 – wageoghe 2011-01-21 15:54:47