2014-10-08 97 views
53

我需要一些帮助来实现RxJava中的并行异步调用。我选择了一个简单的用例,其中FIRST调用获取(而不是搜索)要显示的产品列表(Tile)。随后调用出去取(A)审查和(B)产品图片RxJava并行获取观察值

多次尝试我到这个地方之后。

1 Observable<Tile> searchTile = searchServiceClient.getSearchResults(searchTerm); 
2 List<Tile> allTiles = new ArrayList<Tile>(); 
3 ClientResponse response = new ClientResponse(); 

4 searchTile.parallel(oTile -> { 
5  return oTile.flatMap(t -> { 
6  Observable<Reviews> reviews = reviewsServiceClient.getSellerReviews(t.getSellerId()); 
7  Observable<String> imageUrl = reviewsServiceClient.getProductImage(t.getProductId()); 

8  return Observable.zip(reviews, imageUrl, (r, u) -> { 
9   t.setReviews(r); 
10   t.setImageUrl(u); 

11   return t; 
12  }); 

13  }); 
14 }).subscribe(e -> { 
15  allTiles.add((Tile) e); 
16 }); 

1号线:熄灭并获取产品(瓦)来显示

4号线:我们采取的可观察的名单和碎片它获取的评论和imageUrls

谎言6 ,7:取可观察的审查和可观察到的URL

8号线:最后2个可观的压缩,返回一个更新的可观测

线15:FI应受第15行整理,以显示它可以返回给调用层

集合中的所有单个产品虽然可观察到已经分片,并在我们的测试运行在4个不同的主题;获取评论和图像似乎是一个接一个。我怀疑第8行的zip步骤基本上是导致2个observables(评论和url)的连续调用。

enter image description here

请问这组有任何建议,平行取reiews和图像的URL。本质上,上面附带的瀑布图应该看起来更垂直堆叠。对审查和图像的调用应该是并行

感谢 阿南德拉曼

+0

您是如何生成Transfer Timeline图表的?它看起来很酷很有用。想自己使用它。 – 2015-01-18 02:24:04

+0

由于我的系统正在进行外部呼叫,我只是通过提琴手代理呼叫。提琴手可以选择生成网络时间表。你基本上看到了这个观点。在为代理请求设置提琴手之后;只需选择您感兴趣的会话,然后单击右侧窗格中的时间轴选项卡即可。谢谢anand – diduknow 2015-01-19 07:52:11

回答

82

的并行操作被证明是几乎所有的用例问题,没有做什么最期待它,所以它被移除1.0.0.rc.4版本:https://github.com/ReactiveX/RxJava/pull/1716

一个很好的例子是如何做到这种行为并获得并行执行,可以看到here

在您的示例代码,目前还不清楚searchServiceClient是同步或异步的。它会影响如何解决问题,就好像它已经是异步的,不需要额外的调度。如果需要同步的额外调度。

首先这里是显示同步和异步行为的一些简单的例子:

import rx.Observable; 
import rx.Subscriber; 
import rx.schedulers.Schedulers; 

public class ParallelExecution { 

    public static void main(String[] args) { 
     System.out.println("------------ mergingAsync"); 
     mergingAsync(); 
     System.out.println("------------ mergingSync"); 
     mergingSync(); 
     System.out.println("------------ mergingSyncMadeAsync"); 
     mergingSyncMadeAsync(); 
     System.out.println("------------ flatMapExampleSync"); 
     flatMapExampleSync(); 
     System.out.println("------------ flatMapExampleAsync"); 
     flatMapExampleAsync(); 
     System.out.println("------------"); 
    } 

    private static void mergingAsync() { 
     Observable.merge(getDataAsync(1), getDataAsync(2)).toBlocking().forEach(System.out::println); 
    } 

    private static void mergingSync() { 
     // here you'll see the delay as each is executed synchronously 
     Observable.merge(getDataSync(1), getDataSync(2)).toBlocking().forEach(System.out::println); 
    } 

    private static void mergingSyncMadeAsync() { 
     // if you have something synchronous and want to make it async, you can schedule it like this 
     // so here we see both executed concurrently 
     Observable.merge(getDataSync(1).subscribeOn(Schedulers.io()), getDataSync(2).subscribeOn(Schedulers.io())).toBlocking().forEach(System.out::println); 
    } 

    private static void flatMapExampleAsync() { 
     Observable.range(0, 5).flatMap(i -> { 
      return getDataAsync(i); 
     }).toBlocking().forEach(System.out::println); 
    } 

    private static void flatMapExampleSync() { 
     Observable.range(0, 5).flatMap(i -> { 
      return getDataSync(i); 
     }).toBlocking().forEach(System.out::println); 
    } 

