2013-07-15 54 views
4

我想在C++中创建一个小型服务器,只需通过infiniband连接回应接收的任何内容。我也在Ubuntu下使用套接字直接协议和POSIX套接字。无法通过infiniband连接到服务器

不幸的是,我在互联网上发现的唯一建议是关于我应该如何创建使用AF_INET_SDP域套接字,就像这里:

#define AF_INET_SDP 27 

int socketfd = socket(AF_INET_SDP, SOCK_STREAM, 0); 

我设法绑定套接字,我所说的听(。 ..)函数,但服务器在尝试接受连接时挂起,而在客户端,我只在尝试连接时收到超时。

我也有一个用Java编写的旧的类似的应用程序(客户端和服务器),它通过infiniband进行通信,并且它可以正常工作。

有人可以给我一个应用程序的例子使用infiniband或指向我的某种文件,可以帮助我吗?

谢谢。

回答

6

这是一个简单的例子来使用InfiniBand的怎么样,我希望它可以帮助ü:

#include <stdio.h> 
#include <signal.h> 
#include <unistd.h> 
#include <sys/types.h> 
#include <string.h> 
#include <stdlib.h> 
#include <netinet/in.h> 
#include <sys/socket.h> 
#include <arpa/inet.h> 
#include <errno.h> 
#include <sys/select.h> 
#include <sys/ioctl.h> 
#include <fcntl.h> 
#include <pthread.h> 
#include <time.h> 

#define AF_INET_SDP 27 


typedef struct 
{ 
    pthread_attr_t threadattr; 
    pthread_t threadid; 
    int seq; 
    int fd; 
    double sendbytes; 
    double runtimes; 
}CLIENTTHREADPARA; 

CLIENTTHREADPARA * g_pClientThreadPara; 


void signal_func(int no); 
int SetSignal(); 

int g_listensock; 
int g_bIsExit=0; 

int g_packetsize; 
int g_totalthread; 
char * g_szIp; 
int g_iPort; 
int g_nDelayUsecs; 

pthread_mutex_t g_mutex; 


typedef struct 
{ 
    pthread_attr_t threadattr; 
    pthread_t threadid; 
    int fd; 
}THREADDATA; 

static double time_so_far(); 
void * ThreadProc(void * pPara); 
int RunServ(int iPort); 
int RunClient(char * Ip,int iPort); 
void * ClientThreadProc(void * pPara); 

int main(int argc,char * argv[]) 
{ 
    int iPort; 
    int thread_nums; 

    SetSignal(); 

    if(argc != 3 && argc!=6) 
    { 
     goto err_out; 
    } 

    g_packetsize=atoi(argv[1]); 

    if(argc==3) 
    { 
     iPort=atoi(argv[2]); 
     if(iPort==0) 
     { 
      printf("socket port can not zero\n"); 
      return -1; 
     } 
     RunServ(iPort); 
    } 
    else if(argc==6) 
    { 
     g_szIp=argv[2]; 
     g_iPort=atoi(argv[3]); 
     g_totalthread=atoi(argv[4]); 
     g_nDelayUsecs=atoi(argv[5]); 
     RunClient(argv[2],iPort); 
    } 
    return 0; 

err_out: 
    printf("For Server: sdptest <packet size> <port>\n"); 
    printf("For Client: sdptest <packet size> <ip> <port> <thread_nums> <delay microsecond>\n"); 
    return -1; 
} 

int RunClient(char * Ip,int iPort) 
{ 
    int i; 
    pthread_mutex_init(&g_mutex, (pthread_mutexattr_t *)0); 

    g_pClientThreadPara=(CLIENTTHREADPARA *)malloc(sizeof(CLIENTTHREADPARA)*g_totalthread); 
    for(i=0;i<g_totalthread;i++) 
    { 
     g_pClientThreadPara[i].seq=i; 
     pthread_attr_init(&g_pClientThreadPara[i].threadattr); 
     pthread_create(&g_pClientThreadPara[i].threadid, &g_pClientThreadPara[i].threadattr,ClientThreadProc,(void *)&g_pClientThreadPara[i]); 
    } 

    double sendbytes = 0; 
    double pre_sendbytes=0; 
    double pre_time = time_so_far(); 
    double current_time = time_so_far(); 

    while(!g_bIsExit) 
    { 
     sleep(1); 
     pthread_mutex_lock(&g_mutex); 
     sendbytes = g_pClientThreadPara[0].sendbytes; 
     pthread_mutex_unlock(&g_mutex); 
     current_time = time_so_far();   
     printf("speed is %10.2fMbytes/s,\n",(sendbytes - pre_sendbytes)/1024/1024/(current_time - pre_time)); 
     pre_sendbytes = sendbytes; 
     pre_time = current_time; 
    } 

    for(i=0;i<g_totalthread;i++) 
    { 
     pthread_join(g_pClientThreadPara[i].threadid, NULL); 
    } 

    double totalspeed=0; 
    double totaltimes=0; 

    for(i=0;i<g_totalthread;i++) 
    { 

     totalspeed+=(double)g_pClientThreadPara[i].sendbytes/g_pClientThreadPara[i].runtimes; 
    } 
    printf("speed is %10.2fMbytes/s,total times is %.2f seconds\n",totalspeed/1024/1024,totaltimes/g_totalthread); 
} 

