0

我想实现加盖和缓存作业执行程序。执行加盖和缓存作业执行程序

这将有一个单独的方法:

public class CappedBufferedExecutor { 
    public CappedBufferedExecutor(int bufferCapping, int fillTimeInMillisec); 
    public Task<bool> EnqueueAsync(string val); 
} 

的想法是,值被异步排队,一旦fillTimeInMillisec毫秒通过,或缓冲器被填充到其的独特值上限,则执行是在实践中完成,异步任务全部完成。在执行完成(可能需要很长时间)的同时,可以重新填充缓冲区并执行新的异步执行。

我在下面的伪代码

  • 使用Timer的线条想到了什么,等待fillTime通过,一旦超过,创建一个新的任务,那将做的工作(见下文) 。
  • 在新值上锁定rwlock以供阅读。检查缓冲区是否已满,如果是,请等待ManualResetEventTaskCompletionSource
  • 将新值添加到缓冲区(HashSet<string>)。
  • 如果缓冲区已满,则创建一个新的执行任务,该任务将锁定rwlock进行写入,完成所有收集的值的工作并使用TaskCompletionSource唤醒所有未完成的任务。
  • TaskCompletionSource上等待缓冲的任务(在上一步中提到)执行。

我的问题:如何将Timer和填充的缓冲区检查同步,如何等待,当缓冲区满时,如何开始执行,并允许新值到达时TaskCompletionSource实例之间进行切换。

+0

为什么你需要返回任务''从'EnqueueAsync'?当缓冲区已满时,它是否可以提供带有任务列表的事件? – apocalypse

+0

如果在完成项目之前仍然正在处理项目并且缓冲区已满,会发生什么情况? –

+0

@apocalypse bool将表明工作是否成功。不重要,可以走你的路。 – Mugen

回答

2

这仅仅是概念,所以不要指望太多:-)

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 

namespace ConsoleApp 
{ 
    class Program 
    { 
     static void Main (string[] args) 
     { 
      var buffer = CreateBuffer(); 

      var executor = new Executor<string> (SomeWork, buffer); 
      executor.ProcessingStarted += Executor_ProcessingStarted; 

      string userInput = null; 

      do 
      { 
       userInput = Console.ReadLine(); 

       buffer.Enqueue (userInput); 
      } 
      while (!string.IsNullOrWhiteSpace (userInput)); 

      executor.Dispose(); 
     } 

     //---------------------------------------------------------------------------------------------------------------------------------- 

     private static IBuffer<string> CreateBuffer() 
     { 
      var buffer = new UniqueItemsBuffer<string> (3); 

      buffer.DataAvailable += (items) => Console.WriteLine ("BUFFER :: data available raised."); 

      var alert = new Alert(); 

      var bufferWithTimeout = new BufferWithTimeout<string> (buffer, alert, TimeSpan.FromSeconds (5)); 

      return bufferWithTimeout; 
     } 

     //---------------------------------------------------------------------------------------------------------------------------------- 

     static Random rnd = new Random(); // must be outside, to avoid creating Random too quick because it will use the same seed for all tasks 

     public static bool SomeWork (string x) 
     { 
      int delay = rnd.Next (1000, 8000); 

      Console.WriteLine ($" +++ Starting SomeWork for: {x}, delay: {delay} ms"); 

      Thread.Sleep (delay); 

      Console.WriteLine ($" --- SomeWork for: {x} - finished."); 

      return true; 
     } 

     //---------------------------------------------------------------------------------------------------------------------------------- 

     private static void Executor_ProcessingStarted (IReadOnlyList<Task<bool>> items) 
     { 
      Task.Run (() => 
      { 
       Task.WaitAll (items.ToArray()); 
       Console.WriteLine ("Finished processing tasks, count = " + items.Count); 
      }); 
     } 
    } 

    //====== actual code =================================================================================================================== 

    public delegate void ItemsAvailable<T> (IReadOnlyList<T> items); // new type to simplify code 

    public delegate bool ProcessItem<T> (T item); // processes the given item and returns true if job is done with success 

    //====================================================================================================================================== 

    public interface IDataAvailableEvent<T> 
    { 
     event ItemsAvailable<T> DataAvailable; // occurs when buffer need to be processed (also before raising this event, buffer should be cleared) 
    } 

    //====================================================================================================================================== 

