2016-04-07 20 views
0

我希望更好地理解与弹性运算符一起使用时受试者的预期行为,即retryretryWhen科目和弹性逻辑工作流程

下面的代码示例将从它们对应JSBin略有不同(例如在链接找到)中,我使用的箭头函数和类型更容易消耗,这是使用4.0.0版 - 4.0.7

我预期弹性的行为可以用下面的example表示:

Rx.Observable 
    .interval(1000) 
    .flatMap((count:number) => { 
    return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count); 
    }) 
    .retry() 
    .take(5); 

Output 
// 0 
// 1 
// 2 
// 3 
// 0 <-- Retry means we start again from scratch (expected) 

直到此时一切都是一致的,那就是在第四通知已发生错误后从无到有的整个流重新启动(赢得无国籍建筑师URE)。

现在到了抓我的头的一部分,如果我们增加一个组播操作,这样做增加一个潜在的主题(在我的情况下,ReplaySubject为1的缓冲),example

const consumer : Rx.Observable<number> = Rx.Observable 
    .interval(1000) 
    .flatMap((count:number) => { 
    return count === 4 ? Rx.Observable.throw('Break') : Rx.Observable.return(count); 
    }) 
    .shareReplay(1) /* multicast(new Rx.ReplaySubject(1)).refCount() */ 
    .retry() 
    .take(5); 

const firstSubscriber : Rx.Disposable = consumer.subscribe((next:number) => { 
    console.log('first subscriber: ' + next); 
}); 

setTimeout(() => { 
    firstSubscriber.dispose(); /* Lets start fresh in that refCount === 0 */ 
    const secondSubscriber : Rx.Disposable = consumer.subscribe((next) => { 
     console.log('second subscriber: ' + next); 
    }); 
}, 5000); 

Output (before error is thrown) 
// "first subscriber: 0" 
// "first subscriber: 1" 
// "first subscriber: 2" 
// "first subscriber: 3" 
Output (after error is thrown) 
// "first subscriber: 3" 
// "second subscriber: 3" 
// "second subscriber: 3" 
// "second subscriber: 3" 
// "second subscriber: 3" 
// "second subscriber: 3" 

就让我们来看看到Subject标识何时发生错误主体被标记为inError,并且每个未来的订户将获得最后的通知(行46),并在调用onError(行50)后直接获得。

那么,这使我们在哪里?在我看来,我认为当它跟随包含Subject(shareReplay,publish等)的任何其他操作符时,您都不会使用弹性运算符。

在这一点上,我想通过这种设计获得成功的唯一方法是确保何时发生错误并且已经处理节点,无论何时使用了一个主题,都需要创建一个新主题(并向下兔子洞我们开始走了)?

multicast可以采取工厂/ subjectSelector:

.multicast(() => new Rx.ReplaySubject(1), (source:Rx.ConnectableObservable) => source); 

纵观source如果使用subjectSelector,而不是仅仅在主题通过直接为每个新认购subjectSelector将被调用和新ConnectableObservable会被创建(第11行)。

在这一点上,我不确定是否共享(通过一些缓存)和处理(当检测到错误时)将实际上给组播用户?

为了达到这个目的,我还写了一个RecoverableReplaySubject,在处理时我已经取消了错误状态,这更多的是用于测试,并且期望RxJS团队把这个工作流程放在一个很好的理由。

任何有关此主题的指导和经验将不胜感激。

感谢

回答

0

shareReplay科目持有不同的语义,当涉及到错误和完成。例如,即使相关的观测值已经完成(refCount == 0),shareReplay也不会完成,因此进一步调用它将产生(重放)过去的值。参看jsbin(shareReplay)与jsbin(share)。

var source = Rx.Observable 
     .interval(100) 
     .take(5) 
     .shareReplay() 

var first = source.subscribe(function(next) { 
    console.log('first subscriber: ' + next); 
}); 

setTimeout(function() { 
// first.dispose(); 
    var second = source.subscribe(function(next) { 
    console.log('second subscriber: ' + next); 
}); 

}, 1000); 

否则,你会发现有关于shareReplay行为(与你的问题的讨论)的解释与其他运营商:

提出的解决方案恰恰是使用工厂的乐趣用于多播运营商。无论如何,尝试新设计并查看它是否有效并不难。