2017-06-08 114 views
1

在项目中,我有一个循环遍历URL列表。它从每个网址下载文件,并通过下载的文件进行一些后期处理。完成forEach后运行回调函数

完成所有过程(包括下载过程和后期过程)之后,我想要执行回调函数。由于后期处理包含一些流式处理任务,因此它具有关闭事件如果可以识别最后一项,我可以将回调函数传递给关闭事件。但是,由于循环是异步的,因此我无法跟踪哪个项目最后完成。

现在,我使用5秒的超时时间来确保在整个过程之后执行回调。显然,这是不可持续的。处理这个问题的好方法是什么?

循环代码:

exports.processArray = (items, process, callback) => { 
    var todo = items.concat(); 
    setTimeout(function() { 
     process(todo.shift()); 
     if(todo.length > 0) { 
      // execute download and post process each second 
      // however it doesn't guarantee one start after previous one done 
      setTimeout(arguments.callee, 1000); 
     } else { 
      setTimeout(() => {callback();}, 5000); 
     } 
    }, 1000); 
}; 

processArray(
    // First param, the array 
    urlList, 
    // Second param, download and post process 
    (url) => { 
    if(url.startsWith('http')) { 
     getDataReg(url, uid); 
    } 
    else if(url.startsWith('ftp')) { 
     getDataFtp(url, uid); 
    } 
    else { 
     console.log('not a valid resource'); 
    } 
    }, 
    // Third param, callback to be executed after all done 
() => { 
    Request.get(`${config.demouri}bound=${request.query.boundary};uid=${uid}`, { 
     method: 'GET', 
     auth: auth 
    }) 
    .on('response', (response) => { 
     console.log('response event emmits'); 
     zipFiles(uid) 
     .then((path) => { 
     reply.file(path, { confine: false, filename: uid + '.zip', mode: 'inline'}).header('Content-Disposition'); 
     }); 
    }); 
    } 
); 

下载和后处理:

exports.getDataFtp = (url, uid) => { 
    console.log('get into ftp'); 
    var usefulUrl = url.split('//')[1]; 
    var spliter = usefulUrl.indexOf('/'); 
    var host = usefulUrl.substring(0, spliter); 
    var dir = usefulUrl.substring(spliter+1, usefulUrl.length); 
    var client = new ftp(); 
    var connection = { 
    host: host 
    }; 
    var fileNameStart = dir.lastIndexOf('/') + 1; 
    var fileNameEnd = dir.length; 
    var fileName = dir.substring(fileNameStart, fileNameEnd); 
    console.log('filename: ', fileName); 

    client.on('ready',() => { 
    console.log('get into ftp ready'); 
    client.get(dir, (err, stream) => { 
     if (err) { 
     console.log('get file err:', err); 
     return; 
     } else{ 
     console.log('get into ftp get'); 
     stream.pipe(fs.createWriteStream(datadir + `download/${uid}/${fileName}`)); 
     stream.on('end',() => { 
      console.log('get into ftp close'); 
      unzipData(datadir + `download/${uid}/`, fileName, uid); 
      client.end(); 
     }); 
     } 
    }); 
    }); 
    client.connect(connection); 
}; 

exports.getDataReg = (url, uid) => { 
    console.log('get into http'); 
    var fileNameStart = url.lastIndexOf('/') + 1; 
    var fileNameEnd = url.length; 
    var fileName = url.substring(fileNameStart, fileNameEnd); 
    var file = fs.createWriteStream(datadir + `download/${uid}/${fileName}`); 
    if (url.startsWith('https')) { 
    https.get(url, (response) => { 
     console.log('start piping file'); 
     response.pipe(file); 
     file.on('finish',() => { 
     console.log('get into http finish'); 
     unzipData(datadir + `download/${uid}/`, fileName, uid); 
     }); 
    }).on('error', (err) => { // Handle errors 
     fs.unlink(datadir + `download/${uid}/${fileName}`); 
     console.log('download file err: ', err); 
    }); 
    } else { 
    http.get(url, (response) => { 
     console.log('start piping file'); 
     response.pipe(file); 
     file.on('finish',() => { 
     unzipData(datadir + `download/${uid}/`, fileName, uid); 
     }); 
    }).on('error', (err) => { 
     fs.unlink(datadir + `download/${uid}/${fileName}`); 
     console.log('download file err: ', err); 
    }); 
    } 
}; 

