2017-10-06 65 views
0

我有一些传统的非RX代码,通过产生一个新的Thread来完成一些网络工作。 工作完成后,它会在回调中调用一个方法。代码适用于RX Completable not blocked onSubscribe thread

我无法控制此代码运行的线程。这是遗产,它自己产生了一个新的Thread

interface Callback { 
    void onSuccess(); 
} 

static void executeRequest(String name, Callback callback) { 
    new Thread(() -> { 
     try { 
      System.out.println(" Starting... " + name); 
      Thread.sleep(2000); 
      System.out.println(" Finishing... " + name); 
      callback.onSuccess(); 
     } catch (InterruptedException ignored) {} 
    }).start(); 
} 

我想这个转换为RX Completable

这可以在像被简化。为此,我使用Completable#create()CompletableEmitter的实现调用executeRequest传递Callback的实现,该请求表明请求完成时发出信号。

我订阅时还会打印日志跟踪以帮助我进行调试。

static Completable createRequestCompletable(String name) { 
     return Completable.create(e -> executeRequest(name, e::onComplete)) 
       .doOnSubscribe(d -> System.out.println("Subscribed to " + name)); 
} 

这个按预期工作。 Completable只有在“请求”结束并且调用回调后才能完成。

问题是,在trampoline调度程序中订阅这些补充程序时,它不会等待第一个请求完成,然后再订阅第二个请求。

此代码:

final Completable c1 = createRequestCompletable("1"); 
c1.subscribeOn(Schedulers.trampoline()).subscribe(); 

final Completable c2 = createRequestCompletable("2"); 
c2.subscribeOn(Schedulers.trampoline()).subscribe(); 

输出:

Subscribed to 1 
    Starting... 1 
Subscribed to 2 
    Starting... 2 
    Finishing... 1 
    Finishing... 2 

正如你看到的,它赞同第二Completable第一Completable之前已经完成,即使我在trampoline我订阅。

我想排队completables,因此,对于第一,第二等待完成,输出这样的:

Subscribed to 1 
    Starting... 1 
    Finishing... 1 
Subscribed to 2 
    Starting... 2 
    Finishing... 2 

我相信这个问题是关系到工作的工人正在做线。如果Completable的实现没有产生新的线程,则按预期工作。 但这是遗留代码,我正在尝试将其适配到RX而不进行修改。

注意:请求在程序的不同点执行 - 我不能使用andThenconcat来实现序列化执行。

回答

0

我已成功执行Completables,通过明确阻止订购ThreadLatch。 但我不认为这是在RX中实现它的惯用方式,我仍然不明白为什么我需要这样做,并且线程在Completable完成之前不会被阻止。

static Completable createRequestCompletable(String name) { 
    final CountDownLatch latch = new CountDownLatch(1); 
    return Completable.create(e -> { 
     executeRequest(name,() -> { 
      e.onComplete(); 
      latch.countDown(); 
     }); 
     latch.await(); 
    }) 
    .doOnSubscribe(disposable -> System.out.println("Subscribed to " + name)); 
} 
相关问题