2012-07-01 21 views
0

我正在创建一个控制台应用程序,它应该接收来自网络的消息以便处理它们。首先,我创建了一个单例类,以确保所有类都可以访问同一队列:该类为ProcessingQueue为什么这些辅助线程不会结束它们的执行?

public class ProcessingQueue 
{ 
    public class ItemToProcess 
    { 
     public string SourceClientId { get; set; } 
     public IMessage ReceivedMessage { get; set; } 
    } 

    private int m_MaxSize = 20; 
    private Queue<ItemToProcess> m_InternalQueue; 

    private static volatile ProcessingQueue m_Instance = null; 
    private static readonly object syncRoot = new object(); 

    private ProcessingQueue() 
    { 
     m_InternalQueue = new Queue<ItemToProcess>(); 
    } 

    public static ProcessingQueue Instance 
    { 
     get 
     { 
      if (m_Instance == null) 
      { 
       lock (syncRoot) 
       { 
        if (m_Instance == null) 
        { 
         m_Instance = new ProcessingQueue(); 
        } 
       } 
      } 
      return m_Instance; 
     } 
    } 

    public int MaxSize 
    { 
     get 
     { 
      lock (syncRoot) 
      { 
       return m_MaxSize; 
      } 
     } 
     set 
     { 
      if (value > 0) 
      { 
       lock (syncRoot) 
       { 
        m_MaxSize = value; 
       } 
      } 
     } 
    } 

    public void Enqueue(String source, IMessage message) 
    { 
     lock (syncRoot) 
     { 
      while (m_InternalQueue.Count >= m_MaxSize) 
      { 
       Monitor.Wait(syncRoot); 
      } 
      m_InternalQueue.Enqueue(new ItemToProcess { SourceClientId = source, ReceivedMessage = message }); 
      if (m_InternalQueue.Count == 1) 
      { 
       Monitor.PulseAll(syncRoot); 
      } 
     } 
    } 

    public ItemToProcess Dequeue() 
    { 
     lock (syncRoot) 
     { 
      while (m_InternalQueue.Count == 0) 
      { 
       Monitor.Wait(syncRoot); 
      } 
      ItemToProcess item = m_InternalQueue.Dequeue(); 
      if (m_InternalQueue.Count == m_MaxSize - 1) 
      { 
       Monitor.PulseAll(syncRoot); 
      } 
      return item; 
     } 
    } 

    public int Count 
    { 
     get 
     { 
      lock (syncRoot) 
      { 
       return m_InternalQueue.Count; 
      } 
     } 
    } 
} 

然后我实现了如下项目的主要类。

  1. 首先,共享队列被实例化。
  2. 然后,我设置了一个计时器来模拟保持活动消息(第一个生产者)的到来。
  3. 然后我创建了消费者线程(processing对象)。
  4. 然后我创建了另一个生产者线程(generating对象)。
  5. 最后,我运行了所有的线程和计时器。

    class程序 静态ProcessingQueue队列= ProcessingQueue.Instance; static System.Timers.Timer keep_alive_timer = new System.Timers.Timer(10000);

    private static volatile bool running = true; 
    
    
    static void Main(string[] args) 
    { 
        queue.MaxSize = 30; 
        keep_alive_timer.Elapsed += new ElapsedEventHandler(delegate(object sender, ElapsedEventArgs e) 
        { 
         KeepAliveMessage msg = new KeepAliveMessage(Guid.NewGuid()); 
         Console.WriteLine("Keep Alive: " + msg.MsgId); 
         queue.Enqueue("", msg); 
        }); 
        keep_alive_timer.Enabled = true; 
        keep_alive_timer.AutoReset = true; 
    
        Thread processing = new Thread(delegate() 
        { 
         while (running) 
         { 
          Console.WriteLine("Number of elements in queue: {0}", queue.Count); 
    
          ProcessingQueue.ItemToProcess msg = queue.Dequeue(); 
          Console.WriteLine("Processed: msgid={0}, source={1};", msg.ReceivedMessage.MsgId, msg.SourceClientId); 
    
          Thread.Sleep(1500); 
         } 
        }); 
    
        Thread generating = new Thread(MessagesFromNetwork); 
    
        processing.Start(); 
        keep_alive_timer.Start(); 
        generating.Start(); 
    
        Console.WriteLine("RUNNING...\n"); 
        Console.ReadLine(); 
    
        running = false; 
        keep_alive_timer.Stop(); 
        Console.WriteLine("CLOSING...\n"); 
    
        //processing.Abort(); 
        //generating.Abort(); 
    
        bool b1 = processing.Join(TimeSpan.FromSeconds(5)); 
        bool b2 = generating.Join(TimeSpan.FromSeconds(5)); 
    
        Console.WriteLine("b1 {0}", b1); 
        Console.WriteLine("b2 {0}", b2); 
        Console.WriteLine("END"); 
        Console.ReadLine(); 
    } 
    
    static void MessagesFromNetwork() 
    { 
        string[] sourceClients = { "1", "2", "3", "4", "5" }; 
        while (running) 
        { 
         IMessage msg; // interface IMessage 
         Random random = new Random(); 
         int type = random.Next(2); 
         switch (type) 
         { 
          case 0: 
           msg = new KeepAliveMessage(Guid.NewGuid()); // implements IMessage 
           break; 
          case 1: 
           msg = new TaskMessage(Guid.NewGuid(), ...); // implements IMessage 
           break; 
          default: 
           throw new Exception("Messaggio non valido!"); 
         } 
         Console.WriteLine("New message received: " + msg.MsgId); 
         queue.Enqueue(sourceClients[random.Next(sourceClients.Length)], msg); 
         Console.WriteLine("... message enqueued: " + msg.MsgId); 
         Thread.Sleep(500); 
        } 
    } 
    

    }

执行过程中输入,在running变量为假并且两个线程应该终止。然而这并不总是会发生,事实上Join这两种方法中的一种没有返回控制权:由于这个原因,我在Join方法中指定了一个超时,但在Console.WriteLine("END");之后,控制台应用程序冻结(第二个Join返回false)。

也许第二个线程没有正确终止......为什么?

+0

用'懒惰'实现单身人士更容易,但我一直认为辛格尔顿是模式中最残酷的。 http://stackoverflow.com/a/1020384/14357 – spender

+0

好的,但我需要一个可以访问WCF服务实例的队列:在我的代码中,我模拟来自网络的数据,但实际上WCF服务的许多实例必须把数据在队列中。 – enzom83

回答

1

看起来像Dequeue或Enqueue可以进入Monitor.Wait(),当运行停止没有人脉冲。

您等待5秒钟,但要注意,最大范围* 1500> 5000

我不能这么直接找出定时器的频率。

+0

我可以改变'Join'方法的等待时间吗?或者我应该使用一个更简单的方法,而不是'Monitor'? – enzom83

+0

你的Enqueue/Dequeue方法根本没有停止逻辑...只需使用一个库类。 –

+0

图书馆课是什么意思?我应该使用“ConcurrentQueue”类吗? – enzom83

相关问题