2010-08-24 39 views
5

我正在使用订阅者/通知模式来提升和使用C#中.Net中间层的事件。例如,某些事件是在“突发”中引发的,例如,当数据从批量程序导入文件中持久化时。这会执行潜在的长时间运行的任务,并且我希望通过实施“安静期”来避免每秒多次触发事件,事件系统会一直等待,直到事件流减速以处理事件。举办活动时,我应该如何实施“安静期”?

当发布者在通知订阅者时发挥主动作用时,我该如何做到这一点?我不想等到一个事件进来检查是否有其他人在等待安静期......

没有主机进程轮询订阅模型的时刻。我应该放弃发布/订阅模式还是有更好的方法?

回答

1

这里有一个粗略的实现,可能指向你的方向。在我的示例中,涉及通知的任务是保存数据对象。当一个对象被保存时,会引发Saved事件。除了一个简单的Save方法之外,我已经实现了BeginSave和EndSave方法以及Save的重载,这两个方法适用于批量保存。当调用EndSave时,会激发一个BatchSaved事件。

显然,您可以改变它以适合您的需求。在我的示例中,我跟踪了批处理操作期间保存的所有对象的列表,但这可能不是您需要做的事情......您可能只关心有多少对象被保存,或者甚至只是简单地批量保存操作已完成。如果您预计会保存大量对象,则将其存储在列表中(如我的示例中)可能会成为内存问题。

编辑:我在我的例子中添加了一个“阈值”的概念,试图防止大量的对象被保存在内存中。这会导致BatchSaved事件更频繁地触发。我还添加了一些锁定来解决潜在的线程安全问题,尽管我可能错过了某些内容。

class DataConcierge<T> 
{ 
    // ************************* 
    // Simple save functionality 
    // ************************* 

    public void Save(T dataObject) 
    { 
     // perform save logic 

     this.OnSaved(dataObject); 
    } 

    public event DataObjectSaved<T> Saved; 

    protected void OnSaved(T dataObject) 
    { 
     var saved = this.Saved; 
     if (saved != null) 
      saved(this, new DataObjectEventArgs<T>(dataObject)); 
    } 

    // ************************ 
    // Batch save functionality 
    // ************************ 

    Dictionary<BatchToken, List<T>> _BatchSavedDataObjects = new Dictionary<BatchToken, List<T>>(); 
    System.Threading.ReaderWriterLockSlim _BatchSavedDataObjectsLock = new System.Threading.ReaderWriterLockSlim(); 

    int _SavedObjectThreshold = 17; // if the number of objects being stored for a batch reaches this threshold, then those objects are to be cleared from the list. 

    public BatchToken BeginSave() 
    { 
     // create a batch token to represent this batch 
     BatchToken token = new BatchToken(); 

     _BatchSavedDataObjectsLock.EnterWriteLock(); 
     try 
     { 
      _BatchSavedDataObjects.Add(token, new List<T>()); 
     } 
     finally 
     { 
      _BatchSavedDataObjectsLock.ExitWriteLock(); 
     } 
     return token; 
    } 

    public void EndSave(BatchToken token) 
    { 
     List<T> batchSavedDataObjects; 
     _BatchSavedDataObjectsLock.EnterWriteLock(); 
     try 
     { 
      if (!_BatchSavedDataObjects.TryGetValue(token, out batchSavedDataObjects)) 
       throw new ArgumentException("The BatchToken is expired or invalid.", "token"); 

      this.OnBatchSaved(batchSavedDataObjects); // this causes a single BatchSaved event to be fired 

      if (!_BatchSavedDataObjects.Remove(token)) 
       throw new ArgumentException("The BatchToken is expired or invalid.", "token"); 
     } 
     finally 
     { 
      _BatchSavedDataObjectsLock.ExitWriteLock(); 
     } 
    } 

    public void Save(BatchToken token, T dataObject) 
    { 
     List<T> batchSavedDataObjects; 
     // the read lock prevents EndSave from executing before this Save method has a chance to finish executing 
     _BatchSavedDataObjectsLock.EnterReadLock(); 
     try 
     { 
      if (!_BatchSavedDataObjects.TryGetValue(token, out batchSavedDataObjects)) 
       throw new ArgumentException("The BatchToken is expired or invalid.", "token"); 

      // perform save logic 

      this.OnBatchSaved(batchSavedDataObjects, dataObject); 
     } 
     finally 
     { 
      _BatchSavedDataObjectsLock.ExitReadLock(); 
     } 
    } 

