标签 IPC 下的文章

这本免费的电子书使经验丰富的程序员更深入了解 Linux 中进程间通信(IPC)的核心概念和机制。

让一个软件过程与另一个软件过程进行对话是一个微妙的平衡行为。但是,它对于应用程序而言可能是至关重要的功能,因此这是任何从事复杂项目的程序员都必须解决的问题。你的应用程序是否需要启动由其它软件处理的工作;监视外设或网络上正在执行的操作;或者检测来自其它来源的信号,当你的软件需要依赖其自身代码之外的东西来知道下一步做什么或什么时候做时,你就需要考虑 进程间通信 inter-process communication (IPC)。

这在 Unix 操作系统上已经由来已久了,这可能是因为人们早期预期软件会来自各种来源。按照相同的传统,Linux 提供了一些同样的 IPC 接口和一些新接口。Linux 内核具有多种 IPC 方法,util-linux 包包含了 ipcmkipcrmipcslsipc 命令,用于监视和管理 IPC 消息。

显示进程间通信信息

在尝试 IPC 之前,你应该知道系统上已经有哪些 IPC 设施。lsipc 命令提供了该信息。

RESOURCE DESCRIPTION               LIMIT  USED  USE%
MSGMNI   Number of message queues  32000     0 0.00%
MSGMAX   Max size of message (byt.. 8192     -     -
MSGMNB   Default max size of queue 16384     -     -
SHMMNI   Shared memory segments     4096    79 1.93%
SHMALL   Shared memory pages       184[...] 25452 0.00%
SHMMAX   Max size of shared memory 18446744073692774399
SHMMIN   Min size of shared memory     1     -     -
SEMMNI   Number of semaphore ident 32000     0 0.00%
SEMMNS   Total number of semaphore 1024000.. 0 0.00%
SEMMSL   Max semaphores per semap  32000     -     -
SEMOPM   Max number of operations p  500     -     -
SEMVMX   Semaphore max value       32767     -     -

你可能注意到,这个示例清单包含三种不同类型的 IPC 机制,每种机制在 Linux 内核中都是可用的:消息(MSG)、共享内存(SHM)和信号量(SEM)。你可以用 ipcs 命令查看每个子系统的当前活动:

$ ipcs

------ Message Queues Creators/Owners ---
msqid     perms     cuid      cgid  [...]

------ Shared Memory Segment Creators/Owners
shmid     perms    cuid    cgid  [...]
557056    700      seth    users [...]
3571713   700      seth    users [...]
2654210   600      seth    users [...]
2457603   700      seth    users [...]

------ Semaphore Arrays Creators/Owners ---
semid     perms     cuid      cgid  [...]

这表明当前没有消息或信号量阵列,但是使用了一些共享内存段。

你可以在系统上执行一个简单的示例,这样就可以看到正在工作的系统之一。它涉及到一些 C 代码,所以你必须在系统上有构建工具。必须安装这些软件包才能从源代码构建软件,这些软件包的名称取决于发行版,因此请参考文档以获取详细信息。例如,在基于 Debian 的发行版上,你可以在 wiki 的构建教程部分了解构建需求,而在基于 Fedora 的发行版上,你可以参考该文档的从源代码安装软件部分。

创建一个消息队列

你的系统已经有一个默认的消息队列,但是你可以使用 ipcmk 命令创建你自己的消息队列:

$ ipcmk --queue
Message queue id: 32764

编写一个简单的 IPC 消息发送器,为了简单,在队列 ID 中硬编码:

#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>
#include <string.h>

struct msgbuffer {
  char text[24];
} message;

int main() {
    int msqid = 32764;
    strcpy(message.text,"opensource.com");
    msgsnd(msqid, &message, sizeof(message), 0);
    printf("Message: %s\n",message.text);
    printf("Queue: %d\n",msqid);
    return 0;
        }

编译该应用程序并运行:

$ gcc msgsend.c -o msg.bin
$ ./msg.bin
Message: opensource.com
Queue: 32769

你刚刚向你的消息队列发送了一条消息。你可以使用 ipcs 命令验证这一点,可以使用 ——queue 选项将输出限制到该消息队列:

$ ipcs -q

------ Message Queues --------
key        msqid   owner  perms  used-bytes  messages
0x7b341ab9 0       seth   666    0          0
0x72bd8410 32764   seth   644    24         1

你也可以检索这些消息:

#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdio.h>

struct msgbuffer {
    char text[24];
} message;

int main() {
    int msqid = 32764;
    msgrcv(msqid, &message, sizeof(message),0,0);
    printf("\nQueue: %d\n",msqid);
    printf("Got this message: %s\n", message.text);
    msgctl(msqid,IPC_RMID,NULL);
    return 0;

编译并运行:

$ gcc get.c -o get.bin
$ ./get.bin

Queue: 32764
Got this message: opensource.com

下载这本电子书

这只是 Marty Kalin 的《Linux 进程间通信指南》中课程的一个例子,可从 Opensource.com 下载的这本最新免费(且 CC 授权)的电子书。在短短的几节课中,你将从消息队列、共享内存和信号量、套接字、信号等中了解 IPC 的 POSIX 方法。认真阅读 Marty 的书,你将成为一个博识的程序员。而这不仅适用于经验丰富的编码人员,如果你编写的只是 shell 脚本,那么你将拥有有关管道(命名和未命名)和共享文件的大量实践知识,以及使用共享文件或外部消息队列时需要了解的重要概念。

如果你对制作具有动态和具有系统感知的优秀软件感兴趣,那么你需要了解 IPC。让这本书做你的向导。


via: https://opensource.com/article/20/1/inter-process-communication-linux

作者:Seth Kenlon 选题:lujun9972 译者:laingke 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

学习在 Linux 中进程是如何与其他进程进行同步的。

本篇是 Linux 下进程间通信(IPC)系列的第三篇同时也是最后一篇文章。第一篇文章聚焦在通过共享存储(文件和共享内存段)来进行 IPC,第二篇文章则通过管道(无名的或者命名的)及消息队列来达到相同的目的。这篇文章将目光从高处(套接字)然后到低处(信号)来关注 IPC。代码示例将用力地充实下面的解释细节。

套接字

正如管道有两种类型(命名和无名)一样,套接字也有两种类型。IPC 套接字(即 Unix 套接字)给予进程在相同设备(主机)上基于通道的通信能力;而网络套接字给予进程运行在不同主机的能力,因此也带来了网络通信的能力。网络套接字需要底层协议的支持,例如 TCP(传输控制协议)或 UDP(用户数据报协议)。

与之相反,IPC 套接字依赖于本地系统内核的支持来进行通信;特别的,IPC 通信使用一个本地的文件作为套接字地址。尽管这两种套接字的实现有所不同,但在本质上,IPC 套接字和网络套接字的 API 是一致的。接下来的例子将包含网络套接字的内容,但示例服务器和客户端程序可以在相同的机器上运行,因为服务器使用了 localhost(127.0.0.1)这个网络地址,该地址表示的是本地机器上的本地机器地址。

套接字以流的形式(下面将会讨论到)被配置为双向的,并且其控制遵循 C/S(客户端/服务器端)模式:客户端通过尝试连接一个服务器来初始化对话,而服务器端将尝试接受该连接。假如万事顺利,来自客户端的请求和来自服务器端的响应将通过管道进行传输,直到其中任意一方关闭该通道,从而断开这个连接。

一个迭代服务器(只适用于开发)将一直和连接它的客户端打交道:从最开始服务第一个客户端,然后到这个连接关闭,然后服务第二个客户端,循环往复。这种方式的一个缺点是处理一个特定的客户端可能会挂起,使得其他的客户端一直在后面等待。生产级别的服务器将是并发的,通常使用了多进程或者多线程的混合。例如,我台式机上的 Nginx 网络服务器有一个 4 个 工人 worker 的进程池,它们可以并发地处理客户端的请求。在下面的代码示例中,我们将使用迭代服务器,使得我们将要处理的问题保持在一个很小的规模,只关注基本的 API,而不去关心并发的问题。

最后,随着各种 POSIX 改进的出现,套接字 API 随着时间的推移而发生了显著的变化。当前针对服务器端和客户端的示例代码特意写的比较简单,但是它着重强调了基于流的套接字中连接的双方。下面是关于流控制的一个总结,其中服务器端在一个终端中开启,而客户端在另一个不同的终端中开启:

  • 服务器端等待客户端的连接,对于给定的一个成功连接,它就读取来自客户端的数据。
  • 为了强调是双方的会话,服务器端会对接收自客户端的数据做回应。这些数据都是 ASCII 字符代码,它们组成了一些书的标题。
  • 客户端将书的标题写给服务器端的进程,并从服务器端的回应中读取到相同的标题。然后客户端和服务器端都在屏幕上打印出标题。下面是服务器端的输出,客户端的输出也和它完全一样:
Listening on port 9876 for clients...
War and Peace
Pride and Prejudice
The Sound and the Fury

示例 1. 使用套接字的客户端程序

#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include "sock.h"

void report(const char* msg, int terminate) {
  perror(msg);
  if (terminate) exit(-1); /* failure */
}

int main() {
  int fd = socket(AF_INET,     /* network versus AF_LOCAL */
          SOCK_STREAM, /* reliable, bidirectional: TCP */
          0);          /* system picks underlying protocol */
  if (fd < 0) report("socket", 1); /* terminate */
    
  /* bind the server's local address in memory */
  struct sockaddr_in saddr;
  memset(&saddr, 0, sizeof(saddr));          /* clear the bytes */
  saddr.sin_family = AF_INET;                /* versus AF_LOCAL */
  saddr.sin_addr.s_addr = htonl(INADDR_ANY); /* host-to-network endian */
  saddr.sin_port = htons(PortNumber);        /* for listening */
  
  if (bind(fd, (struct sockaddr *) &saddr, sizeof(saddr)) < 0)
    report("bind", 1); /* terminate */
    
  /* listen to the socket */
  if (listen(fd, MaxConnects) < 0) /* listen for clients, up to MaxConnects */
    report("listen", 1); /* terminate */

  fprintf(stderr, "Listening on port %i for clients...\n", PortNumber);
  /* a server traditionally listens indefinitely */
  while (1) {
    struct sockaddr_in caddr; /* client address */
    int len = sizeof(caddr);  /* address length could change */
    
    int client_fd = accept(fd, (struct sockaddr*) &caddr, &len);  /* accept blocks */
    if (client_fd < 0) {
      report("accept", 0); /* don't terminated, though there's a problem */
      continue;
    }

    /* read from client */
    int i;
    for (i = 0; i < ConversationLen; i++) {
      char buffer[BuffSize + 1];
      memset(buffer, '\0', sizeof(buffer)); 
      int count = read(client_fd, buffer, sizeof(buffer));
      if (count > 0) {
    puts(buffer);
    write(client_fd, buffer, sizeof(buffer)); /* echo as confirmation */
      }
    }
    close(client_fd); /* break connection */
  }  /* while(1) */
  return 0;
}

上面的服务器端程序执行典型的 4 个步骤来准备回应客户端的请求,然后接受其他的独立请求。这里每一个步骤都以服务器端程序调用的系统函数来命名。

  1. socket(…):为套接字连接获取一个文件描述符
  2. bind(…):将套接字和服务器主机上的一个地址进行绑定
  3. listen(…):监听客户端请求
  4. accept(…):接受一个特定的客户端请求

上面的 socket 调用的完整形式为:

int sockfd = socket(AF_INET,      /* versus AF_LOCAL */
                    SOCK_STREAM,  /* reliable, bidirectional */
                    0);           /* system picks protocol (TCP) */

第一个参数特别指定了使用的是一个网络套接字,而不是 IPC 套接字。对于第二个参数有多种选项,但 SOCK_STREAMSOCK_DGRAM(数据报)是最为常用的。基于流的套接字支持可信通道,在这种通道中如果发生了信息的丢失或者更改,都将会被报告。这种通道是双向的,并且从一端到另外一端的有效载荷在大小上可以是任意的。相反的,基于数据报的套接字大多是不可信的,没有方向性,并且需要固定大小的载荷。socket 的第三个参数特别指定了协议。对于这里展示的基于流的套接字,只有一种协议选择:TCP,在这里表示的 0。因为对 socket 的一次成功调用将返回相似的文件描述符,套接字可以被读写,对应的语法和读写一个本地文件是类似的。

bind 的调用是最为复杂的,因为它反映出了在套接字 API 方面上的各种改进。我们感兴趣的点是这个调用将一个套接字和服务器端所在机器中的一个内存地址进行绑定。但对 listen 的调用就非常直接了:

if (listen(fd, MaxConnects) < 0)

第一个参数是套接字的文件描述符,第二个参数则指定了在服务器端处理一个拒绝连接错误之前,有多少个客户端连接被允许连接。(在头文件 sock.hMaxConnects 的值被设置为 8。)

accept 调用默认将是一个阻塞等待:服务器端将不做任何事情直到一个客户端尝试连接它,然后进行处理。accept 函数返回的值如果是 -1 则暗示有错误发生。假如这个调用是成功的,则它将返回另一个文件描述符,这个文件描述符被用来指代另一个可读可写的套接字,它与 accept 调用中的第一个参数对应的接收套接字有所不同。服务器端使用这个可读可写的套接字来从客户端读取请求然后写回它的回应。接收套接字只被用于接受客户端的连接。

在设计上,服务器端可以一直运行下去。当然服务器端可以通过在命令行中使用 Ctrl+C 来终止它。

示例 2. 使用套接字的客户端

#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include "sock.h"

const char* books[] = {"War and Peace",
               "Pride and Prejudice",
               "The Sound and the Fury"};

void report(const char* msg, int terminate) {
  perror(msg);
  if (terminate) exit(-1); /* failure */
}

int main() {
  /* fd for the socket */
  int sockfd = socket(AF_INET,      /* versus AF_LOCAL */
              SOCK_STREAM,  /* reliable, bidirectional */
              0);           /* system picks protocol (TCP) */
  if (sockfd < 0) report("socket", 1); /* terminate */

  /* get the address of the host */
  struct hostent* hptr = gethostbyname(Host); /* localhost: 127.0.0.1 */ 
  if (!hptr) report("gethostbyname", 1); /* is hptr NULL? */
  if (hptr->h_addrtype != AF_INET)       /* versus AF_LOCAL */
    report("bad address family", 1);
  
  /* connect to the server: configure server's address 1st */
  struct sockaddr_in saddr;
  memset(&saddr, 0, sizeof(saddr));
  saddr.sin_family = AF_INET;
  saddr.sin_addr.s_addr = 
     ((struct in_addr*) hptr->h_addr_list[0])->s_addr;
  saddr.sin_port = htons(PortNumber); /* port number in big-endian */
  
  if (connect(sockfd, (struct sockaddr*) &saddr, sizeof(saddr)) < 0)
    report("connect", 1);
  
  /* Write some stuff and read the echoes. */
  puts("Connect to server, about to write some stuff...");
  int i;
  for (i = 0; i < ConversationLen; i++) {
    if (write(sockfd, books[i], strlen(books[i])) > 0) {
      /* get confirmation echoed from server and print */
      char buffer[BuffSize + 1];
      memset(buffer, '\0', sizeof(buffer));
      if (read(sockfd, buffer, sizeof(buffer)) > 0)
    puts(buffer);
    }
  }
  puts("Client done, about to exit...");
  close(sockfd); /* close the connection */
  return 0;
}

客户端程序的设置代码和服务器端类似。两者主要的区别既不是在于监听也不在于接收,而是连接:

if (connect(sockfd, (struct sockaddr*) &saddr, sizeof(saddr)) < 0)

connect 的调用可能因为多种原因而导致失败,例如客户端拥有错误的服务器端地址或者已经有太多的客户端连接上了服务器端。假如 connect 操作成功,客户端将在一个 for 循环中,写入它的请求然后读取返回的响应。在会话后,服务器端和客户端都将调用 close 去关闭这个可读可写套接字,尽管任何一边的关闭操作就足以关闭它们之间的连接。此后客户端可以退出了,但正如前面提到的那样,服务器端可以一直保持开放以处理其他事务。

从上面的套接字示例中,我们看到了请求信息被回显给客户端,这使得客户端和服务器端之间拥有进行丰富对话的可能性。也许这就是套接字的主要魅力。在现代系统中,客户端应用(例如一个数据库客户端)和服务器端通过套接字进行通信非常常见。正如先前提及的那样,本地 IPC 套接字和网络套接字只在某些实现细节上面有所不同,一般来说,IPC 套接字有着更低的消耗和更好的性能。它们的通信 API 基本是一样的。

信号

信号会中断一个正在执行的程序,在这种意义下,就是用信号与这个程序进行通信。大多数的信号要么可以被忽略(阻塞)或者被处理(通过特别设计的代码)。SIGSTOP (暂停)和 SIGKILL(立即停止)是最应该提及的两种信号。这种符号常量有整数类型的值,例如 SIGKILL 对应的值为 9

信号可以在与用户交互的情况下发生。例如,一个用户从命令行中敲了 Ctrl+C 来终止一个从命令行中启动的程序;Ctrl+C 将产生一个 SIGTERM 信号。SIGTERM 意即终止,它可以被阻塞或者被处理,而不像 SIGKILL 信号那样。一个进程也可以通过信号和另一个进程通信,这样使得信号也可以作为一种 IPC 机制。

考虑一下一个多进程应用,例如 Nginx 网络服务器是如何被另一个进程优雅地关闭的。kill 函数:

int kill(pid_t pid, int signum); /* declaration */

可以被一个进程用来终止另一个进程或者一组进程。假如 kill 函数的第一个参数是大于 0 的,那么这个参数将会被认为是目标进程的 pid(进程 ID),假如这个参数是 0,则这个参数将会被视作信号发送者所属的那组进程。

kill 的第二个参数要么是一个标准的信号数字(例如 SIGTERMSIGKILL),要么是 0 ,这将会对信号做一次询问,确认第一个参数中的 pid 是否是有效的。这样优雅地关闭一个多进程应用就可以通过向组成该应用的一组进程发送一个终止信号来完成,具体来说就是调用一个 kill 函数,使得这个调用的第二个参数是 SIGTERM 。(Nginx 主进程可以通过调用 kill 函数来终止其他工人进程,然后再停止自己。)就像许多库函数一样,kill 函数通过一个简单的可变语法拥有更多的能力和灵活性。

示例 3. 一个多进程系统的优雅停止

#include <stdio.h>
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>

void graceful(int signum) {
  printf("\tChild confirming received signal: %i\n", signum);
  puts("\tChild about to terminate gracefully...");
  sleep(1);
  puts("\tChild terminating now...");
  _exit(0); /* fast-track notification of parent */
}

void set_handler() {
  struct sigaction current;
  sigemptyset(&current.sa_mask);         /* clear the signal set */
  current.sa_flags = 0;                  /* enables setting sa_handler, not sa_action */
  current.sa_handler = graceful;         /* specify a handler */
  sigaction(SIGTERM, &current, NULL);    /* register the handler */
}

void child_code() {
  set_handler();

  while (1) {   /` loop until interrupted `/
    sleep(1);
    puts("\tChild just woke up, but going back to sleep.");
  }
}

void parent_code(pid_t cpid) {
  puts("Parent sleeping for a time...");
  sleep(5);

  /* Try to terminate child. */
  if (-1 == kill(cpid, SIGTERM)) {
    perror("kill");
    exit(-1);
  }
  wait(NULL); /` wait for child to terminate `/
  puts("My child terminated, about to exit myself...");
}

int main() {
  pid_t pid = fork();
  if (pid < 0) {
    perror("fork");
    return -1; /* error */
  }
  if (0 == pid)
    child_code();
  else
    parent_code(pid);
  return 0;  /* normal */
}

上面的停止程序模拟了一个多进程系统的优雅退出,在这个例子中,这个系统由一个父进程和一个子进程组成。这次模拟的工作流程如下:

  • 父进程尝试去 fork 一个子进程。假如这个 fork 操作成功了,每个进程就执行它自己的代码:子进程就执行函数 child_code,而父进程就执行函数 parent_code
  • 子进程将会进入一个潜在的无限循环,在这个循环中子进程将睡眠一秒,然后打印一个信息,接着再次进入睡眠状态,以此循环往复。来自父进程的一个 SIGTERM 信号将引起子进程去执行一个信号处理回调函数 graceful。这样这个信号就使得子进程可以跳出循环,然后进行子进程和父进程之间的优雅终止。在终止之前,进程将打印一个信息。
  • fork 一个子进程后,父进程将睡眠 5 秒,使得子进程可以执行一会儿;当然在这个模拟中,子进程大多数时间都在睡眠。然后父进程调用 SIGTERM 作为第二个参数的 kill 函数,等待子进程的终止,然后自己再终止。

下面是一次运行的输出:

% ./shutdown
Parent sleeping for a time...
        Child just woke up, but going back to sleep.
        Child just woke up, but going back to sleep.
        Child just woke up, but going back to sleep.
        Child just woke up, but going back to sleep.
        Child confirming received signal: 15  ## SIGTERM is 15
        Child about to terminate gracefully...
        Child terminating now...
My child terminated, about to exit myself...

对于信号的处理,上面的示例使用了 sigaction 库函数(POSIX 推荐的用法)而不是传统的 signal 函数,signal 函数有移植性问题。下面是我们主要关心的代码片段:

  • 假如对 fork 的调用成功了,父进程将执行 parent_code 函数,而子进程将执行 child_code 函数。在给子进程发送信号之前,父进程将会等待 5 秒:
puts("Parent sleeping for a time...");
sleep(5);
if (-1 == kill(cpid, SIGTERM)) {
...sleepkillcpidSIGTERM...

假如 kill 调用成功了,父进程将在子进程终止时做等待,使得子进程不会变成一个僵尸进程。在等待完成后,父进程再退出。

  • child_code 函数首先调用 set_handler 然后进入它的可能永久睡眠的循环。下面是我们将要查看的 set_handler 函数:
void set_handler() {
  struct sigaction current;            /* current setup */
  sigemptyset(&current.sa_mask);       /* clear the signal set */
  current.sa_flags = 0;                /* for setting sa_handler, not sa_action */
  current.sa_handler = graceful;       /* specify a handler */
  sigaction(SIGTERM, &current, NULL);  /* register the handler */
}

上面代码的前三行在做相关的准备。第四个语句将为 graceful 设定为句柄,它将在调用 _exit 来停止之前打印一些信息。第 5 行和最后一行的语句将通过调用 sigaction 来向系统注册上面的句柄。sigaction 的第一个参数是 SIGTERM ,用作终止;第二个参数是当前的 sigaction 设定,而最后的参数(在这个例子中是 NULL )可被用来保存前面的 sigaction 设定,以备后面的可能使用。

使用信号来作为 IPC 的确是一个很轻量的方法,但确实值得尝试。通过信号来做 IPC 显然可以被归入 IPC 工具箱中。

这个系列的总结

在这个系列中,我们通过三篇有关 IPC 的文章,用示例代码介绍了如下机制:

  • 共享文件
  • 共享内存(通过信号量)
  • 管道(命名和无名)
  • 消息队列
  • 套接字
  • 信号

甚至在今天,在以线程为中心的语言,例如 Java、C# 和 Go 等变得越来越流行的情况下,IPC 仍然很受欢迎,因为相比于使用多线程,通过多进程来实现并发有着一个明显的优势:默认情况下,每个进程都有它自己的地址空间,除非使用了基于共享内存的 IPC 机制(为了达到安全的并发,竞争条件在多线程和多进程的时候必须被加上锁),在多进程中可以排除掉基于内存的竞争条件。对于任何一个写过即使是基本的通过共享变量来通信的多线程程序的人来说,他都会知道想要写一个清晰、高效、线程安全的代码是多么具有挑战性。使用单线程的多进程的确是很有吸引力的,这是一个切实可行的方式,使用它可以利用好今天多处理器的机器,而不需要面临基于内存的竞争条件的风险。

当然,没有一个简单的答案能够回答上述 IPC 机制中的哪一个更好。在编程中每一种 IPC 机制都会涉及到一个取舍问题:是追求简洁,还是追求功能强大。以信号来举例,它是一个相对简单的 IPC 机制,但并不支持多个进程之间的丰富对话。假如确实需要这样的对话,另外的选择可能会更合适一些。带有锁的共享文件则相对直接,但是当要处理大量共享的数据流时,共享文件并不能很高效地工作。管道,甚至是套接字,有着更复杂的 API,可能是更好的选择。让具体的问题去指导我们的选择吧。

尽管所有的示例代码(可以在我的网站上获取到)都是使用 C 写的,其他的编程语言也经常提供这些 IPC 机制的轻量包装。这些代码示例都足够短小简单,希望这样能够鼓励你去进行实验。


via: https://opensource.com/article/19/4/interprocess-communication-linux-networking

作者:Marty Kalin 选题:lujun9972 译者:FSSlc 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

学习在 Linux 中进程是如何与其他进程进行同步的。

本篇是 Linux 下进程间通信(IPC)系列的第二篇文章。第一篇文章 聚焦于通过共享文件和共享内存段这样的共享存储来进行 IPC。这篇文件的重点将转向管道,它是连接需要通信的进程之间的通道。管道拥有一个写端用于写入字节数据,还有一个读端用于按照先入先出的顺序读入这些字节数据。而这些字节数据可能代表任何东西:数字、员工记录、数字电影等等。

管道有两种类型,命名管道和无名管道,都可以交互式的在命令行或程序中使用它们;相关的例子在下面展示。这篇文章也将介绍内存队列,尽管它们有些过时了,但它们不应该受这样的待遇。

在本系列的第一篇文章中的示例代码承认了在 IPC 中可能受到竞争条件(不管是基于文件的还是基于内存的)的威胁。自然地我们也会考虑基于管道的 IPC 的安全并发问题,这个也将在本文中提及。针对管道和内存队列的例子将会使用 POSIX 推荐使用的 API,POSIX 的一个核心目标就是线程安全。

请查看一些 mq\_open 函数的 man 页,这个函数属于内存队列的 API。这个 man 页中有关 特性 的章节带有一个小表格:

接口特性
mq_open()线程安全MT-Safe

上面的 MT-Safe(MT 指的是 多线程 multi-threaded )意味着 mq_open 函数是线程安全的,进而暗示是进程安全的:一个进程的执行和它的一个线程执行的过程类似,假如竞争条件不会发生在处于相同进程的线程中,那么这样的条件也不会发生在处于不同进程的线程中。MT-Safe 特性保证了调用 mq_open 时不会出现竞争条件。一般来说,基于通道的 IPC 是并发安全的,尽管在下面例子中会出现一个有关警告的注意事项。

无名管道

首先让我们通过一个特意构造的命令行例子来展示无名管道是如何工作的。在所有的现代系统中,符号 | 在命令行中都代表一个无名管道。假设我们的命令行提示符为 %,接下来考虑下面的命令:

## 写入方在 | 左边,读取方在右边
% sleep 5 | echo "Hello, world!" 

sleepecho 程序以不同的进程执行,无名管道允许它们进行通信。但是上面的例子被特意设计为没有通信发生。问候语 “Hello, world!” 出现在屏幕中,然后过了 5 秒后,命令行返回,暗示 sleepecho 进程都已经结束了。这期间发生了什么呢?

在命令行中的竖线 | 的语法中,左边的进程(sleep)是写入方,右边的进程(echo)为读取方。默认情况下,读取方将会阻塞,直到从通道中能够读取到字节数据,而写入方在写完它的字节数据后,将发送 流已终止 end-of-stream 的标志。(即便写入方过早终止了,一个流已终止的标志还是会发给读取方。)无名管道将保持到写入方和读取方都停止的那个时刻。

在上面的例子中,sleep 进程并没有向通道写入任何的字节数据,但在 5 秒后就终止了,这时将向通道发送一个流已终止的标志。与此同时,echo 进程立即向标准输出(屏幕)写入问候语,因为这个进程并不从通道中读入任何字节,所以它并没有等待。一旦 sleepecho 进程都终止了,不会再用作通信的无名管道将会消失然后返回命令行提示符。

下面这个更加实用的示例将使用两个无名管道。我们假定文件 test.dat 的内容如下:

this
is
the
way
the
world
ends

下面的命令:

% cat test.dat | sort | uniq

会将 cat 连接 concatenate 的缩写)进程的输出通过管道传给 sort 进程以生成排序后的输出,然后将排序后的输出通过管道传给 uniq 进程以消除重复的记录(在本例中,会将两次出现的 “the” 缩减为一个):

ends
is
the
this
way
world

下面展示的情景展示的是一个带有两个进程的程序通过一个无名管道通信来进行通信。

示例 1. 两个进程通过一个无名管道来进行通信

#include <sys/wait.h> /* wait */
#include <stdio.h>
#include <stdlib.h>   /* exit functions */
#include <unistd.h>   /* read, write, pipe, _exit */
#include <string.h>

#define ReadEnd  0
#define WriteEnd 1

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1);    /** failure **/
}

