2017-02-04 24 views
1

我的使用案例是对一个流进行分组,并行地开始处理一些组,并且在每个组内延迟处理每个项目的时间间隔。我似乎无法得到组内的延迟,因为它不是定期发射,而是几乎瞬间发射。以下是使用RxJava 2.0.5我的测试代码:如何在一个RxJava组中并行处理延迟?

@Slf4j 
public class GroupsAndDelays { 
    Function<Integer, Flowable<Integer>> remoteClient; 
    int groupMemberDelaySeconds; 
    int remoteCallTimeoutSeconds; 
    int maxRetryCount; 
    int retryDelaySeconds; 
    Map<Long, List<Integer>> threadMap; 
    Map<Long, List<Integer>> resultThreadMap; 

    public ParallelFlowable<Integer> doStuff(Flowable<Integer> src, 
              Function<Integer, Integer> groupByFn, 
              Function<Integer, Flowable<Integer>> responseMapper) { 
     return src 
       .groupBy(groupByFn) 
       .parallel(5).runOn(Schedulers.newThread()) 
       .map(g -> g.distinct().toList()) 
       .flatMap(i -> i.toFlowable()) 
       .flatMap(i -> { 
        log.debug("Processing group: {}.", i); 
        return Flowable.fromIterable(i) 
          .delay(groupMemberDelaySeconds, SECONDS); 
       }) 
       .flatMap(i -> { 
        log.debug("Processing: {}.", i); 
        putInThreadMap(threadMap, i); 
        return remoteCall(i * 2, responseMapper); 
       }); 
    } 

    private Flowable<Integer> remoteCall(int i, Function<Integer, Flowable<Integer>> responseMapper) throws 
      Exception { 
     return remoteClient.apply(i) 
       .timeout(remoteCallTimeoutSeconds, SECONDS) 
       .retryWhen(t -> t.zipWith(Flowable.range(1, maxRetryCount), (ex, retryCount) -> retryCount) 
         .flatMap(retryCount -> Flowable.timer(retryCount * retryDelaySeconds, SECONDS))) 
       .flatMap(result -> { 
        log.debug("Processing result: {}.", result); 
        putInThreadMap(resultThreadMap, result); 
        return responseMapper.apply(result); 
       }) 
       .onErrorReturnItem(-1); 
    } 

    private void putInThreadMap(Map<Long, List<Integer>> map, int i) { 
     map.merge(Thread.currentThread().getId(), singletonList(i), this::merge); 
    } 

    private List<Integer> merge(List<Integer> a, List<Integer> b) { 
     return Stream.concat(a.stream(), b.stream()).collect(Collectors.toList()); 
    } 
} 

这里有一个斯波克测试:

class GroupsAndDelaysSpec extends Specification { 
    final int groupMemberDelaySeconds = 3 
    final int remoteCallTimeoutSeconds = 3 
    final int maxRetryCount = 2 
    final int retryDelaySeconds = 2 
    Function<Integer, Flowable<Integer>> remoteClient 
    Function<Integer, Integer> groupByFn 
    Function<Integer, Flowable<Integer>> responseMapper 

    GroupsAndDelays groupsAndDelays 

    final Flowable<Integer> src = Flowable.fromArray(
      1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 
      11, 12, 13, 14, 15, 11, 12, 13, 14, 15, 
      21, 22, 23, 24, 25, 21, 22, 23, 24, 25, 
      31, 32, 33, 34, 35, 31, 32, 33, 34, 35, 
      41, 42, 43, 44, 45, 41, 42, 43, 44, 45 
    ) 

    def setup() { 
     remoteClient = Mock(Function) 

     groupsAndDelays = new GroupsAndDelays() 
     groupsAndDelays.groupMemberDelaySeconds = groupMemberDelaySeconds 
     groupsAndDelays.remoteCallTimeoutSeconds = remoteCallTimeoutSeconds 
     groupsAndDelays.maxRetryCount = maxRetryCount 
     groupsAndDelays.retryDelaySeconds = retryDelaySeconds 
     groupsAndDelays.remoteClient = remoteClient 
     groupsAndDelays.threadMap = new ConcurrentHashMap<Long, List<Integer>>() 
     groupsAndDelays.resultThreadMap = new ConcurrentHashMap<Long, List<Integer>>() 

     groupByFn = Mock(Function) 
     groupByFn.apply(_) >> { args -> args[0] % 10 } 

     responseMapper = Mock(Function) 
     responseMapper.apply(_) >> { args -> args[0] } 
    } 

