2016-01-31 77 views
0

我正在构建一个从串行端口接收字节的python项目。字节是对发送命令的响应(也是通过串行端口)。答案没有识别标记,即仅来自字节,我不知道这是哪个命令响应。解码器当然需要事先知道这是一个响应的命令。将字节流反序列化为对象

我希望传入的字节序列表示为嵌套对象,指示帧,标头,有效载荷,解码的有效载荷等。我更喜欢每次向解码器推送1个字节并让它调用回调一旦它已经收到足够的字节为一个完整的对象(或errorCallback如果有错误或超时)。

实际的帧有一个开始字节和一个结束字节。它有一个包含几个字节(一些id,命令状态(基本上是ok/fail))的头部,一个是数据长度字节。紧接着是一个校验和(单字节)的数据。数据是对命令的响应。

响应是可预测的,因为前面的字节决定了即将到来的字节的含义。

实施例响应:

AA:00:0C:00:01:00:00:D3:8D:D4:5C:50:01:04:E0:6E:BB

分拆下来:

aa: start frame 
    00: id 
    0c: data length (incl status): 12 bytes 
    00: command status (byte 1) 
     01: 1 data frame (byte 2) 
      00:00: flags of first data frame (byte 3-4) 
      d3:8d:d4:5c:50:01:04:e0: first data (aa and bb could be in it) (byte 5-12) 
    6e: checksum (includes all bytes except start, checksum, end bytes) 
bb: end frame 

这是串行端口通信,字节可能丢失(和额外的生产)和我希望使用超时来处理重置(无响应预期没有被发送的第一命令)。

我真的想要一个面向对象的方法,其中的解码器将产生一个对象,当序列化时,会再次产生相同的字节序列。我使用python 2.7,但真正的任何面向对象的语言都可以(只要我可以将它转换为python)。

我只是不知道如何构造解码器,使它看起来整洁。我正在寻找一个完整的解决方案,只是让我朝着正确的方向前进(正确的方向有点主观)。

+0

如果你发布了一些代码,你更可能对你的问题得到积极回应。不要担心,如果它不整齐,或不使用任何先进的技巧。不仅发布一些代码表明你已经做出了一个诚实的尝试来自己解决你的问题,一块代码是一个非常有用的方式来显示你正在尝试做什么。 –

+0

你是说你只是想将传入的固定长度响应解析为可以转换回字节的对象(带有属性)? – TisteAndii

+0

我确实有一些代码,但它很可怕,实际上并没有正常工作。另外,它并没有满足我的愿望,使它成为模块化的。 TisteAndii下面的答案实际上比我的要好。 – galmok

回答

1

我不完全明白你想要做什么,但如果你想从一些设备接收固定长度的反应,使他们的一些对象的属性,将这样的事情会好起来?:

START_FRAME = 0xAA 
END_FRAME = 0xBB 
TIMEOUT = 2 

class Response: 
def __init__(self, response): 
    if (len(response) - 6) % 11 == 0 and response[0] == START_FRAME and response[-1] == END_FRAME: # verify that its a valid response 
     self.header = {} # build header 
     self.header['id'] = response[1] 
     self.header['length'] = response[2] 
     self.header['status'] = response[3] 
     self.r_checksum = response[-2] # get checksum from response 
     self.checksum = self.header['id']^self.header['length']^self.header['status'] # verify the checksum 
     self.payload = response[4:-2] # get raw payload slice 
     self.decode(self.payload) # parse payload 
     if self.r_checksum == self.checksum: # final check 
      self.parsed = True 
     else: 
      self.parsed = False 
    else: # if response didnt follow the pattern 
     self.parsed = False 
def decode(self, packet): # decode payload 
    self.frm_count = packet[0] # get number of data frames 
    self.checksum ^= self.frm_count 
    self.decoded = [] # hold decoded payload 
    frames = packet[1:] 
    for c in range(self.frm_count): # iterate through data frames 
     flags = frames[(c*10):(c*10 + 2)] 
     for f in flags: 
      self.checksum ^= f 
     data = frames[(c*10 + 2):(c+1)*10] 
     for d in data: 
      self.checksum ^= d 
     self.decoded.append({'frame': c+1, 'flags': flags, 'data':data}) 
def serialize(): # reconstruct response 
    res = bytearray() 
    res.append(START_FRAME) 
    res.extend([self.header['id'], self.header['length'], self.header['status']]) 
    res.extend(self.payload) 
    res.extend([self.checksum, END_FRAME]) 
    return res 

response = bytearray() 
ser = serial.Serial('COM3', 9600) # timeout is 2 seconds 
last_read = time.clock() 
while time.clock() - last_read < TIMEOUT: 
    while ser.inWaiting() > 0: 
     response.append(ser.read()) 
     last_read = time.clock() 
decoded_res = Response(response) 
if decoded_res.parsed: 
    # do stuff 
else: 
    print('Invalid response!') 

该代码假定可能有多个数据帧,数据帧之前是一个指示数据帧数量的字节。 与串行通信(即使在115200波特)相比,解析数据包的速度更快。整个事情大概是O(n),我想。

+0

您的假设对于显示的回复是正确的。根据发送的命令,我得到各种不同的答案,必须按自己的方式解码。通过添加额外的解码方法(适用于每个命令;您可以扩展代码以考虑这一点)。我认为你的建议对我来说是足够好的,即使这个课程需要一个完整的框架才能开始解码。我曾希望采用更多面向流的方法。使用超时将字节流分解为数据包可能会变得不可靠(而且速度慢)。我希望这不会是一个问题。非常感谢! – galmok

+0

也许你可以通过面向流的方式扩展你的意思?在你理解之前,你不需要一个框架是完整的,因为框架可能会变得不完整/无效?你可以减少超时时间(如果你的波特率足够高),甚至可以用time.clock()来代替time.sleep()来测量时间间隔,而不是浪费秒。让我编辑代码 – TisteAndii

+0

我的担心主要是如果响应是背靠背的,即只通过查看超时值(因为没有),数据包不能被分割。但是,我认为每次响应之后我都可以承受一小段延迟,然后向设备发送新的请求。我需要尽可能快地发送尽可能多的请求(并获得尽可能多的回复)。 1-2个字符时间的超时应该是实惠的。通过面向流的方式,我想到了一种将无尽的流字节发送给类的方法,并且一旦它有一个完整的数据包,就会调用一个回调函数。更多的字节会导致更多的回调。你的代码是一个很好的起点。 – galmok

相关问题