你没有正确地进行管道处理。
首先,您不需要锁定/解锁以写入和读取管道:小于PIPE_BUF
字节的写入保证为原子。 POSIX.1-2001要求PIPE_BUF
至少512字节;由于您一次只写入sizeof(int)
字节,因此您很安全(除非sizeof(int)
大于或等于512,这是无稽之谈)。见man limits.h
,下路径名变量值:
{PIPE_BUF}
写入管时被保证是原子 的最大字节数。最小可接受值:{_POSIX_PIPE_BUF}
它本身简化了代码并减少了不必要的锁定/解锁开销。
但真正的问题是在这里:
else if (pid > 0) { // this is parent
int d;
close(fd[WRITE]);
read(fd[READ], &d, sizeof(d));
close(fd[READ]);
total = total + d;
}
不能关闭fd[WRITE]
内循环:考虑下一次迭代会发生什么,当你用叉子叉下一道工序。下一个循环中的子进程将尝试写入已关闭的文件描述符,因此发生错误(并且write(2)
失败,EBADF
失败,但不会检查返回值write(2)
,以便代码愉快地忽略该错误)。另外,您尝试一次又一次关闭fd[WRITE]
,因此close(2)
也会返回一个错误(您再次忽略)。
同样对于read(2)
:如果关闭fd[READ]
,则不能在下一次迭代中读取管道外的结果; read(2)
也会返回一个错误,并且也会返回close(2)
。
(所以上课是:不要忽略错误。如果你的错误处理是正确的,那么你会发现错误的线索)
你不需要关闭。子进程向管道写入完整的workers
整数;父进程读取管道正好workers
整数,所以这是不够的:
for (chNum = 0; chNum < workers; chNum++) {
c = 0;
pid = fork();
if (pid == -1)
fail("Can't create child process");
if (pid == 0) { // it's a child
for (int i = chNum; i < ptCount; i += workers) {
for (int j = i + 1; j < ptCount; j++) {
// Check the squared distance.
int dx = ptList[i].x - ptList[j].x;
int dy = ptList[i].y - ptList[j].y;
if (dx*dx + dy*dy <= dsq) {
c++;
}
}
}
ssize_t written = write(fd[WRITE], &c, sizeof(c));
if (written == -1)
perror("write error");
if (written != sizeof(c))
fail("Write failed on pipe");
exit(0);
}
else {
int d;
if (read(fd[READ], &d, sizeof(d)) != sizeof(d))
fail("Read error on pipe");
total += d;
}
}
关键的一点是要明白,你需要保持fd[READ]
和fd[WRITE]
只要你打算到餐桌,将使用新的进程打开管道。
现在,这可以解决问题,但是会产生虚假的并行性:如果没有数据可用,管道中的读取将默认阻塞。这意味着在每次迭代中,父对象在相应的子写入管道之前都不会进展。所以你并没有真正将任何东西并行化;效果与具有父叉的效果相同,等待子终止,读取结果并将其添加到总数,然后分叉下一个子节点(并重复循环)。
如果您想要真正的并行性,您必须分叉每个进程,然后才能从管道读取数据。例如:
for (chNum = 0; chNum < workers; chNum++) {
c = 0;
pid = fork();
if (pid == -1)
fail("Can't create child process");
if (pid == 0) { // it's a child
for (int i = chNum; i < ptCount; i += workers) {
for (int j = i + 1; j < ptCount; j++) {
// Check the squared distance.
int dx = ptList[i].x - ptList[j].x;
int dy = ptList[i].y - ptList[j].y;
if (dx*dx + dy*dy <= dsq) {
c++;
}
}
}
ssize_t written = write(fd[WRITE], &c, sizeof(c));
if (written == -1)
perror("write error");
if (written != sizeof(c))
fail("Write failed on pipe");
exit(0);
}
}
if (close(fd[WRITE]) < 0)
fail("Error closing pipe's write channel");
int d;
ssize_t r;
while ((r = read(fd[READ], &d, sizeof(d))) > 0) {
if (r != sizeof(d))
fail("read error");
total += d;
}
请注意,在开始读取之前,我们必须显式关闭管道的写通道;这是为了避免当没有更多的子进程正在主动写入管道时让父进程挂起。请记住,只要管道的写通道至少有一个进程打开,就会阻塞读取。如果父进程保持写通道打开,read(2)
将永远不会返回,因为有可能父本身可以写入管道(即使我们知道它不会)。所以我们必须关闭fd[WRITE]
。
另外,因为我们知道,恰好有workers
数从管道读,我们可能只是这样做的循环之后,而不是关闭写入通道的:
int d;
int i;
for (i = 0; i < workers; i++) {
if (read(fd[READ], &d, sizeof(d)) != sizeof(d))
fail("Failed to read from pipe");
total += d;
}
一对夫妇的其他(无关)备注:
- 给出错误参数时的错误消息与代码不一致。代码显示
distance
在argv[1]
和workers
在argv[2]
,但传递到fail()
的错误消息似乎说distance
在argv[2]
。
argv[1]
被作为整数解析两次:atoi(3)
和sscanf(3)
。我会坚持sscanf(3)
,因为您可以检查返回值以确保解析成功。
workers
未经验证,转换为atoi(3)
。错误被忽略。我建议使用sscanf(3)
解析它,就像你使用distance
一样,并确保它成功。
- 存储
pid
的正确类型是pid_t
,而不是int
。请使用正确的类型(除了unistd.h
之外,您可能还需要包含sys/types.h
)。
下面是最终版本,所有这一切都整理出来:
int main(int argc, char *argv[]) {
int distance;
int workers;
if (argc != 3 || sscanf(argv[1], "%d", &distance) != 1 || sscanf(argv[2], "%d", &workers) != 1)
fail("usage: <distance> <workers>");
readPoints();
// Compute the square of the distance bound, since that's what we'll
// need to compare against.
int dsq = distance * distance;
// Count up the number of nearby pairs of points.
int total = 0;
int fd[2]; // pipe
if (pipe(fd) != 0)
fail("Can't create pipe");
pid_t pid;
int chNum; // child's number
int c;
for (chNum = 0; chNum < workers; chNum++) {
c = 0;
pid = fork();
if (pid == -1)
fail("Can't create child process");
if (pid == 0) { // it's a child
for (int i = chNum; i < ptCount; i += workers) {
for (int j = i + 1; j < ptCount; j++) {
// Check the squared distance.
int dx = ptList[i].x - ptList[j].x;
int dy = ptList[i].y - ptList[j].y;
if (dx*dx + dy*dy <= dsq) {
c++;
}
}
}
ssize_t written = write(fd[WRITE], &c, sizeof(c));
if (written == -1)
perror("write error");
if (written != sizeof(c))
fail("Write failed on pipe");
exit(0);
}
}
if (close(fd[WRITE]) < 0)
fail("Error closing pipe's write channel");
int d;
ssize_t r;
while ((r = read(fd[READ], &d, sizeof(d))) > 0) {
if (r != sizeof(d))
fail("read error");
total += d;
}
printf("Total: %d\n", total);
return 0;
}
请在这里张贴当 – dietbacon