2015-09-05 38 views
1

我必须使用fork(2)来制作与用户输入的孩子数量一样多的孩子。使用多个进程读取文件并通过管道发送号码()

然后,我需要他们分解工作读取坐标点的txt文件比较他们之间的距离输入的距离。

然后他们添加他们在给定距离内的点数。每个孩子必须将他们的计数写入管道,父母必须读取每个计数并将其添加到总计中,然后将其打印出来。这里是我的代码:

int main(int argc, char *argv[]) { 
    int distance = atoi(argv[1]); 
    if (argc != 3 || sscanf(argv[ 1 ], "%d", &distance) != 1) 
     fail("usage: pairs <distance>"); 
    readPoints(); 
    int workers = atoi(argv[2]); 

    // Compute the square of the distance bound, since that's what we'll 
    // need to compare against. 
    int dsq = distance * distance; 
    // Count up the number of nearby pairs of points. 
    int total = 0; 

    int fd[2]; // pipe 
    if (pipe(fd) != 0){ 
     fail("Can't create pipe"); 
    } 
    int pid; // child 
    int chNum; // child's number 
    int c; 
    for(chNum = 0; chNum < workers; chNum++){ 
     c = 0; 
     pid = fork(); 
     if (pid == -1){ //failure 
      fail("Can't create child process"); 
     } 
     if(pid ==0){ // it's a child 
      for (int i =chNum; i < ptCount; i+=workers) 
       for (int j = i + 1; j < ptCount; j++) { 
        // Check the squared distance. 
        int dx = ptList[ i ].x - ptList[ j ].x; 
        int dy = ptList[ i ].y - ptList[ j ].y; 
        if (dx * dx + dy * dy <= dsq) 
         c++; 
       } 
      close(fd[READ]); 
      lockf(fd[WRITE], F_LOCK,0); 
      write(fd[WRITE], &c, sizeof(c)); 
      lockf(fd[WRITE], F_ULOCK,0); 
      close(fd[WRITE]); 
      exit(0); 
     } 
     else if(pid>0){ // this is parent 
      int d; 
      close(fd[WRITE]); 
      read(fd[READ], &d, sizeof(d)); 
      close(fd[READ]); 
      total = total + d; 
     } 
    } 
    if(pid>0){ 
     wait(NULL); 
     printf("Total: %d\n", total); 
    } 
    return 0; 
} 

我用一个for循环与fork(2)使孩子们,然后我让他们计算的计数,并将其发送给管由家长阅读。父母读入d并将其添加到total。我想知道是否正确使用管道将每个孩子的计数发送给父母,并且/或者如果我正确分娩,因此它只来自一位家长。当我使用一个以上的孩子时,我得到的总数错误。

如果我使用1个孩子,总的结果是166428,这是正确的,但是当我使用4例如,它给了我164908。有人可以帮助我吗?

+0

请在这里张贴当 – dietbacon

回答

2

你没有正确地进行管道处理。

首先,您不需要锁定/解锁以写入和读取管道:小于PIPE_BUF字节的写入保证为原子。 POSIX.1-2001要求PIPE_BUF至少512字节;由于您一次只写入sizeof(int)字节,因此您很安全(除非sizeof(int)大于或等于512,这是无稽之谈)。见man limits.h,下路径名变量值

{PIPE_BUF}

写入管时被保证是原子 的最大字节数。最小可接受值:{_POSIX_PIPE_BUF}

它本身简化了代码并减少了不必要的锁定/解锁开销。

但真正的问题是在这里:

else if (pid > 0) { // this is parent 
    int d; 
    close(fd[WRITE]); 
    read(fd[READ], &d, sizeof(d)); 
    close(fd[READ]); 
    total = total + d; 
} 

不能关闭fd[WRITE]内循环:考虑下一次迭代会发生什么,当你用叉子叉下一道工序。下一个循环中的子进程将尝试写入已关闭的文件描述符,因此发生错误(并且write(2)失败,EBADF失败,但不会检查返回值write(2),以便代码愉快地忽略该错误)。另外,您尝试一次又一次关闭fd[WRITE],因此close(2)也会返回一个错误(您再次忽略)。

同样对于read(2):如果关闭fd[READ],则不能在下一次迭代中读取管道外的结果; read(2)也会返回一个错误,并且也会返回close(2)

(所以上课是:不要忽略错误。如果你的错误处理是正确的,那么你会发现错误的线索)

你不需要关闭。子进程向管道写入完整的workers整数;父进程读取管道正好workers整数,所以这是不够的:

for (chNum = 0; chNum < workers; chNum++) { 

    c = 0; 
    pid = fork(); 

    if (pid == -1) 
     fail("Can't create child process"); 

    if (pid == 0) { // it's a child 

     for (int i = chNum; i < ptCount; i += workers) { 
      for (int j = i + 1; j < ptCount; j++) { 
       // Check the squared distance. 
       int dx = ptList[i].x - ptList[j].x; 
       int dy = ptList[i].y - ptList[j].y; 
       if (dx*dx + dy*dy <= dsq) { 
        c++; 
       } 
      } 
     } 

     ssize_t written = write(fd[WRITE], &c, sizeof(c)); 
     if (written == -1) 
      perror("write error"); 
     if (written != sizeof(c)) 
      fail("Write failed on pipe"); 

     exit(0); 
    } 
    else { 
     int d; 
     if (read(fd[READ], &d, sizeof(d)) != sizeof(d)) 
      fail("Read error on pipe"); 
     total += d; 
    } 

} 