    def cleanup() { 
     println("Thread map: ${groupsAndDelays.threadMap}") 
     println("Result thread map: ${groupsAndDelays.resultThreadMap}") 

     assert groupsAndDelays.threadMap.size() == 5 
     assert groupsAndDelays.threadMap.findAll { k, v -> v.size() == 5 }.size() == 5 
    } 

    def "each group executes on a separate thread"() { 
     setup: 
     remoteClient.apply(_) >> { args -> Flowable.just(args[0]) } 

     when: 
     groupsAndDelays.doStuff(src, groupByFn, responseMapper) 
       .sequential() 
       .toList() 
       .blockingGet() 

     then: 
     true 
    } 
} 

采样运行:

2017-02-04 00:49:19.430 [RxNewThreadScheduler-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [3, 13, 23, 33, 43]. 
2017-02-04 00:49:19.430 [RxNewThreadScheduler-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [1, 11, 21, 31, 41]. 
2017-02-04 00:49:19.430 [RxNewThreadScheduler-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [5, 15, 25, 35, 45]. 
2017-02-04 00:49:19.430 [RxNewThreadScheduler-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [2, 12, 22, 32, 42]. 
2017-02-04 00:49:19.430 [RxNewThreadScheduler-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$1 - Processing group: [4, 14, 24, 34, 44]. 
2017-02-04 00:49:22.443 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 2. 
2017-02-04 00:49:22.443 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 1. 
2017-02-04 00:49:22.443 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 5. 
2017-02-04 00:49:22.443 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 4. 
2017-02-04 00:49:22.443 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 3. 
2017-02-04 00:49:22.456 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 10. 
2017-02-04 00:49:22.456 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 2. 
2017-02-04 00:49:22.456 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 8. 
2017-02-04 00:49:22.456 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 6. 
2017-02-04 00:49:22.456 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 4. 
2017-02-04 00:49:22.459 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 13. 
2017-02-04 00:49:22.459 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 14. 
2017-02-04 00:49:22.459 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 11. 
2017-02-04 00:49:22.459 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 15. 
2017-02-04 00:49:22.459 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 12. 
2017-02-04 00:49:22.466 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 26. 
2017-02-04 00:49:22.466 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 30. 
2017-02-04 00:49:22.466 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 24. 
2017-02-04 00:49:22.466 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 22. 
2017-02-04 00:49:22.466 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 28. 
2017-02-04 00:49:22.466 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 23. 
2017-02-04 00:49:22.467 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 25. 
2017-02-04 00:49:22.467 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 22. 
2017-02-04 00:49:22.467 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 21. 
2017-02-04 00:49:22.467 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 24. 
2017-02-04 00:49:22.467 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 46. 
2017-02-04 00:49:22.467 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 50. 
2017-02-04 00:49:22.467 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 44. 
2017-02-04 00:49:22.468 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 42. 
2017-02-04 00:49:22.468 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 48. 
2017-02-04 00:49:22.468 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 33. 
2017-02-04 00:49:22.468 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 35. 
2017-02-04 00:49:22.468 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 32. 
2017-02-04 00:49:22.468 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 31. 
2017-02-04 00:49:22.468 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 34. 
2017-02-04 00:49:22.469 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 66. 
2017-02-04 00:49:22.469 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 62. 
2017-02-04 00:49:22.469 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 68. 
2017-02-04 00:49:22.469 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 64. 
2017-02-04 00:49:22.469 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 70. 
2017-02-04 00:49:22.470 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 43. 
2017-02-04 00:49:22.470 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 44. 
2017-02-04 00:49:22.470 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 41. 
2017-02-04 00:49:22.470 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 42. 
2017-02-04 00:49:22.470 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$6 - Processing: 45. 
2017-02-04 00:49:22.470 [RxComputationThreadPool-3] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 86. 
2017-02-04 00:49:22.470 [RxComputationThreadPool-4] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 88. 
2017-02-04 00:49:22.470 [RxComputationThreadPool-1] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 82. 
2017-02-04 00:49:22.470 [RxComputationThreadPool-2] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 84. 
2017-02-04 00:49:22.470 [RxComputationThreadPool-5] [DEBUG] n.a.j.r.GroupsAndDelays.lambda$null$5 - Processing result: 90. 
Thread map: [20:[3, 13, 23, 33, 43], 21:[2, 12, 22, 32, 42], 22:[5, 15, 25, 35, 45], 23:[4, 14, 24, 34, 44], 24:[1, 11, 21, 31, 41]] 
Result thread map: [20:[6, 26, 46, 66, 86], 21:[4, 24, 44, 64, 84], 22:[10, 30, 50, 70, 90], 23:[8, 28, 48, 68, 88], 24:[2, 22, 42, 62, 82]] 

Process finished with exit code 0 

编辑

奖励积分如果您还可以在project Reactor中显示如何执行此操作。

编辑2: 运用项目反应器的解决方案是here

回答

2

我假设你想从迭代排放这是在这个flatMap返回之间插入延迟:

.flatMap(i -> { 
    log.debug("Processing group: {}.", i); 
     return Flowable.fromIterable(i) 
      .delay(groupMemberDelaySeconds, SECONDS); 
}) 

在这种情况下,你误解delay运营商。它只是将排放量转移指定的时间。要插入每个发射之间的延迟,你可以用interval观察到

.flatMap(i -> { 
    log.debug("Processing group: {}.", i); 
     return Flowable.fromIterable(i) 
      .zipWith(Flowable.interval(groupMemberDelaySeconds, SECONDS), (item, time) -> item) 
}) 

然而压缩它,你要明白,这种方法只适用时,你可以肯定的是,你观察到的是总是产生的频率比指定的时间间隔更频繁,否则最终可能会缓冲来自间隔的发射,并且这意味着一旦它们进入接下来的几个项目就立即从期望的可观察到的发射,这取决于缓冲事件的数量从区间可观察到的。当然,有些方法可以解决这个问题,但是这个更简单,当你使用Iterable时,你可以确定(在原因之内)它不会发生。

+0

你说得对,我误解了'delay'运营商。 –

+0

你可能是对的,我修改了我的评论。我已经迁移到使用项目反应堆而不是RxJava2,并且无法将您的评论与代码放在一起。我会在尝试完成后回复。 我已经为我的问题添加了一个编辑,询问如何在项目Reactor中执行此操作。 –

1

您可以试试下面的代码。关键是使用zipWith结合interval,并确保所有项目的时间特定排放。

public static void main(String[] args) { 
    Observable<Integer> o1 = Observable.range(1, 3); 
    System.out.println("Without delay"); 
    o1.subscribe(v -> System.out.println(v)); 

    System.out.println("With delay"); 
    o1.zipWith(Observable.interval(1, TimeUnit.SECONDS), (a, b)->a).subscribe(a->System.out.println(a)); 
    Observable.timer(5, TimeUnit.SECONDS).toBlocking().subscribe(); 
} 
+0

群组在哪里并组处理? –

+0

好吧,这不是一个完整的解决方案,而是一个示例实现来介绍所需的延迟。基于此,应针对该问题制定完整的解决方案。 –

3

的RxJava 2扩展库包含spanoutoperator

更换delay()

compose(FlowableTransformers.spanout(
    groupMemberDelaySeconds, groupMemberDelaySeconds, SECONDS)) 
+0

看起来不错,但有一个贡献者?不用了,谢谢。 –

+2

大声笑。看起来你正在用RxJava 2承担很多风险...... – akarnokd

+0

你为什么这么说大卫?我也在尝试项目Reactor(请参阅编辑)。 –