2017-01-27 146 views
0

我在python中使用UDP进行大量数据包丢失。我知道如果我不想丢包,我应该使用TCP,但是我没有(完全)控制发送者。用python接收UDP数据包,造成数据包丢失

这是一台使用UDP多点传送每秒发送15个图像的相机。

下面你看到我现在写的代码。 它使用多处理来允许生产者和消费者功能并行工作。生产者函数捕获数据包,消费者函数处理它们并将图像写入.bmp文件。

我写了一个类PacketStream从包字节写入.bmp文件。

当照相机发送新的图像时,首先发送一个分组,其中第一字节= 0×01。这包含有关图像的信息。 然后以第一个字节= 0x02发送612个数据包。这些包含图像中的字节(508字节/包)。

由于15幅图像每秒发送,〜9000包每秒被发送。尽管这种情况发生在每个图像突发更快的速率下,大约为22个数据包/毫秒。

我可以得到完全利用的tcpdump或Wireshark的所有数据包。 但使用下面的代码,数据包被遗漏。 当然,我的Windows 7电脑应该能够处理这个?我也在树莓派3上使用它,并且差不多有相同数量的数据包被遗漏。因此我认为这是代码的问题。

我已经尝试了很多不同的东西,比如线程的队列,而不是管,而不是多。

我也试图与

sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 3000000) 

增加套接字缓冲区无济于事。

这是所有可能的蟒蛇?

由于提前,

import time 
from multiprocessing import Process, Queue 
import socket 
import struct 
from PIL import Image 


class PacketStream: 
    def __init__(self, output_path): 
     self.output_path = output_path 
     self.data_buffer = '' 
     self.img_id = -1 # -1 = waiting for start of new image 

    def process(self, data): 
     message_id = data[0] 
     if message_id == '\x01': 
      self.wrap_up_last_image() 
      self.img_id = ord(data[3]) 
      self.data_buffer = '' 
     if message_id == '\x02': 
      self.data_buffer += data[6:] 

    def wrap_up_last_image(self): 
     if self.img_id > 0: 
      n_bytes = len(self.data_buffer) 
      if n_bytes == 307200: 
       global i 
       write_image(self.output_path + str(i).zfill(7) + '_' + str(self.img_id).zfill(3) + '.bmp', 
          self.data_buffer) 
       i += 1 
      else: 
       print 'Image lost: %s bytes missing.' % (307200 - n_bytes) 


def write_image(path, data): 
    im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1) 
    im.save(path) 
    print time.time(), path 


def producer(q): 
    # setup socket 
    MCAST_GRP = '239.255.83.71' 
    MCAST_PORT = 2271 
    sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
    sock.bind(('', MCAST_PORT)) 
    mreq = struct.pack('4sl', socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) 
    sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 
    while True: 
     q.put(sock.recv(512)) 


def consumer(q): 
    packet_stream = PacketStream('D:/bmpdump/') 
    while True: 
     data = q.get() 
     packet_stream.process(data) 

i = 0 
if __name__ == '__main__': 
    q = Queue() 

    t1 = Process(target=producer, args=(q,)) 
    t1.daemon = True # so they stop when the main prog stops 
    t1.start() 
    t2 = Process(target=consumer, args=(q,)) 
    t2.daemon = True 
    t2.start() 

    time.sleep(10.0) 

    print 'Program finished.' 

编辑

感谢所有的建议。

1)我已经试过穿+队列,也是“”。加入(),似乎并没有太大的差别。我很确定现在的问题是生产者线程没有得到足够的优先级。我无法找到如何使用Python来增加此功能?这甚至有可能吗?

2)我设法使用下面的代码只损失大约10%。处理器是在〜25%(对树莓PI)的关键是消费数据,当有数据包流,即在暂停时的最后一个数据包已经到达

import time 
import socket 
import struct 
from PIL import Image 


def write_image(path, data): 
    im = Image.frombuffer('L', (640, 480), bytearray(data), 'raw', 'L', 0, 1) 
    im.save(path) 
    print time.time(), path 

def consume(data_buffer): 
    img_id = ord(data_buffer[0][1]) 
    real_data_buffer = [data[6:] for data in data_buffer] 
    data_string = ''.join(real_data_buffer) 

    global i 
    write_image('/media/pi/exthdd_02/bmpdump/' + str(i).zfill(7) + '_' + str(img_id).zfill(3) + '.bmp', data_string) 
    i += 1 

def producer(sock): 
    print 'Producer start' 
    data_buffer = [] 
    while True: 
     data = sock.recvfrom(512)[0] 
     if data[0] == '\x01': 
      data_buffer = [] 
     else: 
      data_buffer.append(data) 
     if len(data_buffer) == 612: 
      consume(data_buffer) 


# image counter 
i = 0 

# setup socket 
MCAST_GRP = '239.255.83.71' 
MCAST_PORT = 2271 
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) 
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 
sock.bind((MCAST_GRP, MCAST_PORT)) 
mreq = struct.pack("4sl", socket.inet_aton(MCAST_GRP), socket.INADDR_ANY) 
sock.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq) 
sock.setsockopt(socket.SOL_SOCKET, socket.SO_RCVBUF, 30000000) 

producer(sock) 
+0

我想你在处理UDP套接字时必须使用sock.recvfrom而不是sock.recv。也许这有帮助? –

+0

您可能需要增加UDP数据读取线程/进程的优先级(在Linux上使用sched_setscheduler()或类似的函数),或者在您可能使用的任何其他操作系统上使用类似的API。我不确定Python API是什么与之相对应)。这样一来,您的读取器线程就不太可能被另一个任务从CPU中取出,这可能会导致完整的接收缓冲区和丢弃的数据包。 –

回答

0

一些建议,以改善你的代码,但首先是一个问题:你是否测量过可能会减慢速度的东西?比如你看过你系统的CPU使用情况。如果你达到100%,那很可能是数据包丢失的原因。如果它大部分处于闲置状态,则还有其他事情正在进行,问题与代码的性能无关。

现在,一些建议,以提高代码:

  • 使用socket.recvfrom代替sock.recv与UDP套接字打交道时
  • 不要使用与过程多处理,对于必须要发生的系列化送如果我们正在谈论约9000个呼叫/秒,那么从一个流程到另一个流程的数据可能会成为性能瓶颈。尝试使用线程改为(threading + queue模块)。但是,由于你没有提供任何观察到的数字,所以很难说。
  • 不使用字符串连接来建立接收方的缓冲区,因为它获取数据包。这会创建大量新的临时字符串对象并随时复制数据。相反,将每个数据包追加到一个列表中,并且当您收到所有数据时,它们会一起结尾。