2017-08-18 27 views
0

说,SomeOtherService在一个Veritcle在不同Verticle中使用UserService,通信发生在Event Bus上。在另一台机器上的另一verticle /微服务Vert.x与Rx-Java实现服务调用的缓存

class SomeOtherService { 

    final UserService userService = new UserService(); 

    // Mutable state 
    final Map<String, Single<String>> cache = new HashMap(); // Not Synchronized ? 

    public Single<String> getUserSessionInfo(String id) { 
     // Seems it is not save ! : 
     return cache.computeIfAbsent(id, _id -> { 
       log("could not find " + id + " in cache. Connecting to userService..."); 
       return userService.getUserSessionInfo(id); // uses generated proxy to send msg to the event bus to call it 
      } 
     ); 
    } 
} 

//某处: 来代表它。

class UserService { 

    public Single<String> getUserSessionInfo(String id) { 
     return Single.fromCallable(() -> { 

      waitForOneSecond(); 

      log("getUserSessionInfo for " + id); 

      if (id.equals("1")) 
       return "one"; 
      if (id.equals("2")) 
       return "two"; 

      else throw new Exception("could not"); // is it legal? 

      } 
     ); 
    } 

而且客户端代码,我们认购,并决定对调度:

final Observable<String> obs2 = Observable.from(new String[] {"1", "1"}); 


     // Emulating sequential call of 'getUserSessionInfo' to fork in separate scheduler A 
     obs.flatMap(id -> { 
        log("flatMap"); // on main thread 
        return someOtherService.getUserSessionInfo(id) 
              .subscribeOn(schedulerA) // Forking. will thread starvation happen? (since we have only 10 threads in the pool) 
              .toObservable(); 
       } 
     ).subscribe(
       x -> log("next: " + x) 
     ); 

的问题是,如何好是使用HashMap的缓存(因为它是共享状态的解决方案在这里)通过使用computeIfAbsent方法?

即使我们使用的事件循环&事件总线它不会从共享状态和可能的并发性问题救我们,假设日志操作(如getUserSessionInfo(id)发生在不同的调度/线程?

我应该使用ReplySubject而不是实现缓存?什么是vert.x + RX-Java中的最佳做法?

看来,作为龙为cache.computeIfAbsent在事件循环运行是安全的,因为它是连续的?

对不起..很多的questi ons,我想我可以缩小为:在Vert.x和Rx-Java中实现服务调用的现金的最佳实践是什么?

整个例子是here

+0

我想我找到我的答案在这里:HTTP://blog.danl ew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/ - Observable source = Observable .concat(memory,diskWithCache,networkWithSave) .first();当我通过显式使用map.put(..)而不是使用computeIfAbsent来保存它时 – ses

回答