2014-02-19 126 views
8

我正在写一个小型服务器,它将接收来自多个来源的数据并处理这些数据。收到的来源和数据很重要,但不超过epoll应该能够很好地处理。但是,所有接收到的数据都必须解析并通过大量测试来运行,这些测试非常耗时,并且尽管epoll复用也会阻塞单个线程。基本上,该模式应该如下所示:IO-loop接收数据并将其捆绑到作业中,发送到池中可用的第一个线程,捆绑由作业处理,结果传递给IO循环写入文件。带C工作线程的epoll IO

我决定去找单个IO线程和N个工作线程。在IO线程接受的TCP连接,并读取数据是很容易使用,在提供的示例来实现: http://linux.die.net/man/7/epoll

线程通常也很容易对付,但我奋斗到epoll的IO循环与线程池结合一个优雅的方式。我无法找到在线工作人员在线使用epoll的“最佳做法”,但关于同一主题的问题很多。

因此,我有一些问题,我希望有人能帮助我回答:

  1. 可以(也应该)eventfd被用作IO线程和所有的工人之间的2路同步的机制?例如,每个工作线程有一个好主意,让它自己的epoll例程在一个共享的eventfd上等待(带有一个结构指针,包含关于该工作的数据/信息),即以某种方式使用eventfd作为工作队列?也可能有另一个eventfd将结果从多个工作线程传递回IO线程?
  2. 在IO线程发出有关套接字上更多数据的信号后,实际的recv是否应该在IO线程上发生,或者工作人员是否应该自己收回数据以便在解析数据帧时不阻塞IO线程等。?在那种情况下,我该如何确保安全,例如如果recv在工作线程中读取1,5帧数据并且另一个工作线程从相同连接接收到最后的0,5帧数据?
  3. 如果工作线程池是通过互斥锁等实现的,如果N + 1线程试图使用相同的锁,将等待锁定锁定IO线程吗?
  4. 是否有任何良好的实践模式,如何建立一个围绕epoll工作线程池与双向通信(即从IO到工人和后面)?

编辑:可以一个可能的解决方案是从IO循环更新环形缓冲区,更新后通过共享管道发送环形缓冲区索引给工作人员(因此放弃了该索引的控制权第一个从管道读取索引的工作者),让工作人员拥有该索引直到处理结束,然后再次通过管道将索引号发送回IO线程,从而给出控制权?

我的应用程序仅限于Linux,所以我可以使用Linux专用功能以尽可能最优雅的方式实现此功能。不需要跨平台支持,但性能和线程安全性是。

+0

我想我可能有一个有用的解决方案,但需要知道,如何尽快拳头你知道单帧/包的长度吗?它们是固定长度的,是包含在数据包标题中还是只在最后才知道?如果你早点知道,在不占用主线程的情况下关闭工作要容易得多,但如果你不知道最后主线程不可避免地要做大量的读取。 – Vality

+0

嗨,我知道recv后和迭代recv缓冲区后的长度。不幸的是,它们不是固定的长度,长度不会出现在数据包中,而是基于换行成帧。 – agnsaft

回答

3

当执行这个模型时,因为我们只知道数据包大小,一旦我们完全接收到数据包,很遗憾我们不能将接收本身卸载到工作线程。相反,我们仍然可以做的最好的事情是接收数据的线程必须将指针传递给完全接收的数据包。

