2009-04-09 47 views
12

如何在C#中使用事件和代表实现生产者/消费者模式?在使用这些设计模式时,我需要注意哪些问题?我需要注意哪些边缘情况?在C#中实现生产者/消费者模式

+5

这是一个有效的问题国际海事组织。我不认为把一个问题当作家庭作业是不礼貌的。 – Sung 2009-04-09 12:52:29

+5

@Sung Meister:我不能不同意你的意见! – 2009-04-09 21:07:41

+5

作业与否,这是一个常见问题,值得回答。 – mpen 2010-09-25 05:15:04

回答

1

我知道这个线程是很有点老了,但因为我有时会在我的搜索碰到它,我决定分享这个生产者 - 消费者对人们想知道如何实现一个简单通用的生产者 - 消费者代码作业队列。

作业类用于以代理的形式'存储'对象的方法调用。然后在处理作业时调用该委托。任何相关参数也存储在此Job类中。

有了这个简单的模式,就有可能在入队和出队过程中实现多线程。其实这只是最简单的部分:多线程给你的代码带来了新的挑战,你会注意到它们以后;-)

我原来在thread上发布了这段代码。

using System; 
using System.Collections.Concurrent; 
using System.Diagnostics; 
using System.Threading; 

// Compiled and tested in: Visual Studio 2017, DotNET 4.6.1 

namespace MyNamespace 
{ 
    public class Program 
    { 
     public static void Main(string[] args) 
     { 
      MyApplication app = new MyApplication(); 
      app.Run(); 
     } 
    } 

    public class MyApplication 
    { 
     private BlockingCollection<Job> JobQueue = new BlockingCollection<Job>(); 
     private CancellationTokenSource JobCancellationTokenSource = new CancellationTokenSource(); 
     private CancellationToken JobCancellationToken; 
     private Timer Timer; 
     private Thread UserInputThread; 



     public void Run() 
     { 
      // Give a name to the main thread: 
      Thread.CurrentThread.Name = "Main"; 

      // Fires a Timer thread: 
      Timer = new Timer(new TimerCallback(TimerCallback), null, 1000, 2000); 

      // Fires a thread to read user inputs: 
      UserInputThread = new Thread(new ThreadStart(ReadUserInputs)) 
      { 
       Name = "UserInputs", 
       IsBackground = true 
      }; 
      UserInputThread.Start(); 

      // Prepares a token to cancel the job queue: 
      JobCancellationToken = JobCancellationTokenSource.Token; 

      // Start processing jobs: 
      ProcessJobs(); 

      // Clean up: 
      JobQueue.Dispose(); 
      Timer.Dispose(); 
      UserInputThread.Abort(); 

      Console.WriteLine("Done."); 
     } 



     private void ProcessJobs() 
     { 
      try 
      { 
       // Checks if the blocking collection is still up for dequeueing: 
       while (!JobQueue.IsCompleted) 
       { 
        // The following line blocks the thread until a job is available or throws an exception in case the token is cancelled: 
        JobQueue.Take(JobCancellationToken).Run(); 
       } 
      } 
      catch { } 
     } 



     private void ReadUserInputs() 
     { 
      // User input thread is running here. 
      ConsoleKey key = ConsoleKey.Enter; 

      // Reads user inputs and queue them for processing until the escape key is pressed: 
      while ((key = Console.ReadKey(true).Key) != ConsoleKey.Escape) 
      { 
       Job userInputJob = new Job("UserInput", this, new Action<ConsoleKey>(ProcessUserInputs), key); 
       JobQueue.Add(userInputJob); 
      } 
      // Stops processing the JobQueue: 
      JobCancellationTokenSource.Cancel(); 
     } 

     private void ProcessUserInputs(ConsoleKey key) 
     { 
      // Main thread is running here. 
      Console.WriteLine($"You just typed '{key}'. (Thread: {Thread.CurrentThread.Name})"); 
     } 



     private void TimerCallback(object param) 
     { 
      // Timer thread is running here. 
      Job job = new Job("TimerJob", this, new Action<string>(ProcessTimer), "A job from timer callback was processed."); 
      JobQueue.TryAdd(job); // Just enqueues the job for later processing 
     } 

     private void ProcessTimer(string message) 
     { 
      // Main thread is running here. 
      Console.WriteLine($"{message} (Thread: {Thread.CurrentThread.Name})"); 
     } 
    } 



    /// <summary> 
    /// The Job class wraps an object's method call, with or without arguments. This method is called later, during the Job execution. 
    /// </summary> 
    public class Job 
    { 
     public string Name { get; } 
     private object TargetObject; 
     private Delegate TargetMethod; 
     private object[] Arguments; 

     public Job(string name, object obj, Delegate method, params object[] args) 
     { 
      Name = name; 
      TargetObject = obj; 
      TargetMethod = method; 
      Arguments = args; 
     } 

     public void Run() 
     { 
      try 
      { 
       TargetMethod.Method.Invoke(TargetObject, Arguments); 
      } 
      catch(Exception ex) 
      { 
       Debug.WriteLine($"Unexpected error running job '{Name}': {ex}"); 
      } 
     } 

    } 
}