2011-07-16 128 views
2

我想在服务器和客户端之间有非阻塞I/O。这两个连接后,我试图用fork来处理IO,但是在尝试读取“Transport endpoint is not connected”时服务器端发生错误,并且它发生了两次(因为我猜的是fork) 。简单套接字非阻塞I/O

Server代码

//includes taken out 

#define PORT "4950" 
#define STDIN 0 

struct sockaddr name; 

void set_nonblock(int socket) { 
    int flags; 
    flags = fcntl(socket,F_GETFL,0); 
    assert(flags != -1); 
    fcntl(socket, F_SETFL, flags | O_NONBLOCK); 
} 


// get sockaddr, IPv4 or IPv6: 
void *get_in_addr(struct sockaddr *sa) { 
    if (sa->sa_family == AF_INET) 
     return &(((struct sockaddr_in*)sa)->sin_addr); 

    return &(((struct sockaddr_in6*)sa)->sin6_addr); 
} 


int main(int agrc, char** argv) { 
    int status, sock, adrlen, new_sd; 

    struct addrinfo hints; 
    struct addrinfo *servinfo; //will point to the results 

    //store the connecting address and size 
    struct sockaddr_storage their_addr; 
    socklen_t their_addr_size; 

    //socket infoS 
    memset(&hints, 0, sizeof hints); //make sure the struct is empty 
    hints.ai_family = AF_INET; 
    hints.ai_socktype = SOCK_STREAM; //tcp 
    hints.ai_flags = AI_PASSIVE;  //use local-host address 

    //get server info, put into servinfo 
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) { 
     fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status)); 
     exit(1); 
    } 

    //make socket 
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); 
    if (sock < 0) { 
     printf("\nserver socket failure %m", errno); 
     exit(1); 
    } 

    //allow reuse of port 
    int yes=1; 
    if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) { 
     perror("setsockopt"); 
     exit(1); 
    } 

    //unlink and bind 
    unlink("127.0.0.1"); 
    if(bind (sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) { 
     printf("\nBind error %m", errno); 
     exit(1); 
    } 

    freeaddrinfo(servinfo); 

    //listen 
    if(listen(sock, 5) < 0) { 
     printf("\nListen error %m", errno); 
     exit(1); 
    } 
    their_addr_size = sizeof(their_addr); 
    //accept 
    new_sd = accept(sock, (struct sockaddr*)&their_addr, &their_addr_size); 
    if(new_sd < 0) { 
     printf("\nAccept error %m", errno); 
     exit(1); 
    } 

    cout<<"\nSuccessful Connection!"; 

    //set nonblock 
    set_nonblock(new_sd); 

    char* in = new char[255]; 
    char* out = new char[255]; 
    int numSent; 
    int numRead; 
    pid_t pid; 

    fork(); 
    pid = getpid(); 

    if(pid == 0) { 

     while(!(out[0] == 'q' && out[1] == 'u' && out[2] == 'i' && out[3] == 't')) { 

      fgets(out, 255, stdin); 
      numSent = send(sock, out, strlen(out), 0); 

      if(numSent < 0) { 
       printf("\nError sending %m", errno); 
       exit(1); 
      } //end error 
     } //end while 
    } //end child 

    else { 
     numRead = recv(sock, in, 255, 0); 
     if(numRead < 0) { 
      printf("\nError reading %m", errno); 
      exit(1); 
     } //end error 
     else { 
      cout<<in; 
      for(int i=0;i<255;i++) 
       in[i] = '\0'; 

     } //end else 
    } //end parent 

    cout<<"\n\nExiting normally\n"; 
    return 0; 
} 

客户端代码

//包括取出

#define PORT "4950" 

struct sockaddr name; 

void set_nonblock(int socket) { 
    int flags; 
    flags = fcntl(socket,F_GETFL,0); 
    assert(flags != -1); 
    fcntl(socket, F_SETFL, flags | O_NONBLOCK); 
} 


// get sockaddr, IPv4 or IPv6: 
void *get_in_addr(struct sockaddr *sa) { 
    if (sa->sa_family == AF_INET) 
     return &(((struct sockaddr_in*)sa)->sin_addr); 

    return &(((struct sockaddr_in6*)sa)->sin6_addr); 
} 

