0
我想下面的对象追加到每一个对象在流流的NodeJS改造elasticsearch散装
{"index":{"_index":"tvseries","_type":"internindex"}}
我流看起来像这样
[
{"showname":"The X Files","episode":"04","content":"Before what?","season":"1"},
{"showname":"The X Files","episode":"04","content":"Before what?","season":"1"},
{"showname":"The X Files","episode":"01","content":"What?","season":"1"}
]
我流应该是什么样子!
> -> POST http://localhost:9200/_bulk {"index":{"_index":"tvseries","_type":"internindex"}}
> {"showname":"The X Files","episode":"04","content":"Before
> what?","season":"1"}
> {"index":{"_index":"tvseries","_type":"internindex"}}
> {"showname":"The X
> Files","episode":"04","content":"Great.","season":"1"}
> {"index":{"_index":"tvseries","_type":"internindex"}}
> {"showname":"The X
> Files","episode":"01","content":"What?","season":"1"}
我怎么能在我现有的代码库以下
var stream = new ElasticsearchWritableStream(client, {
highWaterMark: 256,
flushTimeout: 500
});
pg.connect(connectionString,function(err, client, done) {
if(err) throw err;
var query = new QueryStream('SELECT * FROM srt limit 2')
var streams = client.query(query)
//release the client when the stream is finished
streams.on('end', done)
streams.pipe(JSONStream.stringify()).pipe(stream)
})
故宫包目前我使用
对于批量插入elasticsearch实现这一使用jsonstream!
用于获取来自Postgres的数据转换成流!
缺少的部分是转换的Postgres流成弹性可写流! 任何建议,指针,如何实现这个建议!