    // artificial representations of IO work 
    static Observable<Integer> getDataAsync(int i) { 
     return getDataSync(i).subscribeOn(Schedulers.io()); 
    } 

    static Observable<Integer> getDataSync(int i) { 
     return Observable.create((Subscriber<? super Integer> s) -> { 
      // simulate latency 
       try { 
        Thread.sleep(1000); 
       } catch (Exception e) { 
        e.printStackTrace(); 
       } 
       s.onNext(i); 
       s.onCompleted(); 
      }); 
    } 
} 

以下是在提供更符合您的代码示例尝试:

import java.util.List; 

import rx.Observable; 
import rx.Subscriber; 
import rx.schedulers.Schedulers; 

public class ParallelExecutionExample { 

    public static void main(String[] args) { 
     final long startTime = System.currentTimeMillis(); 

     Observable<Tile> searchTile = getSearchResults("search term") 
       .doOnSubscribe(() -> logTime("Search started ", startTime)) 
       .doOnCompleted(() -> logTime("Search completed ", startTime)); 

     Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> { 
      Observable<Reviews> reviews = getSellerReviews(t.getSellerId()) 
        .doOnCompleted(() -> logTime("getSellerReviews[" + t.id + "] completed ", startTime)); 
      Observable<String> imageUrl = getProductImage(t.getProductId()) 
        .doOnCompleted(() -> logTime("getProductImage[" + t.id + "] completed ", startTime)); 

      return Observable.zip(reviews, imageUrl, (r, u) -> { 
       return new TileResponse(t, r, u); 
      }).doOnCompleted(() -> logTime("zip[" + t.id + "] completed ", startTime)); 
     }); 

     List<TileResponse> allTiles = populatedTiles.toList() 
       .doOnCompleted(() -> logTime("All Tiles Completed ", startTime)) 
       .toBlocking().single(); 
    } 

    private static Observable<Tile> getSearchResults(String string) { 
     return mockClient(new Tile(1), new Tile(2), new Tile(3)); 
    } 

    private static Observable<Reviews> getSellerReviews(int id) { 
     return mockClient(new Reviews()); 
    } 

    private static Observable<String> getProductImage(int id) { 
     return mockClient("image_" + id); 
    } 

    private static void logTime(String message, long startTime) { 
     System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); 
    } 

    private static <T> Observable<T> mockClient(T... ts) { 
     return Observable.create((Subscriber<? super T> s) -> { 
      // simulate latency 
       try { 
        Thread.sleep(1000); 
       } catch (Exception e) { 
       } 
       for (T t : ts) { 
        s.onNext(t); 
       } 
       s.onCompleted(); 
      }).subscribeOn(Schedulers.io()); 
     // note the use of subscribeOn to make an otherwise synchronous Observable async 
    } 

    public static class TileResponse { 

     public TileResponse(Tile t, Reviews r, String u) { 
      // store the values 
     } 

    } 

    public static class Tile { 

     private final int id; 

     public Tile(int i) { 
      this.id = i; 
     } 

     public int getSellerId() { 
      return id; 
     } 

     public int getProductId() { 
      return id; 
     } 

    } 

    public static class Reviews { 

    } 
} 

此输出:

Search started => 65ms 
Search completed => 1094ms 
getProductImage[1] completed => 2095ms 
getSellerReviews[2] completed => 2095ms 
getProductImage[3] completed => 2095ms 
zip[1] completed => 2096ms 
zip[2] completed => 2096ms 
getProductImage[2] completed => 2096ms 
getSellerReviews[1] completed => 2096ms 
zip[3] completed => 2096ms 
All Tiles Completed => 2097ms 
getSellerReviews[3] completed => 2097ms 

我做了每个IO调用模拟采取1000毫秒,所以我t在延迟和并行发生时很明显。它打印出进度是在经过的毫秒中进行的。

这里的技巧是flatMap合并异步调用,所以只要合并的Observables是异步的,它们将全部同时执行。

如果像getProductImage(t.getProductId())这样的调用是同步的,则可以使其成为异步,如下所示:getProductImage(t.getProductId())。subscribeOn(Schedulers.io)。

这里是上面的例子中没有所有的记录和样板类型的重要组成部分:

Observable<Tile> searchTile = getSearchResults("search term");; 

    Observable<TileResponse> populatedTiles = searchTile.flatMap(t -> { 
     Observable<Reviews> reviews = getSellerReviews(t.getSellerId()); 
     Observable<String> imageUrl = getProductImage(t.getProductId()); 

     return Observable.zip(reviews, imageUrl, (r, u) -> { 
      return new TileResponse(t, r, u); 
     }); 
    }); 

    List<TileResponse> allTiles = populatedTiles.toList() 
      .toBlocking().single(); 