int main() {
  int pipeFDs[2]; /* two file descriptors */
  char buf;       /* 1-byte buffer */
  const char* msg = "Nature's first green is gold\n"; /* bytes to write */

  if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
  pid_t cpid = fork();                                /* fork a child process */
  if (cpid < 0) report_and_exit("fork");              /* check for failure */

  if (0 == cpid) {    /*** child ***/                 /* child process */
    close(pipeFDs[WriteEnd]);                         /* child reads, doesn't write */

    while (read(pipeFDs[ReadEnd], &buf, 1) > 0)       /* read until end of byte stream */
      write(STDOUT_FILENO, &buf, sizeof(buf));        /* echo to the standard output */

    close(pipeFDs[ReadEnd]);                          /* close the ReadEnd: all done */
    _exit(0);                                         /* exit and notify parent at once  */
  }
  else {              /*** parent ***/
    close(pipeFDs[ReadEnd]);                          /* parent writes, doesn't read */

    write(pipeFDs[WriteEnd], msg, strlen(msg));       /* write the bytes to the pipe */
    close(pipeFDs[WriteEnd]);                         /* done writing: generate eof */

    wait(NULL);                                       /* wait for child to exit */
    exit(0);                                          /* exit normally */
  }
  return 0;
}

上面名为 pipeUN 的程序使用系统函数 fork 来创建一个进程。尽管这个程序只有一个单一的源文件,在它正确执行的情况下将会发生多进程的情况。