    public interface IProcessingStartedEvent<T> 
    { 
     event ItemsAvailable<Task<bool>> ProcessingStarted; // executor raises this event when all tasks are created and started 
    } 

    //====================================================================================================================================== 

    public interface IBuffer<T> : IDataAvailableEvent<T> 
    { 
     bool Enqueue (T item); // adds new item to buffer (but sometimes it can ignore item, for example if we need only unique items in list) 
           // returns: true = buffer is not empty, false = is emtpy 

     void FlushBuffer(); // should clear buffer and raise event (or not raise if buffer was already empty) 
    } 

    //====================================================================================================================================== 

    // raises DataAvailable event when buffer cap is reached 
    // ignores duplicates 

    // you can only use this class from one thread 

    public class UniqueItemsBuffer<T> : IBuffer<T> 
    { 
     public event ItemsAvailable<T> DataAvailable; 

     readonly int capacity; 
     HashSet<T> items = new HashSet<T>(); 

     public UniqueItemsBuffer (int capacity = 10) 
     { 
      this.capacity = capacity; 
     } 

     public bool Enqueue (T item) 
     { 
      if (items.Add (item) && items.Count == capacity) 
      { 
       FlushBuffer(); 
      } 

      return items.Count > 0; 
     } 

     public void FlushBuffer() 
     { 
      Console.WriteLine ("BUFFER :: flush, item count = " + items.Count); 

      if (items.Count > 0) 
      { 
       var itemsCopy = items.ToList(); 
       items.Clear(); 

       DataAvailable?.Invoke (itemsCopy); 
      } 
     } 
    } 

    //====================================================================================================================================== 

    public class Executor<T> : IProcessingStartedEvent<T>, IDisposable 
    { 
     public event ItemsAvailable<Task<bool>> ProcessingStarted; 

     readonly ProcessItem<T> work; 
     readonly IDataAvailableEvent<T> dataEvent; 

     public Executor (ProcessItem<T> work, IDataAvailableEvent<T> dataEvent) 
     { 
      this.work = work; 
      this.dataEvent = dataEvent; 

      dataEvent.DataAvailable += DataEvent_DataAvailable; 
     } 

     private void DataEvent_DataAvailable (IReadOnlyList<T> items) 
     { 
      Console.WriteLine ("EXECUTOR :: new items to process available, count = " + items.Count); 

      var list = new List<Task<bool>>(); 

      foreach (var item in items) 
      { 
       var task = Task.Run (() => work (item)); 

       list.Add (task); 
      } 

      Console.WriteLine ("EXECUTOR :: raising processing started event (this msg can appear later than messages from SomeWork)"); 

      ProcessingStarted?.Invoke (list); 
     } 

     public void Dispose() 
     { 
      dataEvent.DataAvailable -= DataEvent_DataAvailable; 
     } 
    } 

    //====================================================================================================================================== 

    // if you want to fill buffer using many threads - use this decorator 

    public sealed class ThreadSafeBuffer<T> : IBuffer<T> 
    { 
     public event ItemsAvailable<T> DataAvailable; 

     readonly IBuffer<T> target; 
     readonly object sync = new object(); 

     private ThreadSafeBuffer (IBuffer<T> target) 
     { 
      this.target = target; 
      this.target.DataAvailable += (items) => DataAvailable?.Invoke (items); // TODO: unpin event :P 
     } 

     public bool Enqueue (T item) 
     { 
      lock (sync) return target.Enqueue (item); 
     } 

     public void FlushBuffer() 
     { 
      lock (sync) target.FlushBuffer(); 
     } 

     public static IBuffer<T> MakeThreadSafe (IBuffer<T> target) 
     { 
      if (target is ThreadSafeBuffer<T>) return target; 

      return new ThreadSafeBuffer<T> (target); 
     } 
    } 

    //====================================================================================================================================== 

    // and now if you want to process buffer after elapsed time 

    public interface IAlert 
    { 
     CancellationTokenSource CreateAlert (TimeSpan delay, Action action); // will execute 'action' after given delay (non blocking) 
    } 

    // I didn't use much timers, so idk is this code good 

    public class Alert : IAlert 
    { 
     List<System.Timers.Timer> timers = new List<System.Timers.Timer>(); // we need to keep reference to timer to avoid dispose 