int main(int agrc, char** argv) { 
    int status, sock, adrlen; 

    struct addrinfo hints; 
    struct addrinfo *servinfo; //will point to the results 

    memset(&hints, 0, sizeof hints); //make sure the struct is empty 
    hints.ai_family = AF_INET; 
    hints.ai_socktype = SOCK_STREAM; //tcp 
    hints.ai_flags = AI_PASSIVE;  //use local-host address 

    //get server info, put into servinfo 
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) { 
     fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status)); 
     exit(1); 
    } 

    //make socket 
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); 
    if (sock < 0) { 
     printf("\nserver socket failure %m", errno); 
     exit(1); 
    } 

    if(connect(sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) { 
     printf("\nclient connection failure %m", errno); 
     exit(1); 
    } 

    cout<<"\nSuccessful connection!"; 

    //set nonblock 
    set_nonblock(sock); 

    char* out = new char[255]; 
    char* in = new char[255]; 
    int numRead; 
    int numSent; 
    pid_t pid; 

    fork(); 
    pid = getpid(); 

    if(pid == 0) { 

     while(!(out[0] == 'q' && out[1] == 'u' && out[2] == 'i' && out[3] == 't')) { 

      fgets(out, 255, stdin); 
      numSent = send(sock, out, strlen(out), 0); 

      if(numSent < 0) { 
       printf("\nError sending %m", errno); 
       exit(1); 
      } //end error 
     } //end while 
    } //end child process 

    else { 
     while(!(in[0] == 'q' && in[1] == 'u' && in[2] == 'i' && in[3] == 't')) { 
     numRead = recv(sock, in, 255, 0); 
      cout<<in; 
      for(int i=0;i<255;i++) 
       in[i] = '\0'; 
     } 
    } //end parent process 

    cout<<"\n\nExiting normally\n"; 
    return 0; 
} 

我也使用线程做I/O尝试。有问题,当我运行程序时,它就像线程不会发生。该程序只是运行“成功连接”,然后“正常退出”。我在while(1)循环中添加了一些cout语句,并且他们打印了几次,但是由于某种原因它们停止了。我不确定它是否与我的线程或套接字有关。代码(非常类似于上面),因为这是在这里 -

服务器

//includes taken out 

#define PORT "4950" 
#define STDIN 0 

pthread_t readthread; 
pthread_t sendthread; 

char* in = new char[255]; 
char* out = new char[255]; 
int numSent; 
int numRead; 

struct sockaddr name; 
int sock, new_sd; 

void* readThread(void* threadid) { 

    while(1) { 

     numRead = recv(new_sd, in, 255, 0); 

     if(numRead > 0) { 
      cout<<"\n"<<in; 
      for(int i=0;i<strlen(in);i++) 
       in[i] = '\0'; 
     } //end if 
     else if(numRead < 0) { 
      printf("\nError reading %m", errno); 
      exit(1); 
     } 

    } //end while 
} //END READTHREAD 


void* sendThread(void* threadid) { 

    while(1) { 

     cin.getline(out, 255); 

     numSent = send(new_sd, out, 255, 0); 

     if(numSent > 0) { 
      for(int i=0;i<strlen(out);i++) 
       out[i] = '\0'; 
     } //end if 
     else if(numSent < 0) { 
      printf("\nError sending %m", errno); 
      exit(1); 
     } 
    } //end while 
} //END SENDTHREAD 

void set_nonblock(int socket) { 
    int flags; 
    flags = fcntl(socket,F_GETFL,0); 
    assert(flags != -1); 
    fcntl(socket, F_SETFL, flags | O_NONBLOCK); 
} 

// get sockaddr, IPv4 or IPv6: 
void *get_in_addr(struct sockaddr *sa) { 
    if (sa->sa_family == AF_INET) 
     return &(((struct sockaddr_in*)sa)->sin_addr); 

    return &(((struct sockaddr_in6*)sa)->sin6_addr); 
} 

