2017-09-20 30 views
0

上下文:我有一个应用程序的日志记录库,我想使用Rx Subject将日志写入数据库。 问题:在建立与数据库的连接之前会生成日志,并且连接可能随时变得不可用,但是我有和可观察到的情况是要么包含具有用于使用数据库的方法的对象,要么包含未定义的方法。我想要做的是在数据库持久性未定义时将日志保留在流中,并在可用时恢复。RxJS在外部条件下从流中提取数据

我认为代码应该是这个样子:

logsSubject 
      .takeWhen(/* test for database persistence */) // made up name because I don't know a Rx method that does what I want 
      .subscribe(/* write data to database */); 

我不但是知道这实际上是可能的,我使用RxJS了很短的时间

回答

0

你可以得到做到这一点的其他方式:

Observable.combineLatest(dbConnection, logsSubject, (db, logs) => { 
    // whatever: db.save(...) 
    }) 
    .do(/* or since it's a side effect it should probably go here instead */) 

这样combineLatest将等待两个源可观察EMIT和将举行的dbConnection即使emitte只有一个项目。

+0

这是第一次,但'dbConnection'发出多次,因为连接可以在引导后重置,并且我想只有在'!! db === true'时才从logsSubject提取数据。 –

+0

然后你可以用'dbConnection.filter(db => !! db)'把它链接到'combineLatest' – martin

1

以下代码将模拟您的场景。数据库将每10秒连接一次,并在连接后每隔5秒断开连接。在实际使用情况下,连接和断开应该共享Observable或Subject。我不是使用takeUntil操作符,而是在这里使用合并,因为takeUntil只会完成可观察的序列,您将无法恢复它。 (repeatWhen运营商应该工作,但它似乎Rxjs 5人失踪)

//emit value every 1s 
const logWrite = Rx.Observable.interval(1000) 

const dbConnected = new Rx.Subject() 
// make it fire every 10s 
Rx.Observable.interval(10000).do(()=>dbConnected.next('db 
connected')).subscribe() 

const dbDisconnected = Rx.Observable.timer(3000) 
.flatMap(()=>Rx.Observable.throw('db disconnected')) 


const writeWhenConnected = logWrite.merge(dbDisconnected) 
.retryWhen(function(errors) { 
    return errors.switchMap(()=>dbConnected); 
}); 

writeWhenConnected.subscribe(val => console.log(val),console.warn); 
0

似乎什么,我需要从RxJS4被pausableBuffered但因为它是在RxJS5被删除我结束了以下解决方案:

let dbAvailability = dbSubject 
     .switchMap((db) => db ? logSubject : Rx.Observable.never()); 

//get data only when dbAvailability emits, and that happens only when I have a connection 
let bufferedLogs = logSubject.buffer(dbAvailability); 

bufferedLogs 
    .concatAll() 
    .subscribe(/* save to database */);