2016-08-07 42 views
2

在AnuglarJS 2应用程序中,我想使链式promise中的Observable。由于承诺提供一次性结果,所以第一步是使用Observable.fromPromise(),然后使用mapAll()上的元可观察来处理每个fromPromisecomplete。我在这里找到有用的这太问题RxJS: How to have one Observer process multiple Observables?RxJS的执行顺序可以从链接的promise中查看

从以上问题接受的答案解决简单的事件,我已经准备很容易代替Observable.fromEvent(someEvent)使用Observable.fromPromise(someComposedPromise)自己的解决方案。不幸的是,虽然所有的工作对于简单的单一承诺都很好,但问题出现在承诺由两个承诺组成,因为承诺的解决顺序。

为了简化和病例隔离的缘故让我们假设我们有一些现有的外部DumbCache(和什么我想用的是Ionic 2 LocalStorage其中最简单的变体看起来与此类似):

class DumbCache { 
    cache = {}; 

    get(key) { 
    return new Promise((resolve, reject) => { 
     var value = this.cache[key]; 
     resolve(value); 
    }); 
    } 

    set(key, value) { 
    return new Promise((resolve, reject) => { 
     this.cache[key] = value; 
     resolve(); 
    }); 
    } 
} 

然后上述的处理方法是:

class CacheValueObservable { 
    private cache: DumbCache; 

    constructor(private key: string) { 
    this.cache = new DumbCache(); 
    } 

    /* 
    * meta observer to handle promises from observables with all results and errors 
    * thanks to ReplaySubject(1) current value is available immediately after subscribing 
    */ 
    private _valueSource$$ = new Rx.ReplaySubject(1); 
    private _value$ = this._valueSource$$.mergeAll(); 

    public value$() { return this._value$; } 

    public updateValue(value) { 
    this._valueSource$$.next(
     Rx.Observable.fromPromise(
     this.cache.set(this.key, value) 
     .then(() => this.cache.get(this.key)) 
    ) 
    ); 
    } 
} 

现在为以下代码:

let cacheValueObservable = new CacheValueObservable("TEST_KEY"); 
cacheValueObservable.updateValue('VALUE 0'); 

cacheValueObservable.value$().subscribe(
    val => { 
     console.log('val:' + val); 
    }, 
    val => console.log('err', val.stack), 
    () => (console.log('complete')) 
); 

cacheValueObservable.updateValue('VALUE 1'); 
cacheValueObservable.updateValue('VALUE 2'); 

console.log('end'); 

结果是:

starting... 
end 
val:VALUE 2 
val:VALUE 2 
val:VALUE 2 

而很明显,我想实现

starting... 
end 
val:VALUE 0 
val:VALUE 1 
val:VALUE 2 

完整这里例如:http://jsbin.com/wiheki/edit?js,console

回答

1

虽然试图表达的问题很好地描述我仍然在更好和更好地调查和理解这个问题。重点是第一个承诺this.cache.set(this.key, value)可能实际上立即解决,而Observable.fromPromise不能保证以下顺序的所有链接承诺正在解决。

这个问题是由于每个链的最后一个承诺只有在最后一个链的第一个承诺(因此VALUE 2)被更改后才执行的事实造成的。

后所有的解决方案看起来从视图代码点很简单,但它是不是很明显是由两个主要变化:

  • 推迟最初的承诺执行,直到mergeAll阶段将使用Observable.defer代替Observable.fromPromise
  • 限制(或实际禁用)使用mergeAll(1)

因而工作液合并的承诺的并发看起来是这样的:

class CacheValueObservable { 
    private cache: DumbCache; 

    constructor(private key: string) { 
    this.cache = new DumbCache(); 
    } 

    /* 
    * meta observer to handle promises from observables with all results and errors 
    * thanks to ReplaySubject(1) current value is available immediately after subscribing 
    */ 
    private _valueSource$$ = new Rx.ReplaySubject(1); 
    // disable merge concurrency 
    private _value$ = this._valueSource$$.mergeAll(1); 

    public value$() { return this._value$; } 

    public updateValue(value) { 
    this._valueSource$$.next(
     // defer promise resolution to ensure they will be fully resolved 
     // one by one thanks to no concurrency in mergeAll 
     Rx.Observable.defer(() => 
     this.cache.set(this.key, value) 
     .then(() => this.cache.get(this.key)) 
    ) 
    ); 
    } 
} 

这里就是活生生的例子: http://jsbin.com/bikawo/edit?html,js,console