2011-10-30 70 views
1

我学习无扩展,我一直在试图找出如果它是这样的任务的匹配一批请求。处理与无扩展

我有加工成批请求作为一个工作单元,并调用回调当所有请求已经完成的进程()方法。

这里最重要的是,每个请求将调用回调函数同步或异步取决于它的实现,并批处理器必须能够同时处理。

但没有线程正在从批处理器开始,任何新的螺纹(或其它异步执行)将被从请求处理程序内如果有必要启动。我不知道这是否符合rx的用例。

我当前工作的代码看起来(几乎)是这样的:

public void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted) 
{ 
    IUnitOfWork uow = null; 
    try 
    { 
     uow = unitOfWorkFactory.Create(); 

     var responses = new List<IResponse>(); 
     var outstandingRequests = requests.Count; 
     foreach (var request in requests) 
     { 
      var correlationId = request.CorrelationId; 
      Action<IResponse> requestCallback = response => 
      { 
       response.CorrelationId = correlationId; 
       responses.Add(response); 
       outstandingRequests--; 
       if (outstandingRequests != 0) 
        return; 

       uow.Commit(); 
       onCompleted(responses); 
      }; 

      requestProcessor.Process(request, requestCallback); 
     } 
    } 
    catch(Exception) 
    { 
     if (uow != null) 
      uow.Rollback(); 
    } 

    if (uow != null) 
     uow.Commit(); 
}   

你将如何实现这一点使用RX?这是合理的吗?

注意,这项工作的单位将被同步提交即使有还未返回异步请求。

回答

3

我的办法处理这一是两步。

首先创建一个通用的运营商,轮流Action<T, Action<R>>Func<T, IObservable<R>>

public static class ObservableEx 
{ 
    public static Func<T, IObservable<R>> FromAsyncCallbackPattern<T, R>(
     this Action<T, Action<R>> call) 
    { 
     if (call == null) throw new ArgumentNullException("call"); 
     return t => 
     { 
      var subject = new AsyncSubject<R>(); 
      try 
      { 
       Action<R> callback = r => 
       { 
        subject.OnNext(r); 
        subject.OnCompleted(); 
       }; 
       call(t, callback); 
      } 
      catch (Exception ex) 
      { 
       return Observable.Throw<R>(ex, Scheduler.ThreadPool); 
      } 
      return subject.AsObservable<R>(); 
     }; 
    } 
} 

接下来,并将该呼叫void Process(ICollection<IRequest> requests, Action<List<IResponse>> onCompleted)IObservable<IResponse> Process(IObservable<IRequest> requests)

public IObservable<IResponse> Process(IObservable<IRequest> requests) 
{ 
    Func<IRequest, IObservable<IResponse>> rq2rp = 
     ObservableEx.FromAsyncCallbackPattern 
      <IRequest, IResponse>(requestProcessor.Process); 

    var query = (
     from rq in requests 
     select rq2rp(rq)).Concat(); 

    var uow = unitOfWorkFactory.Create(); 
    var subject = new ReplaySubject<IResponse>(); 

    query.Subscribe(
     r => subject.OnNext(r), 
     ex => 
     { 
      uow.Rollback(); 
      subject.OnError(ex); 
     }, 
     () => 
     { 
      uow.Commit(); 
      subject.OnCompleted(); 
     }); 

    return subject.AsObservable(); 
} 

现在,这不仅运行处理异步,但它也确保了结果的正确顺序。

事实上,因为你开始集合,你甚至可以做到这一点:

var rqs = requests.ToObservable(); 

var rqrps = rqs.Zip(Process(rqs), 
    (rq, rp) => new 
    { 
     Request = rq, 
     Response = rp, 
    }); 

那么你将有一个可观察的是对了,而不需要一个CorrelationId属性每个请求/响应。

我希望这会有所帮助。

+0

这的确非常有帮助!感谢您提供非常详尽的答案。唯一让我无法理解的是,当所有请求都得到回应时,我该如何“做些什么”?在zip之后我该怎么说:现在所有的请求都被响应,用这些请求/响应对调用这些方法......我尝试订阅“rqrps”observable,在列表中收集响应并调用方法完成后,但没有成功(在所有这些之后,收集列表中的响应似乎有点奇怪)。我真的很感激:) – asgerhallas

+0

doh! ......称为错误的超载:) – asgerhallas

+0

@asgerhallas - 你不需要等待所有的对子回来 - 只要在他们回来时开始处理它们。否则看看'.ToArray()'可观察的运算符 - 它将'IObservable '变成'IObservable ' - 将n个值的可观察值转换为具有n个元素的一个数组的可观察值,并且只有在成功时。听起来像是您需要的完美运营商。 – Enigmativity

1

这是Rx的天才的一部分,你可以自由地同步或异步返回结果:

public IObservable<int> AddNumbers(int a, int b) { 
    return Observable.Return(a + b); 
} 

public IObservable<int> AddNumbersAsync(int a, int b) { 
    return Observable.Start(() => a + b, Scheduler.NewThread); 
} 

它们都有的IObservable类型,所以他们同样工作。如果你想要找出何时完成所有IObservables,总结会做到这一点,因为它会变成“否”可观察到的项目成在年底返回项目:

IObservable<int> listOfObservables[]; 

listObservables.ToObservable() 
    .Merge() 
    .Aggregate(0, (acc, x) => acc+1) 
    .Subscribe(x => Console.WriteLine("{0} items were run", x)); 
+0

非常好,谢谢!现在我想知道,最简单的方法是什么?如果我应该采用当前的requestProcessor.Process()来接收回调并将该回调的调用转换为正在观察的内容? – asgerhallas

+0

哦...发现这个http://stackoverflow.com/questions/5827178/using-rx-framework-for-async-calls-using-the-void-asyncmethodactiont-callback。这似乎是正确的方向。 – asgerhallas