2017-09-10 20 views
0

我有需要的输入,并返回一个输出服务:调用链inifnitely

Item callService(Item input); 

输入始终是由可观测和服务调用的输出应该是下一个发射元件发射的前一个元素由可观察的。这个顺序应该重复,直到一个项目达到一定的条件。

其实,我有一棵树的叶子,我把叶子发送给一个服务,告诉我它的父类是什么。我继续这样做直到我到达根部。

我的第一个解决方案:

我递归调用一个函数flatMap。我不喜欢这样的解决方案,因为递归:

Observable.just(Item.of("start")) 
      .map(Result::of) 
      .flatMap(App::fallbackUntilIsRoot) 
      .blockingSingle(); 

private static Observable<Result> fallbackUntilIsRoot(Result previousResult) 
{ 
    if (previousResult.isRoot()) 
    { 
     return Observable.just(previousResult); 
    } else 
    { 
     return Service.call(previousResult.getItem()) 
         .flatMap(App::fallbackUntilIsRoot); // recursion 
    } 
} 

我的第二个解决方案:

我保持外部队列,基本上我喂观察到这个队列。仍然看起来不太好。

Queue<Item> todoList = new LinkedList<>(Collections.singleton(Item.of("start"))); 

Result result = Observable.range(1, Integer.MAX_VALUE) //max tries 
          .concatMap(attempt -> Service.call(todoList)) 
          .takeUntil(Result::isEmpty) 
          .takeLast(2) 
          .firstElement() 
          .blockingGet(); 

public class Service 
{ 
    private static final Queue<Item> itemsProvidedByService = new LinkedList<>(
      Arrays.asList(Item.of("first"), Item.of("second"), Item.of("third"), Item.of("fourth"), Item.of("root"))); 

    public static Observable<Result> call(Queue<Item> todoList) 
    { 
     Item previousItem = todoList.poll(); 

     Item newItem = itemsProvidedByService.poll(); // we could pass previousItem to a real service here 

     if (newItem == null) 
     { 
      return Observable.just(Result.empty()); 
     } 

     todoList.add(newItem); 

     return Observable.just(Result.of(newItem)); 
    } 
} 

是否有Rx做这种链接的方式?

回答

1

同时,我发现名字我的问题。它被称为corecursion。 RxJava 2为这个问题提供了一个干净的(无副作用)解决方案。

private static void solutionWithGenerate() 
{ 
    Result startElement = Result.of(Item.of("start")); 

    Result result = Observable.generate(() -> startElement, App::generate) 
           .startWith(startElement) 
           .lastElement() 
           .blockingGet(); 

    System.out.println(result); 
} 

private static Result generate(Result previousResult, Emitter<Result> emitter) 
{ 
    Result newResult = Service.callDirectly(previousResult.getItem()); 

    if (newResult.isEmpty()) 
    { 
     emitter.onComplete(); 
    } else 
    { 
     emitter.onNext(newResult); 
    } 

    return newResult; 
} 

能在RxJava 1完成了。有点更详细: Observable.Generate in RxJava?

0

您可以通过保持当前迭代项目的单一副作用字段来完成。
结合repeat()takeUntil()将while循环连同fromCallable()制作的循环迭代可变的(即使用just()将重复永远的初始值):

it = Result.of(Item.of("start")); 
Observable.fromCallable(() -> it) 
     .map(result -> { 
      it = service.call(result); 
      return result; 
     }) 
     .repeat() 
     .takeUntil(Item -> it.isRoot()) 
     .subscribe(item -> .....); 
+0

在java中它不起作用,因为lambda中的变量必须是final或有效的final,所以我仍然需要使用某种数据结构来跟踪当前元素。虽然,重复似乎工作,对我的用例更具表现力,比我的解决方案,但副作用仍困扰着我。 – Yossarian

+0

是的,正如我所说的,它需要单一的副作用领域或数据结构,如你所说。 – yosriz