int main(int agrc, char** argv) { 
    int status, adrlen; 

    struct addrinfo hints; 
    struct addrinfo *servinfo; //will point to the results 

    //store the connecting address and size 
    struct sockaddr_storage their_addr; 
    socklen_t their_addr_size; 

    //socket infoS 
    memset(&hints, 0, sizeof hints); //make sure the struct is empty 
    hints.ai_family = AF_INET; 
    hints.ai_socktype = SOCK_STREAM; //tcp 
    hints.ai_flags = AI_PASSIVE;  //use local-host address 

    //get server info, put into servinfo 
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) { 
     fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status)); 
     exit(1); 
    } 

    //make socket 
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); 
    if (sock < 0) { 
     printf("\nserver socket failure %m", errno); 
     exit(1); 
    } 

    //allow reuse of port 
    int yes=1; 
    if (setsockopt(sock,SOL_SOCKET,SO_REUSEADDR,&yes,sizeof(int)) == -1) { 
     perror("setsockopt"); 
     exit(1); 
    } 

    //unlink and bind 
    unlink("127.0.0.1"); 
    if(bind (sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) { 
     printf("\nBind error %m", errno); 
     exit(1); 
    } 

    freeaddrinfo(servinfo); 

    //listen 
    if(listen(sock, 5) < 0) { 
     printf("\nListen error %m", errno); 
     exit(1); 
    } 

    their_addr_size = sizeof(their_addr); 
    //accept 
    new_sd = accept(sock, (struct sockaddr*)&their_addr, &their_addr_size); 
    if(new_sd < 0) { 
     printf("\nAccept error %m", errno); 
     exit(1); 
    } 

    cout<<"\nSuccessful Connection!"; 

    //set nonblock 
    set_nonblock(new_sd); 

    pthread_attr_t attr; 
    pthread_attr_init(&attr); 
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 

    pthread_create(&readthread, &attr, readThread, (void*)0); 
    pthread_create(&sendthread, &attr, sendThread, (void*)1); 

    cout<<"\n\nExiting normally\n"; 
    return 0; 
} 

客户

#define PORT "4950" 

pthread_t readthread; 
pthread_t sendthread; 
char* in = new char[255]; 
char* out = new char[255]; 
int numSent; 
int numRead; 
struct sockaddr name; 
int sock; 

void* readThread(void* threadid) { 

    while(1) { 

     numRead = recv(sock, in, 255, 0); 

     if(numRead > 0) { 
      cout<<"\n"<<in; 
      for(int i=0;i<strlen(in);i++) 
       in[i] = '\0'; 
     } //end if 
     else if(numRead < 0) { 
      printf("\nError reading %m", errno); 
      exit(1); 
     } 
    } //end while 
} //END READTHREAD 

void* sendThread(void* threadid) { 

    while(1) { 

     cin.getline(out, 255); 
     numSent = send(sock, out, 255, 0); 

     if(numSent > 0) { 
      for(int i=0;i<strlen(out);i++) 
       out[i] = '\0'; 
     } //end if 
     else if(numSent < 0) { 
      printf("\nError sending %m", errno); 
      exit(1); 
     } 

    } //end while 
} //END SENDTHREAD 

void set_nonblock(int socket) { 
    int flags; 
    flags = fcntl(socket,F_GETFL,0); 
    assert(flags != -1); 
    fcntl(socket, F_SETFL, flags | O_NONBLOCK); 
} 


// get sockaddr, IPv4 or IPv6: 
void *get_in_addr(struct sockaddr *sa) { 
    if (sa->sa_family == AF_INET) 
     return &(((struct sockaddr_in*)sa)->sin_addr); 

    return &(((struct sockaddr_in6*)sa)->sin6_addr); 
} 

int main(int agrc, char** argv) { 
    int status, adrlen; 

    struct addrinfo hints; 
    struct addrinfo *servinfo; //will point to the results 

    memset(&hints, 0, sizeof hints); //make sure the struct is empty 
    hints.ai_family = AF_INET; 
    hints.ai_socktype = SOCK_STREAM; //tcp 
    hints.ai_flags = AI_PASSIVE;  //use local-host address 

    //get server info, put into servinfo 
    if ((status = getaddrinfo("127.0.0.1", PORT, &hints, &servinfo)) != 0) { 
     fprintf(stderr, "getaddrinfo error: %s\n", gai_strerror(status)); 
     exit(1); 
    } 

    //make socket 
    sock = socket(servinfo->ai_family, servinfo->ai_socktype, servinfo->ai_protocol); 
    if (sock < 0) { 
     printf("\nserver socket failure %m", errno); 
     exit(1); 
    } 

    if(connect(sock, servinfo->ai_addr, servinfo->ai_addrlen) < 0) { 
     printf("\nclient connection failure %m", errno); 
     exit(1); 
    } 

    cout<<"\nSuccessful connection!"; 

    //set nonblock 
    set_nonblock(sock); 

    pthread_attr_t attr; 
    pthread_attr_init(&attr); 
    pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_JOINABLE); 

    pthread_create(&readthread, &attr, readThread, (void*)0); 
    pthread_create(&sendthread, &attr, sendThread, (void*)1); 

    cout<<"\n\nExiting normally\n"; 
    return 0; 
} 

