2016-09-28 14 views
0

我有下面的代码只调用一次演示问题:春+ RxJava +计划cron作业:为什么豆子上连接的可观察

@Component 
public class App { 
    @Autowired S1 s1; 
    @Autowired S2 s2; 
    int jobs = 0; 
    @Scheduled(cron = "0 * * * * ?") 
    void foo() { 
     System.out.println("schedule cron job: " + jobs++); 
     Observable<String> observable = Observable.just("bar"); 
     ConnectableObservable<String> publishedObservable = observable.publish(); 
     publishedObservable.subscribe(s1); 
     publishedObservable.subscribe(s2); 
     publishedObservable.connect(); 
    } 
} 

Subscriber1的:

@Component 
public class S1 extends Subscriber<String> { 
    private AtomicInteger counter = new AtomicInteger(0); 

    @Override 
    public void onCompleted() { 
    } 

    @Override 
    public void onError(Throwable e) { 
    } 

    @Override 
    public void onNext(String s) { 
     System.out.println("S1:::: Times called: " + counter.getAndIncrement() + ", input: " + s); 

    } 
} 

subscriber2用户:

@Component 
public class S2 extends Subscriber<String> { 
    private AtomicInteger counter = new AtomicInteger(0); 

    @Override 
    public void onCompleted() { 
    } 

    @Override 
    public void onError(Throwable e) { 
    } 

    @Override 
    public void onNext(String s) { 
     System.out.println("S2:::: Times called: " + counter.getAndIncrement() + ", input: " + s); 
    } 
} 

输出将是:

schedule cron job: 0 
S1:::: Times called: 0, input: bar 
S2:::: Times called: 0, input: bar 
schedule cron job: 1 
schedule cron job: 2 
schedule cron job: 3 
schedule cron job: 4 
...... 

为什么每次调用foo方法时都不会调用S1和S2? 如何实现这一目标?

这是因为rx一些订阅逻辑还是因为这些bean是单身?

回答

2

为什么每次调用foo方法时都不会调用S1和S2?

RxJava Subscriber s为有状态,一旦他们所消耗的顺序,他们将不再可用,并报告自己退订。与他们订阅再次没有效果。每次需要订阅源时,您都必须重新创建它们。

+0

谢谢你澄清这一点。这会是一个选择吗? 'publishedObservable.subscribe(s1 :: onNext,s1 :: onError,s1 :: onCompleted); publishedObservable.subscribe(s2 :: onNext,s2 :: onError,s2 :: onCompleted);'还是使用原型范围更好? – marknorkin

+0

使用“观察者”。 – akarnokd

+0

'Observers'是无情的,可以重用?在一些用户中,我使用'onStart'方法,这个逻辑可以被移动? 'publishedObservable.subscribe(s1 :: onNext,s1 :: onError,s1 :: onCompleted);'这就是为什么不会为我工作我认为 – marknorkin