     public CancellationTokenSource CreateAlert (TimeSpan delay, Action action) 
     { 
      var cts = new CancellationTokenSource(); 

      var timer = new System.Timers.Timer (delay.TotalMilliseconds); 
      timers.Add (timer); 

      timer.Elapsed += (sender, e) => 
      { 
       timers.Remove (timer); 

       timer.Dispose(); 

       if (cts.Token.IsCancellationRequested) return; 

       action.Invoke(); 
      }; 

      timer.AutoReset = false; // just one tick 
      timer.Enabled = true; 

      return cts; 
     } 
    } 

    // thread safe (maybe :-D) 

    public class BufferWithTimeout<T> : IBuffer<T> 
    { 
     public event ItemsAvailable<T> DataAvailable; 

     readonly IBuffer<T> target; 
     readonly IAlert  alert; 
     readonly TimeSpan timeout; 

     CancellationTokenSource cts; 

     readonly object sync = new object(); 

     public BufferWithTimeout (IBuffer<T> target, IAlert alert, TimeSpan timeout) 
     { 
      this.target = ThreadSafeBuffer<T>.MakeThreadSafe (target); // alert can be raised from different thread 
      this.alert = alert; 
      this.timeout = timeout; 

      target.DataAvailable += Target_DataAvailable; // TODO: unpin event 
     } 

     private void Target_DataAvailable (IReadOnlyList<T> items) 
     { 
      lock (sync) 
      { 
       DisableTimer(); 
      } 

      DataAvailable?.Invoke (items); 
     } 

     public bool Enqueue (T item) 
     { 
      lock (sync) 
      { 
       bool hasItems = target.Enqueue (item); // can raise underlying flush -> dataAvailable event (will disable timer) 

       // and now if buffer is empty, we cannot start timer 

       if (hasItems && cts == null) // if timer is not enabled 
       { 
        Console.WriteLine ("TIMER :: created alert"); 
        cts = alert.CreateAlert (timeout, HandleAlert); 
       } 

       return hasItems; 
      } 
     } 

     public void FlushBuffer() 
     { 
      lock (sync) 
      { 
       DisableTimer(); 
       target.FlushBuffer(); 
      } 
     } 

     private void HandleAlert() 
     { 
      lock (sync) 
      { 
       Console.WriteLine ("TIMER :: handler, will call buffer flush"); 
       target.FlushBuffer(); 
      } 
     } 

     private void DisableTimer() 
     { 
      cts?.Cancel(); 
      cts = null; 

      Console.WriteLine ("TIMER :: disable"); 
     } 
    } 
} 
+0

谢谢,我会尽力为我的代码工作!小注:'System.Timers.Timer'不需要你维护一个引用 - 它由框架维护:) – Mugen

+0

@Mugen:啊...真的。感谢您的提示。看起来他们在Net 2.0中修复了它。 – apocalypse

1

您可以使用Reactive Extensions轻松做些事情。

void Main() 
{ 
    var c = new Processor(); 
    c.SetupBufferedProcessor(2, TimeSpan.FromMilliseconds(1000)); 

    c.Enqueue("A"); 
    c.Enqueue("B"); 
    c.Enqueue("C"); 

    Console.ReadLine(); 

    // When application has ended, flush the buffer 
    c.Dispose(); 
} 


public sealed class Processor : IDisposable 
{ 
    private IDisposable subscription; 
    private Subject<string> subject = new Subject<string>(); 

    public void Enqueue(string item) 
    { 
     subject.OnNext(item);  
    } 

    public void SetupBufferedProcessor(int bufferSize, TimeSpan bufferCloseTimespan) 
    { 
     // Create a subscription that will produce a set of strings every second 
     // or when buffer has 2 items, whatever comes first 
     subscription = subject.AsObservable() 
      .Buffer(bufferCloseTimespan, bufferSize) 
      .Where(list => list.Any()) // suppress empty list (no items enqueued for 1 second) 
      .Subscribe(async list => 
      { 
       await Task.Run(() => 
       { 
        Console.WriteLine(string.Join(",", list)); 
        Thread.Sleep(2000); // For demo purposes, to demonstrate processing takes place parallel with other batches. 
       }); 
      }); 
    } 

    public void Dispose() 
    { 
     subscription?.Dispose(); 
    } 
} 

这将输出

A,B 

和,在一秒之后,

C 

为RX的代码是更多at GitHub 上RX:http://www.introtorx.com/使用Buffer方法一个基本的例子

这个例子可以改进以保存对创建的对象的引用,以便在结束应用程序之前可以正确等待它们,但这会给你一个总体思路。