function unzipData(path, fileName, uid) { 
    console.log('get into unzip'); 
    console.log('creating: ', path + fileName); 
    fs.createReadStream(path + fileName) 
    .pipe(unzip.Extract({path: path})) 
    .on('close',() => { 
    console.log('get into unzip close'); 
    var filelist = listFile(path); 
    filelist.forEach((filePath) => { 
     if (!filePath.endsWith('.zip')) { 
     var components = filePath.split('/'); 
     var component = components[components.length-1]; 
     mv(filePath, datadir + `processing/${uid}/${component}`, (err) => { 
      if(err) { 
      console.log('move file err: '); 
      } else { 
      console.log('move file done'); 
      } 
     }); 
     } 
    }); 
    fs.unlink(path + fileName, (err) => {}); 
    }); 
} 
+0

处理它通过处理异步代码“正确” - 使用回调或承诺 –

+0

可以'processArray()'调用中使用'Promise'构造。不知道为什么'nodejs'没有实现流标准来使用'Promise'而不是回调。这个问题在过去一周左右出现了好几次。 – guest271314

+0

你能用代码示例回答这个问题吗? – zhangjinzhou

回答

1

你想要做的就是让你的所有异步过程收敛到一个单一的承诺,你可以用在正确的时刻来执行回调。

让我们开始在点每个过程完成后,我以为是在传递给mv()功能unzipData()回调。您希望将这些异步操作中的每一个都包装在承诺中解决的Promise中,并且您还希望稍后使用这些承诺,并且您使用.map()方法来收集数组中的承诺(而不是.forEach())。
下面的代码:

var promises = filelist.map((filePath) => { 
    if (!filePath.endsWith('.zip')) { 
    var components = filePath.split('/'); 
    var component = components[components.length-1]; 
    return new Promise((resolve, reject) => 
     mv(filePath, datadir + `processing/${uid}/${component}`, (err) => { 
     if(err) { 
      console.log('move file err: '); 
      reject(); // Or resolve() if you want to ignore the error and not cause it to prevent the callback from executing later 
     } else { 
      console.log('move file done'); 
      resolve(); 
     } 
     })); 
    } 
    return Promise.resolve(); 
}); 

(如果不执行异步操作,而是返回立即解决了无极)

现在,我们可以将承诺的这份名单成一个单一的承诺当列表中的所有承诺都已解决时,可以解决此问题:

var allPromise = Promise.all(promises); 

接下来,我们需要进一步了解代码。我们可以看到,我们刚刚查看的代码本身就是异步操作的事件处理程序的一部分,即fs.createReadStream()。你需要用在当内承诺解决得到解决一个承诺,这就是unzipData()功能的,应当交回承诺:

function unzipData(path, fileName, uid) { 
    console.log('get into unzip'); 
    console.log('creating: ', path + fileName); 
    return new Promise((outerResolve) => 
    fs.createReadStream(path + fileName) 
    .pipe(unzip.Extract({path: path})) 
    .on('close',() => { 
     console.log('get into unzip close'); 
     var filelist = listFile(path); 

     // Code from previous examples 

     allPromise.then(outerResolve); 
    })); 
} 

接下来,我们来看看使用unzipData()功能:getDataReg()getDataFtp()。他们只执行一个异步操作,所以您只需让他们返回一个承诺,解决unzipData()返回的承诺何时解决。
简单的例子:

exports.getDataReg = (url, uid) => { 
    return new Promise((resolve, reject) => { 

    // ... 

    https.get(url, (response) => { 
     response.pipe(file); 
     file.on('finish',() => { 
     unzipData(datadir + `download/${uid}/`, fileName, uid) 
      .then(resolve); 
     }); 
    }).on('error', (err) => { // Handle errors 
     fs.unlink(datadir + `download/${uid}/${fileName}`); 
     reject(); // Or resolve() if you want to ignore the error and not cause it to prevent the callback from executing later 
    }); 

    // ... 

    }); 
} 

最后,我们得到的processArray()功能,在这里你需要做的我们做了,开始用同样的事:流程映射到承诺的列表。首先,process函数传递需要通过getDataReg()getDataFtp()返回返回的承诺:

