我使用RX实体时,有以下方案:如何检查所有需要的订阅者是否完成了他们的工作?
Observerable1
|
------------------
| | |
S1(*) S2(*) S3 (onCompleted() starts another observable)
|
Observable2
|
S4(*)
我需要知道什么时候所有*
用户完成他们的工作(S1,S2,S4
)。因为它们可以在不同的线程中执行,所以我需要一些同步机制,或者有一些来自rx-java
的开箱即用解决方案?
一些示例代码来说明当前的设计:
@Component
public class BatchReader {
@Autowired List<Subscriber<List<Integer>>> subscribers;
public void start() {
ConnectableObservable<List<Integer>> observable1 = createObservable();
subscribers.forEach(observable1::subscribe);
observable1.connect();
}
private ConnectableObservable<List<Integer>> createObservable() {
return Observable.create((Subscriber<? super List<Integer>> subscriber) -> {
try {
subscriber.onStart();
while (someCondition) {
List<Integer> numbers = ...;
subscriber.onNext(numbers);
}
subscriber.onCompleted();
} catch (Exception ex) {
subscriber.onError(ex);
}
}).observeOn(Schedulers.newThread()).publish();
}
}
在S3我具有以下逻辑:
@Component
public class S3 extends Subscriber<List<Integer>> {
@Autowired AnotherBatchReader anotherBatchReader;
....
@Override
public void onCompleted() {
anotherBatchReader.start();
}
...
}
和S4所预订在AnotherBatchReader:
@Component
public class AnotherBatchReader {
@Autowired S4<List<Foo>> subscriber4;
public void start() {
Observable<List<Foo>> observable2 = createObservable();
observable2.subscribe(subscriber4);
}
private Observable<List<Foo>> createObservable() {
return Observable.create(subscriber -> {
try {
subscriber.onStart();
while (someConditionBar) {
List<Foo> foo = ...;
subscriber.onNext(foo);
}
subscriber.onCompleted();
}
} catch (RuntimeException ex) {
subscriber.onError(ex);
}
});
}
}
那么,有一个当我感兴趣的所有用户完成他们的工作时,要如何正确通知? rx是否支持它?或者也许有更好的设计来支持它?
编辑: 我有单独的用户,因为每个人都有不同的行为。最后,用户*
(S1,S2,S3)将把他们的数据写入xml文件。但
- S1接收数据
onNext()
,做一些工作,并直接向文件写入结果 - S2接收数据
onNext()
,做一些工作,聚集在现场的结果,然后用onCompleted
- S3将它写入在
onNext
接收数据,做了一些工作,DB写入结果和onCompleted
被称为后开始的观察到它开始从数据库获取数据,并把它推到S4 - S4在
onNext()
接收数据,做了一些工作,并写入文件
为什么我需要在S3中的数据写入到数据库的原因是由于从接收到的数据在onNext()
生成的结果必须是唯一的,但我是从Observable1
获得批量的数据,我可以”没有保证这个独特性,所以DB照顾它。
当然,在S3
我不能像S2
那样做(将所有结果累积在内存中),因为S3中存在的结果的乘积与S2相比是显着的。
这看起来waaaay太复杂 - 你有什么用呢?为什么你有分开的用户? –
@TassosBassoukos我编辑问题回答你的问题 – marknorkin
感谢您的澄清。 –