2016-04-22 29 views
0

我正在尝试使用Highland.js设计工作流程。我无法弄清楚Highland.js如何使用它。如何使用Highland.js编写过滤器(使用数据库)

我有一个基于流的工作流程如下(伪代码),

read      //fs.createReadStream(...) 
    .pipe(parse)   //JSONStream.parse(...) 
    .pipe(filterDuplicate) //mongoClient.db.collection.count({}) > 0 
    .pipe(transform)  //fn(item) { return tranform(item); } 
    .pipe(write);   //mongoClient.db.collection.insert(doc) 

的filterDuplicate查找数据库,以检查是否存在(使用条件)读取记录,并返回一个布尔结果。为了使过滤器工作,它需要一个活动的数据库连接,我想要重复使用,直到数据流完成。一种方法是在读取和关闭“完成”写入事件之前打开一个连接;这意味着我需要将连接作为参数传递给过滤器和写入,如果两个方法都使用相同的数据库,则这会起作用。

在上面的工作流程中,filterDuplicate和write也可能使用不同的数据库。所以我希望连接能够在每个函数中包含和管理,这使得它成为一个独立的可重用单元。

我在寻找关于如何使用高地设计的任何输入。

感谢。

回答

0

它不会像使用pipe一堆一样简单。你必须为任务使用最合适的API方法。

这里是什么,你可能会最终接近一个粗略的例子:

read 
    .through(JSONStream.parse([true])) 
    .through((x) => { 
    h((next, push) => { // use a generator for async operations 
     h.wrapCallback(mongoCountQuery)(params) // you don't have to do it this way 
     .collect() 
     .toCallback((err, result) => { 
      if (result > 0) push(err, x); // if it met the criteria, hold onto it 
      return push(null, h.nil); // tell highland this stream is done 
     }); 
    }); 
    }) 
    .merge() // because you've got a stream of streams after that `through` 
    .map(transform) // just your standard map through a transform 
    .through((x) => { 
    h((next, push) => { // another generator for async operations 
     h.wrapCallback(mongoUpdateQuery)(params) 
     .toCallback((err, results) => { 
      push(err, results); 
      return push(null, h.nil); 
     }); 
    }); 
    }) 
    .merge() // another stream-of-streams situation 
    .toCallback(cb); // call home to say we're done 
+0

我为高原机制,专门找(如果有的话)保持状态。在这种情况下,一个打开的数据库连接,为mongoCountQuery和mongoUpdateQuery并在其适当关闭时触发。经过一番阅读后,我认为这个状态应该在外面维护,并且明确地作为上下文传递给流处理函数。这样,流处理功能只是使用上下文来完成他们的工作,而Highland则专注于协调流功能。 – Krishnan

+0

四个月后,我倾向于低估自己的答案。 – amsross