2015-06-17 47 views
0

我有这一块的Java方法的我的工作现在:使顺序处理异步而不使用“并行”操作

Observable<Map.Entry<String, ConstituentInfo>> obs = Observable.from(constituents.entrySet()); 

Subscriber<Map.Entry<String, ConstituentInfo>> sub = new Subscriber<Map.Entry<String, ConstituentInfo>>(){ 
    String securityBySymbol, company, symbol, companyName, marketPlace, countryName, tier, tierId; 
    ConstituentInfo constituent; 
    Integer compId; 

    @Override 
    public void onNext(Map.Entry<String, ConstituentInfo> entry) { 
     logger.info("blah blah test"); 
    } 

    @Override 
    public void onCompleted() { 
     logger.info("completed successfully"); 
    } 

    @Override 
    public void onError(Throwable throwable) { 
     logger.error(throwable.getMessage()); 
     throwable.printStackTrace(); 
    } 
}; 

obs.observeOn(Schedulers.io()).subscribe(sub); 

的方法基本上是处理在Map.Entry每个条目,但这似乎按顺序处理它(同一个线程)。如何在不使用“并行”操作符(即并行处理条目)的情况下使此过程异步?我试图运行上面的代码,我错过了一些结果(有些没有正确处理)。

回答

1

您观察到的是主线程上运行,因为它是报道与认购方式将由Schedulers.io()给出一个工人被称为所以这就是为什么它出现在一个线程。你的代码并不表示任何超出订阅者日志记录的真正工作,所以在这里不需要做任何事情。

也许你打算这样做?

 
obs.subscribeOn(Schedulers.io()).subscribe(sub); 

在并行处理方面,如果你的最后一行是这样的:

 
obs.doOnNext(entry -> doWork(entry)).observeOn(Schedulers.io()).subscribe(sub); 

然后,你可以有doWork位异步这样做:

 
int numProcessors = Runtime.getRuntime().availableProcessors(); 
obs.buffer(Math.max(1, constituents.size()/numProcessors)) 
    .flatMap(list -> Observable.from(list) 
        .doOnNext(entry -> doWork(entry) 
        .subscribeOn(Schedulers.computation()) 
    .observeOn(Schedulers.io()) 
    .subscribe(sub); 

您需要缓冲了处理器的工作,否则可能会有很多线程上下文切换正在进行。

我不知道为什么你错过了结果。如果您有可重复的测试用例,请将代码粘贴到此处或将其报告给github上的RxJava作为问题。

相关问题