下面的内容是对库函数 fork 如何工作的一个简要回顾:

  • fork 函数由进程调用,在失败时返回 -1 给父进程。在 pipeUN 这个例子中,相应的调用是:
pid_t cpid = fork(); /* called in parent */

函数调用后的返回值也被保存下来了。在这个例子中,保存在整数类型 pid_t 的变量 cpid 中。(每个进程有它自己的进程 ID,这是一个非负的整数,用来标记进程)。复刻一个新的进程可能会因为多种原因而失败,包括进程表满了的原因,这个结构由系统维持,以此来追踪进程状态。明确地说,僵尸进程假如没有被处理掉,将可能引起进程表被填满的错误。

  • 假如 fork 调用成功,则它将创建一个新的子进程,向父进程返回一个值,向子进程返回另外的一个值。在调用 fork 后父进程和子进程都将执行相同的代码。(子进程继承了到此为止父进程中声明的所有变量的拷贝),特别地,一次成功的 fork 调用将返回如下的东西:
+ 向子进程返回 `0`
+ 向父进程返回子进程的进程 ID
  • 在一次成功的 fork 调用后,一个 if/else 或等价的结构将会被用来隔离针对父进程和子进程的代码。在这个例子中,相应的声明为:
if (0 == cpid) { /*** child ***/
...
}
else { /*** parent ***/
...
} 