// Second param, download and post process 
(url) => { 
    if(url.startsWith('http')) { 
    return getDataReg(url, uid); 
    } 
    else if(url.startsWith('ftp')) { 
    return getDataFtp(url, uid); 
    } 
    else { 
    console.log('not a valid resource'); 
    } 
    return Promise.reject(); // or Promise.resolve() if you want invalid resources to be ignored and not prevent the callback from executing later 
} 

现在,您processArray()功能可以是这样的:

exports.processArray = (items, process, callback) => 
    Promise.all(items.map(process)) 
    .then(callback) 
    .catch(() => console.log('Something went wrong somewhere')); 

您的回调将被调用时,所有的异步操作已经完成了,无论他们以何种顺序进行。如果任何一个承诺被拒绝,回调将永远不会执行,因此应相应地管理您的承诺拒绝。

这里有一个用的jsfiddle的完整代码:https://jsfiddle.net/upn4yqsw/

+0

完美答案!你理解并明确解释我的问题,并提供一个很好的解决方案。谢谢! – zhangjinzhou

0

一般来说,由于nodejs不会出现已经实现Streams StandardPromise为主,从什么可以收集最少;而是使用基于事件或回调机制,你可以使用函数调用内Promise构造,以return一个实现Promise对象当特定事件已派出

const doStuff = (...args) => new Promise((resolve, reject)) => { 
    /* define and do stream stuff */ 
    doStreamStuff.on(/* "close", "end" */, => { 
    // do stuff 
    resolve(/* value */) 
    }) 
}); 

doStuff(/* args */) 
.then(data => {}) 
.catch(err => {}) 
+0

的问题是,我不能决定什么时候执行回调。如你所知,它是一个循环。它不会告诉最后一个项目何时完成(我相信这是执行回调的要点)。 – zhangjinzhou

+0

使用'Promise.all()'和'Array.prototype.map()''而不是Array.prototype.forEach()',参见[什么是JavaScript中承诺的地步?](https://开头计算器。 COM/q/43982774 /) – guest271314

+0

谢谢。我认为Mikael Lennholm使用代码来解释它。我想这就是你想说的。 – zhangjinzhou

2

全部完成处理后(包括下载过程和后期处理),我想执行一个回调函数。

有关一系列异步过程的有趣之处在于,您永远无法知道何时完全执行所有过程。所以设置回调超时是很快的做法,但它确实不可靠。

您可以改为使用counter来解决此问题。 假设您有10个操作要执行。在开始时,您将计数器设置为10 counter = 10并且在每个过程完成后,无论(它可以成功或失败)如何,您都可以将计数器递减1,例如counter -= 1,之后您可以检查计数器是否为0,如果是这样意味着所有的流程都已经完成并且我们已经达到了目的您现在可以安全地运行你的回调函数,像if(counter === 0) callback();


如果我是你,我会做这样的事情:

*请注意,这个过程被称为应返回一个承诺,这样我可以知道当它完成(无论再怎么)

*如果您需要帮助有关承诺,这个有用的文章可以帮助你:https://howtonode.org/promises

*哦,还有一件事情,你应该避免使用arguments.callee,是导致它被弃用。这是为什么Why was the arguments.callee.caller property deprecated in JavaScript?

exports.processArray = (items, process, callback) => { 
    var todo = [].concat(items); 
    var counter = todo.length; 

    runProcess(); 

    function runProcess() { 
     // Check if the counter already reached 0 
     if(checkCounter() === false) { 
     // Nope. Counter is still > 0, which means we got work to do. 
     var processPromise = process(todo.shift()); 

     processPromise 
      .then(function() { 
      // success 
      }) 
      .catch(function() { 
      // failure 
      }) 
      .finally(function() { 
      // The previous process is done. 
      // Now we can go with the next one. 
      --counter; 
      runProcess(); 
      }) 
     } 
    }; 

    function checkCounter() { 
     if(counter === 0) { 
     callback(); 
     return true; 
     } else { 
     return false; 
     } 
    } 
}; 
+0

在我的情况下,processArray在一个文件中。所有其他功能都在另一个文件中。从我的角度来看,计数器将被设置在processArray中。 Counter - = 1将在unzipData中执行。我该如何处理? – zhangjinzhou

+1

@zhangjinzhou其实我不喜欢很长的答案,无论是我喜欢长的问题,但我伸出我的回答给你看,你如何解决这个问题。 :-) –

+0

我从来没有使用promise.finally。你有任何指示吗? – zhangjinzhou