数据本身可能最好保存在一个循环缓冲区中,但是我们会为每个输入源需要一个单独的缓冲区(如果我们得到一个部分数据包,我们可以继续从其他数据源接收而不分割数据。是如何通知工作人员新的数据包何时准备好以及给他们一个指向数据包中的数据的指针,因为这里的数据很少,只是一些指针,最优雅的方法是使用posix消息队列。这些提供了多个发送者和多个接收者写入和读取消息的能力,总是确保每个消息都被接收,并且恰好由1个线程来接收。

你会想要一个类似于下面的结构的每个数据源的结构,我现在要通过字段的目的。

struct DataSource 
{ 
    int SourceFD; 
    char DataBuffer[MAX_PACKET_SIZE * (THREAD_COUNT + 1)]; 
    char *LatestPacket; 
    char *CurrentLocation 
    int SizeLeft; 
}; 

的SourceFD显然文件描述符到所讨论的数据流,所述的DataBuffer是其中同时被处理的分组的内容被保持,这是一个循环缓冲器。 LatestPacket指针用于临时保存一个指向最重新发送的数据包的指针,以防我们收到部分数据包并在关闭数据包之前移动到另一个数据包。 CurrentLocation存储最新数据包的结束位置,以便我们知道下一个数据包的放置位置,或部分接收数据的位置。剩下的大小是留在缓冲区中的空间,这将用于判断我们是否可以适应数据包或需要绕回到开始。

接收功能将因此有效

  • 复制分组的内容到缓冲
  • 移动CurrentLocation以指向分组
  • 更新SizeLeft的端部以考虑现在减少缓冲
  • 如果我们无法将数据包放入缓冲区末尾,我们会循环约
  • 如果没有空间,那么稍后再尝试一次,转到另一个源同时
  • 如果我们有一个部分接收存储LatestPacket指针指向包的开始和去到另一个流,直到我们得到了休息
  • 发送使用posix message queue的工作线程,因此它可以处理的消息数据,该消息将包含一个指向数据源的结构,因此它可以在它的工作,它也需要一个指向该数据包是工作,它的大小,这些可以计算出,当我们收到的数据包

工作者线程将使用收到的指针进行处理,然后增加SizeLeft,以便接收者线程知道它可以继续填充缓冲区。将需要原子函数来处理结构中的size值,所以我们不会获得size属性的竞争条件(因为它可能是由工作者和IO线程同时写入的,导致丢失的写入,请参阅我的评论如下),他们列出了here,并且简单而且非常有用。现在

,我已经给了一些一般性的背景,但将解决具体给出几点:

  1. 使用EventFD作为一个同步机制在很大程度上是一个坏主意,你会用不必要的CPU相当数量的发现自己时间,并且很难执行任何同步。特别是如果你有多个线程拿起相同的文件描述符,你可能会遇到大问题。这实际上是一种有时会起作用的令人讨厌的黑客攻击,但它不是真正的同步替代品。
  2. 如上所述尝试卸载接收也是一个坏主意,你可以用复杂的IPC解决这个问题,但坦率地说,接收IO不太可能需要足够的时间来拖延你的应用程序,你的IO也可能很多比CPU慢,所以用多线程接收将获得很少的收益。 (假设你没有说,有几个10千兆网卡)。
  3. 使用互斥锁或锁定在这里是一个愚蠢的想法,考虑到(同时)共享数据量较低,它更适合无锁编码,您实际上只是交出工作和数据。这也将提高接收线程的性能,并使您的应用程序更具可扩展性。使用这里提到的功能http://gcc.gnu.org/onlinedocs/gcc-4.1.2/gcc/Atomic-Builtins.html你可以做到这一点很好,很容易。如果你这样做的话,你需要的是一个信号量,每次接收到一个数据包并锁定时,这个信号就会被解锁,每一个线程都会启动一个工作,以便在更多的数据包准备就绪时允许动态地增加线程数量,那么带有互斥体的自制解决方案的开销就会小得多。
  4. 对于任何线程池,这里没有太多的区别,你产生了很多线程,然后让它们在数据消息队列中的mq_receive中全部阻塞以等待消息。完成后,他们将结果发回主线程,将主结果消息队列添加到它的epoll列表中。然后它可以以这种方式接收结果,它对于像指针这样的小数据有效载荷来说简单而且非常有效。这也将使用很少的CPU,并且不会迫使主线浪费时间管理工作人员。

最后,你的编辑是相当明智的,除了我建议的事实,消息队列远远胜于管道,因为它们非常有效地发送事件信号,保证完整的消息读取并提供自动组帧。

我希望这会有所帮助,但是如果我错过了任何事情,或者您有任何问题,请随时澄清或更多解释发表评论。

+0

感谢您长时间的回答。只有几个问题:我是否可以假设多个线程可能会阻塞在同一队列中以等待新任务?多个线程是否可以写入另一个队列以传递完成的工作?在这样的设计中,我是否真的需要如上所述的builtins? – agnsaft

+0

@invictus消息队列确实是多对多的关系,它们非常强大,因为任意数量的发送者和任何数量的接收者都可以使用队列,并且消息总是会传递给正好一个监听线程。绝大多数代码并不需要上面的内置代码,唯一的用途是确保SizeLeft以原子方式更新,以确保接收器线程和工作线程不会同时更新并导致其损坏,例如:线程1负载值,thread2加载值,thread1写入它,thread2写入它,thread1的写入丢失。 – Vality

+0

这样的声音是我一直在寻找的解决方案。你知道mq_ *如何执行(即性能开销)吗? – agnsaft

4

在我的测试中,每个线程的一个epoll实例远远超过了复杂的线程模型。如果将监听套接字添加到所有epoll实例中,工作人员将只需accept(2),并且获胜者将被授予连接并在其一生中处理它。

你的工人可能是这个样子:

for (;;) { 
    nfds = epoll_wait(worker->efd, &evs, 1024, -1); 

    for (i = 0; i < nfds; i++) 
     ((struct socket_context*)evs[i].data.ptr)->handler(
      evs[i].data.ptr, 
      evs[i].events); 
} 

并添加到epoll的实例的每个文件描述符可能有与之相关的struct socket_context

void listener_handler(struct socket_context* ctx, int ev) 
{ 
    struct socket_context* conn; 

    conn->fd = accept(ctx->fd, NULL, NULL); 
    conn->handler = conn_handler; 

    /* add to calling worker's epoll instance or implement some form 
    * of load balancing */ 
} 

void conn_handler(struct socket_context* ctx, int ev) 
{ 
    /* read all available data and process. if incomplete, stash 
    * data in ctx and continue next time handler is called */ 
} 

void dummy_handler(struct socket_context* ctx, int ev) 
{ 
    /* handle exit condition async by adding a pipe with its 
    * own handler */ 
} 

我喜欢这样的策略,因为:

  • 非常简单的设计;
  • 所有线程是相同的;
  • 工人和连接被隔离 - 没有踩在对方的脚趾上或在错误的工作人员中呼叫read(2);
  • 不需要锁(内核在accept(2)上担心同步);
  • 有点自然负载平衡,因为没有繁忙的工作人员会积极抗争accept(2)

而且对epoll的一些注意事项:

  • 使用边沿触发模式,非阻塞套接字,读,直到EAGAIN; (epoll寄存器文件描述符,但实际上手表文件描述);或者,
  • 你可以安全地epoll_ctl(2)其他线程的epoll实例;
  • epoll_wait(2)使用大的struct epoll_event缓冲区来避免饥饿。

其他一些注意事项:

  • 使用accept4(2)节省了系统调用;
  • 每个内核使用一个线程(如果CPU受限,则每个物理为1;如果受I/O限制,则每个逻辑为1);
  • poll(2)/select(2)如果连接数很少,可能会更快。

我希望这有助于。

+0

我喜欢这个想法,但是,我担心每次recv后我的繁重工作量都会阻止其他连接。另外,如果线程“幸运”足以先选择下一个接受,那么这是否会导致每个线程的工作负载不均衡?此外,如果我只有4-5个连接,我可能仍然需要30个工作线程来处理它们生成的内容。 – agnsaft

+0

@invictus是的,除非您将连接均匀分布到侦听器处理程序中,否则工作负载将不会完美平衡,这可能会增加一些复杂性。你的工作是CPU还是I/O绑定?如果处理器内核处于CPU限制状态,那么比处理器内核多的线程将引入更多的上下文切换。 – haste

+0

我的工作负载是CPU-bound – agnsaft

0

我张贴在其他职位相同的答案:I want to wait on both a file descriptor and a mutex, what's the recommended way to do this?

================================ ==========================

这是一个非常常见的问题,尤其是当您开发网络服务器端程序时。大多数Linux服务器端程序的主看就会循环是这样的:

epoll_add(serv_sock); 
while(1){ 
    ret = epoll_wait(); 
    foreach(ret as fd){ 
     req = fd.read(); 
     resp = proc(req); 
     fd.send(resp); 
    } 
} 

它是单线程(主线程),epoll的基于服务器架构。问题是,它是单线程的,而不是多线程的。它要求proc()永远不要阻塞或运行一段很长的时间(比如对于一般情况来说,就是10毫秒)。

如果proc()会运行很长时间,我们需要多线程,并在分离线程(工作线程)中执行proc()。

我们可以在不阻塞主线程的情况下使用基于互斥锁的消息队列将任务提交给工作线程,但速度足够快。

然后我们需要一种方法来从工作者线程获取任务结果。怎么样?但是,如果我们直接在epoll_wait()之前或之后检查消息队列,则检查操作将在epoll_wait()结束后执行,而epoll_wait()通常会阻塞10微秒(常见情况),如果它等待的所有文件描述符不活跃。

对于服务器来说,10毫秒是相当长的时间!当产生任务结果时,我们可以发信号通知epoll_wait()立即结束吗?

是的!我将在我的一个开源项目中描述它是如何完成的。

为所有工作线程创建管道,并且epoll也在该管道上等待。一旦生成任务结果,工作线程将一个字节写入管道,然后epoll_wait()几乎在同一时间结束! - Linux管道有5 us到20 us的延迟。

在我的项目SSDB(Redis协议兼容的磁盘NoSQL数据库)中,我创建了一个SelectableQueue用于在主线程和工作线程之间传递消息。就像它的名字一样,SelectableQueue有一个文件描述符,可以通过epoll等待。

SelectableQueue:在主线程https://github.com/ideawu/ssdb/blob/master/src/util/thread.h#L94

用法:

epoll_add(serv_sock); 
epoll_add(queue->fd()); 
while(1){ 
    ret = epoll_wait(); 
    foreach(ret as fd){ 
     if(fd is worker_thread){ 
      sock, resp = worker->pop_result(); 
      sock.send(resp); 
     } 
     if(fd is client_socket){ 
      req = fd.read(); 
      worker->add_task(fd, req); 
     } 
    } 
} 

工作线程用法:

fd, req = queue->pop_task(); 
resp = proc(req); 
queue->add_result(fd, resp);