关键的一点是要明白,你需要保持fd[READ]fd[WRITE]只要你打算到餐桌,将使用新的进程打开管道。

现在,这可以解决问题,但是会产生虚假的并行性:如果没有数据可用,管道中的读取将默认阻塞。这意味着在每次迭代中,父对象在相应的子写入管道之前都不会进展。所以你并没有真正将任何东西并行化;效果与具有父叉的效果相同,等待子终止,读取结果并将其添加到总数,然后分叉下一个子节点(并重复循环)。

如果您想要真正的并行性,您必须分叉每个进程,然后才能从管道读取数据。例如:

for (chNum = 0; chNum < workers; chNum++) { 

    c = 0; 
    pid = fork(); 

    if (pid == -1) 
     fail("Can't create child process"); 

    if (pid == 0) { // it's a child 

     for (int i = chNum; i < ptCount; i += workers) { 
      for (int j = i + 1; j < ptCount; j++) { 
       // Check the squared distance. 
       int dx = ptList[i].x - ptList[j].x; 
       int dy = ptList[i].y - ptList[j].y; 
       if (dx*dx + dy*dy <= dsq) { 
        c++; 
       } 
      } 
     } 

     ssize_t written = write(fd[WRITE], &c, sizeof(c)); 
     if (written == -1) 
      perror("write error"); 
     if (written != sizeof(c)) 
      fail("Write failed on pipe"); 

     exit(0); 
    } 
} 



if (close(fd[WRITE]) < 0) 
    fail("Error closing pipe's write channel"); 

int d; 
ssize_t r; 
while ((r = read(fd[READ], &d, sizeof(d))) > 0) { 
    if (r != sizeof(d)) 
     fail("read error"); 
    total += d; 
} 

请注意,在开始读取之前,我们必须显式关闭管道的写通道;这是为了避免当没有更多的子进程正在主动写入管道时让父进程挂起。请记住,只要管道的写通道至少有一个进程打开,就会阻塞读取。如果父进程保持写通道打开,read(2)将永远不会返回,因为有可能父本身可以写入管道(即使我们知道它不会)。所以我们必须关闭fd[WRITE]

另外,因为我们知道,恰好有workers数从管道读,我们可能只是这样做的循环之后,而不是关闭写入通道的:

int d; 
int i; 
for (i = 0; i < workers; i++) { 
    if (read(fd[READ], &d, sizeof(d)) != sizeof(d)) 
     fail("Failed to read from pipe"); 
    total += d; 
} 

一对夫妇的其他(无关)备注:

  • 给出错误参数时的错误消息与代码不一致。代码显示distanceargv[1]workersargv[2],但传递到fail()的错误消息似乎说distanceargv[2]
  • argv[1]被作为整数解析两次:atoi(3)sscanf(3)。我会坚持sscanf(3),因为您可以检查返回值以确保解析成功。
  • workers未经验证,转换为atoi(3)。错误被忽略。我建议使用sscanf(3)解析它,就像你使用distance一样,并确保它成功。
  • 存储pid的正确类型是pid_t,而不是int。请使用正确的类型(除了unistd.h之外,您可能还需要包含sys/types.h)。

下面是最终版本,所有这一切都整理出来:

int main(int argc, char *argv[]) { 
    int distance; 
    int workers; 

    if (argc != 3 || sscanf(argv[1], "%d", &distance) != 1 || sscanf(argv[2], "%d", &workers) != 1) 
     fail("usage: <distance> <workers>"); 

    readPoints(); 

    // Compute the square of the distance bound, since that's what we'll 
    // need to compare against. 
    int dsq = distance * distance; 
    // Count up the number of nearby pairs of points. 
    int total = 0; 

    int fd[2]; // pipe 
    if (pipe(fd) != 0) 
     fail("Can't create pipe"); 

    pid_t pid; 
    int chNum; // child's number 
    int c; 

    for (chNum = 0; chNum < workers; chNum++) { 

     c = 0; 
     pid = fork(); 

     if (pid == -1) 
      fail("Can't create child process"); 

     if (pid == 0) { // it's a child 

      for (int i = chNum; i < ptCount; i += workers) { 
       for (int j = i + 1; j < ptCount; j++) { 
        // Check the squared distance. 
        int dx = ptList[i].x - ptList[j].x; 
        int dy = ptList[i].y - ptList[j].y; 
        if (dx*dx + dy*dy <= dsq) { 
         c++; 
        } 
       } 
      } 

      ssize_t written = write(fd[WRITE], &c, sizeof(c)); 
      if (written == -1) 
       perror("write error"); 
      if (written != sizeof(c)) 
       fail("Write failed on pipe"); 

      exit(0); 
     } 
    } 

    if (close(fd[WRITE]) < 0) 
     fail("Error closing pipe's write channel"); 

    int d; 
    ssize_t r; 
    while ((r = read(fd[READ], &d, sizeof(d))) > 0) { 
     if (r != sizeof(d)) 
      fail("read error"); 
     total += d; 
    } 


    printf("Total: %d\n", total); 

    return 0; 
} 
+0

非常感谢您正确缩进代码。你的解释确实帮助我更好地理解了一切。我刚开始学习这个,所以非常有帮助。 – mukhtar23

相关问题