2011-11-16 49 views
3

在.net 2.0上工作我需要实现一些线程,我正在查看一个虚拟示例,但找不到实现事件通知的任何内容。 需要知道什么时候完成,还有某种进度条,如果你喜欢。生产者消费者模式你如何通知完成?

我一直在玩下面的代码通过看不到正确的事件通知。 我如何检测到我已完成处理并可能使用我一直在做的工作来更新ui?

实施例代码

class Program 
{ 
    static void Main(string[] args) 
    { 
     using (PCQueue q = new PCQueue(2)) 
     { 
      q.TaskCompleted += new EventHandler(OnTaskCompleted); 
      q.PercentageCompleted += new EventHandler(OnPercentageCompleted); 


      for (int i = 1; i < 100; i++) 
      { 

       string itemNumber = i.ToString(); // To avoid the captured variable trap 
       q.EnqueueItem(itemNumber); 
      } 
      Console.WriteLine("Waiting for items to complete..."); 
      Console.Read(); 
     } 
    } 

    private static void OnPercentageCompleted(object sender, EventArgs e) 
    { 

    } 

    static void OnTaskCompleted(object sender, EventArgs e) 
    { 

    } 
} 


public class PCQueue : IDisposable 
{ 
    readonly object locker = new object(); 
    Thread[] _workers; 
    Queue<string> _itemQ = new Queue<string>(); 
    public PCQueue(int workerCount) 
    { 
     _workers = new Thread[workerCount]; 
     // Create and start a separate thread for each worker 
     for (int i = 0; i < workerCount; i++) 
     { 
      (_workers[i] = new Thread(Consume)).Start(); 
     } 
    } 

    public void EnqueueItem(string item) 
    { 
     lock (locker) 
     { 
      _itemQ.Enqueue(item); // We must pulse because we're 
      Monitor.Pulse(locker); // changing a blocking condition. 
     } 
    } 
    void Consume() 
    { 
     while (true) // Keep consuming until 
     { // told otherwise. 
      string item; 
      lock (locker) 
      { 
       while (_itemQ.Count == 0) Monitor.Wait(locker); 
       item = _itemQ.Dequeue(); 
      } 
      if (item == null) return; // This signals our exit. 

      DoSomething(item); // Execute item. 
     } 
    } 
    private void DoSomething(string item) 
    { 
     Console.WriteLine(item); 
    } 
    public void Dispose() 
    { 
     // Enqueue one null item per worker to make each exit. 
     foreach (Thread worker in _workers) 
     { 
      EnqueueItem(null); 
     } 
    } 

    //where/how can I fire this event??? 
    public event EventHandler TaskCompleted; 
    protected void OnCompleted(EventArgs e) 
    { 
     if (this.TaskCompleted != null) 
     { 
      this.TaskCompleted(this, e); 
     } 
    } 
    //where/how can I fire this event??? 
    public event EventHandler PercentageCompleted; 
    protected void OnPercentageCompleted(EventArgs e) 
    { 
     if (this.PercentageCompleted != null) 
     { 
      this.PercentageCompleted(this, e); 
     } 
    } 
    } 

任何建议?

+0

这应该给你一些想法:http://msdn.microsoft.com/en-us/library/system.threading.monitor.pulse(v=VS.80).aspx –

+0

感谢您的链接。对不起,但看不到任何事件发生。 – user9969

+0

啊,你特别想要事件。我没有看到任何会阻止你处理q.WhateverEvent假设被解雇的事情。你可以发布你的PCQueue的代码吗? –

回答

2

您不能在队列中引发进度事件,原因很简单,因为队列不知道应该处理的总数项目。所以它不能计算一个百分比。你只需要插入一些东西,然后进行处理。

你能做什么就是引发一个ItemProcessed事件并订阅它。然后在您的主程序中,您可以执行逻辑,计算到目前为止处理了多少项目,以及应处理的项目数量。

您可以在从Consume函数返回之前提高完整事件。然而,当Brian在回答中说,你需要跟踪有多少线程仍然活动。我修改了代码以反映这一点。

因此就沿着这些线路:

... 
private int _ActiveThreads; 
public PCQueue(int workerCount) 
{ 
    _ActiveThreads = workerCount; 
    _workers = new Thread[workerCount]; 
    // Create and start a separate thread for each worker 
    for (int i = 0; i < workerCount; i++) 
    { 
     (_workers[i] = new Thread(Consume)).Start(); 
    } 
} 

void Consume() 
{ 
    while (true) // Keep consuming until 
    { // told otherwise. 
     string item; 
     lock (locker) 
     { 
      while (_itemQ.Count == 0) Monitor.Wait(locker); 
      item = _itemQ.Dequeue(); 
     } 
     if (item == null) // This signals our exit. 
     { 
      if (Interlocked.Decrement(ref _ActiveThreads) == 0) 
      { 
       OnCompleted(EventArgs.Empty); 
      } 
      return; 
     } 

     DoSomething(item); // Execute item. 
     OnItemProcessed(); 
    } 
} 

