2013-05-31 31 views
-2

我有一个方案来启动订阅,一旦另一个订阅完成。两者都共享相同的连接流。现在任务。继续等效于RX

IConnectableObservable<List<int>> stream = GetMyHotStream(); 
var firstSubscription = stream.Subscribe(s=>{}); 
var secondSubscription = stream.Subscribe(s=> {}); 
stream.Connect(); 

我想等待secondSubscription直到firstSubscription完成。

为了简洁,我省略了其他细节。

任何答案将是非常有益的..谢谢

+1

”现在我想等待第二次订阅,直到第一次订阅完成。“ - 这句话没有任何意义。你想从更高级别的角度做什么 –

+0

我在看什么是var secondSubscription = stream.WaitUntil(firstSubscriptionCompleted).Subscribe(s => {}); – user730702

+0

看到您的实际使用案例会很有趣。可能有(可能是)解决问题的更好方法? –

回答

0

据我了解,你想机制保障,其中两个订阅的执行顺序。问题是,建立起来,它们是独立的。我认为最好的解决办法就是把它们放在一个操作中。

stream.Subscribe(s => 
{ 
    FirstSubscriptionStuff(s); 
    SecondSubscriptionStuff(s): 
}); 
+0

正确。我试图保证排除顺序。我无法将单个订阅中的逻辑组合在一起,因为第一次订阅(其是值列表)中的结果首先在UI上更新,并且结果的丰富发生在第二次订阅中的每个结果中,因为它连接很长时间到另一项服务。 – user730702

0

下面是一个例子,其中每个有更新到热可观察的时间所述第一预订打印出“子1项”。我使用Take(5)来确保第一次订阅完成。 然后,第二次订阅将继续从原始来源获取项目。

var s = Observable.Interval(TimeSpan.FromSeconds(1)).Publish(); 

var sub1 = s.Take(5).Dump("Sub 1 item"); 
var sub2 = sub1.IgnoreElements().Concat(s).Dump("Sub 2 item"); 

s.Connect(); 

输出:

Sub 1 item →0 
Sub 1 item →1 
Sub 1 item →2 
Sub 1 item →3 
Sub 1 item →4 
Sub 2 item →5 
Sub 2 item →6 
Sub 2 item →7 

这是诸如此类的事情,你是后? 问候,

霍华德

-1

我写的东西像

public static IObservable<TSource> WaitUntil<TSource>(this IObservable<TSource> source, IObservable<Unit> completionSource) 
    { 
     return new WaitUntil<TSource>(source, completionSource); 
    } 

,并通过订阅的completionSource的OnCompletion源实现的最好推迟。

的这种用法是

var asyncSubject = new AsyncSubject();

 var secondStream = stream.WaitUntil(asyncSubject).Subscribe(s => 
      { 
      }); 

     var firstStream = stream.SubscribeOn(Scheduler.NewThread).Subscribe(s => 
      { }, 
      () => 
      { 
       asyncSubject.OnCompleted(); 
      }); 

     stream.Connect(); 

不知道如何优雅,这是因为我刚开始在我的项目中使用的Rx,但这个工作正常。 “