假如成功地复刻出了一个子进程,pipeUN 程序将像下面这样去执行。在一个整数的数列里:

int pipeFDs[2]; /* two file descriptors */

来保存两个文件描述符,一个用来向管道中写入,另一个从管道中写入。(数组元素 pipeFDs[0] 是读端的文件描述符,元素 pipeFDs[1] 是写端的文件描述符。)在调用 fork 之前,对系统 pipe 函数的成功调用,将立刻使得这个数组获得两个文件描述符:

if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");

父进程和子进程现在都有了文件描述符的副本。但分离关注点模式意味着每个进程恰好只需要一个描述符。在这个例子中,父进程负责写入,而子进程负责读取,尽管这样的角色分配可以反过来。在 if 子句中的第一个语句将用于关闭管道的读端:

close(pipeFDs[WriteEnd]); /* called in child code */

在父进程中的 else 子句将会关闭管道的读端:

close(pipeFDs[ReadEnd]); /* called in parent code */

然后父进程将向无名管道中写入某些字节数据(ASCII 代码),子进程读取这些数据,然后向标准输出中回放它们。

在这个程序中还需要澄清的一点是在父进程代码中的 wait 函数。一旦被创建后,子进程很大程度上独立于它的父进程,正如简短的 pipeUN 程序所展示的那样。子进程可以执行任意的代码,而它们可能与父进程完全没有关系。但是,假如当子进程终止时,系统将会通过一个信号来通知父进程。

要是父进程在子进程之前终止又该如何呢?在这种情形下,除非采取了预防措施,子进程将会变成在进程表中的一个僵尸进程。预防措施有两大类型:第一种是让父进程去通知系统,告诉系统它对子进程的终止没有任何兴趣:

signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */

第二种方法是在子进程终止时,让父进程执行一个 wait。这样就确保了父进程可以独立于子进程而存在。在 pipeUN 程序中使用了第二种方法,其中父进程的代码使用的是下面的调用:

wait(NULL); /* called in parent */

这个对 wait 的调用意味着一直等待直到任意一个子进程的终止发生,因此在 pipeUN 程序中,只有一个子进程。(其中的 NULL 参数可以被替换为一个保存有子程序退出状态的整数变量的地址。)对于更细粒度的控制,还可以使用更灵活的 waitpid 函数,例如特别指定多个子进程中的某一个。

pipeUN 将会采取另一个预防措施。当父进程结束了等待,父进程将会调用常规的 exit 函数去退出。对应的,子进程将会调用 _exit 变种来退出,这类变种将快速跟踪终止相关的通知。在效果上,子进程会告诉系统立刻去通知父进程它的这个子进程已经终止了。

假如两个进程向相同的无名管道中写入内容,字节数据会交错吗?例如,假如进程 P1 向管道写入内容:

foo bar

同时进程 P2 并发地写入:

baz baz

到相同的管道,最后的结果似乎是管道中的内容将会是任意错乱的,例如像这样:

baz foo baz bar

只要没有写入超过 PIPE_BUF 字节,POSIX 标准就能确保写入不会交错。在 Linux 系统中, PIPE_BUF 的大小是 4096 字节。对于管道我更喜欢只有一个写入方和一个读取方,从而绕过这个问题。

命名管道

无名管道没有备份文件:系统将维持一个内存缓存来将字节数据从写方传给读方。一旦写方和读方终止,这个缓存将会被回收,进而无名管道消失。相反的,命名管道有备份文件和一个不同的 API。

下面让我们通过另一个命令行示例来了解命名管道的要点。下面是具体的步骤:

  • 开启两个终端。这两个终端的工作目录应该相同。
  • 在其中一个终端中,键入下面的两个命令(命令行提示符仍然是 %,我的注释以 ## 打头。):
% mkfifo tester ## 创建一个备份文件,名为 tester
% cat tester    ## 将管道的内容输出到 stdout 

在最开始,没有任何东西会出现在终端中,因为到现在为止没有在命名管道中写入任何东西。

  • 在第二个终端中输入下面的命令:
% cat > tester ## redirect keyboard input to the pipe
hello, world!  ## then hit Return key
bye, bye       ## ditto
<Control-C>    ## terminate session with a Control-C

无论在这个终端中输入什么,它都会在另一个终端中显示出来。一旦键入 Ctrl+C,就会回到正常的命令行提示符,因为管道已经被关闭了。

  • 通过移除实现命名管道的文件来进行清理:
% unlink tester

正如 mkfifo 程序的名字所暗示的那样,命名管道也被叫做 FIFO,因为第一个进入的字节,就会第一个出,其他的类似。有一个名为 mkfifo 的库函数,用它可以在程序中创建一个命名管道,它将在下一个示例中被用到,该示例由两个进程组成:一个向命名管道写入,而另一个从该管道读取。

示例 2. fifoWriter 程序

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>

#define MaxLoops         12000   /* outer loop */
#define ChunkSize           16   /* how many written at a time */
#define IntsPerChunk         4   /* four 4-byte ints per chunk */
#define MaxZs              250   /* max microseconds to sleep */

int main() {
  const char* pipeName = "./fifoChannel";
  mkfifo(pipeName, 0666);                      /* read/write for user/group/others */
  int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */
  if (fd < 0) return -1;                       /* can't go on */

  int i;
  for (i = 0; i < MaxLoops; i++) {          /* write MaxWrites times */
    int j;
    for (j = 0; j < ChunkSize; j++) {       /* each time, write ChunkSize bytes */
      int k;
      int chunk[IntsPerChunk];
      for (k = 0; k < IntsPerChunk; k++)
        chunk[k] = rand();
      write(fd, chunk, sizeof(chunk));
    }
    usleep((rand() % MaxZs) + 1);           /* pause a bit for realism */
  }

  close(fd);           /* close pipe: generates an end-of-stream marker */
  unlink(pipeName);    /* unlink from the implementing file */
  printf("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk);

  return 0;
}

上面的 fifoWriter 程序可以被总结为如下:

  • 首先程序创建了一个命名管道用来写入数据:
mkfifo(pipeName, 0666); /* read/write perms for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY);

其中的 pipeName 是备份文件的名字,传递给 mkfifo 作为它的第一个参数。接着命名管道通过我们熟悉的 open 函数调用被打开,而这个函数将会返回一个文件描述符。

  • 在实现层面上,fifoWriter 不会一次性将所有的数据都写入,而是写入一个块,然后休息随机数目的微秒时间,接着再循环往复。总的来说,有 768000 个 4 字节整数值被写入到命名管道中。
  • 在关闭命名管道后,fifoWriter 也将使用 unlink 取消对该文件的连接。
close(fd); /* close pipe: generates end-of-stream marker */
unlink(pipeName); /* unlink from the implementing file */

一旦连接到管道的每个进程都执行了 unlink 操作后,系统将回收这些备份文件。在这个例子中,只有两个这样的进程 fifoWriterfifoReader,它们都做了 unlink 操作。

这个两个程序应该在不同终端的相同工作目录中执行。但是 fifoWriter 应该在 fifoReader 之前被启动,因为需要 fifoWriter 去创建管道。然后 fifoReader 才能够获取到刚被创建的命名管道。

示例 3. fifoReader 程序

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>

unsigned is_prime(unsigned n) { /* not pretty, but efficient */
  if (n <= 3) return n > 1;
  if (0 == (n % 2) || 0 == (n % 3)) return 0;

  unsigned i;
  for (i = 5; (i * i) <= n; i += 6)
    if (0 == (n % i) || 0 == (n % (i + 2))) return 0;

  return 1; /* found a prime! */
}

int main() {
  const char* file = "./fifoChannel";
  int fd = open(file, O_RDONLY);
  if (fd < 0) return -1; /* no point in continuing */
  unsigned count = 0, total = 0, primes_count = 0;

  while (1) {
    int next;
    int i;

    ssize_t count = read(fd, &next, sizeof(int));
    if (0 == count) break;                  /* end of stream */
    else if (count == sizeof(int)) {        /* read a 4-byte int value */
      total++;
      if (is_prime(next)) primes_count++;
    }
  }

  close(fd);       /* close pipe from read end */
  unlink(file);    /* unlink from the underlying file */
  printf("Received ints: %u, primes: %u\n", total, primes_count);

  return 0;
}

上面的 fifoReader 的内容可以总结为如下:

  • 因为 fifoWriter 已经创建了命名管道,所以 fifoReader 只需要利用标准的 open 调用来通过备份文件来获取到管道中的内容:
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);

这个文件的是以只读打开的。

  • 然后这个程序进入一个潜在的无限循环,在每次循环时,尝试读取 4 字节的块。read 调用:
ssize_t count = read(fd, &next, sizeof(int));

返回 0 来暗示该流的结束。在这种情况下,fifoReader 跳出循环,关闭命名管道,并在终止前 unlink 备份文件。

  • 在读入 4 字节整数后,fifoReader 检查这个数是否为质数。这个操作代表了一个生产级别的读取器可能在接收到的字节数据上执行的逻辑操作。在示例运行中,在接收到的 768000 个整数中有 37682 个质数。

重复运行示例, fifoReader 将成功地读取 fifoWriter 写入的所有字节。这不是很让人惊讶的。这两个进程在相同的机器上执行,从而可以不用考虑网络相关的问题。命名管道是一个可信且高效的 IPC 机制,因而被广泛使用。

下面是这两个程序的输出,它们在不同的终端中启动,但处于相同的工作目录:

% ./fifoWriter
768000 ints sent to the pipe.
###
% ./fifoReader
Received ints: 768000, primes: 37682

消息队列

管道有着严格的先入先出行为:第一个被写入的字节将会第一个被读,第二个写入的字节将第二个被读,以此类推。消息队列可以做出相同的表现,但它又足够灵活,可以使得字节块可以不以先入先出的次序来接收。

正如它的名字所提示的那样,消息队列是一系列的消息,每个消息包含两部分:

  • 荷载,一个字节序列(在 C 中是 char)
  • 类型,以一个正整数值的形式给定,类型用来分类消息,为了更灵活的回收

看一下下面对一个消息队列的描述,每个消息由一个整数类型标记:

          +-+    +-+    +-+    +-+
sender--->|3|--->|2|--->|2|--->|1|--->receiver
          +-+    +-+    +-+    +-+

在上面展示的 4 个消息中,标记为 1 的是开头,即最接近接收端,然后另个标记为 2 的消息,最后接着一个标记为 3 的消息。假如按照严格的 FIFO 行为执行,消息将会以 1-2-2-3 这样的次序被接收。但是消息队列允许其他收取次序。例如,消息可以被接收方以 3-2-1-2 的次序接收。

mqueue 示例包含两个程序,sender 将向消息队列中写入数据,而 receiver 将从这个队列中读取数据。这两个程序都包含的头文件 queue.h 如下所示:

示例 4. 头文件 queue.h

#define ProjectId 123
#define PathName  "queue.h" /* any existing, accessible file would do */
#define MsgLen    4
#define MsgCount  6

typedef struct { 
  long type;                 /* must be of type long */ 
  char payload[MsgLen + 1];  /* bytes in the message */  
} queuedMessage;

上面的头文件定义了一个名为 queuedMessage 的结构类型,它带有 payload(字节数组)和 type(整数)这两个域。该文件也定义了一些符号常数(使用 #define 语句),前两个常数被用来生成一个 key,而这个 key 反过来被用来获取一个消息队列的 ID。ProjectId 可以是任何正整数值,而 PathName 必须是一个存在的、可访问的文件,在这个示例中,指的是文件 queue.h。在 senderreceiver 中,它们都有的设定语句为:

key_t key = ftok(PathName, ProjectId); /* generate key */
int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */

ID qid 在效果上是消息队列文件描述符的对应物。

示例 5. sender 程序

#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>
#include "queue.h"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1); /* EXIT_FAILURE */
}

int main() {
  key_t key = ftok(PathName, ProjectId);
  if (key < 0) report_and_exit("couldn't get key...");

  int qid = msgget(key, 0666 | IPC_CREAT);
  if (qid < 0) report_and_exit("couldn't get queue id...");

  char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"};
  int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */
  int i;
  for (i = 0; i < MsgCount; i++) {
    /* build the message */
    queuedMessage msg;
    msg.type = types[i];
    strcpy(msg.payload, payloads[i]);

    /* send the message */
    msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT); /* don't block */
    printf("%s sent as type %i\n", msg.payload, (int) msg.type);
  }
  return 0;
}