    public event BatchDataObjectSaved<T> BatchSaved; 

    protected void OnBatchSaved(List<T> batchSavedDataObjects) 
    { 
     lock (batchSavedDataObjects) 
     { 
      var batchSaved = this.BatchSaved; 
      if (batchSaved != null) 
       batchSaved(this, new BatchDataObjectEventArgs<T>(batchSavedDataObjects)); 
     } 
    } 

    protected void OnBatchSaved(List<T> batchSavedDataObjects, T savedDataObject) 
    { 
     // add the data object to the list storing the data objects that have been saved for this batch 
     lock (batchSavedDataObjects) 
     { 
      batchSavedDataObjects.Add(savedDataObject); 

      // if the threshold has been reached 
      if (_SavedObjectThreshold > 0 && batchSavedDataObjects.Count >= _SavedObjectThreshold) 
      { 
       // then raise the BatchSaved event with the data objects that we currently have 
       var batchSaved = this.BatchSaved; 
       if (batchSaved != null) 
        batchSaved(this, new BatchDataObjectEventArgs<T>(batchSavedDataObjects.ToArray())); 

       // and clear the list to ensure that we are not holding on to the data objects unnecessarily 
       batchSavedDataObjects.Clear(); 
      } 
     } 
    } 
} 

class BatchToken 
{ 
    static int _LastId = 0; 
    static object _IdLock = new object(); 

    static int GetNextId() 
    { 
     lock (_IdLock) 
     { 
      return ++_LastId; 
     } 
    } 

    public BatchToken() 
    { 
     this.Id = GetNextId(); 
    } 

    public int Id { get; private set; } 
} 

class DataObjectEventArgs<T> : EventArgs 
{ 
    public T DataObject { get; private set; } 

    public DataObjectEventArgs(T dataObject) 
    { 
     this.DataObject = dataObject; 
    } 
} 

delegate void DataObjectSaved<T>(object sender, DataObjectEventArgs<T> e); 

class BatchDataObjectEventArgs<T> : EventArgs 
{ 
    public IEnumerable<T> DataObjects { get; private set; } 

    public BatchDataObjectEventArgs(IEnumerable<T> dataObjects) 
    { 
     this.DataObjects = dataObjects; 
    } 
} 

delegate void BatchDataObjectSaved<T>(object sender, BatchDataObjectEventArgs<T> e); 

在我的示例中,我选择使用标记概念来创建单独的批处理。这允许在单独线程上运行的较小批处理操作完成并引发事件,而无需等待较大的批处理操作完成。

我做了separete事件:Saved和BatchSaved。但是,这些可以很容易地合并成一个事件。

编辑:由Steven Sudit在访问活动代表时指出的固定竞赛条件。

编辑:在我的例子修订的锁代码使用ReaderWriterLockSlim而不是监视器(即“锁”语句)。我认为存在一些竞争条件,例如在Save和EndSave方法之间。 EndSave可能执行,导致数据对象列表从字典中删除。如果Save方法在另一个线程上同时执行,则可能会将数据对象添加到该列表中,即使它已从字典中删除。

在我的修改示例中,这种情况不会发生,并且Save方法在EndSave之后执行时会引发异常。这些竞争条件主要是由我试图避免我认为是不必要的锁定造成的。我意识到更多的代码需要在锁内,但决定使用ReaderWriterLockSlim而不是Monitor,因为我只想防止Save和EndSave同时执行;没有必要同时阻止多个线程执行Save。请注意,Monitor仍然用于同步对字典中检索的特定数据对象列表的访问。

编辑:加入使用例

下面是上述样本代码的使用示例。

static void DataConcierge_Saved(object sender, DataObjectEventArgs<Program.Customer> e) 
    { 
     Console.WriteLine("DataConcierge<Customer>.Saved"); 
    } 

    static void DataConcierge_BatchSaved(object sender, BatchDataObjectEventArgs<Program.Customer> e) 
    { 
     Console.WriteLine("DataConcierge<Customer>.BatchSaved: {0}", e.DataObjects.Count()); 
    } 

    static void Main(string[] args) 
    { 
     DataConcierge<Customer> dc = new DataConcierge<Customer>(); 
     dc.Saved += new DataObjectSaved<Customer>(DataConcierge_Saved); 
     dc.BatchSaved += new BatchDataObjectSaved<Customer>(DataConcierge_BatchSaved); 

     var token = dc.BeginSave(); 
     try 
     { 
      for (int i = 0; i < 100; i++) 
      { 
       var c = new Customer(); 
       // ... 
       dc.Save(token, c); 
      } 
     } 
     finally 
     { 
      dc.EndSave(token); 
     } 
    } 

