我有一些传统的非RX代码,通过产生一个新的Thread来完成一些网络工作。 工作完成后,它会在回调中调用一个方法。代码适用于RX Completable not blocked onSubscribe thread
我无法控制此代码运行的线程。这是遗产,它自己产生了一个新的Thread
。
interface Callback {
void onSuccess();
}
static void executeRequest(String name, Callback callback) {
new Thread(() -> {
try {
System.out.println(" Starting... " + name);
Thread.sleep(2000);
System.out.println(" Finishing... " + name);
callback.onSuccess();
} catch (InterruptedException ignored) {}
}).start();
}
我想这个转换为RX Completable
:
这可以在像被简化。为此,我使用Completable#create()
。 CompletableEmitter
的实现调用executeRequest
传递Callback
的实现,该请求表明请求完成时发出信号。
我订阅时还会打印日志跟踪以帮助我进行调试。
static Completable createRequestCompletable(String name) {
return Completable.create(e -> executeRequest(name, e::onComplete))
.doOnSubscribe(d -> System.out.println("Subscribed to " + name));
}
这个按预期工作。 Completable
只有在“请求”结束并且调用回调后才能完成。
问题是,在trampoline
调度程序中订阅这些补充程序时,它不会等待第一个请求完成,然后再订阅第二个请求。
此代码:
final Completable c1 = createRequestCompletable("1");
c1.subscribeOn(Schedulers.trampoline()).subscribe();
final Completable c2 = createRequestCompletable("2");
c2.subscribeOn(Schedulers.trampoline()).subscribe();
输出:
Subscribed to 1
Starting... 1
Subscribed to 2
Starting... 2
Finishing... 1
Finishing... 2
正如你看到的,它赞同第二Completable
第一Completable
之前已经完成,即使我在trampoline
我订阅。
我想排队completables
,因此,对于第一,第二等待完成,输出这样的:
Subscribed to 1
Starting... 1
Finishing... 1
Subscribed to 2
Starting... 2
Finishing... 2
我相信这个问题是关系到工作的工人正在做线。如果Completable
的实现没有产生新的线程,则按预期工作。 但这是遗留代码,我正在尝试将其适配到RX而不进行修改。
注意:请求在程序的不同点执行 - 我不能使用andThen
或concat
来实现序列化执行。