我希望这有助于。

+0

谢谢@benjchristensen的辉煌回应。它提供了清晰度并解决了我的问题。感谢您也在[https://github.com/benjchristensen/ReactiveLab]中指出了例子的宝藏。将在周末深入挖掘它。 – diduknow 2014-10-08 18:26:02

+0

doOnXXX()方法的目的是什么? – 2015-02-09 07:39:10

+0

@Pangea,我认为这些事件发生时它们会打印它的目的,所以你可以看到它并行工作。 – ivant 2015-03-24 15:10:28

4

仍然@ JDK 7的人,IDE尚未自动检测到JDK 8源代码,以及由@benjchristensen尝试上述杰出响应(和解释)的方法可以使用这种无耻折射的JDK 7代码。感谢@benjchristensen的一个惊人的解释和例子!

import java.util.List; 

import rx.Observable; 
import rx.Subscriber; 
import rx.functions.Action0; 
import rx.functions.Func1; 
import rx.functions.Func2; 
import rx.schedulers.Schedulers; 

public class ParallelExecutionExample 
{ 

    public static void main(String[] args) 
    { 
     final long startTime = System.currentTimeMillis(); 

     Observable<Tile> searchTile = getSearchResults("search term") 
       .doOnSubscribe(new Action0() 
         { 

          @Override 
          public void call() 
          { 
           logTime("Search started ", startTime); 
          } 
       }) 
       .doOnCompleted(new Action0() 
         { 

          @Override 
          public void call() 
          { 
           logTime("Search completed ", startTime); 
          } 
       }); 
     Observable<TileResponse> populatedTiles = searchTile.flatMap(new Func1<Tile, Observable<TileResponse>>() 
     { 

      @Override 
      public Observable<TileResponse> call(final Tile t) 
      { 
       Observable<Reviews> reviews = getSellerReviews(t.getSellerId()) 
         .doOnCompleted(new Action0() 
           { 

            @Override 
            public void call() 
            { 
             logTime("getSellerReviews[" + t.id + "] completed ", startTime); 
            } 
         }); 
       Observable<String> imageUrl = getProductImage(t.getProductId()) 
         .doOnCompleted(new Action0() 
           { 

            @Override 
            public void call() 
            { 
             logTime("getProductImage[" + t.id + "] completed ", startTime); 
            } 
         }); 
       return Observable.zip(reviews, imageUrl, new Func2<Reviews, String, TileResponse>() 
       { 

        @Override 
        public TileResponse call(Reviews r, String u) 
        { 
         return new TileResponse(t, r, u); 
        } 
       }) 
         .doOnCompleted(new Action0() 
           { 

            @Override 
            public void call() 
            { 
             logTime("zip[" + t.id + "] completed ", startTime); 
            } 
         }); 
      } 
     }); 

     List<TileResponse> allTiles = populatedTiles 
       .toList() 
       .doOnCompleted(new Action0() 
         { 

          @Override 
          public void call() 
          { 
           logTime("All Tiles Completed ", startTime); 
          } 
       }) 
       .toBlocking() 
       .single(); 
    } 

    private static Observable<Tile> getSearchResults(String string) 
    { 
     return mockClient(new Tile(1), new Tile(2), new Tile(3)); 
    } 

    private static Observable<Reviews> getSellerReviews(int id) 
    { 
     return mockClient(new Reviews()); 
    } 

    private static Observable<String> getProductImage(int id) 
    { 
     return mockClient("image_" + id); 
    } 

    private static void logTime(String message, long startTime) 
    { 
     System.out.println(message + " => " + (System.currentTimeMillis() - startTime) + "ms"); 
    } 

    private static <T> Observable<T> mockClient(final T... ts) 
    { 
     return Observable.create(new Observable.OnSubscribe<T>() 
     { 

      @Override 
      public void call(Subscriber<? super T> s) 
      { 
       try 
       { 
        Thread.sleep(1000); 
       } 
       catch (Exception e) 
       { 
       } 
       for (T t : ts) 
       { 
        s.onNext(t); 
       } 
       s.onCompleted(); 
      } 
     }) 
       .subscribeOn(Schedulers.io()); 
     // note the use of subscribeOn to make an otherwise synchronous Observable async 
    } 

    public static class TileResponse 
    { 

     public TileResponse(Tile t, Reviews r, String u) 
     { 
      // store the values 
     } 

    } 

    public static class Tile 
    { 

     private final int id; 

     public Tile(int i) 
     { 
      this.id = i; 
     } 

     public int getSellerId() 
     { 
      return id; 
     } 

     public int getProductId() 
     { 
      return id; 
     } 

    } 

    public static class Reviews 
    { 

    } 
}