2013-07-08 22 views
1

Linux select()调用中继事件排序有什么方法吗?Linux select()和多个套接字的FIFO排序?

我所看到的描述:

在一台机器,我写了一个简单的程序,发送三个多播包,一个给每个三种不同的组播组。这些数据包是背对背发送的,两者之间没有任何延迟。即SENDTO(mcast_group1); SENDTO(mcast_group2); SENDTO(mcast_group3)。

在另一台机器上,我有一个接收程序。该程序每个多播组使用一个套接字。每个套接字对其侦听的地址执行bind()和IP_ADD_MEMBERSHIP(即加入/订阅)。程序然后在三个套接字上执行select()。

当选择返回时,所有三个套接字都可供读取。但是哪一个最先?准备阅读的套接字列表是一个集合,因此没有顺序。我想要的是,如果select()每个接收到的数据包只返回一次,按顺序(这里增加的开销是可以接受的)。或者,还有其他一些机制可以用来确定数据包接收顺序吗?

其他信息:

  • OS是CentOS 5的(有效的红帽企业版Linux)在x86_64
  • NIC硬件是英特尔82571EB
  • 我已经试过E1000E驱动程序版本1.3.10-k2和2.1.4-NAPI
  • 我试图牵制网卡的中断空载和孤立的CPU核心
  • 我已经禁用硬件IRQ通过设置驱动器选项将InterruptThrottleRate = 0合并,并设置婷RX-微秒(usecs)= 0的ethtool通过
  • 我也尝试过使用epoll的,并且它具有相同的行为

最后一句话:如果我只使用一个插座的分组排序被保留。在这种情况下,我将绑定到INADDR_ANY(0.0.0.0)并在同一个套接字上多次执行IP_ADD_MEMBERSHIP。但是这对我们的应用程序不起作用,因为我们需要通过绑定到实际的多播地址来提供过滤。最终,在同一台机器上将会有多个多播接收程序,并且订阅集可能会相互交叉。所以也许另一种解决方案是找到另一种方法来实现bind()的过滤效果,但不使用bind()。

回答

0

如果select()返回大于1,那么事件必须非常接近才能使排序问题变得毫无意义。

1

您可以使用IP_PKTINFO将数据包被发送到组播组的地址 - 即使套接字认购一堆组播组。完成此操作后,您将按顺序获取数据包并按组地址进行过滤。看下面的例子:

#include <stdio.h> 
#include <stdlib.h> 
#include <string.h> 
#include <unistd.h> 
#include <sys/types.h> 
#include <sys/socket.h> 
#include <netinet/in.h> 
#include <arpa/inet.h> 
#include <sys/stat.h> 
#include <ctype.h> 
#include <errno.h> 

#define PORT 1234 
#define PPANIC(msg) perror(msg); exit(1); 
#define STATS_PATCH 0 

int main(int argc, char **argv) 
{ 
    fd_set master; 
    fd_set read_fds; 
    struct sockaddr_in serveraddr; 
    int sock; 
    int opt = 1; 
    size_t i; 
    int rc; 

    char *mcast_groups[] = { 
     "226.0.0.1", 
     "226.0.0.2", 
     NULL 
    }; 
#if STATS_PATCH 
    struct stat stat_buf; 
#endif 

    struct ip_mreq imreq; 

    FD_ZERO(&master); 
    FD_ZERO(&read_fds); 

    rc = sock = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP); 
    if(rc == -1) 
    { 
     PPANIC("socket() failed"); 
    } 

    rc = setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); 
    if(rc == -1) 
    { 
     PPANIC("setsockopt(reuse) failed"); 
    } 

    memset(&serveraddr, 0, sizeof(serveraddr)); 
    serveraddr.sin_family = AF_INET; 
    serveraddr.sin_port = htons(PORT); 
    serveraddr.sin_addr.s_addr = htonl(INADDR_ANY); 

    rc = bind(sock, (struct sockaddr *)&serveraddr, sizeof(serveraddr)); 
    if(rc == -1) 
    { 
     PPANIC("bind() failed"); 
    } 

    rc = setsockopt(sock, IPPROTO_IP, IP_PKTINFO, &opt, sizeof(opt)); 
    if(rc == -1) 
    { 
     PPANIC("setsockopt(IP_PKTINFO) failed"); 
    } 

    for (i = 0; mcast_groups[i] != NULL; i++) 
    { 
     imreq.imr_multiaddr.s_addr = inet_addr(mcast_groups[i]); 
     imreq.imr_interface.s_addr = INADDR_ANY; 
     rc = setsockopt(sock, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const void *)&imreq, sizeof(struct ip_mreq)); 
     if (rc != 0) 
     { 
      PPANIC("joing mcast group failed"); 
     } 
    } 

    FD_SET(sock, &master); 

    while(1) 
    { 
     read_fds = master; 
     rc = select(sock + 1, &read_fds, NULL, NULL, NULL); 

     if (rc == 0) 
     { 
      continue; 
     } 

     if(rc == -1) 
     { 
      PPANIC("select() failed"); 
     } 

     if(FD_ISSET(sock, &read_fds)) 
     { 
      char buf[1024]; 
      int inb; 
      char ctrl_msg_buf[1024]; 
      struct iovec iov[1]; 
      iov[0].iov_base = buf; 
      iov[0].iov_len = 1024; 
      struct msghdr msg_hdr = { 
       .msg_iov = iov, 
       .msg_iovlen = 1, 
       .msg_name = NULL, 
       .msg_namelen = 0, 
       .msg_control = ctrl_msg_buf, 
       .msg_controllen = sizeof(ctrl_msg_buf), 
      }; 
      struct cmsghdr *ctrl_msg_hdr; 

      inb = recvmsg(sock, &msg_hdr, 0); 
      if (inb < 0) 
      { 
       PPANIC("recvmsg() failed"); 
      } 

      for (ctrl_msg_hdr = CMSG_FIRSTHDR(&msg_hdr); ctrl_msg_hdr != NULL; ctrl_msg_hdr = CMSG_NXTHDR(&msg_hdr, ctrl_msg_hdr)) 
      { 
       if (ctrl_msg_hdr->cmsg_level == IPPROTO_IP && ctrl_msg_hdr->cmsg_type == IP_PKTINFO) 
       { 
        struct in_pktinfo *pckt_info = (struct in_pktinfo *)CMSG_DATA(ctrl_msg_hdr); 
        printf("got data for mcast group: %s\n", inet_ntoa(pckt_info->ipi_addr)); 
        break; 
       } 
      } 

      printf("|"); 
      for (i = 0; i < inb; i++) 
       printf("%c", isprint(buf[i])?buf[i]:'?'); 
      printf("|\n"); 
#if STATS_PATCH 
      rc = fstat(sock, &stat_buf); 
      if (rc == -1) 
      { 
       perror("fstat() failed"); 
      } else { 
       printf("st_atime: %d\n", stat_buf.st_atime); 
       printf("st_mtime: %d\n", stat_buf.st_mtime); 
       printf("st_ctime: %d\n", stat_buf.st_ctime); 
      } 
#endif 
     } 
    } 

    return 0; 
} 

