2013-02-26 48 views
2

该场景如下: 有一些低优先级线程可以被高优先级线程中断。每当高优先级线程要求低优先级线程暂停时,它们将进入Wait状态(如果它们尚未处于等待状态)。然而,当高优先级的线程表示低优先级线程可以通知Resume时,低优先级线程不应该继续,直到要求低优先级线程暂停的所有高优先级线程已经同意为止。管理.net中的高/低优先级线程

为了解决这个问题,我保持跟踪Pause()从高优先级线程调用计数器变量中的低优先级线程。每当高优先级线程向低优先级线程询问Pause()时,计数器的值增加1.如果在增量后计数器的值为1,则表示该线程不在Wait中,因此请求它进入Wait状态。否则,只需增加counter值。相反,当一个高优先级的线程调用Resume()时,我们递减counter的值,并且如果在递减之后的值是0,这意味着低优先级的线程现在可以Resume

这是我的问题的简化实现。比较操作内如果与Interlocked.XXX语句是不正确的,即

如果(Interlocked.Increment(参照_remain)== 1)

,作为读/修改和比较操作不是原子。

我在这里错过了什么?我不想使用线程优先级。

using System; 
using System.Collections.Generic; 
using System.Threading; 

namespace TestConcurrency 
{ 

// I borrowed this class from Joe Duffy's blog and modified it 
public class LatchCounter 
{ 
private long _remain; 
private EventWaitHandle m_event; 
private readonly object _lockObject; 

public LatchCounter() 
{ 
    _remain = 0; 
    m_event = new ManualResetEvent(true); 
    _lockObject = new object(); 
} 

public void Check() 
{ 
    if (Interlocked.Read(ref _remain) > 0) 
    { 
     m_event.WaitOne(); 
    } 
} 

public void Increment() 
{ 
    lock(_lockObject) 
    { 
     if (Interlocked.Increment(ref _remain) == 1) 
      m_event.Reset(); 
    } 
} 

public void Decrement() 
{ 
    lock(_lockObject) 
    { 
     // The last thread to signal also sets the event. 
     if (Interlocked.Decrement(ref _remain) == 0) 
      m_event.Set(); 
    } 
} 
} 



public class LowPriorityThreads 
{ 
private List<Thread> _threads; 
private LatchCounter _latch; 
private int _threadCount = 1; 

internal LowPriorityThreads(int threadCount) 
{ 
    _threadCount = threadCount; 
    _threads = new List<Thread>(); 
    for (int i = 0; i < _threadCount; i++) 
    { 
     _threads.Add(new Thread(ThreadProc)); 
    } 

    _latch = new CountdownLatch(); 
} 


public void Start() 
{ 
    foreach (Thread t in _threads) 
    { 
     t.Start(); 
    } 
} 

void ThreadProc() 
{ 
    while (true) 
    { 
     //Do something 
     Thread.Sleep(Rand.Next()); 
     _latch.Check(); 
    } 
} 

internal void Pause() 
{ 
    _latch.Increment(); 
} 

internal void Resume() 
{ 
    _latch.Decrement(); 
} 
} 


public class HighPriorityThreads 
{ 
private Thread _thread; 
private LowPriorityThreads _lowPriorityThreads; 

internal HighPriorityThreads(LowPriorityThreads lowPriorityThreads) 
{ 
    _lowPriorityThreads = lowPriorityThreads; 
    _thread = new Thread(RandomlyInterruptLowPriortyThreads); 
} 

public void Start() 
{ 
    _thread.Start(); 
} 

void RandomlyInterruptLowPriortyThreads() 
{ 
    while (true) 
    { 
     Thread.Sleep(Rand.Next()); 

     _lowPriorityThreads.Pause(); 

     Thread.Sleep(Rand.Next()); 
     _lowPriorityThreads.Resume(); 
    } 
} 
} 

class Program 
{ 
    static void Main(string[] args) 
    { 
    LowPriorityThreads lowPriorityThreads = new LowPriorityThreads(3); 
    HighPriorityThreads highPriorityThreadOne = new HighPriorityThreads(lowPriorityThreads); 
    HighPriorityThreads highPriorityThreadTwo = new HighPriorityThreads(lowPriorityThreads); 

    lowPriorityThreads.Start(); 
    highPriorityThreadOne.Start(); 
    highPriorityThreadTwo.Start(); 
} 
} 


class Rand 
{ 
internal static int Next() 
{ 
    // Guid idea has been borrowed from somewhere on StackOverFlow coz I like it 
    return new System.Random(Guid.NewGuid().GetHashCode()).Next() % 30000; 
} 
} 
+0

为什么你就不能修改检查做'm_event.WaitOne()'没有别的什么吗? – usr 2013-02-26 12:14:10

+2

不使用Thread.Priority是一个严重的错误。你将调试僵局,直到母牛回家。 – 2013-02-26 12:19:48

+0

有些东西对这个要求闻起来有些“异味”,但我现在还不能完全掌握它。 – 2013-02-26 12:40:32

回答

0

我不知道您的要求,因此我不会在这里讨论它们。 就实现过程而言,我会引入一个“调度程序”类,它将处理线程间交互,并且还作为“可运行”对象的工厂。

这个实现当然是非常粗糙和开放的批评。

class Program 
{ 
    static void Main(string[] args) 
    { 
     ThreadDispatcher td=new ThreadDispatcher(); 
     Runner r1 = td.CreateHpThread(d=>OnHpThreadRun(d,1)); 
     Runner r2 = td.CreateHpThread(d => OnHpThreadRun(d, 2)); 

     Runner l1 = td.CreateLpThread(d => Console.WriteLine("Running low priority thread 1")); 
     Runner l2 = td.CreateLpThread(d => Console.WriteLine("Running low priority thread 2")); 
     Runner l3 = td.CreateLpThread(d => Console.WriteLine("Running low priority thread 3")); 


     l1.Start(); 
     l2.Start(); 
     l3.Start(); 

     r1.Start(); 
     r2.Start(); 

     Console.ReadLine(); 

     l1.Stop(); 
     l2.Stop(); 
     l3.Stop(); 

     r1.Stop(); 
     r2.Stop(); 
    } 

