2017-10-08 119 views
0

我有一个整数列表和每个整数,我想从数据库中获取一个字符串值,并将其放入一个Map中,其中整数是来自父列表的整型值,并且字符串是从数据库中获取的字符串值。我也想要获取并行运行的字符串。RxJava:循环中的异步调用

这是我用RxJava实现的。我的代码工作,我得到了预期的结果,但我不认为提取名称并行运行。

public Observable<Map<Integer, String>> getVitalsForConnectedDevices(int accountId) { 
    List<Integer> ids = Lists.newArrayList(2, 3, 5); 

    return Observable<Map<Integer, String>> obs = Observable.from(ids) 
      .flatMap((Func1<Integer, Observable<Map.Entry<Integer, String>>>) integer 
        -> Observable.just(Maps.immutableEntry(integer, deviceDAO.getVitalName(integer))) 
        .subscribeOn(Schedulers.io()), 20) 
      .toMap(entry -> entry.getKey(), entry -> entry.getValue()); 
} 

这里的getVitalName()方法

public String getVitalName(int vitalId) { 
    log.debug("id: " + vitalId); 
    String query = "SELECT name FROM vitals WHERE vital_id=?"; 
    String name = v1JdbcTemplate.queryForObject(query, String.class, vitalId); 
    log.debug("name: " + name); 
    return name; 
} 

其打印从上述功能的调试语句顺序如下:

09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 2 
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Steps 
09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 3 
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Floors 
09-10-2017 02:05:37 DEBUG DeviceDAO:118 - id: 5 
09-10-2017 02:05:37 DEBUG DeviceDAO:121 - name: Distance 

如果它运行的同时,应该不会吧打印所有的ID的第一个名字?我在这里做错了什么?我如何让他们平行运行?

+2

我觉得这篇文章会给你提供你需要的答案。即使这个例子也很相似:http://tomstechnicalblog.blogspot.com.by/2015/11/rxjava-achieving-parallelization.html –

回答

1

应该不是被打印所有ID的第一和名称后

答案是否定的,因为你所要求的,每个ID从数据库字符串。结果完成后,将ID和字符串打包到一个条目中,然后将其压入管道。请注意,如果所有flatMap完成或发生错误,toMap将只能成功完成。所以可能会,你会等待最后的结果一段时间。

作为一个例子:你迭代5个ID。所有请求都是并行触发的。服务器由于负载而处于困境中,所以一些请求需要一些时间。可以说所有的请求都完成了,但只有一个。只有最后一个请求完成后,结果地图<>才会被推送给用户。如果这是你想要的,那么我会建议返回一个单一的<>不是可观察的。

这里是输出测试:

@Test 
void name() throws Exception { 
    Observable<Tuple2<Integer, String>> tuple2Observable = Observable.just(1, 2, 3, 4, 5, 6) 
      .flatMap(integer -> 
        Observable.fromCallable(() -> getVitalName(integer)) 
          .subscribeOn(Schedulers.io()) 
          .doOnNext(s -> System.out.println("Value:: " + Thread.currentThread().getName() + "-" + Instant.now())) 
          .map(s -> Tuple.of(integer, s)) 
      ).doOnComplete(() -> System.out.println("Finished:: " + Thread.currentThread().getName() + "-" + Instant.now())); 

    tuple2Observable.test() 
      .await(); 
} 

public String getVitalName(int vitalId) throws Exception { 
    System.out.println("getVitalName method called with vitalId = " + vitalId + "-" + Thread.currentThread().getName() + "-" + Instant.now()); 

    Thread.sleep(500); 

    String name = "le fake value"; 
    return name; 
} 

你看,那确实是卡列斯分别在不同的线程,当所有请求已经完成了观察的同时成品制造。

getVitalName method called with vitalId = 4-RxCachedThreadScheduler-4-2017-10-08T21:20:43.785Z 
getVitalName method called with vitalId = 6-RxCachedThreadScheduler-6-2017-10-08T21:20:43.786Z 
getVitalName method called with vitalId = 5-RxCachedThreadScheduler-5-2017-10-08T21:20:43.785Z 
getVitalName method called with vitalId = 1-RxCachedThreadScheduler-1-2017-10-08T21:20:43.785Z 
getVitalName method called with vitalId = 2-RxCachedThreadScheduler-2-2017-10-08T21:20:43.784Z 
getVitalName method called with vitalId = 3-RxCachedThreadScheduler-3-2017-10-08T21:20:43.787Z 
Value:: RxCachedThreadScheduler-4-2017-10-08T21:20:44.303Z 
Value:: RxCachedThreadScheduler-6-2017-10-08T21:20:44.303Z 
Value:: RxCachedThreadScheduler-1-2017-10-08T21:20:44.304Z 
Value:: RxCachedThreadScheduler-2-2017-10-08T21:20:44.304Z 
Value:: RxCachedThreadScheduler-3-2017-10-08T21:20:44.304Z 
Value:: RxCachedThreadScheduler-5-2017-10-08T21:20:44.304Z 
Finished:: RxCachedThreadScheduler-4-2017-10-08T21:20:44.317Z