2017-01-02 49 views
0

我想并行加载用户对象。RxJava:减少不按预期工作

final User user = new User(); 
    final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS") 
      .flatMap(field -> getOrchestrator(user, field)) 
      .scan(new User(), (finalUser, event) -> { 
       finalUser = event; 
       return finalUser; 
      }); 

扫描确实发出三个用户对象,因为reduce根本不发射任何项目?我在这里做错了什么。

final User user = new User(); 
    final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS") 
      .flatMap(field -> getOrchestrator(user, field)) 
      .reduce(new User(), (finalUser, event) -> { 
       finalUser = event; 
       return finalUser; 
      }); 

getOrchestrator返回Observable。任何帮助将不胜感激。

下面是完整的代码片断

public class Orchestrator { 
    private String userId; 

    public Orchestrator(final String userId) { 
     this.userId = userId; 
    } 

    public static void main(final String[] args) throws Exception { 
     final User user = new User(); 
     final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS") 
       .flatMap(field -> getOrchestrator(user, field)) 
       .scan(new User(), (finalUser, event) -> { 
        finalUser = event; 
        return finalUser; 
       }); 

     userObs.subscribeOn(Schedulers.io()).subscribe(result -> { 
      System.out.println(result.toString()); 
     }); 

     TimeUnit.SECONDS.sleep(10); 

    } 

    private static Observable<User> getOrchestrator(final User user, final String fieldName) { 
     switch (fieldName) { 
      case "CURRENT_ADDRESS": 
       return new AddressOrchestrator().getCurrentAddress(user.getUserId()) 
         .map(currentAddress -> { 
          user.setAddress(currentAddress); 
          try { 
           TimeUnit.MILLISECONDS.sleep(200); 
          } 
          catch (final InterruptedException e) { 

          } 
          return user; 
         }); 
      case "ADDRESSES": 
       return new AddressOrchestrator().getAddresses(user.getUserId()) 
         .map(addresses -> { 
          user.setAddresses(addresses); 
          try { 
           TimeUnit.MILLISECONDS.sleep(200); 
          } 
          catch (final InterruptedException e) { 

          } 
          return user; 
         }); 

      case "NAMES": 
       return new NameOrchestrator().getNames(user.getUserId()) 
         .map(names -> { 
          user.setNames(names); 
          try { 
           TimeUnit.MILLISECONDS.sleep(200); 
          } 
          catch (final InterruptedException e) { 

          } 
          return user; 
         }); 
     } 
     return null; 
    } 

    public User getUser() { 
     final Random r = new Random(); 
     if (r.nextInt(3) % 2 == 0) { 
      return new User(); 
     } 
     throw new RuntimeException(); 
    } 
} 

每个配器返回观测。您创建一个使用Observable.create(大红旗,除非你真的知道自己在做什么)不终止

public class AddressOrchestrator { 

    public Observable<List<Address>> getAddresses(final String userId) { 
     return Observable.create(s -> { 
      final Address currentAddress = this.getBaseAddress(userId); 
      final Address anotherAddress = this.getBaseAddress(userId); 
      anotherAddress.setState("NE"); 
      s.onNext(Arrays.asList(currentAddress, anotherAddress)); 
     }); 

    } 

    public Observable<Address> getCurrentAddress(final String userId) { 
     return Observable.create(s -> s.onNext(this.getBaseAddress(userId))); 
    } 

    public Address getBaseAddress(final String userId) { 
     final Address address = new Address(); 
     address.setLine1("540 Caddo Lake Dr"); 
     address.setCity("Georgetown"); 
     address.setCountry("USA"); 
     address.setState("TX"); 

     return address; 
    } 
} 


public class NameOrchestrator { 

    public Observable<List<Name>> getNames(final String userId) { 
     return Observable.create(s -> { 
      final Name name = new Name(); 
      name.setName("Vanchi"); 
      final Name formerName = new Name(); 
      formerName.setName("Vanchinathan"); 
      s.onNext(Arrays.asList(name, formerName)); 
     }); 
    } 
} 
+1

显示完整的示例。对于我们所知道的您的订阅是错误的。 – weston

+0

看来你的流是无限的。 –

+0

使用扫描时,流确实结束。使用减少时,它不会发射一个项目。 @weston添加了完整的代码段 –

回答

2

你的业务流程。这导致无限的流(从某种意义上说,一个完整的事件永远不会被发射)。你需要的东西有点像

public Observable<List<Address>> getAddresses(final String userId) { 
     return Observable.create(s -> { 
      final Address currentAddress = this.getBaseAddress(userId); 
      final Address anotherAddress = this.getBaseAddress(userId); 
      anotherAddress.setState("NE"); 
      s.onNext(Arrays.asList(currentAddress, anotherAddress)); 
      s.onCompleted(); 
     }); 

注意被调用的onCompleted。这将解决你的肤浅问题,但你最好摆脱Observable.create首先,并使用像Observable.defer

+0

您能否解释创建是如何变得糟糕以及延迟如何提供帮助? –

+0

总之,当你使用'Observable'时,有很多方法搞砸了。创造'这就是为什么一般不建议如果你有其他选择。 'Observable.defer'[这里]有一篇很好的博客文章(http://blog.danlew.net/2014/10/08/grokking-rxjava-part-4)。 – JohnWowUs

1

发生这种情况是因为scan发出所有上游项目,而reduce只有在流完成时才会发出。

更换

public Observable<Address> getCurrentAddress(final String userId) { 
    return Observable.create(s -> s.onNext(this.getBaseAddress(userId))); 
} 

public Observable<Address> getCurrentAddress(final String userId) { 
    return Observable.fromCallable(() -> this.getBaseAddress(userId)); 
} 

public Observable<List<Address>> getAddresses(final String userId) { 
    return Observable.create(s -> { 
     final Address currentAddress = this.getBaseAddress(userId); 
     final Address anotherAddress = this.getBaseAddress(userId); 
     anotherAddress.setState("NE"); 
     s.onNext(Arrays.asList(currentAddress, anotherAddress)); 
    }); 
} 

public Observable<List<Address>> getAddresses(final String userId) { 
    return Observable.fromCallable(() -> this.getBaseAddress(userId)) 
      .zipWith(Observable.fromCallable(() -> this.getBaseAddress(userId)) 
        .map(address -> address.setState("NE")), 
        (currentAddress, anotherAddress) -> Arrays.asList(currentAddress, anotherAddress)); 
} 

,改变你的setAddress()方法本

public Address setState(String state) { 
    this.state = state; 
    return this; 
} 

fromCallable()是一种更好的方式来创建一个Observable发射至多有一个元素,因为它处理错误和背压为您服务。