2012-06-27 53 views
0

我有以下异步队列处理路由。无功扩展等待队列空

 var commandQueue = new BlockingCollection<MyCommand>(); 
     commandQueue 
      .GetConsumingEnumerable() 
      .ToObservable(new LimitedConcurrencyLevelTaskPoolScheduler(5)) 
      .Subscribe(c => 
          { 
           try 
           { 
            ProcessCommand(c); 
           } 
           catch (Exception ex) 
           { 
            Trace.TraceError(ex.ToString()); 
           } 
          } 
      ); 

在一个特定的情况下(当我即将得到一些数据),我需要确保我commandQueue是前走出去,让数据为空。预计此操作将同步发生。基本上,我想这样做

public void GetData() 
    { 
    commandQueue.WaitForEmpty(); 

    // could potentially be expressed: 
    // while (commandQueue.Count > 0) Thread.Sleep(10); 

    return GoGetTheData() 
    } 

我意识到,在理想情况下,所有的来电者“的GetData”异步......但有时它的必要,它发生在一个同步的方式...等我需要等待命令队列为空以确保数据的一致性和最新性。

我知道我如何使用ManualResetEvent轻松完成此操作...但我想知道System.Reactive/TPL是否有简单的方法。

谢谢。

回答

1

这是一个比起初看起来更难的问题。你需要BlockingCollection(和底层的ConcurrentQueue)生产者 - 消费者的工作语义。但是你也希望能够观察这些收藏品的情况,包括等待'空'的信号。

最好的办法就是从这里看一看JobQueueParallelJobQueue

http://social.msdn.microsoft.com/Forums/en-US/rx/thread/2817c6e5-e5a4-4aac-91c1-97ba7de88ff7

其中包括一个可观察的WhenQueueEmpty并可以控制同时运行的作业数量和排队的作业(工种是同义这种情况下你的命令的概念)。

0

你能用这个吗?

var dataObservable = Observable.Start(() => 
    { 
     commandQueue.WaitForEmpty(); 
     return GoGetTheData(); 
    }); 
+0

问题是,没有这样的方法WaitForEmpty – Jeff

+0

@ JeffN825 - 什么是BlockCollection呢?这是你定义的东西吗? – Enigmativity

+0

对不起,我不确定在我的例子中如何被截断。它是BlockingCollection http://msdn.microsoft.com/en-us/library/dd267312.aspx – Jeff

0

在我看来,你的要求是

  • 异步此数据获取数据
  • 并行处理
  • 重复此过程

如果(5度最大并行)这些都是你的要求,你不需要使用BlockingCollection,即它不是现有的API,那么我认为你可以很容易地用Rx解决这个问题。

var dataRequestScheduler = new EventLoopScheduler(); 
var subscription = GetTheData() 
    .Repeat() 
    .SubscribeOn(dataRequestScheduler) 
    .ObserveOn(Scheduler.TaskPool)//new LimitedConcurrencyLevelTaskPoolScheduler(5) 
    .Subscribe(c => 
      { 
       try 
       { 
        ProcessCommand(c); 
       } 
       catch (Exception ex) 
       { 
        Trace.TraceError(ex.ToString()); 
       } 
      } 
     ); 

凡GetTheData方法返回的IObservable

你可能会利用Observable.Start和合并(5),以获得你的最大5个线程,而无需自定义调度。