2014-12-23 14 views
0

已经编写了一个简单的程序,用Arduino Mega记录以太网屏蔽数据,每个48byte数据包的传输速率通常为0.0008-0.001秒。我创建了一个线程,它打开一个套接字并从arduino获取一个数据包并将其放入队列中,并在控制台中打印出一条调试消息,根据控制台,我还没有看到它跳过节拍从套接字接收数据包。还有另一个线程从队列中获取一个等待数据包并将其打印到控制台。然而,第二个线程有时会跳过或复制数据包,因为它将数据包从队列中拉出,因此我丢失了有价值的数据。林不知道我在做什么错在这里,因为我已经尝试了几次不同的调用put和get的迭代。在我将其重写为多进程之前,也许有人可以解释这个问题?Python线程/队列快速数据I/O丢失或重复Get()数据包

import sys, os, time, datetime 
import socket from threading 
import Thread from PyQt4 
import QtCore, QtGui 
import queue 
import binascii 

q = queue.Queue(maxsize=0) 
Running = False 

class Window(QtGui.QWidget): 
    def __init__(self): 
     QtGui.QWidget.__init__(self) 
     self.button = QtGui.QPushButton('Start', self) 
     self.button.clicked.connect(self.handleButton) 
     layout = QtGui.QVBoxLayout(self) 
     layout.addWidget(self.button) 

    def handleButton(self): 
     global Running, thread, thread2, thrAdruino, thrPackets 
     if Running == True: 
      Running = False 
      self.button.setText('Start') 
      thrAdruino.isRunning = False 
      thrPackets.isRunning = False 
      print ('stop') 
     else: 
      Running = True 
      self.button.setText('Stop') 

      thrAdruino = talkArduino() 
      thread = Thread(target=thrAdruino.run, daemon=True) 
      thread.start() 

      thrPackets = parsePackets() 
      thread2 = Thread(target=thrPackets.run, daemon=True) 
      thread2.start() 

      print ("Start") 



class talkArduino(Thread): 
    def __init__(self): 
     self.isRunning = True 
    def run(self): 
     print('Arduino: Thread Started') 
     TCP_RECEIVE_BUFFER = bytearray(48) 
     TCP_IP, TCP_PORT, BUFFER_SIZE = '192.168.1.177', 502, 48 
     s=socket.socket(socket.AF_INET, socket.SOCK_STREAM) 
     print('Arduino: Socket Opening') 
     s.connect((TCP_IP, TCP_PORT)) 
     print('Arduino: Socket Opened, sending start command') 
     s.send(bytes(1)) 
     while self.isRunning == True: 
      s.recv_into(TCP_RECEIVE_BUFFER) 
      if TCP_RECEIVE_BUFFER[0] == 48: 
       q.put(TCP_RECEIVE_BUFFER, block=True, timeout=1) 
       print ('Arduino: ', binascii.hexlify(TCP_RECEIVE_BUFFER)) 
     s.close() 
     print('Arduino: Closed Socket') 



class parsePackets(Thread): 
    def __init__(self): 
     self.isRunning = True 
    def run(self): 
     print ('packet parser thread started') 
     print (q.qsize()) 
     buffer1 = bytearray() 
     while self.isRunning == True: 
      try: 
       print (q.qsize()) 
       buffer1 = q.get(block=True, timeout=1) 
       print ("Packets: ", binascii.hexlify(buffer1)) 
      except q.Empty: 
       pass 



if __name__ == '__main__': 

    import sys 
    app = QtGui.QApplication(sys.argv) 
    window = Window() 
    window.show() 
    sys.exit(app.exec_()) 

我还包括了Arduino Mega Sketch的完整性。

#include <SPI.h> 
#include <Ethernet.h> 

// Enter a MAC address and IP address for your controller below. 
// The IP address will be dependent on your local network: 
byte mac[] = {0xDE, 0xAD, 0xBE, 0xEF, 0xFE, 0xED }; 
IPAddress ip(192,168,1,177); 
byte Buffer[] = { 0x30, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c, 0x0d, 0x0e, 0x0f, 
        0x10, 0x11, 0x12, 0x13, 0x14, 0x15, 0x16, 0x17, 0x18, 0x19, 0x1a, 0x1b, 0x1c, 0x1d, 0x1e, 0x1f, 
        0x20, 0x21, 0x22, 0x23, 0x24, 0x25, 0x26, 0x27, 0x28, 0x29, 0x2a, 0x1b, 0x2c, 0x2d, 0x2e, 0x2f}; 
// Initialize the Ethernet server library 
// with the IP address and port you want to use 
// (port 80 is default for HTTP): 
EthernetServer server(502); 

// Define various ADC prescaler 
const unsigned char PS_16 = (1 << ADPS2); 
const unsigned char PS_32 = (1 << ADPS2) | (1 << ADPS0); 
const unsigned char PS_64 = (1 << ADPS2) | (1 << ADPS1); 
const unsigned char PS_128 = (1 << ADPS2) | (1 << ADPS1) | (1 << ADPS0); 


