2017-10-18 29 views
0

我有越来越与我执行后续XHR请求,像这样的阵列的XHR请求:如何成对缓冲观察值并通过对执行它们?

const Rx = require('rxjs/Rx'); 
const fetch = require('node-fetch'); 

const url = `url`; 

// Get array of tables 
const tables$ = Rx.Observable 
    .from(fetch(url).then((r) => r.json())); 

// Get array of columns 
const columns$ = (table) => { 
    return Rx.Observable 
    .from(fetch(`${url}/${table.TableName}/columns`).then(r => r.json())); 
}; 

tables$ 
    .mergeMap(tables => Rx.Observable.forkJoin(...tables.map(columns$)))  
    .subscribe(val => console.log(val)); 

我想执行在chuncks列请求,以便请求没有被发送到服务器立刻。

这太问题是有些在同一个方向,但不是完全:Rxjs: Chunk and delay stream?

现在,我想是这样的:

tables$ 
    .mergeMap(tables => Rx.Observable.forkJoin(...tables.map(columns$))) 
    .flatMap(e => e) 
    .bufferCount(4) 
    .executeTheChunksSerial(magic) 
    .flatMap(e => e) 
    .subscribe(val => console.log(val)); 

但我不能换我围​​绕着如何执行该块头在系列...

+0

看起来你有一个以上的高阶可观察这使得这个很难理解。也许你可以使用'.concatMap(buffered => Observable.forkJoin(buffered))'而不是'.executeTheChunksSerial(magic)',但我不确定。 – martin

回答

2

可以利用的mergeMapconcurrency参数来获得最大的X同时请求到服务器:

const getTables = Promise.resolve([{ tableName: 'foo' },{ tableName: 'bar' },{ tableName: 'baz' }]); 
 
const getColumns = (table) => Rx.Observable.of('a,b,c') 
 
    .do(_ => console.log('getting columns for table: ' + table)) 
 
    .delay(250); 
 
     
 
Rx.Observable.from(getTables) 
 
    .mergeAll() 
 
    .mergeMap(
 
    table => getColumns(table.tableName), 
 
    (table, columns) => ({ table, columns }), 
 
    2) 
 
    .subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.3/Rx.js"></script>

相关问题