这导致下面的输出:

DataConcierge <顾客> .BatchSaved:17

DataConcierge <顾客> .BatchSaved:17

DataConcierge <顾客> .BatchSaved :17

Dat aConcierge <顾客> .BatchSaved:17

DataConcierge <顾客> .BatchSaved:17

DataConcierge <顾客> .BatchSaved:15

在我的示例中的阈值设置为17,所以一批100个项目导致BatchSaved事件触发6次。

+0

标准投诉:不要检查委托为空,然后调用它,因为这表现出竞争条件。相反,做一个本地副本,检查它为空,然后通过它调用。 – 2010-08-24 18:06:52

+0

啊,是的。感谢您指出了这一点。我已根据您的建议进行了更新。 – 2010-08-24 18:42:17

1

我不知道我是否正确理解你的问题,但我会尝试从源头解决问题 - 确保事件不是在“突发”中引发的。您可以考虑实施批处理操作,这可以从文件导入程序中使用。这将被视为您中间人的单一事件并提出单个事件。

我认为如果你不能做出上面所述的改变,实现一些合理的解决方案将会非常棘手 - 你可以尝试将你的发布者包装在一个“缓存”发布者中,这将实现一些启发式来缓存事件如果他们爆发。最简单的方法是缓存一个事件,如果另一个相同类型正在被处理(所以你的批处理至少会导致2个事件 - 一个在最开始,一个在结尾)。您可以等待很短的时间,并且只在下一个事件在那段时间内没有到来时才发起事件,但即使管道中存在单个事件,您也会遇到时间延迟。即使有不断的事件排队,您也需要确保您不时提出事件 - 否则发布商可能会挨饿。

第二个选择是棘手的实施,将包含启发式,这可能会去非常错误的...

+0

我希望通知过程能够经常引发事件,但让用户决定是否关心事件本身。一个用户可能希望每秒钟发生20个事件,另一个用户可能希望得到较少的通知... – 2010-08-24 14:44:15

+0

好吧,我明白了。如果你想使用发布/订阅模式,那么我认为订阅者需要实现事件的缓存。您可以考虑创建不同类型的事件,例如一个会为每一个元素开火,另一个会在整个批次中触发一次。 否则,我认为我会在每个订阅者中实现缓存事件。 – Grzenio 2010-08-24 14:59:09

+0

可以说,'频率'可以是订阅的属性,迫使发布者以不同的订阅者的不同时间间隔发送通知。 – 2010-08-24 18:05:50

0

这是一个想法,只是从我的脑海里掉了。我不知道它有多可行,也看不出一个明显的方法来使它更通用,但它可能是一个开始。它所做的只是为按钮点击事件提供一个缓冲区(必要时用您的事件替代)。

class ButtonClickBuffer 
{ 
    public event EventHandler BufferedClick; 

    public ButtonClickBuffer(Button button, int queueSize) 
    { 
     this.queueSize= queueSize; 
     button.Click += this.button_Click; 
    } 

    private int queueSize; 
    private List<EventArgs> queuedEvents = new List<EventArgs>(); 

    private void button_Click(object sender, EventArgs e) 
    { 
     queuedEvents.Add(e); 
     if (queuedEvents.Count >= queueSize) 
     { 
      if (this.BufferedClick!= null) 
      { 
       foreach (var args in this.queuedEvents) 
       { 
        this.BufferedClick(sender, args); 
       } 
       queuedEvents.Clear(); 
      } 
     } 
    } 
} 

所以你的用户,而不是订阅为:

this.button1.Click += this.button1_Click; 

会使用一个缓冲,指定多少事件等待:

ButtonClickBuffer buffer = new ButtonClickBuffer(this.button1, 5); 
buffer.BufferedClick += this.button1_Click; 

它可以在一个简单的测试形式我打了起来,但它远非生产准备!

你说你不想等待一个事件,看看是否有一个队列正在等待,这正是这样做的。您可以用缓冲区内的逻辑来产生一个监视队列的新线程,并根据需要分派事件。上帝知道可能由此产生线程和锁定问题!