2015-11-30 46 views
1

我已经多次需要这样的构建,而且我不知道如何处理它。我的问题是:当发生A时,我想创建一个复杂的可观察(通过组合几个运算符)。它将异步完成一些操作,发布结果并完成。同时,我希望允许对这个可观察对象进行新的订阅,但是一旦它完成,就应该创建新的可观察对象,这是第一个观察对象的副本(或者只是做同样的事情)。订阅现有的可观察物体,除非它已完成

(编辑)作为一个例子,让我们来看一个简单的观测值:Observable obs = Observable.just(true).delay(1, TimeUnit.SECONDS)。我的目标是针对以下行为:

[毫秒:行动]

0:obs.subscribe(...) - 谨以此观察到后〜完成1S

500:obs.subscribe(...) - 这应该后完成〜500ms的

950:原始观察到的应该已经完成​​:如上所述,50ms的后

1500应完成。我现在想重新开始一切,之后1秒

2000这里完全有订阅:在这里我想连接到连接到最新鲜的观察到的,并期望它500S后完成(由于新的第二始于1500算起)

我不知道如何以正确和线程安全的方式来做到这一点。我可以用一个可观察的事物做到吗?

+0

一种模糊的描述,但听起来像你需要'分享'。 – akarnokd

+0

@akarnokd我看到“分享”会如何帮助我分享可观察的内容。那么当前一个完成后,我将如何重新创建可观测值?我编辑了我的问题,使问题更加清晰 – wasyl

回答

2

您可以使用defershare来实现此目的。

Observable<Long> o = Observable.defer(() -> 
    Observable.just(System.currentTimeMillis()).delay(1, TimeUnit.SECONDS)) 
.share(); 

o.subscribe(System.out::println); // T = 0 
Thread.sleep(500); 
o.subscribe(System.out::println); // T = 500 
Thread.sleep(450); 
o.subscribe(System.out::println); // T = 950 

Thread.sleep(550); 
o.subscribe(System.out::println); // T = 1500 
Thread.sleep(500); 
o.subscribe(System.out::println); // T == 2000 

Thread.sleep(1000); 

前3将完成在1s和第二后两位相同的时间(具有相同的值)将完成第一批后1.5秒(具有不同的值到所述第一)。

+0

非常感谢!我其实已经写了几乎完全相同的测试用例,但既然你发布了答案,我会很乐意接受它。 – wasyl