2016-03-28 62 views
2

我有一个使用案例,我在使用消息,保存消息,然后回复成功或失败。 mongo插入返回一个Observable,所以我可以使用flatmap进行链接。问题是Insert Observable发出插入的结果,但我需要从第一个observable发出的原始消息进行回复。因此,为了做到这一点,我在第一个Observable的订阅中运行插入,并在第二个订阅内部进行回复。Chaining Observable&Emitting/Passing Original Emit订阅呼叫

我一直希望用平面图等某种操作符以更具反应性的方式完成此操作。我搜索了运营商名单,没有找到我要找的东西。

eb.consumer("persister.save.event").toObservable() 
    .subscribe(msg -> { 
     mongo.insertObservable("event", (JsonObject) msg.body()) 
      .subscribe(
       res -> msg.reply(new JsonObject().put("success", true)), 
       error -> msg.fail(500, "failed to save event")); 
      }); 

上述代码是应该完成的方式还是有更好的方法?这两个订阅者感觉不对。

回答

2

这里是什么可以做,以避免两个用户:

eb.consumer("persister.save.event").toObservable() 
    .flatMap(msg -> mongo.insertObservable("event", (JsonObject) msg.body()).map(mongoResponse -> msg)) 
    .subscribe(
      res -> msg.reply(new JsonObject().put("success", true)), 
      error -> msg.fail(500, "failed to save event")); 

的窍门是map您蒙戈结果想msgflatMap内。

+1

啊,完美!非常感谢你。 – zylum