2011-10-28 31 views
0

我正在开发一个学校项目,我们必须制作一个多线程的Web服务器。我有一个问题,当我在我的信号上调用sem_wait(它应该初始化为0,但已经似乎是sem_post()编辑为1)。我得到一个SIGABRT。POSIX sem_wait()SIGABRT

我在下面附上我的代码,并对导致我的问题的行发表评论。我花了几个小时与调试器运气不大。

#include <iostream> 
#include <sys/types.h> 
#include <sys/socket.h> 
#include <sys/stat.h> 
#include <netinet/in.h> 
#include <netdb.h> 
#include <string> 
#include <string.h> 
#include <iostream> 
#include <fcntl.h> 
#include <errno.h> 
#include <pthread.h> 
#include <vector> 
#include <semaphore.h> 
#include <stdio.h> 
#include <cstdlib> 
#include <strings.h> 

#define PORTNUM 7000 
#define NUM_OF_THREADS 5 
#define oops(msg) { perror(msg); exit(1);} 
#define FCFS 0 
#define SJF 1; 

void bindAndListen(); 
void acceptConnection(int socket_file_descriptor); 
void* dispatchJobs(void*); 
void* replyToClient(void* pos); 

//holds ids of worker threads 
pthread_t threads[NUM_OF_THREADS]; 

//mutex variable for sleep_signal_cond 
pthread_mutex_t sleep_signal_mutex[NUM_OF_THREADS]; 
//holds the condition variables to signal when the thread should be unblocked 
pthread_cond_t sleep_signal_cond[NUM_OF_THREADS]; 

//mutex for accessing sleeping_thread_list 
pthread_mutex_t sleeping_threads_mutex = PTHREAD_MUTEX_INITIALIZER; 
//list of which threads are sleeping so they can be signaled and given a job 
std::vector<bool> *sleeping_threads_list = new std::vector<bool>(); 

//number of threads ready for jobs 
sem_t available_threads; 
sem_t waiting_jobs; 


//holds requests waiting to be given to one of the threads for execution 
//request implemented as int[3] with int[0]== socket_descriptor int[1]== file_size int[2]== file_descriptor of requested file 
//if file_size == 0 then HEAD request 
std::vector<std::vector<int> >* jobs = new std::vector<std::vector<int> >(); 

pthread_mutex_t jobs_mutex = PTHREAD_MUTEX_INITIALIZER; 


int main (int argc, char * const argv[]) { 
    //holds id for thread responsible for removing jobs from ready queue and assigning them to worker thread 
    pthread_t dispatcher_thread; 

    //initializes semaphores 
    if(sem_init(&available_threads, 0, NUM_OF_THREADS) != 0){ 
     oops("Error Initializing Semaphore"); 
    } 

    if(sem_init(&waiting_jobs, 0, 0) !=0){ 
     oops("Error Initializing Semaphore"); 
    } 

    //initializes condition variables and guarding mutexes 
    for(int i=0; i<NUM_OF_THREADS; i++){ 
     pthread_cond_init(&sleep_signal_cond[i], NULL); 
     pthread_mutex_init(&sleep_signal_mutex[i], NULL); 
    } 

    if(pthread_create(&dispatcher_thread, NULL, dispatchJobs, (void*)NULL) !=0){ 
     oops("Error Creating Distributer Thread"); 
    } 

    for (int i=0; i<NUM_OF_THREADS; i++) { 
     pthread_mutex_lock(&sleeping_threads_mutex); 
     printf("before"); 
     sleeping_threads_list->push_back(true); 
     printf("after"); 
     pthread_mutex_unlock(&sleeping_threads_mutex); 
    } 

    printf("here"); 
    for (int i=0; i<NUM_OF_THREADS; i++) { 
     //creates threads and stores ID in threads 
     if(pthread_create(&threads[i], NULL, replyToClient, (void*)i) !=0){ 
      oops("Error Creating Thread"); 
     } 
    } 

    /* 
    if(sem_init(&available_threads, 0, NUM_OF_THREADS) !=0){ 
     oops("Error Initializing Semaphore"); 
    } 

    if(sem_init(&waiting_jobs, 0, 0) !=0){     //this is the semaphore thats used in the sem_wait 
     oops("Error Initializing Semaphore"); 
    }*/ 

    bindAndListen(); 
} 