    private static void OnHpThreadRun(ThreadDispatcher d,int number) 
    { 
     Random r=new Random(); 
     Thread.Sleep(r.Next(100,2000)); 
     d.CheckedIn(); 
     Console.WriteLine(string.Format("*** Starting High Priority Thread {0} ***",number)); 
     Thread.Sleep(r.Next(100, 2000)); 
     Console.WriteLine(string.Format("+++ Finishing High Priority Thread {0} +++", number)); 
     Thread.Sleep(300); 
     d.CheckedOut();   
    } 
} 

public abstract class Runner 
{ 
    private Thread _thread; 
    protected readonly Action<ThreadDispatcher> _action; 
    private readonly ThreadDispatcher _dispathcer; 
    private long _running; 
    readonly ManualResetEvent _stopEvent=new ManualResetEvent(false); 
    protected Runner(Action<ThreadDispatcher> action,ThreadDispatcher dispathcer) 
    { 
     _action = action; 
     _dispathcer = dispathcer; 
    } 

    public void Start() 
    { 
     _thread = new Thread(OnThreadStart); 
     _running = 1; 
     _thread.Start(); 
    } 

    public void Stop() 
    { 
     _stopEvent.Reset(); 
     Interlocked.Exchange(ref _running, 0); 
     _stopEvent.WaitOne(2000); 
     _thread = null; 
     Console.WriteLine("The thread has been stopped."); 

    } 
    protected virtual void OnThreadStart() 
    { 
     while (Interlocked.Read(ref _running)!=0) 
     { 
      OnStartWork(); 
      _action.Invoke(_dispathcer); 
      OnFinishWork(); 
     } 
     OnFinishWork(); 
     _stopEvent.Set(); 
    } 

    protected abstract void OnStartWork(); 
    protected abstract void OnFinishWork(); 
} 

public class ThreadDispatcher 
{ 
    private readonly ManualResetEvent _signal=new ManualResetEvent(true); 
    private int _hpCheckedInThreads; 
    private readonly object _lockObject = new object(); 

    public void CheckedIn() 
    { 
     lock(_lockObject) 
     { 
      _hpCheckedInThreads++; 
      _signal.Reset(); 
     } 
    } 
    public void CheckedOut() 
    { 
     lock(_lockObject) 
     { 
      if(_hpCheckedInThreads>0) 
       _hpCheckedInThreads--; 
      if (_hpCheckedInThreads == 0) 
       _signal.Set(); 
     } 
    } 

    private class HighPriorityThread:Runner 
    { 
     public HighPriorityThread(Action<ThreadDispatcher> action, ThreadDispatcher dispatcher) : base(action,dispatcher) 
     { 
     } 

     protected override void OnStartWork() 
     { 
     } 

     protected override void OnFinishWork() 
     { 
     } 
    } 
    private class LowPriorityRunner:Runner 
    { 
     private readonly ThreadDispatcher _dispatcher; 
     public LowPriorityRunner(Action<ThreadDispatcher> action, ThreadDispatcher dispatcher) 
      : base(action, dispatcher) 
     { 
      _dispatcher = dispatcher; 
     } 

     protected override void OnStartWork() 
     { 
      Console.WriteLine("LP Thread is waiting for a signal."); 
      _dispatcher._signal.WaitOne(); 
      Console.WriteLine("LP Thread got the signal."); 
     } 

     protected override void OnFinishWork() 
     { 

     } 
    } 

    public Runner CreateLpThread(Action<ThreadDispatcher> action) 
    { 
     return new LowPriorityRunner(action, this); 
    } 

    public Runner CreateHpThread(Action<ThreadDispatcher> action) 
    { 
     return new HighPriorityThread(action, this); 
    } 
} 

}

相关问题