我在很长的帖子道歉,但我一直在这几天现在,我不知道如何继续。我尝试过使用select(),但对于只有1个客户端和一个服务器I/O来说,这看起来有点多。如果任何人都可以指出上面可能会出现什么问题或其他提示(或者我只是明确选择错误:)),我将不胜感激。

+0

如果您喜欢阅读源代码,请查看node.js项目。他们做得很好,一切都很好记录。你可以借用一些类似的概念,或者只是采取代码,这是麻省理工学院。 – tjameson

回答

1

我看着前两个代码服务器和客户端。不幸的是,你错误地设计了你的程序。服务器应该做以下(在多进程的情况下):

  1. 创建一个Socket
  2. 绑定到一个端口号
  3. 切换到被动模式(通过听())。
  4. 转至循环以接受传入连接。这里sock只负责监听新的连接,它不负责像你一样发送/接收。
    • 接受一个新的连接。接受()
    • 创建新的进程
    • 新进程负责做送/ recv的。父进程应返回到第4步以侦听新连接。

下面是一个伪代码

while (1) { 
    new_sd = accept(....); 
    if (new_sd < 0) continue; // or quit 
    if (fork() == 0) { 
     // do sendin and receiving USING new_sd 
     exit(0); // child terminates here 
    } 
} 

更新:因为你正在寻找无阻塞的情况下,我会建议你使用select()或poll()系统调用。

我觉得你要创建两个过程,一个过程是用于发送,另一个用于接收。如果是这样,那么您不需要将new_sd设置为非阻塞模式,因为这两个进程同时运行。如果你打算这样做,那么子进程将创建另一个进程,以便第一个孩子发送和第二个孩子接收。如下:

while (1) { 
    new_sd = accept(....); 
    if (new_sd < 0) continue; // or quit 
    if (fork() == 0) { 
     pid = fork(); 
     if (pid == 0) then do_sending(new_sd); 
     else do_receiving(new_sd); 
     exit(0); // child terminates here 
    } 
} 
+0

服务器代码现在看起来像这样。在客户端,我做了同样的事情减去接受呼叫(它只是(1)和叉为IO)。但是,当客户端连接时,当我尝试读取或发送errno为“资源暂时不可用”时,我只是遇到无限错误。你知道这意味着什么吗? – Sterling

+0

这绝对是一个更好的实现。正如我上面所说的,“资源暂时不可用”是由EAGAIN错误引起的,它表明套接字是非阻塞的并且没有数据。这似乎有点奇怪,但不是所有的错误都是平等的。如果您阅读send(2)和recv(2)的手册页,您会看到不同的错误代码意味着不同的事情,其中​​一些错误代码需要以除打印错误消息之外的方式处理。 – Sean

+0

每当我将其更改为阻止时,我都会通过对等错误重置连接,并冻结我的计算机。我试图搜索谷歌,但大多数人只是问如何创建该错误,而不是如何绕过它。 – Sterling

2

这已经有一段时间,因为我已经做任何POSIX编程,但下面的代码似乎有点奇怪:

fork(); 
pid = getpid(); 
if(pid == 0) { 

IIRC,你应该检查叉的返回值:0的子进程,父进程中的子进程的pid。

没有看过你的代码:)

+0

我试过了,没有帮助。我这样做只是因为我之前看到的一些线索是这样做的。 – Sterling

+1

那么,你的getpid()应该永远不会返回0,从fork()检查返回是正确的方式...错误是其他地方:)。我会建议最终寻找一个更高级别的异步框架,但它仍然是一个很好的练习来追踪当前的错误 - 抱歉,我没有时间仔细查看代码。 – snemarch

+0

+1,很好。 – 2011-07-16 00:54:53

4

的其余部分,我认为你的问题是在这里:

numSent = send(sock, out, strlen(out), 0); 

这里:

numRead = recv(sock, in, 255, 0); 

,你正试图sendrecv在您的监听套接字上。您需要使用new_sd这是可接受的套接字。请参阅accept(2)的手册页。

+0

肖恩,很好! – 2011-07-16 00:55:25

+0

谢谢,我将服务器端切换到了这一点。但是,现在我在尝试收回“资源暂时不可用”时收到错误消息。哪些资源不可用? – Sterling

+0

该错误(EAGAIN)表示该套接字是非阻塞的并且没有数据。如果你查看手册页,你会看到它在那里拼写。 – Sean