下面的代码不会解决有机磷农药的问题,但可以指导人们处理类似要求

(EDIT)一个人不应该在深夜做这样的事情。 ..即使采用这种解决方案,您只会得到fd由select处理的订单 - 这将不会提示帧到达的时间。

正如here所述,由于没有为socket inode设置所需的回调,因此目前无法检索套接字的顺序或更改的时间戳。但是如果你能修补你的内核,你可以通过在选择系统调用中设置时间来解决这个问题。

下面的补丁可以给你一个想法:

diff --git a/fs/select.c b/fs/select.c 
index 467bb1c..3f2927e 100644 
--- a/fs/select.c 
+++ b/fs/select.c 
@@ -435,6 +435,9 @@ int do_select(int n, fd_set_bits *fds, struct timespec *end_time) 
     for (i = 0; i < n; ++rinp, ++routp, ++rexp) { 
      unsigned long in, out, ex, all_bits, bit = 1, mask, j; 
      unsigned long res_in = 0, res_out = 0, res_ex = 0; 
+   struct timeval tv; 
+   
+   do_gettimeofday(&tv); 

      in = *inp++; out = *outp++; ex = *exp++; 
      all_bits = in | out | ex; 
@@ -452,6 +455,16 @@ int do_select(int n, fd_set_bits *fds, struct timespec *end_time) 
       f = fdget(i); 
       if (f.file) { 
        const struct file_operations *f_op; 
+     struct kstat stat; 
+     
+     int ret; 
+     u8 is_sock = 0; 
+ 
+     ret = vfs_getattr(&f.file->f_path, &stat); 
+     if(ret == 0 && S_ISSOCK(stat.mode)) { 
+      is_sock = 1; 
+     } 
+     
        f_op = f.file->f_op; 
        mask = DEFAULT_POLLMASK; 
        if (f_op->poll) { 
@@ -464,16 +477,22 @@ int do_select(int n, fd_set_bits *fds, struct timespec *end_time) 
         res_in |= bit; 
         retval++; 
         wait->_qproc = NULL; 
+      if(is_sock && f.file->f_inode) 
+       f.file->f_inode->i_ctime.tv_sec = tv.tv_sec; 
        } 
        if ((mask & POLLOUT_SET) && (out & bit)) { 
         res_out |= bit; 
         retval++; 
         wait->_qproc = NULL; 
+      if(is_sock && f.file->f_inode) 
+       f.file->f_inode->i_ctime.tv_sec = tv.tv_sec; 
        } 
        if ((mask & POLLEX_SET) && (ex & bit)) { 
         res_ex |= bit; 
         retval++; 
         wait->_qproc = NULL; 
+      if(is_sock && f.file->f_inode) 
+       f.file->f_inode->i_ctime.tv_sec = tv.tv_sec; 
        } 
        /* got something, stop busy polling */ 
        if (retval) { 

注:

  1. 这是......只为你:) - 不要指望它在主线

  2. do_gettimeofday()被称为之前每个相关的fd被测试。 以获得更高的粒度,这应该在每次迭代中完成(并且仅在需要时)。由于stat接口只提供了一秒的粒度,所以你可以(!!很好!)使用剩余的时间属性将秒的小数部分映射到这些字段。

  3. 这是使用内核3.16.0完成的,没有很好的测试。不要在太空船或医疗设备中使用它。如果您想尝试一下,得到一个文件系统映像(如https://people.debian.org/~aurel32/qemu/amd64/debian_wheezy_amd64_standard.qcow2)和QEMU使用来测试它:

    须藤QEMU系统-x86_64的-kernel弓/ 86 /启动/ bzImage的-hda debian_wheezy_amd64_standard.qcow2 - 追加“root =/dev/sda1”