2013-11-09 79 views
0

我想用pubm/sub模式使用python zmq。我在客户端面临一个奇怪的问题。在客户端,我使用pyzmq,gevent-websocket和bottle作为wsgi服务器。尽管它适用于一个客户端,但其他客户端正在等待第一个客户端断开连接。在与一个客户端合作时,如果我断开并重新连接,则每次重新连接都会收到两次或更多的消息。PyZMQ Gevent Websocket连接错误

#!/usr/bin/python 
from gevent import monkey; monkey.patch_all() 
import zmq 
import gevent 
from bottle import route, run, request, abort, Bottle ,static_file 
from gevent import sleep 
from gevent.pywsgi import WSGIServer 
import geventwebsocket 
from geventwebsocket import WebSocketHandler, WebSocketError 

host = "127.0.0.1" 
port = 8000 
mqport = "8082" 

context = zmq.Context() 
socket = context.socket(zmq.SUB) 

app = Bottle() 

@app.route('/v1/streams/device/<id>') 
def handle_websocket(id): 
    socket.setsockopt(zmq.UNSUBSCRIBE, '') 
    socket.connect ("tcp://localhost:%s" % mqport) 
    socket.setsockopt(zmq.SUBSCRIBE, id) 
    wsock = request.environ.get('wsgi.websocket') 
    if wsock is None: 
     logger.info("Error creating websocket") 
    try : 
     while True: 
      string = socket.recv() 
      logger.info("%s" % string) 
      id, data = string.split(" ") 
      wsock.send("%s" % data) 
      sleep(0.1) 
    except geventwebsocket.WebSocketError, ex: 
     wsock.close() 
     sock.close() 
server = WSGIServer((host, port), app, 
       handler_class=WebSocketHandler) 
server.serve_forever() 

我看到的所有例子都使用while循环来接收消息。我不喜欢这个while循环并寻找一些像socket.on_message这样的回调函数。为了实验目的,我写了一个node.js版本,没有像这样的while循环,但是在我的项目节点中排除了。节点版本在这里:

var WebSocketServer = require('websocket').server; 

var http = require('http'); 

global.client = 0; 

var server = http.createServer(function(request, response) { 

}); 

server.listen(8000, function() { }); 

wsServer = new WebSocketServer({ 

    httpServer: server 

}); 



// WebSocket server 

wsServer.on('request', function(request) { 

    console.log("connected client :", global.client++) 

    var connection = request.accept(null, request.origin); 

    var url = request.resourceURL.href 

    var device_id = url.substr(url.lastIndexOf('/') + 1) 

    var zeromq = require('zmq'); 

    var sock = zeromq.socket('sub'); 

    sock.connect('tcp://127.0.0.1:8082'); 

    sock.subscribe(device_id); 

    sock.on('message', function(data) { 
     console.log(data.toString()); 

     var msg = data.toString(); 

     var rjson = msg.split(" ") 

     connection.send(rjson.pop()); 

    }); 

    connection.on('close', function(connection) { 
    }); 
}); 

这里我不使用while循环。

我在做我的Python代码错误?

回答

0

几天后我们碰壁,发现一个名为gevent-zeromq的非阻塞的zeromq客户端。

本课是任何使用gem-websocket和zmq客户端的人不要使用pyzmq,而是使用gevent-zeromq。

现在gevent-zeromq与pyzmq合并。互动做import zmqfrom zmq.green import zmq