public event EventHandler ItemProcessed; 
protected void OnItemProcessed() 
{ 
    var handler = ItemProcessed; 
    if (handler != null) 
    { 
     handler(this, EventArgs.Empty); 
    } 
} 
... 

当然,你可能需要创建一些meaningfull事件参数实际上通过其处理该事件的项目。

然后在主:

... 
static void Main(string[] args) 
{ 
    using (PCQueue q = new PCQueue(2)) 
    { 
     q.ItemProcessed += ItemProcessed; 
     q.TaskCompleted += OnTaskCompleted; 

     for (int i = 1; i <= totalNumberOfItems; i++) 
     { 
      string itemNumber = i.ToString(); // To avoid the captured variable trap 
      q.EnqueueItem(itemNumber); 
     } 
     Console.WriteLine("Waiting for items to complete..."); 
     Console.Read(); 
    } 
} 

private static int currentProcessCount = 0; 
private static int totalNumberOfItems = 100; 

private static void ItemProcessed(object sender, EventArgs e) 
{ 
    currentProcessCount++; 
    Console.WriteLine("Progress: {0}%", ((double)currentProcessCount/(double)totalNumberOfItems) * 100.0); 
} 

static void OnTaskCompleted(object sender, EventArgs e) 
{ 
    Console.WriteLine("Done"); 
} 
... 

不用说,所有的静态的东西应该去。这只是基于你的例子。

还有一个备注: 你PCQueue目前需要你有工作线程,否则只有一个线程将退出,而其他人将等到进程退出,你排队尽可能多的null值。您可以通过查看第一个项目来更改该项目,并且只有在不是null时才将其删除 - 从而将标记留在所有线程中。所以Consume就改成这样:

void Consume() 
{ 
    while (true) // Keep consuming until 
    { // told otherwise. 
     string item; 
     lock (locker) 
     { 
      while (_itemQ.Count == 0) Monitor.Wait(locker); 
      item = _itemQ.Peek(); 
      if (item != null) _itemQ.Dequeue(); 
      else Monitor.PulseAll(); // if the head of the queue is null then make sure all other threads are also woken up so they can quit 
     } 
     if (item == null) // This signals our exit. 
     { 
      if (Interlocked.Decrement(ref _ActiveThreads) == 0) 
      { 
       OnCompleted(EventArgs.Empty); 
      } 
      return; 
     } 

     DoSomething(item); // Execute item. 
     OnItemProcessed(); 
    } 
} 
+0

任何示例?你说的听起来不错 – user9969

+0

@ user231465:更新了答案。 – ChrisWue

+0

非常感谢您的时间和答复。我得到我的控制台我得到所有“0”和所有“100”。可能是我错过了point.My想要的结果是“1/10”“2/10”3/10等等......然后10/10如果我们正在处理10个项目,则全部完成 – user9969

2

在你PCQueue类,你将需要保持的许多工作线程如何仍然活跃的轨道,提高TaskCompleted只有在所有的线程已经指示结束。

void Consume() 
{ 
    while (true) 
    { 
     string item; 
     lock (locker) 
     { 
      while (_itemQ.Count == 0) Monitor.Wait(locker); 
      item = _itemQ.Dequeue(); 
     } 

     if (item == null) 
     { 
      // activeThreads is set to the number of workers in the constructor. 
      if (Interlocked.Decrement(ref activeThreads) == 0) 
      { 
       // Take a snapshot of the event so that a null check + invocation is safe. 
       // This works because delegates are immutable. 
       var copy = TaskCompleted; 

       if (copy != null) 
       { 
       copy(this, new EventArgs()); 
       } 
      } 
      return; 
     } 

     DoSomething(item); // Execute item. 
    } 
} 

几个其他点:

  • 荣誉的获得正确实施阻塞队列。大多数人都错了。
  • 请记住在触摸任何UI控件之前将TaskCompleted事件处理程序重新编排回UI线程。
  • 您可以从DoSomething提高PercentCompleted,但是没有清楚地表明队列中有多少项应该保留,这个值是没有意义的。我在这一点上第二Chris' recommendation
+0

试图用activeThreads实现你的“消费”版本,但没有运气。使用Chris示例。我在哪里声明ref activeThreads? – user9969

+0

@ user231465:在'PCQueue'中声明'activeThreads'作为私有变量,然后在构造函数中执行'activeThreads = workerCount'。这个想法是'Interlocked.Decrement'与'if'检查将导致'TaskCompleted'在最后一个工作线程完成后被引发。 –

+0

提及UI问题。关于保持活动线程计数的好主意 - 错过了。 – ChrisWue