在AnuglarJS 2应用程序中,我想使链式promise中的Observable。由于承诺提供一次性结果,所以第一步是使用Observable.fromPromise()
,然后使用mapAll()
上的元可观察来处理每个fromPromise
的complete
。我在这里找到有用的这太问题RxJS: How to have one Observer process multiple Observables?RxJS的执行顺序可以从链接的promise中查看
从以上问题接受的答案解决简单的事件,我已经准备很容易代替Observable.fromEvent(someEvent)
使用Observable.fromPromise(someComposedPromise)
自己的解决方案。不幸的是,虽然所有的工作对于简单的单一承诺都很好,但问题出现在承诺由两个承诺组成,因为承诺的解决顺序。
为了简化和病例隔离的缘故让我们假设我们有一些现有的外部DumbCache
(和什么我想用的是Ionic 2 LocalStorage
其中最简单的变体看起来与此类似):
class DumbCache {
cache = {};
get(key) {
return new Promise((resolve, reject) => {
var value = this.cache[key];
resolve(value);
});
}
set(key, value) {
return new Promise((resolve, reject) => {
this.cache[key] = value;
resolve();
});
}
}
然后上述的处理方法是:
class CacheValueObservable {
private cache: DumbCache;
constructor(private key: string) {
this.cache = new DumbCache();
}
/*
* meta observer to handle promises from observables with all results and errors
* thanks to ReplaySubject(1) current value is available immediately after subscribing
*/
private _valueSource$$ = new Rx.ReplaySubject(1);
private _value$ = this._valueSource$$.mergeAll();
public value$() { return this._value$; }
public updateValue(value) {
this._valueSource$$.next(
Rx.Observable.fromPromise(
this.cache.set(this.key, value)
.then(() => this.cache.get(this.key))
)
);
}
}
现在为以下代码:
let cacheValueObservable = new CacheValueObservable("TEST_KEY");
cacheValueObservable.updateValue('VALUE 0');
cacheValueObservable.value$().subscribe(
val => {
console.log('val:' + val);
},
val => console.log('err', val.stack),
() => (console.log('complete'))
);
cacheValueObservable.updateValue('VALUE 1');
cacheValueObservable.updateValue('VALUE 2');
console.log('end');
结果是:
starting...
end
val:VALUE 2
val:VALUE 2
val:VALUE 2
而很明显,我想实现
starting...
end
val:VALUE 0
val:VALUE 1
val:VALUE 2
完整这里例如:http://jsbin.com/wiheki/edit?js,console