2016-11-17 37 views
2

我对Rx非常陌生,并试图围绕它来包裹我的头。没有阅读很多,但试图通过实验室的第一手。Rx.net - 同步与异步观察者 - 取决于源?

class Program 
{ 
    static void Main(string[] args) 
    { 
     // one source, produces values with delays 
     IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i, i => TimeSpan.FromMilliseconds(100)); 
     IObserver<int> handler = null; 

     IDisposable subscription = source.Subscribe(
      i => 
      { 
       Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i); 
       Thread.Sleep(500); 
      }, 
      exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 1 Completed observation")); 

     IDisposable subscription2 = source.Subscribe(
      i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i), 
      exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 2 Completed observation")); 

     Console.WriteLine("press to cancel"); 
     Console.ReadLine(); 
     subscription.Dispose(); 
     subscription2.Dispose(); 

    } 
} 

这产生如预期异步交叉执行。另一方面,如果我将源更改为同步,即使观察者变为阻塞和同步(同一个线程ID,如果没有完全使用sub1,也不会进入sub2)。 有人可以帮我理解这一点吗?这里的同步版本

class Program 
{ 
    static void Main(string[] args) 
    { 
     // one source, produces values 
     IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i*i); 
     IObserver<int> handler = null; 

     // two observers that consume - first with a delay and the second immediately. 
     // in this case, the behavior of the observers becomes synchronous? 
     IDisposable subscription = source.Subscribe(
      i => 
      { 
       Console.WriteLine("Sub 1 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId,i); 
       Thread.Sleep(500); 
      }, 
      exception => Console.WriteLine("Sub 1 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 1 Completed observation")); 

     IDisposable subscription2 = source.Subscribe(
      i => Console.WriteLine("Sub 2 [tid:{0}] received {1} from source", Thread.CurrentThread.ManagedThreadId, i), 
      exception => Console.WriteLine("Sub 2 Something went wrong {0}", exception), 
      () => Console.WriteLine("Sub 2 Completed observation")); 

     Console.WriteLine("press to cancel"); 
     Console.ReadLine(); 
     subscription.Dispose(); 
     subscription2.Dispose(); 

    } 
} 

回答

2

我相信原因是运营商所选择的默认IScheduler。看看接受的答案here

对于Generate它取决于过载。根据答案,这些是使用的默认调度程序。您可以验证他们的来源,如果你喜欢

  • 的时间运算默认ISchedulerDefaultScheduler.Instance
  • 默认IScheduler后者算CurrentThreadScheduler.Instance

可以通过提供一个“非确认此阻止“您的同步版本中的调度程序

IObservable<int> source = Observable.Generate(0, i => i < 2, i => ++i, i => i * i, DefaultScheduler.Instance);

+0

看起来像是如果你使用时间跨度的重载选择'Scheduler.Default'调度器,而没有时间跨度的调度器则使用'Scheduler.Immediate' – Raghu