2017-09-25 29 views
1

我一直在使用PyQt5和围绕asyncio构建的websockets模块的客户端。我认为像下面的代码可以工作,但我发现传入的数据(从服务器)没有在GUI中更新,直到我点击输入行编辑框中。这些传入的消息旨在为GUI的更新设置脉冲,并将携带用于更新的数据。 quamash是更好的方法来处理这个问题吗?顺便说一句,我将使用这个代码的其他方面的进程,所以我不认为它是过度杀伤(在这一点上)。 这是Python 3.6,PyQt5.6(或更高版本)以及当前使用pip安装的任何websockets版本。 https://github.com/aaugustin/websocketswebsockets,asyncio和PyQt5在一起。 Quamash是必要的吗?

客户端:

#!/usr/bin/env python 
# -*- coding: utf-8 -*- 
import asyncio 
import websockets 
import sys 
import time 
from multiprocessing import Process, Pipe, Queue 
from PyQt5 import QtCore, QtGui, QtWidgets 

class ComBox(QtWidgets.QDialog): 
    def __init__(self): 
     QtWidgets.QDialog.__init__(self) 
     self.verticalLayout = QtWidgets.QVBoxLayout(self) 
     self.groupBox = QtWidgets.QGroupBox(self) 
     self.groupBox.setTitle("messages from beyond") 
     self.gridLayout = QtWidgets.QGridLayout(self.groupBox) 
     self.label = QtWidgets.QLabel(self.groupBox) 
     self.gridLayout.addWidget(self.label, 0, 0, 1, 1) 
     self.verticalLayout.addWidget(self.groupBox) 
     self.lineEdit = QtWidgets.QLineEdit(self) 
     self.verticalLayout.addWidget(self.lineEdit) 
     self.lineEdit.editingFinished.connect(self.enterPress) 

    @QtCore.pyqtSlot() 
    def enterPress(self): 
     mytext = str(self.lineEdit.text()) 
     self.inputqueue.put(mytext) 

    @QtCore.pyqtSlot(str) 
    def updategui(self, message): 
     self.label.setText(message) 

class Websocky(QtCore.QThread): 
    updatemaingui = QtCore.pyqtSignal(str) 
    def __init__(self): 
     super(Websocky, self).__init__() 
    def run(self): 
     while True: 
      time.sleep(.1) 
      message = self.outputqueue.get() 
      try: 
       self.updatemaingui[str].emit(message) 
      except Exception as e1: 
       print("updatemaingui problem: {}".format(e1)) 

async def consumer_handler(websocket): 
    while True: 
     try: 
      message = await websocket.recv() 
      outputqueue.put(message) 
     except Exception as e1: 
      print(e1) 

async def producer_handler(websocket): 
    while True: 
     message = inputqueue.get() 
     await websocket.send(message) 
     await asyncio.sleep(.1) 

async def handler(): 
    async with websockets.connect('ws://localhost:8765') as websocket: 
     consumer_task = asyncio.ensure_future(consumer_handler(websocket)) 
     producer_task = asyncio.ensure_future(producer_handler(websocket)) 
     done, pending = await asyncio.wait(
      [consumer_task, producer_task], 
      return_when=asyncio.FIRST_COMPLETED,) 
     for task in pending: 
      task.cancel() 

def start_websockets(): 
    loop = asyncio.get_event_loop() 
    loop.run_until_complete(handler()) 

inputqueue = Queue() 
outputqueue = Queue() 

app = QtWidgets.QApplication(sys.argv) 
comboxDialog = ComBox() 
comboxDialog.inputqueue = inputqueue 
comboxDialog.outputqueue = outputqueue 
comboxDialog.show() 

webster = Websocky() 
webster.outputqueue = outputqueue 
webster.updatemaingui[str].connect(comboxDialog.updategui) 
webster.start() 

p2 = Process(target=start_websockets) 
p2.start() 

sys.exit(app.exec_()) 

服务器:

#!/usr/bin/env python3 
# -*- coding: utf-8 -*- 
import asyncio 
import time 
import websockets 

# here we'll store all active connections to use for sending periodic messages 
connections = [] 


#@asyncio.coroutine 
async def connection_handler(connection, path): 
    connections.append(connection) # add connection to pool 
    while True: 
     msg = await connection.recv() 
     if msg is None: # connection lost 
      connections.remove(connection) # remove connection from pool, when client disconnects 
      break 
     else: 
      print('< {}'.format(msg)) 

#@asyncio.coroutine 
async def send_periodically(): 
    while True: 
     await asyncio.sleep(2) # switch to other code and continue execution in 5 seconds 
     for connection in connections: 
      message = str(round(time.time())) 
      print('> Periodic event happened.') 
      await connection.send(message) # send message to each connected client 

start_server = websockets.serve(connection_handler, 'localhost', 8765) 
asyncio.get_event_loop().run_until_complete(start_server) 
asyncio.ensure_future(send_periodically()) # before blocking call we schedule our coroutine for sending periodic messages 
asyncio.get_event_loop().run_forever() 

回答

0

不久发布了这个问题后,我意识到这个问题。该行

message = inputqueue.get() 

在producer_handler函数中被阻塞。这会导致应该是异步函数来挂起该进程中的所有内容,直到它看到队列中的某些内容为止。我的解决方法是使用提供asyncio兼容队列的aioprocessing模块。因此,它看起来更像这样:

import aioprocessing 

async def producer_handler(websocket): 
    while True: 
     message = await inputqueue.coro_get() 
     await websocket.send(message) 
     await asyncio.sleep(.1) 

inputqueue = aioprocessing.AioQueue() 

aioprocessing模块提供了一些不错的选项和文档。在这种情况下,这个问题是一个相当简单的解决方案。 https://github.com/dano/aioprocessing 所以,回答我的问题:不,你不必为这种事情使用quamash。