上面的 sender 程序将发送出 6 个消息,每两个为一个类型:前两个是类型 1,接着的连个是类型 2,最后的两个为类型 3。发送的语句:

msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);

被配置为非阻塞的(IPC_NOWAIT 标志),是因为这里的消息体量上都很小。唯一的危险在于一个完整的序列将可能导致发送失败,而这个例子不会。下面的 receiver 程序也将使用 IPC_NOWAIT 标志来接收消息。

示例 6. receiver 程序

#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include "queue.h"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1); /* EXIT_FAILURE */
}

int main() {
  key_t key= ftok(PathName, ProjectId); /* key to identify the queue */
  if (key < 0) report_and_exit("key not gotten...");

  int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */
  if (qid < 0) report_and_exit("no access to queue...");

  int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */
  int i;
  for (i = 0; i < MsgCount; i++) {
    queuedMessage msg; /* defined in queue.h */
    if (msgrcv(qid, &msg, sizeof(msg), types[i], MSG_NOERROR | IPC_NOWAIT) < 0)
      puts("msgrcv trouble...");
    printf("%s received as type %i\n", msg.payload, (int) msg.type);
  }

  /** remove the queue **/
  if (msgctl(qid, IPC_RMID, NULL) < 0)  /* NULL = 'no flags' */
    report_and_exit("trouble removing queue...");

  return 0;
}

这个 receiver 程序不会创建消息队列,尽管 API 尽管建议那样。在 receiver 中,对

int qid = msgget(key, 0666 | IPC_CREAT);

的调用可能因为带有 IPC_CREAT 标志而具有误导性,但是这个标志的真实意义是如果需要就创建,否则直接获取sender 程序调用 msgsnd 来发送消息,而 receiver 调用 msgrcv 来接收它们。在这个例子中,sender 以 1-1-2-2-3-3 的次序发送消息,但 receiver 接收它们的次序为 3-1-2-1-3-2,这显示消息队列没有被严格的 FIFO 行为所拘泥:

% ./sender
msg1 sent as type 1
msg2 sent as type 1
msg3 sent as type 2
msg4 sent as type 2
msg5 sent as type 3
msg6 sent as type 3

% ./receiver
msg5 received as type 3
msg1 received as type 1
msg3 received as type 2
msg2 received as type 1
msg6 received as type 3
msg4 received as type 2

上面的输出显示 senderreceiver 可以在同一个终端中启动。输出也显示消息队列是持久的,即便 sender 进程在完成创建队列、向队列写数据、然后退出的整个过程后,该队列仍然存在。只有在 receiver 进程显式地调用 msgctl 来移除该队列,这个队列才会消失:

if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */

总结

管道和消息队列的 API 在根本上来说都是单向的:一个进程写,然后另一个进程读。当然还存在双向命名管道的实现,但我认为这个 IPC 机制在它最为简单的时候反而是最佳的。正如前面提到的那样,消息队列已经不大受欢迎了,尽管没有找到什么特别好的原因来解释这个现象;而队列仍然是 IPC 工具箱中的一个工具。这个快速的 IPC 工具箱之旅将以第 3 部分(通过套接字和信号来示例 IPC)来终结。


via: https://opensource.com/article/19/4/interprocess-communication-linux-channels

作者:Marty Kalin 选题:lujun9972 译者:FSSlc 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

学习在 Linux 中进程是如何与其他进程进行同步的。

本篇是 Linux 下进程间通信(IPC)系列的第一篇文章。这个系列将使用 C 语言代码示例来阐明以下 IPC 机制:

  • 共享文件
  • 共享内存(使用信号量)
  • 管道(命名的或非命名的管道)
  • 消息队列
  • 套接字
  • 信号

在聚焦上面提到的共享文件和共享内存这两个机制之前,这篇文章将带你回顾一些核心的概念。

核心概念

进程是运行着的程序,每个进程都有着它自己的地址空间,这些空间由进程被允许访问的内存地址组成。进程有一个或多个执行线程,而线程是一系列执行指令的集合:单线程进程就只有一个线程,而多线程的进程则有多个线程。一个进程中的线程共享各种资源,特别是地址空间。另外,一个进程中的线程可以直接通过共享内存来进行通信,尽管某些现代语言(例如 Go)鼓励一种更有序的方式,例如使用线程安全的通道。当然对于不同的进程,默认情况下,它们能共享内存。

有多种方法启动之后要进行通信的进程,下面所举的例子中主要使用了下面的两种方法:

  • 一个终端被用来启动一个进程,另外一个不同的终端被用来启动另一个。
  • 在一个进程(父进程)中调用系统函数 fork,以此生发另一个进程(子进程)。

第一个例子采用了上面使用终端的方法。这些代码示例的 ZIP 压缩包可以从我的网站下载到。

共享文件

程序员对文件访问应该都已经很熟识了,包括许多坑(不存在的文件、文件权限损坏等等),这些问题困扰着程序对文件的使用。尽管如此,共享文件可能是最为基础的 IPC 机制了。考虑一下下面这样一个相对简单的例子,其中一个进程(生产者 producer)创建和写入一个文件,然后另一个进程(消费者 consumer)从这个相同的文件中进行读取:

          writes +-----------+ reads
producer-------->| disk file |<-------consumer
                 +-----------+

在使用这个 IPC 机制时最明显的挑战是竞争条件可能会发生:生产者和消费者可能恰好在同一时间访问该文件,从而使得输出结果不确定。为了避免竞争条件的发生,该文件在处于状态时必须以某种方式处于被锁状态,从而阻止在操作执行时和其他操作的冲突。在标准系统库中与锁相关的 API 可以被总结如下:

  • 生产者应该在写入文件时获得一个文件的排斥锁。一个排斥锁最多被一个进程所拥有。这样就可以排除掉竞争条件的发生,因为在锁被释放之前没有其他的进程可以访问这个文件。
  • 消费者应该在从文件中读取内容时得到至少一个共享锁。多个读取者可以同时保有一个共享锁,但是没有写入者可以获取到文件内容,甚至在当只有一个读取者保有一个共享锁时。

共享锁可以提升效率。假如一个进程只是读入一个文件的内容,而不去改变它的内容,就没有什么原因阻止其他进程来做同样的事。但如果需要写入内容,则很显然需要文件有排斥锁。

标准的 I/O 库中包含一个名为 fcntl 的实用函数,它可以被用来检查或者操作一个文件上的排斥锁和共享锁。该函数通过一个文件描述符(一个在进程中的非负整数值)来标记一个文件(在不同的进程中不同的文件描述符可能标记同一个物理文件)。对于文件的锁定, Linux 提供了名为 flock 的库函数,它是 fcntl 的一个精简包装。第一个例子中使用 fcntl 函数来暴露这些 API 细节。

示例 1. 生产者程序

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>

#define FileName "data.dat"
#define DataString "Now is the winter of our discontent\nMade glorious summer by this sun of York\n"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1); /* EXIT_FAILURE */
}

int main() {
  struct flock lock;
  lock.l_type = F_WRLCK;    /* read/write (exclusive versus shared) lock */
  lock.l_whence = SEEK_SET; /* base for seek offsets */
  lock.l_start = 0;         /* 1st byte in file */
  lock.l_len = 0;           /* 0 here means 'until EOF' */
  lock.l_pid = getpid();    /* process id */

  int fd; /* file descriptor to identify a file within a process */
  if ((fd = open(FileName, O_RDWR | O_CREAT, 0666)) < 0)  /* -1 signals an error */
    report_and_exit("open failed...");

  if (fcntl(fd, F_SETLK, &lock) < 0) /** F_SETLK doesn't block, F_SETLKW does **/
    report_and_exit("fcntl failed to get lock...");
  else {
    write(fd, DataString, strlen(DataString)); /* populate data file */
    fprintf(stderr, "Process %d has written to data file...\n", lock.l_pid);
  }

  /* Now release the lock explicitly. */
  lock.l_type = F_UNLCK;
  if (fcntl(fd, F_SETLK, &lock) < 0)
    report_and_exit("explicit unlocking failed...");

  close(fd); /* close the file: would unlock if needed */
  return 0;  /* terminating the process would unlock as well */
}

