2016-12-28 81 views
2

我试图promisify流,但它似乎比我预期的更难。这里是我的尝试:Promisify streams

'use strict' 

const Promise = require('bluebird') 
const Twitter = require('twitter') 

const TwitterStream = module.exports = function TwitterStream (config) { 
    // init Twitter Streaming API for OAuth 
    this.stream = new Twitter({ 
    consumer_key: config.get('/twitter/consumerKey'), 
    consumer_secret: config.get('/twitter/consumerSecret'), 
    access_token_key: config.get('/twitter/accessTokenKey'), 
    access_token_secret: config.get('/twitter/accessTokenSecret') 
    }) 
    .stream('statuses/filter', { 
    track: config.get('/twitter/track') 
    }) 
} 

TwitterStream.prototype.receive = function() { 
    return new Promise((resolve, reject) => { 
    this.stream.on('data', resolve).on('error', reject) 
    }) 
} 

TwitterStream.prototype.destroy = function() { 
    this.stream.destroy() 
} 

的主要问题是,当我创建对象

const stream = new TwitterStream(config) 

stream.receive().then((data) => console.log(data)) 

当我执行只有一个对象被读取。没有其他数据流式传输。

TwitterStream.prototype.receive = function() { 
    return new Promise((resolve, reject) => { 
     this.stream 
     .on('data', (data) => resolve(data) 
     .on('error', (error) => reject(error)) 
     }) 
    } 
+0

我不知道TwitterStream api。这个流是无限期打开的,只要有新数据(tweets)可用,就会发出'data'事件。而'end'只在连接中断时才被调用? –

+0

@ t.niese完全是 – Mazzy

+1

您无法直接将Promises映射到这种类型的流。我会尝试创建一个例子,你将如何解决这个问题。 –

回答

0

您需要在stream.on函数的回调中返回承诺。此时,被调用时的方法receive仅返回一次解析的返回值或错误。

+0

你的意思是? (编辑该问题)我试过了,但它不起作用 – Mazzy

+0

仍然不起作用 – Mazzy

+0

是的,因为该函数在调用时会返回一个Promise。如果它不是被调用一次以上,则该流不会被读取多次。注册一个事件处理程序是解决这个问题的更好方法。我的答案本身是错误的,因为我没有正确理解这个问题。我的错。 – Bhargav

0

使用的Rx扩展,这是非常简单的:

TwitterStream.prototype.receive = function() { 
    return Rx.Observable.create((observer) => { 
     this.stream 
      .on('data', (data) => observer.onNext(data)) 
      .on('error', (err) => observer.onError(err)); 
    }); 
} 

然后

const stream = new TwitterStream(config) 

stream.receive().subscribe((data) => console.log(data)); 
+0

尽管它看起来多余(因为订阅时你仍然需要传递一个回调),但它与使用承诺时具有相同的好处:您可以应用诸如map,compose等操作,这些操作相当有用。 – olivarra1

0

下面是一个没有经过测试,最有可能仍bug的代码来说明你能如何与承诺做到这一点:

function defer() { 
    var resolve, reject; 
    var promise = new Promise(function() { 
    resolve = arguments[0]; 
    reject = arguments[1]; 
    }); 
    return { 
    resolve: resolve, 
    reject: reject, 
    promise: promise 
    }; 
} 


TwitterStream.prototype.receive = function() { 

    this.stream 
    .on('data', data => { 
     this.dataCache = this.dataCache || []; 
     this.dataCache.push(data); 
     this.tryToSendData() 
    }) 
    .on('end',() => { 
     this.finished = true; 
     this.tryToSendData() 
    }) 
    .on('error', err => { 
     this.lastError = err; 
     // error handling still missing 
    }) 

    return this; 
} 

TwitterStream.prototype.tryToSendData = function() { 
    if (this.defered) { 
    let defered = this.defered; 
    this.defered = null; 

    // if data is available or finished then pass the first element of buffer (or undefined) 
    defered.resolve(this.dataCache.shift()) 
    } 
} 

TwitterStream.prototype.getNextData = function() { 
    if (this.dataCache.length > 0 || this.finished) { 
    // if data is available or finished then pass the first element of buffer (or undefined) 
    return Promise.resolve(this.dataCache.shift()); 
    } else { 
    // otherwise we need a defered object 
    this.defered = defer(); 
    } 
} 

然后用法如下:

stream.receive().getNextData() 
    .then(function processData(data) { 
    if (data) { 
     console.dir(data); 

     // if data is available then continue requestin the data 
     return stream.getNextData().then(processData); 
    } 
    }) 

这是一种罕见的情况,您可以使用Deferreds。

0

我想你可能想看看我在scramjet已经promisified流。

对于你的Twitter例如该代码应该很好地工作:

const stream = new Twitter({ 
    consumer_key: config.get('/twitter/consumerKey'), 
    consumer_secret: config.get('/twitter/consumerSecret'), 
    access_token_key: config.get('/twitter/accessTokenKey'), 
    access_token_secret: config.get('/twitter/accessTokenSecret') 
}) 
.stream('statuses/filter', { 
    track: config.get('/twitter/track') 
}) 
.pipe(new scramjet.DataStream) 

然后执行任何你喜欢的转变......例如某种程度上映射流和累积流到一个数组时,即可大功告成。

stream.map(
    function (a) { return modifyTheTweetSomehow(a); } // a Promise can be returned here 
).accumulate(
    function(a, i) { a.push(i); }, 
    [] 
) // this returns a Promise that will be resolved on stream end. 

我希望你喜欢它。 :)