void * ClientThreadProc(void * pPara) 
{ 

    CLIENTTHREADPARA * pThreadPara= (CLIENTTHREADPARA *)pPara; 

    struct sockaddr_in sin; 
    int nRet; 
    double starttime,endtime; 
    int cnt_per_usec; 
    char * buf; 

    memset(&sin,0,sizeof(sin)); 
    sin.sin_family=AF_INET_SDP; 
    sin.sin_port=htons(g_iPort); 

    if((sin.sin_addr.s_addr = inet_addr(g_szIp)) == INADDR_NONE) 
    { 
     printf("Ip address %s is invalid!\n",g_szIp); 
     return (void *)-1; 
    } 

    pThreadPara->fd = socket(AF_INET_SDP,SOCK_STREAM,0); 
    if(pThreadPara->fd < 0) 
    { 
     perror("Create socket error"); 
     return (void *)-1; 
    } 

    nRet=connect(pThreadPara->fd,(struct sockaddr *)&sin,sizeof(sin)); 
    if(nRet<0) 
    { 

     printf("Can't connect to %s:%d\n",g_szIp,g_iPort); 
     perror("sock error:"); 
     close(pThreadPara->fd); 
     return (void *)-1; 
    } 

    buf=malloc(g_packetsize); 
    starttime=time_so_far(); 

    pThreadPara->sendbytes=0; 
    while(!g_bIsExit) 
    { 
     nRet=send(pThreadPara->fd,buf,g_packetsize,0); 
     if(nRet<=0) 
     { 
      if(errno==EINTR) 
      { 
       continue; 
      } 
      else 
      { 
       printf("thread %d ",pThreadPara->seq); 
       perror("sock error:"); 
       break; 
      } 
     } 
     else 
     { 
      pthread_mutex_lock(&g_mutex); 
      pThreadPara->sendbytes+=nRet; 
      pthread_mutex_unlock(&g_mutex); 
     } 
     if (g_nDelayUsecs>0) 
     { 
      usleep(g_nDelayUsecs); 
     } 
    } 
    endtime=time_so_far(); 
    pThreadPara->runtimes=endtime-starttime; 
    close(pThreadPara->fd); 
    free(buf); 
    //printf("speed is %10.2fM/s\n",g_totalbytes/1024/1024/(endtime - starttime)); 
    return (void *)0; 
} 

int RunServ(int iPort) 
{ 
    struct sockaddr_in my_addr; 
    struct linger li; 
    int nRet; 
    int fromclientfd; 

    //0.init 
    memset ((char *)&my_addr, 0, sizeof(struct sockaddr_in)); 
    my_addr.sin_family = AF_INET_SDP; 
    my_addr.sin_port = htons(iPort); 
    my_addr.sin_addr.s_addr = htonl(INADDR_ANY); 

    //1.create 
    g_listensock = socket(AF_INET_SDP, SOCK_STREAM, 0); 
    if (g_listensock==-1) 
    { 
     return -1; 
    } 

    li.l_onoff = 1; 
    li.l_linger = 0; 

    int option=1; 
    setsockopt(g_listensock,SOL_SOCKET, SO_REUSEADDR, (char*)&option, sizeof(option)); 
    setsockopt(g_listensock,SOL_SOCKET, SO_LINGER, (char *) &li, sizeof(li)); 

    //2.bind 
    nRet = bind(g_listensock, (struct sockaddr *)&my_addr, sizeof(struct sockaddr)); 
    if (nRet==-1) 
    { 
     printf("bind %d port error\n",iPort); 
     return -1; 
    } 

    //3.listen 
    nRet = listen(g_listensock, 10); 
    if (nRet==-1) 
    { 
     printf("listen in port %d error!\n",iPort); 
     return -1; 
    } 

    struct sockaddr_in from; 
#if AIX || Linux 
     socklen_t fromlen=sizeof(from); 
#else 
     int fromlen=sizeof(from); 
#endif 


    li.l_onoff = 1; 
    li.l_linger = 3; 
    while(!g_bIsExit) 
    { 
     fromclientfd = accept(g_listensock,(struct sockaddr *)&from,&fromlen); 
     if (fromclientfd==-1) 
     { 
      close(g_listensock); 
      printf("server is stoped.\n"); 
      break;; 
     } 

     setsockopt(fromclientfd,SOL_SOCKET, SO_LINGER, (char *) &li, sizeof(li)); 
     THREADDATA * pThreadData=(THREADDATA *)malloc(sizeof(THREADDATA)); 
     pThreadData->fd=fromclientfd; 
     pthread_attr_init(&pThreadData->threadattr); 
     pthread_create (&pThreadData->threadid, &pThreadData->threadattr,ThreadProc,(void *)pThreadData); 
    } 
} 

