2012-07-30 53 views
1

如何设置worker角色以使用Windows Azure sdk侦听服务器总线队列消息?Azure Node.js在工作角色中运行服务总线队列消息

目前我有这在我的server.js工作者的角色来监听队列中的消息

var http = require('http') 
    , config = require('./config') 
    , azure = require('azure') 
    , uuid = require('node-uuid'); 

http.createServer(function (req, res) { 
    processServices(function() { 
     res.writeHead(200, { 'Content-Type': 'text/plain' }); 
     res.end('hello world'); 
    }); 
}).listen(process.env.port); 

function processServices(callback) { 
    var sb1 = azure.createServiceBusService(config.serviceBusNamespace, config.serviceBusAccessKey); 
    sb1.receiveQueueMessage('startup', function (error, m) { 
     if (!error) { 
      writeMessage(JSON.stringify(m), function() { 
       callback(); 
      }); 
     } 
     else { 
      writeMessage(JSON.stringify(error), function() { 
       callback(); 
      }); 
     } 
    }); 
} 

function writeMessage(message, callback) { 
    var serviceClient = azure.ServiceClient; 
    var ts1 = azure.createTableService(serviceClient.DEVSTORE_STORAGE_ACCOUNT, serviceClient.DEVSTORE_STORAGE_ACCESS_KEY, serviceClient.DEVSTORE_TABLE_HOST); 

    ts1.getTable('Messages', function (error) { 
     if (error === null) { 
      var messageEntity = { 
       PartitionKey: '0', 
       RowKey: uuid(), 
       Message: message 
      }; 

      ts1.insertEntity('Messages', messageEntity, function (error, newMessage) { 
       callback(); 
      }); 
     } 
     else callback(); 
    }); 
} 

这在我的server.js Web角色设置队列

var sb1 = azure.createServiceBusService(config.serviceBusNamespace, config.serviceBusAccessKey); 
sb1.getQueue('startup', function (error, queue) { 
    if (error) { 
     sb1.createQueueIfNotExists('startup', function (error, queue) { 
      if (!error) 
       console.log("created startup queue 1: " + JSON.stringify(queue) + "\n"); 
      else 
       console.log("don't got startup queue 1: " + JSON.stringify(error) + "\n"); 
     }); 
    } 
    else console.log("created startup queue 2: " + JSON.stringify(queue) + "\n"); 
}); 
sb1.getQueue('serialnumbers', function (error, queue) { 
    if (error) { 
     sb1.createQueueIfNotExists('serialnumbers', function (error, queue) { 
      if (!error) 
       console.log("created serialnumbers queue 1: " + JSON.stringify(queue) + "\n"); 
      else 
       console.log("don't got serialnumbers queue 1: " + JSON.stringify(error) + "\n"); 
     }); 
    } 
    else console.log("created serialnumbers queue 2: " + JSON.stringify(queue) + "\n"); 
}); 

而这我的web角色index.js文件发送消息给队列

var azure = require('azure') 
    , config = require('../utils/config'); 

exports.index = function (req, res) { 
    var sb1 = azure.createServiceBusService(config.serviceBusNamespace, config.serviceBusAccessKey); 
    var startupMessage = { 
     body: '' 
    }; 

    sb1.getQueue('startup', function (error, queue) { 
     if (!error) { 
      sb1.sendQueueMessage('startup', startupMessage, function (error) { 
       if (!error) { 
        console.log("sent startup message 1\n"); 

        res.render('index', { 
         locals: { 
          pageTitle: 'Home' 
         } 
        }); 
       } 
       else { 
        console.log("didn't send startup message 1: " + JSON.stringify(error) + "\n"); 

        res.render('index', { 
         locals: { 
          pageTitle: 'Home' 
         } 
        }); 
       } 
      }); 
     } 
     else { 
      res.render('index', { 
       locals: { 
        pageTitle: 'Home' 
       } 
      }); 
     } 
    }); 
}; 

我该如何得到它,当web角色运行index.js文件时,e worker角色是否侦听并执行?

目前它没有这样做,消息是坐在队列中,但不被工作人员角色读取?

如何获得工作者角色从队列中读取消息?

我应该运行一个cron作业还是使用socket.io?我有点困惑。

回答

0

您只需使用.receiveQueueMessage方法连续轮询队列,直到您从队列中收到消息。民意调查之间的时间间隔完全由您决定。

+0

嗨bbthethemoon,根据我写的代码,你能给我一个你的意思吗?欢呼 – 2012-07-30 23:35:32

+1

好,事情是,服务总线队列客户端,不会等待收到的消息,当您使用receiveQueueMessage()轮询时,它只是检查是否有消息给你,如果是的话,它接收它,而不是 - 它什么也收不到,离开。所以你需要一次又一次地调用这个方法,直到收到消息。在异步环境中制作无尽的循环是一个单独的话题,我通常通过事件来做到这一点。我检查自定义事件的队列,并在准备好再次轮询时触发事件。 – bbbonthemoon 2012-07-31 09:59:26

+0

好的bbthethemoon谢谢,我认为,与node.js适合的最好的东西是cron模块,所以我会使用它。欢呼 – 2012-08-01 09:58:50

相关问题