2012-01-01 85 views
2

是否可以强制多个RX订阅到不同的观察值连续运行(不是同时)?同步RX中的多个订阅

我知道我可以使用EventLoopScheduler,但这会降低性能,因为所有处理都将在单个线程上完成。

回答

1

如果你的意思是运行一个可观察的直到OnCompleted然后开始下一个,你应该看看Concat。如果您的意思是要同时订阅多个不同的observables,则可以使用Merge(如果语义对您的方案有意义)。如果合并不合适,我会建议在观察者方法或您已知的EventLoopScheduler中使用标准线程同步方法之一(锁定,监视等)。

编辑原来的答复保存下面

是的,这是可以强制串行执行观察员。但是,您是否需要取决于观测值。一般来说,热点观测量已经连续运行,而冷态观测量则不会。这是热和冷观察者工作方式的差异的副作用。为了使观察者变得冷热,从而使观察者连续运行,使用Publish。这是一个演示各种行为的例子。

Sub Main() 
    'hot observable, runs serially 
    Dim trigger As New ObsEvent 
    Dim eobs = Observable.FromEventPattern(Of ItemEventArgs(Of String))(
        Sub(h) AddHandler trigger.Triggered, h, 
        Sub(h) RemoveHandler trigger.Triggered, h) 
    Dim sub1 = eobs.Subscribe(Sub(v) 
            Console.WriteLine("Starting event observer 1: {0}", v.EventArgs.Item) 
            Thread.Sleep(2000) 
            Console.WriteLine("Ending event observer 1") 
           End Sub) 
    trigger.Trigger("event trigger 1") 
    Dim sub2 = eobs.Subscribe(Sub(v) 
            Console.WriteLine("Starting event observer 2: {0}", v.EventArgs.Item) 
            Thread.Sleep(2000) 
            Console.WriteLine("Ending event observer 2") 
           End Sub) 
    trigger.Trigger("event trigger 2") 

    Console.WriteLine() 
    Console.WriteLine() 

    'cold observable, runs "simultaneously" 
    Dim tobs = Observable.Timer(TimeSpan.FromSeconds(5)) 
    sub1 = tobs.Subscribe(Sub(v) 
           Console.WriteLine("Starting timer observer 1") 
           Thread.Sleep(2000) 
           Console.WriteLine("Ending timer observer 1") 
          End Sub, 
          Sub(ex) Console.WriteLine("Error"), 
          Sub() Console.WriteLine("Observer 1 completed")) 
    Thread.Sleep(500) 
    sub2 = tobs.Subscribe(Sub(v) 
           Console.WriteLine("Starting timer observer 2") 
           Thread.Sleep(2000) 
           Console.WriteLine("Ending timer observer 2") 
          End Sub, 
          Sub(ex) Console.WriteLine("Error"), 
          Sub() Console.WriteLine("Observer 2 completed")) 

    'cold observable turned hot, runs serially 
    Dim pobs = tobs.Publish() 
    sub1 = pobs.Subscribe(Sub(v) 
           Console.WriteLine("Starting publish observer 1") 
           Thread.Sleep(2000) 
           Console.WriteLine("Ending publish observer 1") 
          End Sub, 
          Sub(ex) Console.WriteLine("Error"), 
          Sub() Console.WriteLine("Observer P1 completed")) 
    Thread.Sleep(500) 
    sub2 = pobs.Subscribe(Sub(v) 
           Console.WriteLine("Starting publish observer 2") 
           Thread.Sleep(2000) 
           Console.WriteLine("Ending publish observer 2") 
          End Sub, 
          Sub(ex) Console.WriteLine("Error"), 
          Sub() Console.WriteLine("Observer P2 completed")) 
    pobs.Connect() 

    Console.ReadKey() 
End Sub 
+1

在上面的示例中订阅的是相同的observable。我的意思是将订阅的执行序列化为不同的观察值。 (编辑我的问题澄清)。 – dux2 2012-01-02 06:28:01

+0

我有这样的印象:这是不正确的(冷观察结果不能连续运行) - 你能给这个信息提供参考吗? – dux2 2012-01-02 07:12:38

+0

@ dux2我没有引用方便,但它遵循冷观察的定义(只有在您每次订阅时才运行)。我最初发布的示例代码演示了这一点。 – 2012-01-04 05:15:41