void * ThreadProc(void * pPara) 
{ 
    THREADDATA * pThreadPara; 
    pThreadPara=(THREADDATA *)pPara; 
    int nRet; 
    char * buf; 
    double starttime,endtime; 
    double totalbytes; 

    buf=malloc(g_packetsize); 
    if(buf==NULL) 
    { 
     close(pThreadPara->fd); 
     printf("Canot not allocate memory!\n"); 
     return 0; 
    } 

    nRet=1; 
    starttime=time_so_far(); 
    totalbytes=0; 
    while (nRet>0 && !g_bIsExit) 
    { 
     nRet=recv(pThreadPara->fd ,buf,g_packetsize,0); 
     if (nRet<=0) 
     { 
      break; 
     } 
     totalbytes+=nRet; 
    } 
    endtime=time_so_far(); 
    free(buf); 
    free(pThreadPara); 
    printf("speed is %10.2fM/s\n",totalbytes/1024/1024/(endtime - starttime)); 
    return 0; 
} 

static double time_so_far() 
{ 
#if defined(SysV) 
    int  val; 
    struct tms tms; 

    if ((val = times(&tms)) == -1) 
    { 
     printf("Call times() error\n"); 
    } 
    return ((double) val)/((double) sysconf(_SC_CLK_TCK)); 

#else 

    struct timeval tp; 

    if (gettimeofday(&tp, (struct timezone *) NULL) == -1) 
    { 
     printf("Call gettyimeofday error\n"); 
    } 
    return ((double) (tp.tv_sec)) + 
      (((double) tp.tv_usec)/1000000.0); 
#endif 
} 


int SetSignal() 
{ 
    signal(SIGHUP, signal_func); 
    signal(SIGQUIT, signal_func); 
    signal(SIGBUS, SIG_DFL); 

    signal(SIGURG,signal_func); 

    signal(SIGPIPE,SIG_IGN); 

    signal(SIGABRT,SIG_IGN); 

    signal(SIGTRAP,SIG_IGN); 

    signal(SIGILL,signal_func); 
    //signal(SIGSEGV,signal_func); 

    //signal(SIGCHLD,SIG_IGN) 

    signal(SIGTERM,signal_func); 
    signal(SIGINT, signal_func); //Ctrl+C 

    return 0; 
} 


void signal_func(int no) 
{ 
    switch (no) 
    { 
    case 1: 
     printf("Receive signal SIGHUP.\n"); 
     break; 
    case SIGINT: 
     close(g_listensock); 
     //printf("Receive Ctrl+C or signal SIGINT, server is stoping....\n"); 
     g_bIsExit=1; 
     break; 
    case SIGTERM: 
     close(g_listensock); 
     printf("Receive kill signal,server is stoping...\n"); 
     g_bIsExit=1; 
     break; 
    case SIGQUIT: 
     printf("Receive SIGQUIT signal.\n"); 
     break; 

    case SIGABRT: 
     close(g_listensock); 
     printf("Receive SIGABRT signal.\n"); 
     break; 

    case SIGILL: 
     printf("Receive SIGILL signal.\n"); 
     break; 

    case SIGSEGV: 
     close(g_listensock); 
     printf("Receive SIGSEGV signal.\n"); 
     g_bIsExit=1; 
     break; 

    case SIGPIPE: 
     printf("Receive SIGPIPE signal.\n"); 
     break; 

    default: 
     printf("Receive %d sigial!\n",no); 
     break; 
    break; 
    } 
} 
+0

谢谢。我会研究它。 –

相关问题