//binds to socket and listens for connections 
//being done by main thead 
void bindAndListen(){ 
    struct sockaddr_in saddr; 
    struct hostent *hp; 
    char hostname[256]; 
    int sock_id, sock_fd; 

    gethostname(hostname, 256); 
    hp = gethostbyname(hostname); 
    bzero(&saddr, sizeof(saddr)); 

    //errno = 0; 

    bcopy(hp->h_addr, &saddr.sin_addr, hp->h_length); 

    saddr.sin_family = AF_INET; 
    saddr.sin_port = htons(PORTNUM); 
    saddr.sin_addr.s_addr = INADDR_ANY; 

    sock_id = socket(AF_INET, SOCK_STREAM, 0); 

    if(sock_id == -1){ 
     oops("socket"); 
     printf("socket"); 
    } 

    if(bind(sock_id, (const sockaddr*)&saddr, sizeof(saddr)) ==0){ 

     if(listen(sock_id, 5) ==-1){ 
      oops("listen"); 
     } 

     //each time a new connection is accepted, get file info and push to ready queue 
     while(1){ 
      int addrlen = sizeof(saddr); 
      sock_fd = accept(sock_id, (sockaddr*)&saddr, (socklen_t*)&addrlen); 
      if (sock_fd > 0) { 
       acceptConnection(sock_fd); 
      }else { 
       oops("Error Accepting Connection"); 
      } 
     } 
    }else{ 
     oops("there was an error binding to socket"); 
    } 
}// end of bindAndListen() 


//accepts connection and gets file info of requested file 
//being done by main thread 
void acceptConnection(int sock_fd){ 
    printf("**Server: A new client connected!"); 

    //only using loop so on error we can break out on error 
    while(true){ 
     //used to hold input from client 
     char* inputBuff = new char[BUFSIZ]; 
     int slen = read(sock_fd, inputBuff, BUFSIZ); 

     //will sit on space between HEAD/GET and path 
     int pos1 = 0; 
     //will sit on space between path and HTTP version 
     int pos2 = 0; 

     //need duplicate ptr so we can manipulate one in the loop 
     char* buffPtr = inputBuff; 
     //parses client input breaks up query by spaces 
     for(int i=0; i<slen; i++){ 
      if(*buffPtr == ' '){ 
       if (pos1 == 0) { 
        pos1 = i; 
       }else { 
        pos2 = i; 
        break; 
       } 
      } 
      buffPtr++; 
     } 

     if((pos1 - pos2) >=0){ 
      std::string str = "Invalid Query"; 
      write(sock_fd, str.c_str(), strlen(str.c_str())); 
      break; 
     } 

     printf("slen length %d\n", slen); 

     std::string* method = new std::string(inputBuff, pos1); 

     printf("method length %lu\n",method->length()); 

     //increment the ptr for buff to the starting pos of the path 
     inputBuff += ++pos1; 

     printf("pos2 - pos1 %d\n", (pos2 - pos1)); 

     printf("pos1 = %d pos2 = %d\n", pos1, pos2); 

     std::string* path = new std::string(inputBuff, (pos2 - pos1)); 

     printf("path length %lu\n", path->length()); 

     printf("part1 %s\n", method->c_str()); 

     printf("part2 %s\n", path->c_str()); 

     //opens file requested by client 
     int fd = open(path->c_str(), O_RDONLY); 
     if(fd < 0){ 
      std::string* error = new std::string("Error Opening File"); 
      *error += *path + std::string(strerror(errno), strlen(strerror(errno))); 
      write(sock_fd, error->c_str(), strlen(error->c_str())); 
      break; 
     } 

     int file_size; 
     if(method->compare("GET") == 0){ 
      //gets file info and puts the resulting struct in file_info 
      struct stat file_info; 
      if(fstat(fd, &file_info) !=0){ 
       oops("Error getting file info"); 
      } 
      file_size = file_info.st_size; 
     }else if(method->compare("HEAD")){ 
      file_size = 0; 
     }else{ 
      write(sock_fd, "Invalid Query", strlen("Invalid Query")); 
      break; 
     } 

     //job to be pushed to ready queue 
     std::vector<int> job; 
     job.push_back(sock_fd); 
     job.push_back(file_size); 
     job.push_back(fd); 

     //check mutex guarding the ready queue 
     pthread_mutex_lock(&jobs_mutex); 
     //push job to back of ready queue 
     jobs->push_back(job); 
     //unlock mutex guarding the ready queue 
     pthread_mutex_unlock(&jobs_mutex); 

     //increment number of jobs in ready queue 
     sem_post(&waiting_jobs); 

    } //end of while(true) 
     // we only end up here if there was an error 
    fflush(stdout); 
    close(sock_fd); 
}// end of acceptConnection() 


