2011-04-13 75 views
3

我已经使用观察者模式为我的应用程序。带计时器的观察者模式

我有有一个System.Timers.Timer对象在它命名为“TMR”的主题。此计时器的滴答事件在每60秒后触发。在此蜱活动中,我将通知所有附属于我的主题的观察员。我已经使用for循环遍历我的观察者列表&,然后触发观察者更新方法。

假设我有10个观察员附加到我的主题。

每个观察者需要10秒来完成其处理。

现在在for循环中完成通知会导致最后一个观察者的更新方法在90秒后被调用。即仅在前一个完成其处理之后调用Next Observer Update方法。

但这并不是我想要的应用程序。我需要所有观察者的Update方法在发生计时器滴答时立即触发。所以没有观察者必须等待。我希望这可以通过线程来完成。

所以,我修改了代码,

// Fires the updates instantly 
    public void Notify() 
    { 
     foreach (Observer o in _observers) 
     { 
     Threading.Thread oThread = new Threading.Thread(o.Update); 
     oThread.Name = o.GetType().Name; 
     oThread.Start(); 
     } 
    } 

但我有我的两个疑惑,

  1. 如果有10个观察员 我的计时器的时间间隔为60秒 然后声明新线程()将会触发600次。

    它是否有效,并建议在每个计时器刻度上创建新线程?

  2. 如果我的观察者花费太多时间来完成更新逻辑,即超过60秒,该怎么办?意味着在观察者更新之前发生计时器滴答声。我怎样才能控制这个?

我可以张贴的示例代码。如果需要......

我使用的代码..

using System; 
using System.Collections.Generic; 
using System.Timers; 
using System.Text; 
using Threading = System.Threading; 
using System.ComponentModel; 

namespace singletimers 
{ 
    class Program 
    { 


    static void Main(string[] args) 
    { 
     DataPullerSubject.Instance.Attach(Observer1.Instance); 
     DataPullerSubject.Instance.Attach(Observer2.Instance); 
     Console.ReadKey(); 
    } 
    } 

    public sealed class DataPullerSubject 
    { 
    private static volatile DataPullerSubject instance; 
    private static object syncRoot = new Object(); 
    public static DataPullerSubject Instance 
    { 
     get 
     { 
     if (instance == null) 
     { 
      lock (syncRoot) 
      { 
      if (instance == null) 
       instance = new DataPullerSubject(); 
      } 
     } 

     return instance; 
     } 
    } 

    int interval = 10 * 1000; 
    Timer tmr; 
    private List<Observer> _observers = new List<Observer>(); 

    DataPullerSubject() 
    { 
     tmr = new Timer(); 
     tmr.Interval = 1; // first time to call instantly 
     tmr.Elapsed += new ElapsedEventHandler(tmr_Elapsed); 
     tmr.Start(); 
    } 

    public void Attach(Observer observer) 
    { 
     _observers.Add(observer); 
    } 

    public void Detach(Observer observer) 
    { 
     _observers.Remove(observer); 
    } 

    // Fires the updates instantly 
    public void Notify() 
    { 
     foreach (Observer o in _observers) 
     { 
     Threading.Thread oThread = new Threading.Thread(o.Update); 
     oThread.Name = o.GetType().Name; 
     oThread.Start(); 
     } 
    } 

    private void tmr_Elapsed(object source, ElapsedEventArgs e) 
    { 
     tmr.Interval = interval; 
     tmr.Stop(); // stop the timer until all notification triggered 
     this.Notify(); 
     tmr.Start();//start again 
    } 
    } 


    public abstract class Observer 
    { 
    string data; 
    public abstract void Update(); 
    public virtual void GetDataFromDBAndSetToDataSet(string param) 
    { 
     Console.WriteLine("Processing for: " + param); 
     data = param + new Random().Next(1, 2000); 
     Threading.Thread.Sleep(10 * 1000);//long work 
     Console.WriteLine("Data set for: " + param); 
    } 
    } 


    public sealed class Observer1 : Observer 
    { 
    private static volatile Observer1 instance; 
    private static object syncRoot = new Object(); 
    public static Observer1 Instance 
    { 
     get 
     { 
     if (instance == null) 
     { 
      lock (syncRoot) 
      { 
      if (instance == null) 
       instance = new Observer1(); 
      } 
     } 

     return instance; 
     } 
    } 
    Observer1() 
    { 
    } 
    public override void Update() 
    { 
     base.GetDataFromDBAndSetToDataSet("Observer1"); 
    } 

    } 

    public sealed class Observer2 : Observer 
    { 
    private static volatile Observer2 instance; 
    private static object syncRoot = new Object(); 
    public static Observer2 Instance 
    { 
     get 
     { 
     if (instance == null) 
     { 
      lock (syncRoot) 
      { 
      if (instance == null) 
       instance = new Observer2(); 
      } 
     } 

     return instance; 
     } 
    } 
    Observer2() 
    { 
    } 
    public override void Update() 
    { 
     base.GetDataFromDBAndSetToDataSet("Observer2"); 
    } 

    } 
} 

感谢&亲切的问候。

+0

有更好的方法来在c#中实现单例模式 - 请参阅http://www.yoda.arachsys.com/csharp/singleton.html – GarethOwen 2011-04-19 11:31:01

回答

1
  • 使用new Thread是不鼓励的。使用TaskTask<T>
  • 您最好的创建Observable模式框架的尝试可能只会接近Rx。使用那个解决你提到的问题(即如果处理花费太多时间)。 Rx将为您定义可观察的场景提供巨大的灵活性。
0

或者,观察者可以以非阻塞的方式实现更新。 也就是说,更新总是立即返回。然后,如果需要,Observer对象有责任在新线程中执行其工作。

我不确定这是否有助于您的情况 - 我不知道您的'观察员'是什么,但是也可能您不知道?

+0

@Garen:我不明白你用“非阻塞方式” 。在我的观察员中,我从数据库中获取数据。所以在这里它可能会花费几分钟的时间。这样每个观察员都可以更新他们的公共领域。 – thinkmmk 2011-04-13 15:52:57

+0

@thinkmmk - 我的意思是对Update的调用立即返回,观察者负责在单独的线程中启动Update操作。当更新正在进行时,观察员也负责排队或忽略更新呼叫。但是这取决于你的观察者是否这是一个好的决定 - 你想避免在不同的'观察者'类中重复相同的逻辑。 – GarethOwen 2011-04-14 06:54:25

+0

我已经用代码编辑了我原来的帖子..现在我正在努力争取同步,即不应再次调用obesever'x',除非它已完成其最后的工作。 – thinkmmk 2011-04-19 09:52:24