2011-01-06 15 views
4

我正在移植构建在ACE Proactor框架之上的应用程序。该应用程序对于VxWorks和Windows都运行良好,但在内核2.6.X.X的Linux(CentOS 5.5,WindRiver Linux 1.4 & 3.0)上使用librt无法实现。Linux中的同步套接字读/写(“全双工”)(特定于aio)

我已经收窄的问题降到一个非常基本的问题: 应用程序开始异步(通过的aio_read)读取套接字上操作,并随后开始异步(通过aio_write)非常相同的插座上书写。由于该协议是从应用程序结束时初始化的,因此读取操作无法实现。 - 当套接字处于阻塞模式时,永远不会写入,协议“挂起”。 - 使用O_NONBLOCK套接字时,写入成功,但读取无限期地返回“EWOULDBLOCK/EAGAIN”错误,永远不会恢复(即使重新启动AIO操作)。

我经历了多个论坛,无法找到一个明确的答案,这是否应该工作(而且我做错了什么事),或者不可能使用Linux AIO。如果我放弃AIO并寻求不同的实现(通过epoll/poll/select等),是否有可能?

附件是一个示例代码来迅速重新产生在非阻塞套接字的问题:

#include <aio.h> 
#include <stdio.h> 
#include <stdlib.h> 
#include <netdb.h> 
#include <string.h> 
#include <netinet/in.h> 
#include <sys/socket.h> 
#include <sys/types.h> 
#include <assert.h> 
#include <errno.h> 

#define BUFSIZE (100) 

// Global variables 
struct aiocb *cblist[2]; 
int theSocket; 

void InitializeAiocbData(struct aiocb* pAiocb, char* pBuffer) 
{ 
    bzero((char *)pAiocb, sizeof(struct aiocb)); 

    pAiocb->aio_fildes = theSocket; 
    pAiocb->aio_nbytes = BUFSIZE; 
    pAiocb->aio_offset = 0; 
    pAiocb->aio_buf = pBuffer; 
} 

void IssueReadOperation(struct aiocb* pAiocb, char* pBuffer) 
{ 
    InitializeAiocbData(pAiocb, pBuffer); 

    int ret = aio_read(pAiocb); 
    assert (ret >= 0); 
} 

void IssueWriteOperation(struct aiocb* pAiocb, char* pBuffer) 
{ 
    InitializeAiocbData(pAiocb, pBuffer); 

    int ret = aio_write(pAiocb); 
    assert (ret >= 0); 
} 

int main() 
{ 
    int ret; 
    int nPort = 11111; 
    char* szServer = "10.10.9.123"; 

    // Connect to the remote server 
    theSocket = socket(AF_INET, SOCK_STREAM, 0); 
    assert (theSocket >= 0); 

    struct hostent *pServer; 
    struct sockaddr_in serv_addr; 
    pServer = gethostbyname(szServer); 

    bzero((char *) &serv_addr, sizeof(serv_addr)); 
    serv_addr.sin_family = AF_INET; 
    serv_addr.sin_port = htons(nPort); 
    bcopy((char *)pServer->h_addr, (char *)&serv_addr.sin_addr.s_addr, pServer->h_length); 

    assert (connect(theSocket, (const sockaddr*)(&serv_addr), sizeof(serv_addr)) >= 0); 

    // Set the socket to be non-blocking 
    int oldFlags = fcntl(theSocket, F_GETFL) ; 
    int newFlags = oldFlags | O_NONBLOCK; 

    fcntl(theSocket, F_SETFL, newFlags); 
    printf("Socket flags: before=%o, after=%o\n", oldFlags, newFlags); 

    // Construct the AIO callbacks array 
    struct aiocb my_aiocb1, my_aiocb2; 
    char* pBuffer = new char[BUFSIZE+1]; 

    bzero((char *)cblist, sizeof(cblist)); 
    cblist[0] = &my_aiocb1; 
    cblist[1] = &my_aiocb2; 

    // Start the read and write operations on the same socket 
    IssueReadOperation(&my_aiocb1, pBuffer); 
    IssueWriteOperation(&my_aiocb2, pBuffer); 

    // Wait for I/O completion on both operations 
    int nRound = 1; 
    printf("\naio_suspend round #%d:\n", nRound++); 
    ret = aio_suspend(cblist, 2, NULL); 
    assert (ret == 0); 

    // Check the error status for the read and write operations 
    ret = aio_error(&my_aiocb1); 
    assert (ret == EWOULDBLOCK); 

    // Get the return code for the read 
    { 
     ssize_t retcode = aio_return(&my_aiocb1); 
     printf("First read operation results: aio_error=%d, aio_return=%d - That's the first EWOULDBLOCK\n", ret, retcode); 
    } 

    ret = aio_error(&my_aiocb2); 
    assert (ret == EINPROGRESS); 
    printf("Write operation is still \"in progress\"\n"); 

    // Re-issue the read operation 
    IssueReadOperation(&my_aiocb1, pBuffer); 

    // Wait for I/O completion on both operations 
    printf("\naio_suspend round #%d:\n", nRound++); 
    ret = aio_suspend(cblist, 2, NULL); 
    assert (ret == 0); 

    // Check the error status for the read and write operations for the second time 
    ret = aio_error(&my_aiocb1); 
    assert (ret == EINPROGRESS); 
    printf("Second read operation request is suddenly marked as \"in progress\"\n"); 

    ret = aio_error(&my_aiocb2); 
    assert (ret == 0); 

    // Get the return code for the write 
    { 
     ssize_t retcode = aio_return(&my_aiocb2); 
     printf("Write operation has completed with results: aio_error=%d, aio_return=%d\n", ret, retcode); 
    } 

    // Now try waiting for the read operation to complete - it'll just busy-wait, receiving "EWOULDBLOCK" indefinitely 
    do 
    { 
     printf("\naio_suspend round #%d:\n", nRound++); 
     ret = aio_suspend(cblist, 1, NULL); 
     assert (ret == 0); 

     // Check the error of the read operation and re-issue if needed 
     ret = aio_error(&my_aiocb1); 
     if (ret == EWOULDBLOCK) 
     { 
      IssueReadOperation(&my_aiocb1, pBuffer); 
      printf("EWOULDBLOCK again on the read operation!\n"); 
     } 
    } 
    while (ret == EWOULDBLOCK); 
} 

