2017-02-06 50 views
0

我有一个sourceStream组成的BaseData对象。如何使用highland.js分叉流?

我想分流这个流到n-不同的流的数量,然后过滤和转换每个BaseData对象到他们的喜好。

最后,我想要n只包含特定类型的流,并且分叉的流的长度可能会有所不同,因为可能会在未来删除或添加数据。

我想我可以通过fork设置它这样的:

import * as _ from 'highland'; 

interface BaseData { 
    id: string; 
    data: string; 
} 

const sourceStream = _([ 
    {id: 'foo', data: 'poit'}, 
    {id: 'foo', data: 'fnord'}, 
    {id: 'bar', data: 'narf'}]); 

const partners = [ 
    'foo', 
    'bar', 
]; 

partners.forEach((partner: string) => { 
    const partnerStream = sourceStream.fork(); 

    partnerStream.filter((baseData: BaseData) => { 
     return baseData.id === partner; 
    }); 

    partnerStream.each(console.log); 
}); 

我预计现在有两个流,以及foo -stream包含两个元素:

{ id: 'foo', data: 'poit' } 
{ id: 'foo', data: 'fnord' } 

bar包含一个元素的流:

{ id: 'bar', data: 'narf' } 

然而,我却遇到了一个错误:

/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338 
     throw new Error(
     ^

Error: Stream already being consumed, you must either fork() or observe() 
    at Stream._addConsumer (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1338:15) 
    at Stream.consume (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1500:10) 
    at Stream.each (/usr/src/marketing-tasks/node_modules/highland/lib/index.js:1774:18) 
    at partners.forEach (/usr/src/marketing-tasks/dist/bin/example.js:17:19) 
    at Array.forEach (native) 
    at Object.<anonymous> (/usr/src/marketing-tasks/dist/bin/example.js:12:10) 
    at Module._compile (module.js:570:32) 
    at Object.Module._extensions..js (module.js:579:10) 
    at Module.load (module.js:487:32) 
    at tryModuleLoad (module.js:446:12) 

如何将流分成多个流?


我也试图链接的电话,但当时我只拿回一个流的结果:

partners.forEach((partner: string) => { 
    console.log(partner); 
    const partnerStream = sourceStream 
     .fork() 
     .filter((item: BaseData) => { 
      return item.id === partner; 
     }); 

    partnerStream.each((item: BaseData) => { 
     console.log(item); 
    }); 
}); 

打印只:而不是预期的

foo 
{ id: 'foo', data: 'poit' } 
{ id: 'foo', data: 'fnord' } 
bar 

foo 
{ id: 'foo', data: 'poit' } 
{ id: 'foo', data: 'fnord' } 
bar 
{id: 'bar', data: 'narf'} 

也可能是我误解的原因是fork就是这样。按its doc entry

Stream.fork()叉流,使您可以添加额外的消费者 共享背压。一条分流给多个消费者的流将只会从最低消费者处理它们的速度中尽可能快地从其来源获取值。

注意:不要依赖叉子之间的一致执行顺序。 此变换仅保证所有分支都将在任何处理第二个值栏之前处理值foo 。它并不保证叉的处理顺序为 。

提示:小心修改叉子中的流值(或使用这样的库来修改 )。 因为每个分支都会传递相同的值到 ,所以在一个分支中所做的更改将在其后执行的任何分支中可见。再加上不一致的执行顺序,以及 ,最终可能会产生细微的数据损坏错误。如果您需要修改 的任何值,则应该复制并修改副本。

废弃警告:目前可能在使用 后(例如,通过转换)对流进行分叉。在下一个主要版本中,这将不再是可能的 。如果你要分岔一个流,总是 呼吁分叉。

因此,而不是“如何叉流?”我的实际问题可能是:如何在高速流中复制高地流到不同的流中?

回答

0

在创建所有分支之前,必须记住不要使用分叉流。就好像一个使用分叉流,它和它的“父”将被消耗,从而使任何后续分叉从一个空流中分叉。

const partnerStreams: Array<Stream<BaseData>> = []; 

partners.forEach((partner: string) => { 
    const partnerStream = sourceStream 
     .fork() 
     .filter((item: BaseData) => { 
      return item.id === partner; 
     } 
    ); 

    partnerStreams.push(partnerStream); 
}); 

partnerStreams.forEach((stream, index) => { 
    console.log(index, stream); 
    stream.toArray((foo) => { 
     console.log(index, foo); 
    }); 
}); 

它打印:

0 [ { id: 'foo', data: 'poit' }, { id: 'foo', data: 'fnord' } ] 
1 [ { id: 'bar', data: 'narf' } ] 
1

partnerStream.filter()返回一个新的流。然后,您再次使用partnerStream,使用partnerStream.each(),而不呼叫fork()observe()。因此,请将partnerStream.filter().each()调用链或将返回值partnerStream.filter()分配给一个变量并在其上调用.each()

+0

我已经更新了我的问题就像现在我没有看到一个错误,但我只看到一个流的返回值,而不是他们两个。 – k0pernikus