//routine run by dispather thread 
void *dispatchJobs(void*){ 
    while(true){ 
     //wait for a thread to be available to execute a job 
     sem_wait(&available_threads); 
     //wait for a job to be waiting in the ready queue 
     sem_wait(&waiting_jobs);     //this is the line thats crashing 
     //aquire lock to check which threads are waiting 
     pthread_mutex_lock(&sleeping_threads_mutex); 
     //go through list of threads to see which is waiting 
     for(int i=0; i<sleeping_threads_list->size(); i++){ 
      if(sleeping_threads_list->at(i)){ 
       //unlocks lock for access to list of waiting threads 
       pthread_mutex_unlock(&sleeping_threads_mutex); 
       //allows us access to the list of condition variables to signal the thread to resume execution 
       pthread_mutex_lock(&sleep_signal_mutex[i]); 
       pthread_cond_signal(&sleep_signal_cond[i]); 
       pthread_mutex_unlock(&sleep_signal_mutex[i]); 
      } 
     } 

    }//end of while(true) 
}//end of dispatchJobs() 


//sends file or metadata to client 
//run by worker thread 
//pos is position of condition variable that it waits to be signaled in the sleep_signal_cond[] array 
void* replyToClient(void* pos){ 
    int position = (long)pos; 
    while(true){ 
     //waits for dispather thread to signal it 
     pthread_mutex_lock(&sleep_signal_mutex[position]); 
     pthread_cond_wait(&sleep_signal_cond[position], &sleep_signal_mutex[position]); 
     pthread_mutex_unlock(&sleep_signal_mutex[position]); 


     //lock mutex to get job to be executed 
     pthread_mutex_lock(&jobs_mutex); 
     std::vector<int> job = jobs->front(); 
     //removes job from front of vector 
     jobs->erase(jobs->begin()); 
     //releases mutex 
     pthread_mutex_unlock(&jobs_mutex); 

     //socket file descriptor, used for writing to socket 
     int sock_fd =job[0]; 
     int file_size = job[1]; 
     //file descriptor for requested job 
     int fd = job[2]; 

     //holds output to be written to socket 
     char* outputBuffer = new char[BUFSIZ]; 

     //GET request, send file 
     if(file_size !=0){ 
      int readResult = 0; 
      while ((readResult = read(fd, outputBuffer, BUFSIZ)) > 0) { 
       if(write(sock_fd, outputBuffer, readResult) != readResult){ 
        printf("We may have a write error"); 
       } 
      } 
      if(readResult < 0){ 
       oops("Error Reading File"); 
      } 
      if(readResult == 0){ 
       printf("finished sending file"); 
      } 
     }else{ // HEAD request 

     } 
     //increment number of available threads 
     sem_post(&available_threads); 
    } 
}// end of replyToClient() 
+4

是否有可能缩短了你的榜样? (http://sscce.org/) –

回答

1

我还没有使用POSIX信号量,但我相信这就是发生了什么。我只熟悉Linux内核信号量,并且不提你的系统。 init函数的第三个参数可能会设置count变量。您将其设置为0(=忙,但没有其他进程在等待)。 wait函数可能会调用down(),该函数的开始是将count变量减1:为-1,这意味着您要使用的信号量现在被锁定。我相信你的程序中没有任何东西可以解锁它(从浏览你的代码 - 这很长),所以你有麻烦了。尝试在init中将其设置为1。这可能是所有需要的。

+0

好吧,你解开它,但在代码现在没有达到的领域。 – gnometorule

2

再次检查代码的整个逻辑 - 这是可能到达这里:

pthread_mutex_lock(&jobs_mutex); 
std::vector<int> job = jobs->front(); 
//removes job from front of vector 
jobs->erase(jobs->begin()); 
//releases mutex 
pthread_mutex_unlock(&jobs_mutex); 

jobs->size() == 0,在这种情况下front()erase()调用未定义的行为,这很可能导致你观察到的影响。

程序检查是否还有以下变化后崩溃:

//lock mutex to get job to be executed 
pthread_mutex_lock(&jobs_mutex); 
if (jobs->size() == 0) 
    { 
    pthread_mutex_unlock (&jobs_mutex); 
    continue; 
    } 
std::vector<int> job = jobs->front(); 
//removes job from front of vector 
jobs->erase(jobs->begin()); 
//releases mutex 
pthread_mutex_unlock(&jobs_mutex);