0
我有下面的代码只调用一次演示问题:春+ RxJava +计划cron作业:为什么豆子上连接的可观察
@Component
public class App {
@Autowired S1 s1;
@Autowired S2 s2;
int jobs = 0;
@Scheduled(cron = "0 * * * * ?")
void foo() {
System.out.println("schedule cron job: " + jobs++);
Observable<String> observable = Observable.just("bar");
ConnectableObservable<String> publishedObservable = observable.publish();
publishedObservable.subscribe(s1);
publishedObservable.subscribe(s2);
publishedObservable.connect();
}
}
Subscriber1的:
@Component
public class S1 extends Subscriber<String> {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("S1:::: Times called: " + counter.getAndIncrement() + ", input: " + s);
}
}
subscriber2用户:
@Component
public class S2 extends Subscriber<String> {
private AtomicInteger counter = new AtomicInteger(0);
@Override
public void onCompleted() {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(String s) {
System.out.println("S2:::: Times called: " + counter.getAndIncrement() + ", input: " + s);
}
}
输出将是:
schedule cron job: 0
S1:::: Times called: 0, input: bar
S2:::: Times called: 0, input: bar
schedule cron job: 1
schedule cron job: 2
schedule cron job: 3
schedule cron job: 4
......
为什么每次调用foo方法时都不会调用S1和S2? 如何实现这一目标?
这是因为rx一些订阅逻辑还是因为这些bean是单身?
谢谢你澄清这一点。这会是一个选择吗? 'publishedObservable.subscribe(s1 :: onNext,s1 :: onError,s1 :: onCompleted); publishedObservable.subscribe(s2 :: onNext,s2 :: onError,s2 :: onCompleted);'还是使用原型范围更好? – marknorkin
使用“观察者”。 – akarnokd
'Observers'是无情的,可以重用?在一些用户中,我使用'onStart'方法,这个逻辑可以被移动? 'publishedObservable.subscribe(s1 :: onNext,s1 :: onError,s1 :: onCompleted);'这就是为什么不会为我工作我认为 – marknorkin