2017-02-16 114 views
0

我有一些可观察到的顺序执行所有观察员后完成操作,例如:可观察到的序列

var period = TimeSpan.FromSeconds(0.5); 
var observable = Observable 
    .Interval(period) 
    .Publish() 
    .RefCount(); 

我想在后台线程上执行一些艰苦的计算该序列的元素,并进行一些最后的动作当所有的计算完成时。所以我想要这样的事情:

observable.ObserveOn(Scheduler.Default).Subscribe(i => ComplexComputation1(i)); 
observable.ObserveOn(Scheduler.Default).Subscribe(i => ComplexComputation2(i)); 
// next observer must be called only after ComplexComputation1/2 complete on input i 
observable.Subscribe(i => FinalAction(i)); 

我可以在Rx做这个吗?或者,这可能违反了反应式编程的一些原则,我应该在这种情况下采用另一种方法?

回答

2

在反应模式中计算有序的序列是非常危险的。

你可以做的一件事就是在复杂的计算完成后发出一个事件。然后,您可以让一位消费观察员在收到前面步骤完成的消息后执行他的计算。


另一个可能的解决方案是创建一个具体的序列块,定期发射。这降低了解决方案的并行性。

observable.ObserveOn(Scheduler.Default).Subscribe(i => 
{  
    ComplexComputation1(i)); 
    ComplexComputation2(i)); 
    FinalAction(i); 
} 
+0

我想过这种方法。但实际上,我不知道有多少观察者会处理每个元素。这个解决方案能应用于这种情况吗? –

+0

我添加了序列块方法作为可能的解决方案。 –

1

为了验证这一点,我创建了下面的方法来帮助说明事件的顺序:

public void ComplexComputation1(long i) 
{ 
    Console.WriteLine("Begin ComplexComputation1"); 
    Thread.Sleep(100); 
    Console.WriteLine("End ComplexComputation1"); 
} 

public void ComplexComputation2(long i) 
{ 
    Console.WriteLine("Begin ComplexComputation2"); 
    Thread.Sleep(100); 
    Console.WriteLine("End ComplexComputation2"); 
} 

public void FinalAction(long i) 
{ 
    Console.WriteLine("Begin FinalAction"); 
    Thread.Sleep(100); 
    Console.WriteLine("End FinalAction"); 
} 

你原来的代码跑这样的:

 
Begin FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
End FinalAction 
End ComplexComputation1 
Begin FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End FinalAction 
End ComplexComputation2 
End ComplexComputation1 
Begin FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
End ComplexComputation1 
End FinalAction 
... 

这很容易执行代码在单个后台线程上依次运行。只需使用EventLoopScheduler即可。

var els = new EventLoopScheduler(); 

observable.ObserveOn(els).Subscribe(i => ComplexComputation1(i)); 
observable.ObserveOn(els).Subscribe(i => ComplexComputation2(i)); 
// next observer must be called only after ComplexComputation1/2 complete on input i 
observable.ObserveOn(els).Subscribe(i => FinalAction(i)); 

这给:

 
Begin ComplexComputation1 
End ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation1 
End ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation1 
End ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 

但只要你介绍Scheduler.Default这不起作用。

的更多或更少的简单的选择是要做到这一点:

var cc1s = observable.ObserveOn(Scheduler.Default).Select(i => { ComplexComputation1(i); return Unit.Default; }); 
var cc2s = observable.ObserveOn(Scheduler.Default).Select(i => { ComplexComputation2(i); return Unit.Default; }); 

observable.Zip(cc1s.Zip(cc2s, (cc1, cc2) => Unit.Default), (i, cc) => i).Subscribe(i => FinalAction(i)); 

按预期工作。

你得到一个不错的顺序是这样的:

 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation1 
End ComplexComputation2 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation2 
Begin ComplexComputation1 
End ComplexComputation2 
End ComplexComputation1 
Begin FinalAction 
End FinalAction 
Begin ComplexComputation1 
Begin ComplexComputation2 
End ComplexComputation2 
End ComplexComputation1 
Begin FinalAction 
End FinalAction 
0

这似乎像组成一个简单的例子嵌套观察到被夷为平地(的SelectMany /合并/的毗连)和Zip

在这里,我已经采取假定Long Running方法的自由返回Task。 但是,如果他们不,那么慢阻塞同步方法可以用Observable.Start(()=>ComplexComputation1(x))包装。

void Main() 
{ 
    var period = TimeSpan.FromSeconds(0.5); 
    var observable = Observable 
     .Interval(period) 
     .Publish() 
     .RefCount(); 

    var a = observable.Select(i => ComplexComputation1(i).ToObservable()) 
       .Concat(); 
    var b = observable.Select(i => ComplexComputation2(i).ToObservable()) 
       .Concat(); 

    a.Zip(b, Tuple.Create) 
     .Subscribe(pair => FinalAction(pair.Item1, pair.Item2)); 
} 

// Define other methods and classes here 
Random rnd = new Random(); 
private async Task<long> ComplexComputation1(long i) 
{ 
    await Task.Delay(rnd.Next(50, 1000)); 
    return i; 
} 
private async Task<long> ComplexComputation2(long i) 
{ 
    await Task.Delay(rnd.Next(50, 1000)); 
    return i; 
} 

private void FinalAction(long a, long b) 
{ 

}