2017-05-14 54 views
1

我知道这是之前在这里提到的一个话题,但是我正在使用rethinkdb的async.waterfall,并且我得到了Error: Callback was already called。奇怪的是,即使它抛出错误并崩溃应用程序,它仍然会创建我需要的数据库和表。我已经阅读了一些其他帖子,如NodeJS Async: Callback already called?Using async.waterfall,但我似乎无法获得任何地方。我的控制台也告诉我错误是db.js:40:9,但我对Node很陌生,只是不确定它想要什么回调。我究竟做错了什么?我需要在这里嵌套回调吗?我使用的代码如下所示。任何帮助,我可以在这里非常感激,如果需要,我可以发布其他相关的代码。多谢你们。Async.waterfall:回调已被调用?

db.js:

exports.setDatabaseAndTables = function() { 
     async.waterfall([ 

     function connect(callback) { 
      r.connect(config.rethinkdb, callback); 
     }, 

     function createDatabase(connection, callback) { 
      // Create the database if needed. 
      r.dbList().contains(config.rethinkdb.db).do(function(containsDb) { 
      return r.branch(
       containsDb, 
       {created: 0}, 
       r.dbCreate(config.rethinkdb.db) 
      ); 
      }).run(connection, function(err) { 
      callback(err, connection); 
      }); 
     }, 

     function createTable(connection, callback) { 
      // Create the tables if needed. 
      r.tableList().contains('todos').do(function(containsTable) { 
      return r.branch(
       containsTable, 
       {created: 0}, 
       r.tableCreate('todos') 
      ); 
      }).run(connection, function(err) { 
      callback(err, connection); 
      }); 
      r.tableList().contains('users').do(function(containsTable) { 
      return r.branch(
       containsTable, 
       {created: 0}, 
       r.tableCreate('users') 
      ); 
      }).run(connection, function(err) { 
      callback(err, connection); 
      }); 
     }, 

     function createIndex(connection, callback) { 
      // Create the indexes if needed. 
      r.table('todos').indexList().contains('createdAt').do(function(hasIndex) { 
      return r.branch(
       hasIndex, 
       {created: 0}, 
       r.table('todos').indexCreate('createdAt') 
      ); 
      }).run(connection, function(err) { 
      callback(err, connection); 
      }); 
      r.table('users').indexList().contains('createdAt').do(function(hasIndex) { 
      return r.branch(
       hasIndex, 
       {created: 0}, 
       r.table('users').indexCreate('createdAt') 
      ); 
      }).run(connection, function(err) { 
      callback(err, connection); 
      }); 
     }, 

     function waitForIndex(connection, callback) { 
      // Wait for the index to be ready. 
      r.table('todos').indexWait('createdAt').run(connection, function(err, result) { 
      callback(err, connection); 
      }); 
      r.table('users').indexWait('createdAt').run(connection, function(err, result) { 
      callback(err, connection); 
      }); 
     } 
     ], 

     function(err, connection) { 
     if(err) { 
      console.error(err); 
      process.exit(1); 
      return; 
     } 
     }); 
    }; 
+2

'createTable','createIndex'和'waitForIndex'每个调用'callback'两次(例如,当'createdAt'上的'todos'索引被创建时,后者会调用'callback',而且''users'上的索引被创建时',所以你需要将它们分成不同的步骤。根据“瀑布步骤”,您只能调用一次“回调”。 – robertklep

+1

@robertklep谢谢罗伯特!你完全正确。我最终不得不重新配置一些代码,但设法让它工作。 –

回答

1

这对于那些试图去 “async.waterfall” 的通病。

这里是通过劈裂 “CREATETABLE”, “上的createIndex” 和 “waitForIndex” 到每2个函数的溶液:

exports.setDatabaseAndTables = function() { 
    async.waterfall([ 

     function connect(callback) { 
      r.connect(config.rethinkdb, callback); 
     }, 

     function createDatabase(connection, callback) { 
      // Create the database if needed. 
      r.dbList().contains(config.rethinkdb.db).do(function(containsDb) { 
       return r.branch(
         containsDb, 
         {created: 0}, 
         r.dbCreate(config.rethinkdb.db) 
         ); 
      }).run(connection, function(err) { 
       callback(err, connection); 
      }); 
     }, 

     function createTableTodos(connection, callback) { 
      // Create the tables if needed. 
      r.tableList().contains('todos').do(function(containsTable) { 
       return r.branch(
         containsTable, 
         {created: 0}, 
         r.tableCreate('todos') 
         ); 
      }).run(connection, function(err) { 
       callback(err, connection); 
      }); 
     }, 

     function createTableUsers(connection, callback) { 
      r.tableList().contains('users').do(function(containsTable) { 
       return r.branch(
         containsTable, 
         {created: 0}, 
         r.tableCreate('users') 
         ); 
      }).run(connection, function(err) { 
       callback(err, connection); 
      }); 
     }, 

     function createIndexTodos(connection, callback) { 
      // Create the indexes if needed. 
      r.table('todos').indexList().contains('createdAt').do(function(hasIndex) { 
       return r.branch(
         hasIndex, 
         {created: 0}, 
         r.table('todos').indexCreate('createdAt') 
         ); 
      }).run(connection, function(err) { 
       callback(err, connection); 
      }); 
     }, 

     function createIndexUsers(connection, callback) { 
      r.table('users').indexList().contains('createdAt').do(function(hasIndex) { 
       return r.branch(
         hasIndex, 
         {created: 0}, 
         r.table('users').indexCreate('createdAt') 
         ); 
      }).run(connection, function(err) { 
       callback(err, connection); 
      }); 
     }, 

     function waitForIndexTodos(connection, callback) { 
      // Wait for the index to be ready. 
      r.table('todos').indexWait('createdAt').run(connection, function(err, result) { 
       callback(err, connection); 
      }); 
     }, 

     function waitForIndexUsers(connection, callback) { 
      r.table('users').indexWait('createdAt').run(connection, function(err, result) { 
       callback(err, connection); 
      }); 
     } 
    ], 
      function(err, connection) { 
       if(err) { 
        console.error(err); 
        process.exit(1); 
        return; 
       } 
      }); 
};