2016-09-13 21 views
0

我使用RX实体时,有以下方案:如何检查所有需要的订阅者是否完成了他们的工作?

Observerable1 
     | 
------------------ 
|  |   | 
S1(*) S2(*)  S3 (onCompleted() starts another observable) 
        | 
       Observable2 
        | 
       S4(*) 

我需要知道什么时候所有*用户完成他们的工作(S1,S2,S4)。因为它们可以在不同的线程中执行,所以我需要一些同步机制,或者有一些来自rx-java的开箱即用解决方案?

一些示例代码来说明当前的设计:

@Component 
public class BatchReader { 
    @Autowired List<Subscriber<List<Integer>>> subscribers; 

    public void start() { 
     ConnectableObservable<List<Integer>> observable1 = createObservable(); 
     subscribers.forEach(observable1::subscribe); 
     observable1.connect(); 
    } 

    private ConnectableObservable<List<Integer>> createObservable() { 
     return Observable.create((Subscriber<? super List<Integer>> subscriber) -> { 
      try {  
       subscriber.onStart(); 

       while (someCondition) { 
        List<Integer> numbers = ...; 
        subscriber.onNext(numbers); 
       } 

       subscriber.onCompleted(); 
      } catch (Exception ex) { 
       subscriber.onError(ex); 
      } 
     }).observeOn(Schedulers.newThread()).publish(); 
    } 
} 

在S3我具有以下逻辑:

@Component 
public class S3 extends Subscriber<List<Integer>> { 
    @Autowired AnotherBatchReader anotherBatchReader; 
    .... 
    @Override 
    public void onCompleted() { 
     anotherBatchReader.start(); 
    } 
    ... 
} 

和S4所预订在AnotherBatchReader:

@Component 
public class AnotherBatchReader { 
    @Autowired S4<List<Foo>> subscriber4; 

    public void start() { 
     Observable<List<Foo>> observable2 = createObservable(); 
     observable2.subscribe(subscriber4); 
    } 

    private Observable<List<Foo>> createObservable() { 
     return Observable.create(subscriber -> { 
      try { 
        subscriber.onStart(); 
        while (someConditionBar) { 
         List<Foo> foo = ...; 
         subscriber.onNext(foo); 
        } 
        subscriber.onCompleted(); 
       } 
      } catch (RuntimeException ex) { 
       subscriber.onError(ex); 
      } 
     }); 
    } 
} 

那么,有一个当我感兴趣的所有用户完成他们的工作时,要如何正确通知? rx是否支持它?或者也许有更好的设计来支持它?

编辑: 我有单独的用户,因为每个人都有不同的行为。最后,用户*(S1,S2,S3)将把他们的数据写入xml文件。但

  • S1接收数据onNext(),做一些工作,并直接向文件写入结果
  • S2接收数据onNext(),做一些工作,聚集在现场的结果,然后用onCompleted
  • S3将它写入在onNext接收数据,做了一些工作,DB写入结果和onCompleted被称为后开始的观察到它开始从数据库获取数据,并把它推到S4
  • S4在onNext()接收数据,做了一些工作,并写入文件

为什么我需要在S3中的数据写入到数据库的原因是由于从接收到的数据在onNext()生成的结果必须是唯一的,但我是从Observable1获得批量的数据,我可以”没有保证这个独特性,所以DB照顾它。

当然,在S3我不能像S2那样做(将所有结果累积在内存中),因为S3中存在的结果的乘积与S2相比是显着的。

+0

这看起来waaaay太复杂 - 你有什么用呢?为什么你有分开的用户? –

+0

@TassosBassoukos我编辑问题回答你的问题 – marknorkin

+0

感谢您的澄清。 –

回答

1

感谢您的澄清。在我看来,现有运营商的审慎应用将使您的代码最小化。现在,我不知道所有的细节,但你在做什么感觉了很多这样的:

Observable<T> observable1 = ... 
     .share(); 

Observable<?> S1 = S1(observable1); 
Observable<?> S2 = S2(observable1); 
Observable<?> S3 = S3(observable1); 
Observable<?> S4 = defer(() -> readFromDatabase()).compose(S4::generate); 

Observable.merge(S1,S2,S3.ignoreElements().switchIfEmpty(S4)) 
    .ignoreElements() 
    .onComplete(...) 
    .subscribe(); 

当然,细节将取决于是否原装观察到的是冷是热是不同的, S[1-4]的详细信息。

此外,不要试图自己驾驶所有Subscribers,让框架为您做,并且您将会得到更多 - f.e.:

S4 = Observable.create(SyncOnSubscribe.generateStateless(
     observer -> observer.onNext(<get List<Foo>>) 
    )) 
    .takeWhile(list -> someConditionBar); 

编辑:这是XY问题的情况下 - 我们都穿过了这...

相关问题