上面生产者程序的主要步骤可以总结如下:

  • 这个程序首先声明了一个类型为 struct flock 的变量,它代表一个锁,并对它的 5 个域做了初始化。第一个初始化
lock.l_type = F_WRLCK; /* exclusive lock */

使得这个锁为排斥锁(read-write)而不是一个共享锁(read-only)。假如生产者获得了这个锁,则其他的进程将不能够对文件做读或者写操作,直到生产者释放了这个锁,或者显式地调用 fcntl,又或者隐式地关闭这个文件。(当进程终止时,所有被它打开的文件都会被自动关闭,从而释放了锁)

  • 上面的程序接着初始化其他的域。主要的效果是整个文件都将被锁上。但是,有关锁的 API 允许特别指定的字节被上锁。例如,假如文件包含多个文本记录,则单个记录(或者甚至一个记录的一部分)可以被锁,而其余部分不被锁。
  • 第一次调用 fcntl
if (fcntl(fd, F_SETLK, &lock) < 0)

尝试排斥性地将文件锁住,并检查调用是否成功。一般来说, fcntl 函数返回 -1 (因此小于 0)意味着失败。第二个参数 F_SETLK 意味着 fcntl 的调用不是堵塞的;函数立即做返回,要么获得锁,要么显示失败了。假如替换地使用 F_SETLKW(末尾的 W 代指等待),那么对 fcntl 的调用将是阻塞的,直到有可能获得锁的时候。在调用 fcntl 函数时,它的第一个参数 fd 指的是文件描述符,第二个参数指定了将要采取的动作(在这个例子中,F_SETLK 指代设置锁),第三个参数为锁结构的地址(在本例中,指的是 &lock)。

  • 假如生产者获得了锁,这个程序将向文件写入两个文本记录。
  • 在向文件写入内容后,生产者改变锁结构中的 l_type 域为 unlock 值:
lock.l_type = F_UNLCK;

并调用 fcntl 来执行解锁操作。最后程序关闭了文件并退出。

示例 2. 消费者程序

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>

#define FileName "data.dat"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1); /* EXIT_FAILURE */
}

int main() {
  struct flock lock;
  lock.l_type = F_WRLCK;    /* read/write (exclusive) lock */
  lock.l_whence = SEEK_SET; /* base for seek offsets */
  lock.l_start = 0;         /* 1st byte in file */
  lock.l_len = 0;           /* 0 here means 'until EOF' */
  lock.l_pid = getpid();    /* process id */

  int fd; /* file descriptor to identify a file within a process */
  if ((fd = open(FileName, O_RDONLY)) < 0)  /* -1 signals an error */
    report_and_exit("open to read failed...");

  /* If the file is write-locked, we can't continue. */
  fcntl(fd, F_GETLK, &lock); /* sets lock.l_type to F_UNLCK if no write lock */
  if (lock.l_type != F_UNLCK)
    report_and_exit("file is still write locked...");

  lock.l_type = F_RDLCK; /* prevents any writing during the reading */
  if (fcntl(fd, F_SETLK, &lock) < 0)
    report_and_exit("can't get a read-only lock...");

  /* Read the bytes (they happen to be ASCII codes) one at a time. */
  int c; /* buffer for read bytes */
  while (read(fd, &c, 1) > 0)    /* 0 signals EOF */
    write(STDOUT_FILENO, &c, 1); /* write one byte to the standard output */

  /* Release the lock explicitly. */
  lock.l_type = F_UNLCK;
  if (fcntl(fd, F_SETLK, &lock) < 0)
    report_and_exit("explicit unlocking failed...");

  close(fd);
  return 0;
}

相比于锁的 API,消费者程序会相对复杂一点儿。特别的,消费者程序首先检查文件是否被排斥性的被锁,然后才尝试去获得一个共享锁。相关的代码为:

lock.l_type = F_WRLCK;
...
fcntl(fd, F_GETLK, &lock); /* sets lock.l_type to F_UNLCK if no write lock */
if (lock.l_type != F_UNLCK)
  report_and_exit("file is still write locked...");

fcntl 调用中的 F_GETLK 操作指定检查一个锁,在本例中,上面代码的声明中给了一个 F_WRLCK 的排斥锁。假如特指的锁不存在,那么 fcntl 调用将会自动地改变锁类型域为 F_UNLCK 以此来显示当前的状态。假如文件是排斥性地被锁,那么消费者将会终止。(一个更健壮的程序版本或许应该让消费者会儿,然后再尝试几次。)

假如当前文件没有被锁,那么消费者将尝试获取一个共享(read-only)锁(F_RDLCK)。为了缩短程序,fcntl 中的 F_GETLK 调用可以丢弃,因为假如其他进程已经保有一个读写锁,F_RDLCK 的调用就可能会失败。重新调用一个只读锁能够阻止其他进程向文件进行写的操作,但可以允许其他进程对文件进行读取。简而言之,共享锁可以被多个进程所保有。在获取了一个共享锁后,消费者程序将立即从文件中读取字节数据,然后在标准输出中打印这些字节的内容,接着释放锁,关闭文件并终止。

下面的 % 为命令行提示符,下面展示的是从相同终端开启这两个程序的输出:

% ./producer
Process 29255 has written to data file...

% ./consumer
Now is the winter of our discontent
Made glorious summer by this sun of York

在本次的代码示例中,通过 IPC 传输的数据是文本:它们来自莎士比亚的戏剧《理查三世》中的两行台词。然而,共享文件的内容还可以是纷繁复杂的,任意的字节数据(例如一个电影)都可以,这使得文件共享变成了一个非常灵活的 IPC 机制。但它的缺点是文件获取速度较慢,因为文件的获取涉及到读或者写。同往常一样,编程总是伴随着折中。下面的例子将通过共享内存来做 IPC,而不是通过共享文件,在性能上相应的有极大的提升。

共享内存

对于共享内存,Linux 系统提供了两类不同的 API:传统的 System V API 和更新一点的 POSIX API。在单个应用中,这些 API 不能混用。但是,POSIX 方式的一个坏处是它的特性仍在发展中,并且依赖于安装的内核版本,这非常影响代码的可移植性。例如,默认情况下,POSIX API 用内存映射文件来实现共享内存:对于一个共享的内存段,系统为相应的内容维护一个备份文件。在 POSIX 规范下共享内存可以被配置为不需要备份文件,但这可能会影响可移植性。我的例子中使用的是带有备份文件的 POSIX API,这既结合了内存获取的速度优势,又获得了文件存储的持久性。

下面的共享内存例子中包含两个程序,分别名为 memwritermemreader,并使用信号量来调整它们对共享内存的获取。在任何时候当共享内存进入一个写入者场景时,无论是多进程还是多线程,都有遇到基于内存的竞争条件的风险,所以,需要引入信号量来协调(同步)对共享内存的获取。

memwriter 程序应当在它自己所处的终端首先启动,然后 memreader 程序才可以在它自己所处的终端启动(在接着的十几秒内)。memreader 的输出如下:

This is the way the world ends...

在每个源程序的最上方注释部分都解释了在编译它们时需要添加的链接参数。

首先让我们复习一下信号量是如何作为一个同步机制工作的。一般的信号量也被叫做一个计数信号量,因为带有一个可以增加的值(通常初始化为 0)。考虑一家租用自行车的商店,在它的库存中有 100 辆自行车,还有一个供职员用于租赁的程序。每当一辆自行车被租出去,信号量就增加 1;当一辆自行车被还回来,信号量就减 1。在信号量的值为 100 之前都还可以进行租赁业务,但如果等于 100 时,就必须停止业务,直到至少有一辆自行车被还回来,从而信号量减为 99。

二元信号量是一个特例,它只有两个值:0 和 1。在这种情况下,信号量的表现为互斥量(一个互斥的构造)。下面的共享内存示例将把信号量用作互斥量。当信号量的值为 0 时,只有 memwriter 可以获取共享内存,在写操作完成后,这个进程将增加信号量的值,从而允许 memreader 来读取共享内存。

示例 3. memwriter 进程的源程序

/** Compilation: gcc -o memwriter memwriter.c -lrt -lpthread **/
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <semaphore.h>
#include <string.h>
#include "shmem.h"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1);
}