void setup() { 
// Open serial communications and wait for port to open: 
    Serial.begin(9600); 
    while (!Serial) { 
    ; // wait for serial port to connect. Needed for Leonardo only 
    } 

    // set up the ADC 
    ADCSRA &= ~PS_128; // remove bits set by Arduino library 
    // you can choose a prescaler from above. 
    // PS_16, PS_32, PS_64 or PS_128 
    ADCSRA |= PS_32; // set our own prescaler to 64 

    // start the Ethernet connection and the server: 
    SPI.setClockDivider(SPI_CLOCK_DIV2); 
    Ethernet.begin(mac, ip); 
    server.begin(); 
    Serial.print("server is at "); 
    Serial.println(Ethernet.localIP()); 
} 


void loop() { 
    unsigned int i; 
    unsigned int AI; 
    // listen for incoming clients 
    EthernetClient client = server.available(); 
    if (client) { 
    Serial.println("new client"); 

    // an http request ends with a blank line 
    while (client.connected()) { 
      for(i=0;i<16;i++) { 
      AI =analogRead(i); 
      Buffer[i*2+4] = highByte(AI); 
      Buffer[i*2+5] = lowByte(AI); 
      } 
      Buffer[1] +=1; 
      client.write(Buffer, 48); 
    } 
    // give the web browser time to receive the data 
    delay(1); 
    // close the connection: 
    client.stop(); 
    Serial.println("client disonnected"); 
    } 
} 

回答

1

当你的put()的get(),你是把和获得的对象的引用 - 而不是对象的副本。这条线把一个参考原始ByteArray()

q.put(TCP_RECEIVE_BUFFER, block=True, timeout=1) 

当你:现在

buffer1 = q.get(block=True, timeout=1)  

的缓冲器1是相同的字节组为TCP_RECEIVE_BUFFER参考。如果第一个线程更改缓冲区的内容,则第二个线程将看到相同的数据。

发生什么取决于两个线程是如何交错的。您描述的症状可以通过编写两个线程的可能排序来看出。

正常的情况下,则get()打印正确:

# case 1 - what you want to happen 
s.recv_into(TCP_RECEIVE_BUFFER) # receive = "packet1" 
q.put(TCP_RECEIVE_BUFFER, block=True, timeout=1) # push reference to buffer - 1 item on Q 
buffer1 = q.get(block=True, timeout=1) # pop reference to buffer - 0 item on Q 
# buffer1 is correct at this point = "packet1" 

怎样的get()丢弃的数据包:

# case 2 - get is delayed a bit - 2 recv's before first get() 
s.recv_into(TCP_RECEIVE_BUFFER) # write into buffer "packet1" 
q.put(TCP_RECEIVE_BUFFER, block=True, timeout=1) # push reference to buffer - 1 item on Q 
s.recv_into(TCP_RECEIVE_BUFFER) # write into buffer "packet2" - overwrites buffer 
buffer1 = q.get(block=True, timeout=1) # pop reference to buffer - 0 item onQ 
# buffer1 has missed one - the buffer now contains "packet2" - never will see "packet1" 

的get()如何看待同一个包两次:

# case 3 - get is delayed a bit longer - 2 puts() before first get() 
s.recv_into(TCP_RECEIVE_BUFFER) # write into buffer "packet1" 
q.put(TCP_RECEIVE_BUFFER, block=True, timeout=1) # push reference to buffer - 1 item on Q 
s.recv_into(TCP_RECEIVE_BUFFER) # write into buffer "packet2" - overwrites buffer 
q.put(TCP_RECEIVE_BUFFER, block=True, timeout=1) # push reference to buffer - 2 item on Q 
buffer1 = q.get(block=True, timeout=1) # pop reference to buffer - 1 item on Q  
# buffer1 has missed one - the buffer now contains "packet2" 
buffer1 = q.get(block=True, timeout=1) # pop reference to buffer - 0 item on Q  
# buffer1 has missed one - the buffer still contains "packet2" 

为了解决这个问题,第一个线程必须推动DA的独特副本 ta到Q并摧毁其参考。您将数据提供给Q,然后持有该数据直到接收方接收它。简单地创建一个新的缓冲区,每次都应该修复:

while self.isRunning == True: 
     s.recv_into(TCP_RECEIVE_BUFFER) 
     if TCP_RECEIVE_BUFFER[0] == 48: 
      print ('Arduino: ', binascii.hexlify(TCP_RECEIVE_BUFFER)) 
      # you have to print before pushing onto Q 
      # once it is on the Q, it is no longer safe for this thread to use 
      q.put(TCP_RECEIVE_BUFFER, block=True, timeout=1) 
      # do not use the object any more - you have given it away 
      # create a new object for the next receive 
      TCP_RECEIVE_BUFFER = bytearray(48) 
+0

谢谢,这工作正常 – JMD