2010-03-29 59 views
3

我有两个线程,一个线程处理队列,另一个线程将东西添加到队列中。同步异常

  1. 我希望把队列处理线程睡眠时,其处理完队列
  2. 我希望有第二个线程告诉它醒来时,它已经将产品添加到队列

然而,这些函数调用System.Threading.SynchronizationLockException: Object synchronization method was called from an unsynchronized block of codeMonitor.PulseAll(waiting);呼叫,因为我还没有同步的功能与所述等待对象。 [我不想这样做,我希望能够在向队列中添加项目时进行处理]。我怎样才能做到这一点?

Queue<object> items = new Queue<object>(); 
object waiting = new object(); 

1日螺纹

public void ProcessQueue() 
{ 
while (true) 
{ 
    if (items.Count == 0) 
    Monitor.Wait(waiting); 

    object real = null; 
    lock(items) { 
    object item = items.Dequeue(); 
    real = item; 
    } 
    if(real == null) 
     continue; 
    .. bla bla bla 
} 
} 

第二个主题涉及

public void AddItem(object o) 
{ 
... bla bla bla 
lock(items) 
{ 
items.Enqueue(o); 
} 
Monitor.PulseAll(waiting); 
} 
+0

我正在使用锁,只是没有他们在示例代码 – Kurru 2010-03-29 14:48:52

回答

2

如果你有机会获得.NET 4.0,你想做的事可以通过BlockingCollection<T>来取得的成就。
如果你想通过Monitor类的手段来自己做,并与Pulse()信号,你实际上是在正确的轨道上。
你得到的异常,因为调用Wait()Pulse()PulseAll()you have to own the lock on the specified object。你碰巧错过了这个waiting

了一个样本基本线程安全的,可以使用队列:

  • foreach对消费者,
  • while或制片方自己喜欢的条件结构,
  • 处理多个生产者/消费者和
  • 使用lock()Monitor.Pulse()Monitor.PulseAll()Monitor.Wait()

public class SignaledQueue<T> 
{ 
    Queue<T> queue = new Queue<T>(); 
    volatile bool shutDown = false; 

    public bool Enqueue(T item) 
    { 
     if (!shutDown) 
     { 
      lock (queue) 
      { 
       queue.Enqueue(item); 
       //Pulse only if there can be waiters. 
       if (queue.Count == 1) 
       { 
        Monitor.PulseAll(queue); 
       } 
      } 
      return true; 
     } 
     //Indicate that processing should stop. 
     return false; 
    } 

    public IEnumerable<T> DequeueAll() 
    { 
     while (!shutDown) 
     { 
      do 
      { 
       T item; 
       lock (queue) 
       { 
        //If the queue is empty, wait. 
        if (queue.Count == 0) 
        { 
         if (shutDown) break; 
         Monitor.Wait(queue); 
         if (queue.Count == 0) break; 
        } 
        item = queue.Dequeue(); 
       } 
       yield return item; 
      } while (!shutDown); 
     } 
    } 

    public void SignalShutDown() 
    { 
     shutDown = true; 
     lock (queue) 
     { 
      //Signal all waiting consumers with PulseAll(). 
      Monitor.PulseAll(queue); 
     } 
    } 

} 

示例用法:

class Program 
{ 
    static void Main(string[] args) 
    { 
     int numProducers = 4, numConsumers = 2; 
     SignaledQueue<int> queue = new SignaledQueue<int>(); 

     ParameterizedThreadStart produce = delegate(object obj) 
     { 
      Random rng = new Random((int)obj); 
      int num = 0; 
      while (queue.Enqueue(++num)) 
      { 
       Thread.Sleep(rng.Next(100)); 
      } 
     }; 

     ThreadStart consume = delegate 
     { 
      foreach (int num in queue.DequeueAll()) 
      { 
       Console.Write(" {0}", num); 
      } 
     }; 

     Random seedRng = new Random(); 
     for (int i = 0; i < numProducers; i++) 
     { 
      new Thread(produce).Start(seedRng.Next()); 
     } 

     for (int i = 0; i < numConsumers; i++) 
     { 
      new Thread(consume).Start(); 
     } 

     Console.ReadKey(true); 
     queue.SignalShutDown(); 

    } 
} 
2
+0

哦对。好。谢谢:D我在并发课中学习了监视器,并建议监视器在哪里使用prodomentientl。对Java有所了解,其监视器已实现,无需实际“监视”对象。 – Kurru 2010-03-29 18:35:16

+0

信号灯会做的伎俩,但他们是重量级的任务。它们需要进入内核模式,这会花费你的成本。 – 2010-03-30 22:06:34

0

我更喜欢使用,会启动处理线程回调一直持续到它被追上,锁造成同步neous读者和作家在排队等候:

public delegate void CallbackDelegate(); 

class Program 
{ 
    static void Main(string[] args) 
    { 
     Queue<object> items = new Queue<object>(); 

     Processor processor = new Processor(items); 
     Adder adder = new Adder(items, new CallbackDelegate(processor.CallBack)); 

     Thread addThread = new Thread(new ParameterizedThreadStart(adder.AddItem)); 
     object objectToAdd = new object(); 
     addThread.Start(objectToAdd); 
    } 
} 

class Processor 
{ 
    Queue<object> items; 

    public Processor(Queue<object> itemsArg) 
    { 
     items = itemsArg; 
    } 

    public void ProcessQueue() 
    { 
     lock (items) 
     { 
      while (items.Count > 0) 
      { 
       object real = items.Dequeue(); 
       // process real 
      } 
     } 
    } 

    public void CallBack() 
    { 
     Thread processThread = new Thread(ProcessQueue); 
     processThread.IsBackground = true; 
     processThread.Start(); 
    } 
} 

class Adder 
{ 
    Queue<object> items; 
    CallbackDelegate callback; 

    public Adder(Queue<object> itemsArg, CallbackDelegate callbackArg) 
    { 
     items = itemsArg; 
     callback = callbackArg; 
    } 

    public void AddItem(object o) 
    { 
     lock (items) { items.Enqueue(o); } 
     callback(); 
    } 
} 
2

答案是在你发布错误消息: “对象同步方法是从代码上Monitor.PulseAll(等待)不同步的块称为”;

你必须从锁(等待)块中调用Monitor.PulseAll(等待)。

另外...你必须从锁块内调用Monitor.Wait。