2011-12-13 105 views
3

我想了解如何将多个可观察事件流式传输到Rx的一组事件中。但是当我运行下面的代码时,我得到一个异常。那么这是否意味着多个观察者总是因为违反Rx语法而容易出现异常?我的意思是,如果这些多个观察者中的两个偶然产生一个事件(任何两个观察者同时产生一些概率),它应该给出一个例外。观察者可以安全地使用Rx监听多个观察对象吗?

DateTimeOffset start; 
     object sync = new object(); 
     var subject = new Subject<long>(); 
     var observer = Observer.Create<long>(c => 
     { 
      lock (sync) 
      { 
       Console.WriteLine(c); 
      } 
     }) 
      ; 

     var observable1 = Observable.Interval(TimeSpan.FromSeconds(2)); 
     var observable2 = Observable.Interval(TimeSpan.FromSeconds(5)); 
     var observable3 = Observable.Never<long>().Timeout 
      (start = DateTimeOffset.Now.AddSeconds(15), 
      (new long[] { 1 }).ToObservable()); 
     var observable4 = Observable.Never<long>().Timeout(start); 
     observable1.Subscribe(observer); 
     observable2.Subscribe(observer); 
     observable3.Subscribe(observer); 
     observable4.Subscribe(observer); 
     Thread.Sleep(20000); 

感谢Gideon的解释。这是我得到的例外。你是对的,这是一个时间错觉。这是一个编码错误。谢谢。

System.TimeoutException: The operation has timed out. 
    at System.Reactive.Observer.<Create>b__8[T](Exception e) 
    at System.Reactive.AnonymousObserver`1.Error(Exception exception) 
    at System.Reactive.AbstractObserver`1.OnError(Exception error) 
    at System.Reactive.Subjects.Subject`1.OnError(Exception error) 
    at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e 
xception) 
    at System.Reactive.AbstractObserver`1.OnError(Exception error) 
    at System.Reactive.AnonymousObservable`1.AutoDetachObserver.Error(Exception e 
xception) 
    at System.Reactive.AbstractObserver`1.OnError(Exception error) 
    at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<>c__DisplayClass28 
e.<Throw>b__28b() 
    at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action 
action) 
    at System.Reactive.Concurrency.ImmediateScheduler.Schedule[TState](TState sta 
te, Func`3 action) 
    at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio 
n action) 
    at System.Reactive.Linq.Observable.<>c__DisplayClass28c`1.<Throw>b__28a(IObse 
rver`1 observer) 
    at System.Reactive.AnonymousObservable`1.<>c__DisplayClass1.<Subscribe>b__0() 

    at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action 
action) 
    at System.Reactive.Concurrency.ScheduledItem`2.InvokeCore() 
    at System.Reactive.Concurrency.ScheduledItem`1.Invoke() 
    at System.Reactive.Concurrency.CurrentThreadScheduler.Trampoline.Run() 
    at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState 
state, TimeSpan dueTime, Func`3 action) 
    at System.Reactive.Concurrency.CurrentThreadScheduler.Schedule[TState](TState 
state, Func`3 action) 
    at System.Reactive.Concurrency.Scheduler.Schedule(IScheduler scheduler, Actio 
n action) 
    at System.Reactive.AnonymousObservable`1.Subscribe(IObserver`1 observer) 
    at System.Reactive.Linq.Observable.<>c__DisplayClass543`1.<>c__DisplayClass54 
5.<Timeout>b__53f() 
    at System.Reactive.Concurrency.Scheduler.Invoke(IScheduler scheduler, Action 
action) 
    at System.Reactive.Concurrency.ThreadPoolScheduler.<>c__DisplayClass8`1.<Sche 
dule>b__6(Object _) 
    at System.Threading._TimerCallback.TimerCallback_Context(Object state) 
    at System.Threading.ExecutionContext.Run(ExecutionContext executionContext, C 
ontextCallback callback, Object state, Boolean ignoreSyncCtx) 
    at System.Threading._TimerCallback.PerformTimerCallback(Object state) 

回答

5

是的,一个观察者可以听到多个观察对象。最好的例子是Merge运营商。内置的操作符都将遵循RX语法,并且通常会在不支持的源上执行。

IObserver你从Observer.Create就是这样的一个情况。一旦调用OnError或OnCompleted,它将忽略以后对OnNext的调用。这确实意味着使用相同的观察者订阅一个observable,然后在第一个observable之后另一个observable将不起作用,因为来自第一个observable的终止消息将导致观察者忽略来自第二个observable的消息。为了解决这个问题,像Merge,ConcatOnErrorResumeNext(等等)这样的运营商在内部使用多个观察者,并且不从最后一个可观察到的外部传递完成消息(取决于操作符的语义的OnError和/或OnCompleted)观察者。

你没有提到你做了什么异常,但我猜想,这是来自于你从observable4得到超时错误。如果你不提供另一个可观测到使用超时,观察者的OnError被调用,并为SubscribeObserver.Create重载不采取错误处理程序的默认OnError是简单地抛出异常。

虽然这很明显是示例/测试代码,但我确实想指出,即使您不再收到传递给您的消息OnNext,但在此异常之后,所有其他可观察对象仍继续运行。您可以使用Merge为您跟踪这些信息,也可以跟踪描述中的所有一次性物品,并在完成消息传出时自行处理它们。 CompositeDisposable(在System.Reactive.Disposables)对此很有帮助。

2

你真的不应该在这里使用锁定的情况下,但如果你真的想这个工作,你可以这样做:

var x = Observable.Create<T>(subj => { /* Fill it in*/ }) 
    .Multicast(new Subject<T>()); 

// Set up your subscriptions Here! 

// When you call the Connect, whatever is in the Observable.Create will be called 
x.Connect(); 

如果你想成为更安全,你可以把它,以便的创建将被“重播”你为未来的订阅,通过使用ReplaySubject而不是主题(而有主题,用户后连接会得到什么)

结果