由于提前, Yotam。

+0

请尝试使用`ace-users @ list.isis.vanderbilt.edu`邮件列表:http://www.cs.wustl.edu/~schmidt/ACE-mail.html – 2011-01-06 16:27:21

回答

3

首先,O_NONBLOCK和AIO不混合。 AIO会报告时,相应的readwrite不会阻止异步操作完成 - 与O_NONBLOCK,他们会从未块,所以aio请求将总是立即完成(与aio_return()EWOULDBLOCK)。其次,不要对两个同时出现的aio请求使用相同的缓冲区。在发出aio请求的时间与aio_error()告诉您它已完成时间之间,缓冲区应被视为完全限制。

第三,对同一文件描述符的AIO请求排队,以便给出明智的结果。这意味着在读取完成之前您的写入不会发生 - 如果您需要先写入数据,则需要按相反顺序发出AIO。下面将做工精细,没有设置O_NONBLOCK

struct aiocb my_aiocb1, my_aiocb2; 
char pBuffer1[BUFSIZE+1], pBuffer2[BUFSIZE+1] = "Some test message"; 

const struct aiocb *cblist[2] = { &my_aiocb1, &my_aiocb2 }; 

// Start the read and write operations on the same socket 
IssueWriteOperation(&my_aiocb2, pBuffer2); 
IssueReadOperation(&my_aiocb1, pBuffer1); 

// Wait for I/O completion on both operations 
int nRound = 1; 
int aio_status1, aio_status2; 
do { 
    printf("\naio_suspend round #%d:\n", nRound++); 
    ret = aio_suspend(cblist, 2, NULL); 
    assert (ret == 0); 

    // Check the error status for the read and write operations 
    aio_status1 = aio_error(&my_aiocb1); 
    if (aio_status1 == EINPROGRESS) 
     puts("aio1 still in progress."); 
    else 
     puts("aio1 completed."); 

    aio_status2 = aio_error(&my_aiocb2); 

    if (aio_status2 == EINPROGRESS) 
     puts("aio2 still in progress."); 
    else 
     puts("aio2 completed."); 
} while (aio_status1 == EINPROGRESS || aio_status2 == EINPROGRESS); 

// Get the return code for the read 
ssize_t retcode; 
retcode = aio_return(&my_aiocb1); 
printf("First operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode); 

retcode = aio_return(&my_aiocb1); 
printf("Second operation results: aio_error=%d, aio_return=%d\n", aio_status1, retcode); 

另外,如果你不关心读取和写入是相对于责令对方,你可以使用dup()创建两个文件描述符套接字,并使用一个读取和另一个写入 - 每个将其AIO操作分开排队。

+1

您是否参考了“ AIO对同一个fd的请求按顺序排列“?如果是这样的话,那么更好的解决方案就是复制句柄。 – 2011-01-07 01:38:50