说,SomeOtherService
在一个Veritcle
在不同Verticle中使用UserService,通信发生在Event Bus上。在另一台机器上的另一verticle /微服务Vert.x与Rx-Java实现服务调用的缓存
class SomeOtherService {
final UserService userService = new UserService();
// Mutable state
final Map<String, Single<String>> cache = new HashMap(); // Not Synchronized ?
public Single<String> getUserSessionInfo(String id) {
// Seems it is not save ! :
return cache.computeIfAbsent(id, _id -> {
log("could not find " + id + " in cache. Connecting to userService...");
return userService.getUserSessionInfo(id); // uses generated proxy to send msg to the event bus to call it
}
);
}
}
//某处: 来代表它。
class UserService {
public Single<String> getUserSessionInfo(String id) {
return Single.fromCallable(() -> {
waitForOneSecond();
log("getUserSessionInfo for " + id);
if (id.equals("1"))
return "one";
if (id.equals("2"))
return "two";
else throw new Exception("could not"); // is it legal?
}
);
}
而且客户端代码,我们认购,并决定对调度:
final Observable<String> obs2 = Observable.from(new String[] {"1", "1"});
// Emulating sequential call of 'getUserSessionInfo' to fork in separate scheduler A
obs.flatMap(id -> {
log("flatMap"); // on main thread
return someOtherService.getUserSessionInfo(id)
.subscribeOn(schedulerA) // Forking. will thread starvation happen? (since we have only 10 threads in the pool)
.toObservable();
}
).subscribe(
x -> log("next: " + x)
);
的问题是,如何好是使用HashMap
的缓存(因为它是共享状态的解决方案在这里)通过使用computeIfAbsent方法?
即使我们使用的事件循环&事件总线它不会从共享状态和可能的并发性问题救我们,假设日志操作(如getUserSessionInfo(id)
发生在不同的调度/线程?
我应该使用ReplySubject
而不是实现缓存?什么是vert.x + RX-Java中的最佳做法?
看来,作为龙为cache.computeIfAbsent
在事件循环运行是安全的,因为它是连续的?
对不起..很多的questi ons,我想我可以缩小为:在Vert.x和Rx-Java中实现服务调用的现金的最佳实践是什么?
整个例子是here:
我想我找到我的答案在这里:HTTP://blog.danl ew.net/2015/06/22/loading-data-from-multiple-sources-with-rxjava/ - Observable source = Observable .concat(memory,diskWithCache,networkWithSave) .first();当我通过显式使用map.put(..)而不是使用computeIfAbsent来保存它时 – ses