int main() {
  int fd = shm_open(BackingFile,      /* name from smem.h */
                    O_RDWR | O_CREAT, /* read/write, create if needed */
                    AccessPerms);     /* access permissions (0644) */
  if (fd < 0) report_and_exit("Can't open shared mem segment...");

  ftruncate(fd, ByteSize); /* get the bytes */

  caddr_t memptr = mmap(NULL,       /* let system pick where to put segment */
                        ByteSize,   /* how many bytes */
                        PROT_READ | PROT_WRITE, /* access protections */
                        MAP_SHARED, /* mapping visible to other processes */
                        fd,         /* file descriptor */
                        0);         /* offset: start at 1st byte */
  if ((caddr_t) -1  == memptr) report_and_exit("Can't get segment...");

  fprintf(stderr, "shared mem address: %p [0..%d]\n", memptr, ByteSize - 1);
  fprintf(stderr, "backing file:       /dev/shm%s\n", BackingFile );

  /* semaphore code to lock the shared mem */
  sem_t* semptr = sem_open(SemaphoreName, /* name */
                           O_CREAT,       /* create the semaphore */
                           AccessPerms,   /* protection perms */
                           0);            /* initial value */
  if (semptr == (void*) -1) report_and_exit("sem_open");

  strcpy(memptr, MemContents); /* copy some ASCII bytes to the segment */

  /* increment the semaphore so that memreader can read */
  if (sem_post(semptr) < 0) report_and_exit("sem_post");

  sleep(12); /* give reader a chance */

  /* clean up */
  munmap(memptr, ByteSize); /* unmap the storage */
  close(fd);
  sem_close(semptr);
  shm_unlink(BackingFile); /* unlink from the backing file */
  return 0;
}

下面是 memwritermemreader 程序如何通过共享内存来通信的一个总结:

  • 上面展示的 memwriter 程序调用 shm_open 函数来得到作为系统协调共享内存的备份文件的文件描述符。此时,并没有内存被分配。接下来调用的是令人误解的名为 ftruncate 的函数
ftruncate(fd, ByteSize); /* get the bytes */

它将分配 ByteSize 字节的内存,在该情况下,一般为大小适中的 512 字节。memwritermemreader 程序都只从共享内存中获取数据,而不是从备份文件。系统将负责共享内存和备份文件之间数据的同步。

  • 接着 memwriter 调用 mmap 函数:
caddr_t memptr = mmap(NULL, /* let system pick where to put segment */
                  ByteSize, /* how many bytes */
                  PROT_READ | PROT_WRITE, /* access protections */
                  MAP_SHARED, /* mapping visible to other processes */
                  fd, /* file descriptor */
                  0); /* offset: start at 1st byte */

来获得共享内存的指针。(memreader 也做一次类似的调用。) 指针类型 caddr_tc 开头,它代表 calloc,而这是动态初始化分配的内存为 0 的一个系统函数。memwriter 通过库函数 strcpy(字符串复制)来获取后续操作的 memptr

  • 到现在为止,memwriter 已经准备好进行写操作了,但首先它要创建一个信号量来确保共享内存的排斥性。假如 memwriter 正在执行写操作而同时 memreader 在执行读操作,则有可能出现竞争条件。假如调用 sem_open 成功了:
sem_t* semptr = sem_open(SemaphoreName, /* name */
                     O_CREAT, /* create the semaphore */
                     AccessPerms, /* protection perms */
                     0); /* initial value */

那么,接着写操作便可以执行。上面的 SemaphoreName(任意一个唯一的非空名称)用来在 memwritermemreader 识别信号量。初始值 0 将会传递给信号量的创建者,在这个例子中指的是 memwriter 赋予它执行操作的权利。

  • 在写操作完成后,memwriter* 通过调用sem\_post` 函数将信号量的值增加到 1:
if (sem_post(semptr) < 0) ..

增加信号了将释放互斥锁,使得 memreader 可以执行它的操作。为了更好地测量,memwriter 也将从它自己的地址空间中取消映射,

munmap(memptr, ByteSize); /* unmap the storage *

这将使得 memwriter 不能进一步地访问共享内存。

示例 4. memreader 进程的源代码

/** Compilation: gcc -o memreader memreader.c -lrt -lpthread **/
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <semaphore.h>
#include <string.h>
#include "shmem.h"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1);
}

int main() {
  int fd = shm_open(BackingFile, O_RDWR, AccessPerms);  /* empty to begin */
  if (fd < 0) report_and_exit("Can't get file descriptor...");

  /* get a pointer to memory */
  caddr_t memptr = mmap(NULL,       /* let system pick where to put segment */
                        ByteSize,   /* how many bytes */
                        PROT_READ | PROT_WRITE, /* access protections */
                        MAP_SHARED, /* mapping visible to other processes */
                        fd,         /* file descriptor */
                        0);         /* offset: start at 1st byte */
  if ((caddr_t) -1 == memptr) report_and_exit("Can't access segment...");

  /* create a semaphore for mutual exclusion */
  sem_t* semptr = sem_open(SemaphoreName, /* name */
                           O_CREAT,       /* create the semaphore */
                           AccessPerms,   /* protection perms */
                           0);            /* initial value */
  if (semptr == (void*) -1) report_and_exit("sem_open");

  /* use semaphore as a mutex (lock) by waiting for writer to increment it */
  if (!sem_wait(semptr)) { /* wait until semaphore != 0 */
    int i;
    for (i = 0; i < strlen(MemContents); i++)
      write(STDOUT_FILENO, memptr + i, 1); /* one byte at a time */
    sem_post(semptr);
  }

  /* cleanup */
  munmap(memptr, ByteSize);
  close(fd);
  sem_close(semptr);
  unlink(BackingFile);
  return 0;
}

memwritermemreader 程序中,共享内存的主要着重点都在 shm_openmmap 函数上:在成功时,第一个调用返回一个备份文件的文件描述符,而第二个调用则使用这个文件描述符从共享内存段中获取一个指针。它们对 shm_open 的调用都很相似,除了 memwriter 程序创建共享内存,而 `memreader 只获取这个已经创建的内存:

int fd = shm_open(BackingFile, O_RDWR | O_CREAT, AccessPerms); /* memwriter */
int fd = shm_open(BackingFile, O_RDWR, AccessPerms); /* memreader */

有了文件描述符,接着对 mmap 的调用就是类似的了:

caddr_t memptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

mmap 的第一个参数为 NULL,这意味着让系统自己决定在虚拟内存地址的哪个地方分配内存,当然也可以指定一个地址(但很有技巧性)。MAP_SHARED 标志着被分配的内存在进程中是共享的,最后一个参数(在这个例子中为 0 ) 意味着共享内存的偏移量应该为第一个字节。size 参数特别指定了将要分配的字节数目(在这个例子中是 512);另外的保护参数(AccessPerms)暗示着共享内存是可读可写的。

memwriter 程序执行成功后,系统将创建并维护备份文件,在我的系统中,该文件为 /dev/shm/shMemEx,其中的 shMemEx 是我为共享存储命名的(在头文件 shmem.h 中给定)。在当前版本的 memwritermemreader 程序中,下面的语句

shm_unlink(BackingFile); /* removes backing file */

将会移除备份文件。假如没有 unlink 这个语句,则备份文件在程序终止后仍然持久地保存着。

memreadermemwriter 一样,在调用 sem_open 函数时,通过信号量的名字来获取信号量。但 memreader 随后将进入等待状态,直到 memwriter 将初始值为 0 的信号量的值增加。

if (!sem_wait(semptr)) { /* wait until semaphore != 0 */

一旦等待结束,memreader 将从共享内存中读取 ASCII 数据,然后做些清理工作并终止。

共享内存 API 包括显式地同步共享内存段和备份文件。在这次的示例中,这些操作都被省略了,以免文章显得杂乱,好让我们专注于内存共享和信号量的代码。

即便在信号量代码被移除的情况下,memwritermemreader 程序很大几率也能够正常执行而不会引入竞争条件:memwriter 创建了共享内存段,然后立即向它写入;memreader 不能访问共享内存,直到共享内存段被创建好。然而,当一个写操作处于混合状态时,最佳实践需要共享内存被同步。信号量 API 足够重要,值得在代码示例中着重强调。

总结

上面共享文件和共享内存的例子展示了进程是怎样通过共享存储来进行通信的,前者通过文件而后者通过内存块。这两种方法的 API 相对来说都很直接。这两种方法有什么共同的缺点吗?现代的应用经常需要处理流数据,而且是非常大规模的数据流。共享文件或者共享内存的方法都不能很好地处理大规模的流数据。按照类型使用管道会更加合适一些。所以这个系列的第二部分将会介绍管道和消息队列,同样的,我们将使用 C 语言写的代码示例来辅助讲解。


via: https://opensource.com/article/19/4/interprocess-communication-linux-storage

作者:Marty Kalin 选题:lujun9972 译者:FSSlc 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出