Marty Kalin 发布的文章

想要入门密码学的基础知识,尤其是有关 OpenSSL 的入门知识吗?继续阅读。

本文是使用 OpenSSL 的密码学基础知识的两篇文章中的第一篇,OpenSSL 是在 Linux 和其他系统上流行的生产级库和工具包。(要安装 OpenSSL 的最新版本,请参阅这里。)OpenSSL 实用程序可在命令行使用,程序也可以调用 OpenSSL 库中的函数。本文的示例程序使用的是 C 语言,即 OpenSSL 库的源语言。

本系列的两篇文章涵盖了加密哈希、数字签名、加密和解密以及数字证书。你可以从我的网站的 ZIP 文件中找到这些代码和命令行示例。

让我们首先回顾一下 OpenSSL 名称中的 SSL。

OpenSSL 简史

安全套接字层 Secure Socket Layer (SSL)是 Netscape 在 1995 年发布的一种加密协议。该协议层可以位于 HTTP 之上,从而为 HTTPS 提供了 S: 安全 secure 。SSL 协议提供了各种安全服务,其中包括两项在 HTTPS 中至关重要的服务:

  • 对等身份验证 Peer authentication (也称为相互质询):连接的每一边都对另一边的身份进行身份验证。如果 Alice 和 Bob 要通过 SSL 交换消息,则每个人首先验证彼此的身份。
  • 机密性 Confidentiality :发送者在通过通道发送消息之前先对其进行加密。然后,接收者解密每个接收到的消息。此过程可保护网络对话。即使窃听者 Eve 截获了从 Alice 到 Bob 的加密消息(即中间人攻击),Eve 会发现他无法在计算上解密此消息。

反过来,这两个关键 SSL 服务与其他不太受关注的服务相关联。例如,SSL 支持消息完整性,从而确保接收到的消息与发送的消息相同。此功能是通过哈希函数实现的,哈希函数也随 OpenSSL 工具箱一起提供。

SSL 有多个版本(例如 SSLv2 和 SSLv3),并且在 1999 年出现了一个基于 SSLv3 的类似协议 传输层安全性 Transport Layer Security (TLS)。TLSv1 和 SSLv3 相似,但不足以相互配合工作。不过,通常将 SSL/TLS 称为同一协议。例如,即使正在使用的是 TLS(而非 SSL),OpenSSL 函数也经常在名称中包含 SSL。此外,调用 OpenSSL 命令行实用程序以 openssl 开始。

除了 man 页面之外,OpenSSL 的文档是零零散散的,鉴于 OpenSSL 工具包很大,这些页面很难以查找使用。命令行和代码示例可以将主要主题集中起来。让我们从一个熟悉的示例开始(使用 HTTPS 访问网站),然后使用该示例来选出我们感兴趣的加密部分进行讲述。

一个 HTTPS 客户端

此处显示的 client 程序通过 HTTPS 连接到 Google:

/* compilation: gcc -o client client.c -lssl -lcrypto */
#include <stdio.h>
#include <stdlib.h>
#include <openssl/bio.h> /* BasicInput/Output streams */
#include <openssl/err.h> /* errors */
#include <openssl/ssl.h> /* core library */
#define BuffSize 1024

void report_and_exit(const char* msg) {
  perror(msg);
  ERR_print_errors_fp(stderr);
  exit(-1);
}

void init_ssl() {
  SSL_load_error_strings();
  SSL_library_init();
}

void cleanup(SSL_CTX* ctx, BIO* bio) {
  SSL_CTX_free(ctx);
  BIO_free_all(bio);
}

void secure_connect(const char* hostname) {
  char name[BuffSize];
  char request[BuffSize];
  char response[BuffSize];

  const SSL_METHOD* method = TLSv1_2_client_method();
  if (NULL == method) report_and_exit("TLSv1_2_client_method...");

  SSL_CTX* ctx = SSL_CTX_new(method);
  if (NULL == ctx) report_and_exit("SSL_CTX_new...");

  BIO* bio = BIO_new_ssl_connect(ctx);
  if (NULL == bio) report_and_exit("BIO_new_ssl_connect...");

  SSL* ssl = NULL;

  /* 链路 bio 通道,SSL 会话和服务器端点 */

  sprintf(name, "%s:%s", hostname, "https");
  BIO_get_ssl(bio, &ssl); /* 会话 */
  SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY); /* 鲁棒性 */
  BIO_set_conn_hostname(bio, name); /* 准备连接 */

  /* 尝试连接 */
  if (BIO_do_connect(bio) <= 0) {
    cleanup(ctx, bio);
    report_and_exit("BIO_do_connect...");
  }

  /* 验证信任库,检查证书 */
  if (!SSL_CTX_load_verify_locations(ctx,
                                      "/etc/ssl/certs/ca-certificates.crt", /* 信任库 */
                                      "/etc/ssl/certs/")) /* 其它信任库 */
    report_and_exit("SSL_CTX_load_verify_locations...");

  long verify_flag = SSL_get_verify_result(ssl);
  if (verify_flag != X509_V_OK)
    fprintf(stderr,
            "##### Certificate verification error (%i) but continuing...\n",
            (int) verify_flag);

  /* 获取主页作为示例数据 */
  sprintf(request,
          "GET / HTTP/1.1\x0D\x0AHost: %s\x0D\x0A\x43onnection: Close\x0D\x0A\x0D\x0A",
          hostname);
  BIO_puts(bio, request);

  /* 从服务器读取 HTTP 响应并打印到输出 */
  while (1) {
    memset(response, '\0', sizeof(response));
    int n = BIO_read(bio, response, BuffSize);
    if (n <= 0) break; /* 0 代表流结束,< 0 代表有错误 */
  puts(response);
  }

  cleanup(ctx, bio);
}

int main() {
  init_ssl();

  const char* hostname = "www.google.com:443";
  fprintf(stderr, "Trying an HTTPS connection to %s...\n", hostname);
  secure_connect(hostname);

return 0;
}

可以从命令行编译和执行该程序(请注意 -lssl-lcrypto 中的小写字母 L):

gcc -o client client.c -lssl -lcrypto

该程序尝试打开与网站 www.google.com 的安全连接。在与 Google Web 服务器的 TLS 握手过程中,client 程序会收到一个或多个数字证书,该程序会尝试对其进行验证(但在我的系统上失败了)。尽管如此,client 程序仍继续通过安全通道获取 Google 主页。该程序取决于前面提到的安全工件,尽管在上述代码中只着重突出了数字证书。但其它工件仍在幕后发挥作用,稍后将对它们进行详细说明。

通常,打开 HTTP(非安全)通道的 C 或 C++ 的客户端程序将使用诸如文件描述符网络套接字之类的结构,它们是两个进程(例如,这个 client 程序和 Google Web 服务器)之间连接的端点。另一方面,文件描述符是一个非负整数值,用于在程序中标识该程序打开的任何文件类的结构。这样的程序还将使用一种结构来指定有关 Web 服务器地址的详细信息。

这些相对较低级别的结构不会出现在客户端程序中,因为 OpenSSL 库会将套接字基础设施和地址规范等封装在更高层面的安全结构中。其结果是一个简单的 API。下面首先看一下 client 程序示例中的安全性详细信息。

  • 该程序首先加载相关的 OpenSSL 库,我的函数 init_ssl 中对 OpenSSL 进行了两次调用:
SSL_load_error_strings();
SSL_library_init(); 
  • 下一个初始化步骤尝试获取安全上下文,这是建立和维护通往 Web 服务器的安全通道所需的信息框架。如对 OpenSSL 库函数的调用所示,在示例中使用了 TLS 1.2:
const SSL_METHOD* method = TLSv1_2_client_method(); /* TLS 1.2 */

如果调用成功,则将 method 指针被传递给库函数,该函数创建类型为 SSL_CTX 的上下文:

SSL_CTX* ctx = SSL_CTX_new(method);

client 程序会检查每个关键的库调用的错误,如果其中一个调用失败,则程序终止。

  • 现在还有另外两个 OpenSSL 工件也在发挥作用:SSL 类型的安全会话,从头到尾管理安全连接;以及类型为 BIO( 基本输入/输出 Basic Input/Output )的安全流,用于与 Web 服务器进行通信。BIO 流是通过以下调用生成的:
BIO* bio = BIO_new_ssl_connect(ctx);

请注意,这个最重要的上下文是其参数。BIO 类型是 C 语言中 FILE 类型的 OpenSSL 封装器。此封装器可保护 client 程序与 Google 的网络服务器之间的输入和输出流的安全。

  • 有了 SSL_CTXBIO,然后程序在 SSL 会话中将它们组合在一起。三个库调用可以完成工作:
BIO_get_ssl(bio, &ssl); /* 会话 */
SSL_set_mode(ssl, SSL_MODE_AUTO_RETRY); /* 鲁棒性 */
BIO_set_conn_hostname(bio, name); /* 准备连接 */

安全连接本身是通过以下调用建立的:

BIO_do_connect(bio);

如果最后一个调用不成功,则 client 程序终止;否则,该连接已准备就绪,可以支持 client 程序与 Google Web 服务器之间的机密对话。

在与 Web 服务器握手期间,client 程序会接收一个或多个数字证书,以认证服务器的身份。但是,client 程序不会发送自己的证书,这意味着这个身份验证是单向的。(Web 服务器通常配置为需要客户端证书)尽管对 Web 服务器证书的验证失败,但 client 程序仍通过了连接到 Web 服务器的安全通道继续获取 Google 主页。

为什么验证 Google 证书的尝试会失败?典型的 OpenSSL 安装目录为 /etc/ssl/certs,其中包含 ca-certificates.crt 文件。该目录和文件包含着 OpenSSL 自带的数字证书,以此构成 信任库 truststore 。可以根据需要更新信任库,尤其是可以包括新信任的证书,并删除不再受信任的证书。

client 程序从 Google Web 服务器收到了三个证书,但是我的计算机上的 OpenSSL 信任库并不包含完全匹配的证书。如目前所写,client 程序不会通过例如验证 Google 证书上的数字签名(一个用来证明该证书的签名)来解决此问题。如果该签名是受信任的,则包含该签名的证书也应受信任。尽管如此,client 程序仍继续获取页面,然后打印出 Google 的主页。下一节将更详细地介绍这些。

客户端程序中隐藏的安全性

让我们从客户端示例中可见的安全工件(数字证书)开始,然后考虑其他安全工件如何与之相关。数字证书的主要格式标准是 X509,生产级的证书由诸如 Verisign 证书颁发机构 Certificate Authority (CA)颁发。

数字证书中包含各种信息(例如,激活日期和失效日期以及所有者的域名),也包括发行者的身份和数字签名(这是加密过的加密哈希值)。证书还具有未加密的哈希值,用作其标识指纹

哈希值来自将任意数量的二进制位映射到固定长度的摘要。这些位代表什么(会计报告、小说或数字电影)无关紧要。例如, 消息摘要版本 5 Message Digest version 5 (MD5)哈希算法将任意长度的输入位映射到 128 位哈希值,而 SHA1( 安全哈希算法版本 1 Secure Hash Algorithm version 1 )算法将输入位映射到 160 位哈希值。不同的输入位会导致不同的(实际上在统计学上是唯一的)哈希值。下一篇文章将会进行更详细的介绍,并着重介绍什么使哈希函数具有加密功能。

数字证书的类型有所不同(例如根证书、中间证书和最终实体证书),并形成了反映这些证书类型的层次结构。顾名思义,证书位于层次结构的顶部,其下的证书继承了根证书所具有的信任。OpenSSL 库和大多数现代编程语言都具有 X509 数据类型以及处理此类证书的函数。来自 Google 的证书具有 X509 格式,client 程序会检查该证书是否为 X509_V_OK

X509 证书基于 公共密钥基础结构 public-key infrastructure (PKI),其中包括的算法(RSA 是占主导地位的算法)用于生成密钥对:公共密钥及其配对的私有密钥。公钥是一种身份:Amazon 的公钥对其进行标识,而我的公钥对我进行标识。私钥应由其所有者负责保密。

成对出现的密钥具有标准用途。可以使用公钥对消息进行加密,然后可以使用同一个密钥对中的私钥对消息进行解密。私钥也可以用于对文档或其他电子工件(例如程序或电子邮件)进行签名,然后可以使用该对密钥中的公钥来验证签名。以下两个示例补充了一些细节。

在第一个示例中,Alice 将她的公钥分发给全世界,包括 Bob。然后,Bob 用 Alice 的公钥加密邮件,然后将加密的邮件发送给 Alice。用 Alice 的公钥加密的邮件将可以用她的私钥解密(假设是她自己的私钥),如下所示:

             +------------------+ encrypted msg  +-------------------+
Bob's msg--->|Alice's public key|--------------->|Alice's private key|---> Bob's msg
             +------------------+                +-------------------+

理论上可以在没有 Alice 的私钥的情况下解密消息,但在实际情况中,如果使用像 RSA 这样的加密密钥对系统,则在计算上做不到。

现在,第二个示例,请对文档签名以证明其真实性。签名算法使用密钥对中的私钥来处理要签名的文档的加密哈希:

                    +-------------------+
Hash of document--->|Alice's private key|--->Alice's digital signature of the document
                    +-------------------+

假设 Alice 以数字方式签署了发送给 Bob 的合同。然后,Bob 可以使用 Alice 密钥对中的公钥来验证签名:

                                             +------------------+
Alice's digital signature of the document--->|Alice's public key|--->verified or not
                                             +------------------+

假若没有 Alice 的私钥,就无法轻松伪造 Alice 的签名:因此,Alice 有必要保密她的私钥。

client 程序中,除了数字证书以外,这些安全性都没有明确展示。下一篇文章使用使用 OpenSSL 实用程序和库函数的示例填充更多详细的信息。

命令行的 OpenSSL

同时,让我们看一下 OpenSSL 命令行实用程序:特别是在 TLS 握手期间检查来自 Web 服务器的证书的实用程序。调用 OpenSSL 实用程序可以使用 openssl 命令,然后添加参数和标志的组合以指定所需的操作。

看看以下命令:

openssl list-cipher-algorithms

该输出是组成 加密算法套件 cipher suite 的相关算法的列表。下面是列表的开头,加了澄清首字母缩写词的注释:

AES-128-CBC ## Advanced Encryption Standard, Cipher Block Chaining
AES-128-CBC-HMAC-SHA1 ## Hash-based Message Authentication Code with SHA1 hashes
AES-128-CBC-HMAC-SHA256 ## ditto, but SHA256 rather than SHA1
...

下一条命令使用参数 s_client 将打开到 www.google.com 的安全连接,并在屏幕上显示有关此连接的所有信息:

openssl s_client -connect www.google.com:443 -showcerts

端口号 443 是 Web 服务器用于接收 HTTPS(而不是 HTTP 连接)的标准端口号。(对于 HTTP,标准端口为 80)Web 地址 www.google.com:443 也出现在 client 程序的代码中。如果尝试连接成功,则将显示来自 Google 的三个数字证书以及有关安全会话、正在使用的加密算法套件以及相关项目的信息。例如,这是开头的部分输出,它声明证书链即将到来。证书的编码为 base64:

Certificate chain
 0 s:/C=US/ST=California/L=Mountain View/O=Google LLC/CN=www.google.com
 i:/C=US/O=Google Trust Services/CN=Google Internet Authority G3
-----BEGIN CERTIFICATE-----
MIIEijCCA3KgAwIBAgIQdCea9tmy/T6rK/dDD1isujANBgkqhkiG9w0BAQsFADBU
MQswCQYDVQQGEwJVUzEeMBwGA1UEChMVR29vZ2xlIFRydXN0IFNlcnZpY2VzMSUw
...

诸如 Google 之类的主要网站通常会发送多个证书进行身份验证。

输出以有关 TLS 会话的摘要信息结尾,包括加密算法套件的详细信息:

SSL-Session:
    Protocol : TLSv1.2
    Cipher : ECDHE-RSA-AES128-GCM-SHA256
    Session-ID: A2BBF0E4991E6BBBC318774EEE37CFCB23095CC7640FFC752448D07C7F438573
...

client 程序中使用了协议 TLS 1.2,Session-ID 唯一地标识了 openssl 实用程序和 Google Web 服务器之间的连接。Cipher 条目可以按以下方式进行解析:

  • ECDHE 椭圆曲线 Diffie-Hellman(临时) Elliptic Curve Diffie Hellman Ephemeral )是一种用于管理 TLS 握手的高效的有效算法。尤其是,ECDHE 通过确保连接双方(例如,client 程序和 Google Web 服务器)使用相同的加密/解密密钥(称为会话密钥)来解决“密钥分发问题”。后续文章会深入探讨该细节。
  • RSA(Rivest Shamir Adleman)是主要的公共密钥密码系统,并以 1970 年代末首次描述了该系统的三位学者的名字命名。这个正在使用的密钥对是使用 RSA 算法生成的。
  • AES128 高级加密标准 Advanced Encryption Standard )是一种 块式加密算法 block cipher ,用于加密和解密 位块 blocks of bits 。(另一种算法是 流式加密算法 stream cipher ,它一次加密和解密一个位。)这个加密算法是对称加密算法,因为使用同一个密钥进行加密和解密,这首先引起了密钥分发问题。AES 支持 128(此处使用)、192 和 256 位的密钥大小:密钥越大,安全性越好。

通常,像 AES 这样的对称加密系统的密钥大小要小于像 RSA 这样的非对称(基于密钥对)系统的密钥大小。例如,1024 位 RSA 密钥相对较小,而 256 位密钥则当前是 AES 最大的密钥。

  • GCM 伽罗瓦计数器模式 Galois Counter Mode )处理在安全对话期间重复应用的加密算法(在这种情况下为 AES128)。AES128 块的大小仅为 128 位,安全对话很可能包含从一侧到另一侧的多个 AES128 块。GCM 非常有效,通常与 AES128 搭配使用。
  • SHA256 256 位安全哈希算法 Secure Hash Algorithm 256 bits )是我们正在使用的加密哈希算法。生成的哈希值的大小为 256 位,尽管使用 SHA 甚至可以更大。

加密算法套件正在不断发展中。例如,不久前,Google 使用 RC4 流加密算法(RSA 的 Ron Rivest 后来开发的 Ron’s Cipher 版本 4)。 RC4 现在有已知的漏洞,这大概部分导致了 Google 转换为 AES128。

总结

我们通过安全的 C Web 客户端和各种命令行示例对 OpenSSL 做了首次了解,使一些需要进一步阐明的主题脱颖而出。下一篇文章会详细介绍,从加密散列开始,到对数字证书如何应对密钥分发挑战为结束的更全面讨论。


via: https://opensource.com/article/19/6/cryptography-basics-openssl-part-1

作者:Marty Kalin 选题:lujun9972 译者:wxy 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

在以不同语言编写并在不同平台上运行的应用程序之间交换数据时,Protobuf 编码可提高效率。

协议缓冲区 Protocol Buffers Protobufs)像 XML 和 JSON 一样,可以让用不同语言编写并在不同平台上运行的应用程序交换数据。例如,用 Go 编写的发送程序可以在 Protobuf 中对以 Go 表示的销售订单数据进行编码,然后用 Java 编写的接收方可以对它进行解码,以获取所接收订单数据的 Java 表示方式。这是在网络连接上的结构示意图:

Go 销售订单 —> Pbuf 编码 —> 网络 —> Pbuf 界面 —> Java 销售订单

与 XML 和 JSON 相比,Protobuf 编码是二进制而不是文本,这会使调试复杂化。但是,正如本文中的代码示例所确认的那样,Protobuf 编码在大小上比 XML 或 JSON 编码要有效得多。

Protobuf 以另一种方式提供了这种有效性。在实现级别,Protobuf 和其他编码系统对结构化数据进行 序列化 serialize 反序列化 deserialize 。序列化将特定语言的数据结构转换为字节流,反序列化是将字节流转换回特定语言的数据结构的逆运算。序列化和反序列化可能成为数据交换的瓶颈,因为这些操作会占用大量 CPU。高效的序列化和反序列化是 Protobuf 的另一个设计目标。

最近的编码技术,例如 Protobuf 和 FlatBuffers,源自 1990 年代初期的 DCE/RPC 分布式计算环境/远程过程调用 Distributed Computing Environment/Remote Procedure Call )计划。与 DCE/RPC 一样,Protobuf 在数据交换中为 IDL(接口定义语言)和编码层做出了贡献。

本文将着眼于这两层,然后提供 Go 和 Java 中的代码示例以充实 Protobuf 的细节,并表明 Protobuf 是易于使用的。

Protobuf 作为一个 IDL 和编码层

像 Protobuf 一样,DCE/RPC 被设计为与语言和平台无关。适当的库和实用程序允许任何语言和平台用于 DCE/RPC 领域。此外,DCE/RPC 体系结构非常优雅。IDL 文档是一侧的远程过程与另一侧的调用者之间的协定。Protobuf 也是以 IDL 文档为中心的。

IDL 文档是文本,在 DCE/RPC 中,使用基本 C 语法以及元数据的语法扩展(方括号)和一些新的关键字,例如 interface。这是一个例子:

[uuid (2d6ead46-05e3-11ca-7dd1-426909beabcd), version(1.0)]
interface echo {
   const long int ECHO_SIZE = 512;
   void echo(
      [in]          handle_t h,
      [in, string]  idl_char from_client[ ],
      [out, string] idl_char from_service[ECHO_SIZE]
   );
}

该 IDL 文档声明了一个名为 echo 的过程,该过程带有三个参数:类型为 handle_t(实现指针)和 idl_char(ASCII 字符数组)的 [in] 参数被传递给远程过程,而 [out] 参数(也是一个字符串)从该过程中传回。在此示例中,echo 过程不会显式返回值(echo 左侧的 void),但也可以返回值。返回值,以及一个或多个 [out] 参数,允许远程过程任意返回许多值。下一节将介绍 Protobuf IDL,它的语法不同,但同样用作数据交换中的协定。

DCE/RPC 和 Protobuf 中的 IDL 文档是创建用于交换数据的基础结构代码的实用程序的输入:

IDL 文档 —> DCE/PRC 或 Protobuf 实用程序 —> 数据交换的支持代码

作为相对简单的文本,IDL 是同样便于人类阅读的关于数据交换细节的文档(特别是交换的数据项的数量和每个项的数据类型)。

Protobuf 可用于现代 RPC 系统,例如 gRPC;但是 Protobuf 本身仅提供 IDL 层和编码层,用于从发送者传递到接收者的消息。与原本的 DCE/RPC 一样,Protobuf 编码是二进制的,但效率更高。

目前,XML 和 JSON 编码仍在通过 Web 服务等技术进行的数据交换中占主导地位,这些技术利用 Web 服务器、传输协议(例如 TCP、HTTP)以及标准库和实用程序等原有的基础设施来处理 XML 和 JSON 文档。 此外,各种类型的数据库系统可以存储 XML 和 JSON 文档,甚至旧式关系型系统也可以轻松生成查询结果的 XML 编码。现在,每种通用编程语言都具有支持 XML 和 JSON 的库。那么,是什么让我们回到 Protobuf 之类的二进制编码系统呢?

让我们看一下负十进制值 -128。以 2 的补码二进制表示形式(在系统和语言中占主导地位)中,此值可以存储在单个 8 位字节中:10000000。此整数值在 XML 或 JSON 中的文本编码需要多个字节。例如,UTF-8 编码需要四个字节的字符串,即 -128,即每个字符一个字节(十六进制,值为 0x2d0x310x320x38)。XML 和 JSON 还添加了标记字符,例如尖括号和大括号。有关 Protobuf 编码的详细信息下面就会介绍,但现在的关注点是一个通用点:文本编码的压缩性明显低于二进制编码。

在 Go 中使用 Protobuf 的示例

我的代码示例着重于 Protobuf 而不是 RPC。以下是第一个示例的概述:

  • 名为 dataitem.proto 的 IDL 文件定义了一个 Protobuf 消息,它具有六个不同类型的字段:具有不同范围的整数值、固定大小的浮点值以及两个不同长度的字符串。
  • Protobuf 编译器使用 IDL 文件生成 Go 版本(以及后面的 Java 版本)的 Protobuf 消息及支持函数。
  • Go 应用程序使用随机生成的值填充原生的 Go 数据结构,然后将结果序列化为本地文件。为了进行比较, XML 和 JSON 编码也被序列化为本地文件。
  • 作为测试,Go 应用程序通过反序列化 Protobuf 文件的内容来重建其原生数据结构的实例。
  • 作为语言中立性测试,Java 应用程序还会对 Protobuf 文件的内容进行反序列化以获取原生数据结构的实例。

我的网站上提供了该 IDL 文件以及两个 Go 和一个 Java 源文件,打包为 ZIP 文件。

最重要的 Protobuf IDL 文档如下所示。该文档存储在文件 dataitem.proto 中,并具有常规的.proto 扩展名。

示例 1、Protobuf IDL 文档

syntax = "proto3";

package main;

message DataItem {
  int64  oddA  = 1;
  int64  evenA = 2;
  int32  oddB  = 3;
  int32  evenB = 4;
  float  small = 5;
  float  big   = 6;
  string short = 7;
  string long  = 8;
}

该 IDL 使用当前的 proto3 而不是较早的 proto2 语法。软件包名称(在本例中为 main)是可选的,但是惯例使用它以避免名称冲突。这个结构化的消息包含八个字段,每个字段都有一个 Protobuf 数据类型(例如,int64string)、名称(例如,oddAshort)和一个等号 = 之后的数字标签(即键)。标签(在此示例中为 1 到 8)是唯一的整数标识符,用于确定字段序列化的顺序。

Protobuf 消息可以嵌套到任意级别,而一个消息可以是另外一个消息的字段类型。这是一个使用 DataItem 消息作为字段类型的示例:

message DataItems {
  repeated DataItem item = 1;
}

单个 DataItems 消息由重复的(零个或多个)DataItem 消息组成。

为了清晰起见,Protobuf 还支持枚举类型:

enum PartnershipStatus {
  reserved "FREE", "CONSTRAINED", "OTHER";
}

reserved 限定符确保用于实现这三个符号名的数值不能重复使用。

为了生成一个或多个声明 Protobuf 消息结构的特定于语言的版本,包含这些结构的 IDL 文件被传递到protoc 编译器(可在 Protobuf GitHub 存储库中找到)。对于 Go 代码,可以以通常的方式安装支持的 Protobuf 库(这里以 作为命令行提示符):

% go get github.com/golang/protobuf/proto

将 Protobuf IDL 文件 dataitem.proto 编译为 Go 源代码的命令是:

% protoc --go_out=. dataitem.proto

标志 --go_out 指示编译器生成 Go 源代码。其他语言也有类似的标志。在这种情况下,结果是一个名为 dataitem.pb.go 的文件,该文件足够小,可以将其基本内容复制到 Go 应用程序中。以下是生成的代码的主要部分:

var _ = proto.Marshal

type DataItem struct {
   OddA  int64   `protobuf:"varint,1,opt,name=oddA" json:"oddA,omitempty"`
   EvenA int64   `protobuf:"varint,2,opt,name=evenA" json:"evenA,omitempty"`
   OddB  int32   `protobuf:"varint,3,opt,name=oddB" json:"oddB,omitempty"`
   EvenB int32   `protobuf:"varint,4,opt,name=evenB" json:"evenB,omitempty"`
   Small float32 `protobuf:"fixed32,5,opt,name=small" json:"small,omitempty"`
   Big   float32 `protobuf:"fixed32,6,opt,name=big" json:"big,omitempty"`
   Short string  `protobuf:"bytes,7,opt,name=short" json:"short,omitempty"`
   Long  string  `protobuf:"bytes,8,opt,name=long" json:"long,omitempty"`
}

func (m *DataItem) Reset()         { *m = DataItem{} }
func (m *DataItem) String() string { return proto.CompactTextString(m) }
func (*DataItem) ProtoMessage()    {}
func init() {}

编译器生成的代码具有 Go 结构 DataItem,该结构导出 Go 字段(名称现已大写开头),该字段与 Protobuf IDL 中声明的名称匹配。该结构字段具有标准的 Go 数据类型:int32int64float32string。在每个字段行的末尾,是描述 Protobuf 类型的字符串,提供 Protobuf IDL 文档中的数字标签及有关 JSON 信息的元数据,这将在后面讨论。

此外也有函数;最重要的是 Proto.Marshal,用于将 DataItem 结构的实例序列化为 Protobuf 格式。辅助函数包括:清除 DataItem 结构的 Reset,生成 DataItem 的单行字符串表示的 String

描述 Protobuf 编码的元数据应在更详细地分析 Go 程序之前进行仔细研究。

Protobuf 编码

Protobuf 消息的结构为键/值对的集合,其中数字标签为键,相应的字段为值。字段名称(例如,oddAsmall)是供人类阅读的,但是 protoc 编译器的确使用了字段名称来生成特定于语言的对应名称。例如,Protobuf IDL 中的 oddAsmall 名称在 Go 结构中分别成为字段 OddASmall

键和它们的值都被编码,但是有一个重要的区别:一些数字值具有固定大小的 32 或 64 位的编码,而其他数字(包括消息标签)则是 varint 编码的,位数取决于整数的绝对值。例如,整数值 1 到 15 需要 8 位 varint 编码,而值 16 到 2047 需要 16 位。varint 编码在本质上与 UTF-8 编码类似(但细节不同),它偏爱较小的整数值而不是较大的整数值。(有关详细分析,请参见 Protobuf 编码指南)结果是,Protobuf 消息应该在字段中具有较小的整数值(如果可能),并且键数应尽可能少,但每个字段至少得有一个键。

下表 1 列出了 Protobuf 编码的要点:

编码示例类型长度
varintint32uint32int64可变长度
fixedfixed32floatdouble固定的 32 位或 64 位长度
字节序列stringbytes序列长度

表 1. Protobuf 数据类型

未明确固定长度的整数类型是 varint 编码的;因此,在 varint 类型中,例如 uint32u 代表无符号),数字 32 描述了整数的范围(在这种情况下为 0 到 2 32 - 1),而不是其位的大小,该位大小取决于值。相比之下,对于固定长度类型(例如 fixed32double),Protobuf 编码分别需要 32 位和 64 位。Protobuf 中的字符串是字节序列;因此,字段编码的大小就是字节序列的长度。

另一个高效的方法值得一提。回想一下前面的示例,其中的 DataItems 消息由重复的 DataItem 实例组成:

message DataItems {
  repeated DataItem item = 1;
}

repeated 表示 DataItem 实例是打包的:集合具有单个标签,在这里是 1。因此,具有重复的 DataItem 实例的 DataItems 消息比具有多个但单独的 DataItem 字段、每个字段都需要自己的标签的消息的效率更高。

了解了这一背景,让我们回到 Go 程序。

dataItem 程序的细节

dataItem 程序创建一个 DataItem 实例,并使用适当类型的随机生成的值填充字段。Go 有一个 rand 包,带有用于生成伪随机整数和浮点值的函数,而我的 randString 函数可以从字符集中生成指定长度的伪随机字符串。设计目标是要有一个具有不同类型和位大小的字段值的 DataItem 实例。例如,OddAEvenA 值分别是 64 位非负整数值的奇数和偶数;但是 OddBEvenB 变体的大小为 32 位,并存放 0 到 2047 之间的小整数值。随机浮点值的大小为 32 位,字符串为 16(Short)和 32(Long)字符的长度。这是用随机值填充 DataItem 结构的代码段:

// 可变长度整数
n1 := rand.Int63()        // 大整数
if (n1 & 1) == 0 { n1++ } // 确保其是奇数
...
n3 := rand.Int31() % UpperBound // 小整数
if (n3 & 1) == 0 { n3++ }       // 确保其是奇数

// 固定长度浮点数
...
t1 := rand.Float32()
t2 := rand.Float32()
...
// 字符串
str1 := randString(StrShort)
str2 := randString(StrLong)

// 消息
dataItem := &DataItem {
   OddA:  n1,
   EvenA: n2,
   OddB:  n3,
   EvenB: n4,
   Big:   f1,
   Small: f2,
   Short: str1,
   Long:  str2,
}

创建并填充值后,DataItem 实例将以 XML、JSON 和 Protobuf 进行编码,每种编码均写入本地文件:

func encodeAndserialize(dataItem *DataItem) {
   bytes, _ := xml.MarshalIndent(dataItem, "", " ")  // Xml to dataitem.xml
   ioutil.WriteFile(XmlFile, bytes, 0644)            // 0644 is file access permissions

   bytes, _ = json.MarshalIndent(dataItem, "", " ")  // Json to dataitem.json
   ioutil.WriteFile(JsonFile, bytes, 0644)

   bytes, _ = proto.Marshal(dataItem)                // Protobuf to dataitem.pbuf
   ioutil.WriteFile(PbufFile, bytes, 0644)
}

这三个序列化函数使用术语 marshal,它与 serialize 意思大致相同。如代码所示,三个 Marshal 函数均返回一个字节数组,然后将其写入文件。(为简单起见,忽略可能的错误处理。)在示例运行中,文件大小为:

dataitem.xml:  262 bytes
dataitem.json: 212 bytes
dataitem.pbuf:  88 bytes

Protobuf 编码明显小于其他两个编码方案。通过消除缩进字符(在这种情况下为空白和换行符),可以稍微减小 XML 和 JSON 序列化的大小。

以下是 dataitem.json 文件,该文件最终是由 json.MarshalIndent 调用产生的,并添加了以 ## 开头的注释:

{
 "oddA":  4744002665212642479,                ## 64-bit >= 0
 "evenA": 2395006495604861128,                ## ditto
 "oddB":  57,                                 ## 32-bit >= 0 but < 2048
 "evenB": 468,                                ## ditto
 "small": 0.7562016,                          ## 32-bit floating-point
 "big":   0.85202795,                         ## ditto
 "short": "ClH1oDaTtoX$HBN5",                 ## 16 random chars
 "long":  "xId0rD3Cri%3Wt%^QjcFLJgyXBu9^DZI"  ## 32 random chars
}

尽管这些序列化的数据写入到本地文件中,但是也可以使用相同的方法将数据写入网络连接的输出流。

测试序列化和反序列化

Go 程序接下来通过将先前写入 dataitem.pbuf 文件的字节反序列化为 DataItem 实例来运行基本测试。这是代码段,其中去除了错误检查部分:

filebytes, err := ioutil.ReadFile(PbufFile) // get the bytes from the file
...
testItem.Reset()                            // clear the DataItem structure
err = proto.Unmarshal(filebytes, testItem)  // deserialize into a DataItem instance

用于 Protbuf 反序列化的 proto.Unmarshal 函数与 proto.Marshal 函数相反。原始的 DataItem 和反序列化的副本将被打印出来以确认完全匹配:

Original:
2041519981506242154 3041486079683013705 1192 1879
0.572123 0.326855
boPb#T0O8Xd&Ps5EnSZqDg4Qztvo7IIs 9vH66AiGSQgCDxk&

Deserialized:
2041519981506242154 3041486079683013705 1192 1879
0.572123 0.326855
boPb#T0O8Xd&Ps5EnSZqDg4Qztvo7IIs 9vH66AiGSQgCDxk&

一个 Java Protobuf 客户端

用 Java 写的示例是为了确认 Protobuf 的语言中立性。原始 IDL 文件可用于生成 Java 支持代码,其中涉及嵌套类。但是,为了抑制警告信息,可以进行一些补充。这是修订版,它指定了一个 DataMsg 作为外部类的名称,内部类在该 Protobuf 消息后面自动命名为 DataItem

syntax = "proto3";

package main;

option java_outer_classname = "DataMsg";

message DataItem {
...

进行此更改后,protoc 编译与以前相同,只是所期望的输出现在是 Java 而不是 Go:

% protoc --java_out=. dataitem.proto

生成的源文件(在名为 main 的子目录中)为 DataMsg.java,长度约为 1,120 行:Java 并不简洁。编译然后运行 Java 代码需要具有 Protobuf 库支持的 JAR 文件。该文件位于 Maven 存储库中。

放置好这些片段后,我的测试代码相对较短(并且在 ZIP 文件中以 Main.java 形式提供):

package main;
import java.io.FileInputStream;

public class Main {
   public static void main(String[] args) {
      String path = "dataitem.pbuf";  // from the Go program's serialization
      try {
         DataMsg.DataItem deserial =
           DataMsg.DataItem.newBuilder().mergeFrom(new FileInputStream(path)).build();

         System.out.println(deserial.getOddA()); // 64-bit odd
         System.out.println(deserial.getLong()); // 32-character string
      }
      catch(Exception e) { System.err.println(e); }
    }
}

当然,生产级的测试将更加彻底,但是即使是该初步测试也可以证明 Protobuf 的语言中立性:dataitem.pbuf 文件是 Go 程序对 Go 语言版的 DataItem 进行序列化的结果,并且该文件中的字节被反序列化以产生一个 Java 语言的 DataItem 实例。Java 测试的输出与 Go 测试的输出相同。

用 numPairs 程序来结束

让我们以一个示例作为结尾,来突出 Protobuf 效率,但又强调在任何编码技术中都会涉及到的成本。考虑以下 Protobuf IDL 文件:

syntax = "proto3";
package main;

message NumPairs {
  repeated NumPair pair = 1;
}

message NumPair {
  int32 odd = 1;
  int32 even = 2;
}

NumPair 消息由两个 int32 值以及每个字段的整数标签组成。NumPairs 消息是嵌入的 NumPair 消息的序列。

Go 语言的 numPairs 程序(如下)创建了 200 万个 NumPair 实例,每个实例都附加到 NumPairs 消息中。该消息可以按常规方式进行序列化和反序列化。

示例 2、numPairs 程序

package main

import (
   "math/rand"
   "time"
   "encoding/xml"
   "encoding/json"
   "io/ioutil"
   "github.com/golang/protobuf/proto"
)

// protoc-generated code: start
var _ = proto.Marshal
type NumPairs struct {
   Pair []*NumPair `protobuf:"bytes,1,rep,name=pair" json:"pair,omitempty"`
}

func (m *NumPairs) Reset()         { *m = NumPairs{} }
func (m *NumPairs) String() string { return proto.CompactTextString(m) }
func (*NumPairs) ProtoMessage()    {}
func (m *NumPairs) GetPair() []*NumPair {
   if m != nil { return m.Pair }
   return nil
}

type NumPair struct {
   Odd  int32 `protobuf:"varint,1,opt,name=odd" json:"odd,omitempty"`
   Even int32 `protobuf:"varint,2,opt,name=even" json:"even,omitempty"`
}

func (m *NumPair) Reset()         { *m = NumPair{} }
func (m *NumPair) String() string { return proto.CompactTextString(m) }
func (*NumPair) ProtoMessage()    {}
func init() {}
// protoc-generated code: finish

var numPairsStruct NumPairs
var numPairs = &numPairsStruct

func encodeAndserialize() {
   // XML encoding
   filename := "./pairs.xml"
   bytes, _ := xml.MarshalIndent(numPairs, "", " ")
   ioutil.WriteFile(filename, bytes, 0644)

   // JSON encoding
   filename = "./pairs.json"
   bytes, _ = json.MarshalIndent(numPairs, "", " ")
   ioutil.WriteFile(filename, bytes, 0644)

   // ProtoBuf encoding
   filename = "./pairs.pbuf"
   bytes, _ = proto.Marshal(numPairs)
   ioutil.WriteFile(filename, bytes, 0644)
}

const HowMany = 200 * 100  * 100 // two million

func main() {
   rand.Seed(time.Now().UnixNano())

   // uncomment the modulus operations to get the more efficient version
   for i := 0; i < HowMany; i++ {
      n1 := rand.Int31() // % 2047
      if (n1 & 1) == 0 { n1++ } // ensure it's odd
      n2 := rand.Int31() // % 2047
      if (n2 & 1) == 1 { n2++ } // ensure it's even

      next := &NumPair {
                 Odd:  n1,
                 Even: n2,
              }
      numPairs.Pair = append(numPairs.Pair, next)
   }
   encodeAndserialize()
}

每个 NumPair 中随机生成的奇数和偶数值的范围在 0 到 20 亿之间变化。就原始数据(而非编码数据)而言,Go 程序中生成的整数总共为 16MB:每个 NumPair 为两个整数,总计为 400 万个整数,每个值的大小为四个字节。

为了进行比较,下表列出了 XML、JSON 和 Protobuf 编码的示例 NumsPairs 消息的 200 万个 NumPair 实例。原始数据也包括在内。由于 numPairs 程序生成随机值,因此样本运行的输出有所不同,但接近表中显示的大小。

编码文件字节大小Pbuf/其它 比例
pairs.raw16MB169%
Protobufpairs.pbuf27MB
JSONpairs.json100MB27%
XMLpairs.xml126MB21%

表 2. 16MB 整数的编码开销

不出所料,Protobuf 和之后的 XML 和 JSON 差别明显。Protobuf 编码大约是 JSON 的四分之一,是 XML 的五分之一。但是原始数据清楚地表明 Protobuf 也会产生编码开销:序列化的 Protobuf 消息比原始数据大 11MB。包括 Protobuf 在内的任何编码都涉及结构化数据,这不可避免地会增加字节。

序列化的 200 万个 NumPair 实例中的每个实例都包含个整数值:Go 结构中的 EvenOdd 字段分别一个,而 Protobuf 编码中的每个字段、每个标签一个。对于原始数据(而不是编码数据),每个实例将达到 16 个字节,样本 NumPairs 消息中有 200 万个实例。但是 Protobuf 标记(如 NumPair 字段中的 int32 值)使用 varint 编码,因此字节长度有所不同。特别是,小的整数值(在这种情况下,包括标签在内)需要不到四个字节进行编码。

如果对 numPairs 程序进行了修改,以使两个 NumPair 字段的值小于 2048,且其编码为一或两个字节,则 Protobuf 编码将从 27MB 下降到 16MB,这正是原始数据的大小。下表总结了样本运行中的新编码大小。

编码文件字节大小Pbuf/其它 比例
Nonepairs.raw16MB100%
Protobufpairs.pbuf16MB
JSONpairs.json77MB21%
XMLpairs.xml103MB15%

表 3. 编码 16MB 的小于 2048 的整数

总之,修改后的 numPairs 程序的字段值小于 2048,可减少原始数据中每个四字节整数值的大小。但是 Protobuf 编码仍然需要标签,这些标签会在 Protobuf 消息中添加字节。Protobuf 编码确实会增加消息大小,但是如果要编码相对较小的整数值(无论是字段还是键),则可以通过 varint 因子来减少此开销。

对于包含混合类型的结构化数据(且整数值相对较小)的中等大小的消息,Protobuf 明显优于 XML 和 JSON 等选项。在其他情况下,数据可能不适合 Protobuf 编码。例如,如果两个应用程序需要共享大量文本记录或大整数值,则可以采用压缩而不是编码技术。


via: https://opensource.com/article/19/10/protobuf-data-interchange

作者:Marty Kalin 选题:lujun9972 译者:wxy 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

学习在 Linux 中进程是如何与其他进程进行同步的。

本篇是 Linux 下进程间通信(IPC)系列的第三篇同时也是最后一篇文章。第一篇文章聚焦在通过共享存储(文件和共享内存段)来进行 IPC,第二篇文章则通过管道(无名的或者命名的)及消息队列来达到相同的目的。这篇文章将目光从高处(套接字)然后到低处(信号)来关注 IPC。代码示例将用力地充实下面的解释细节。

套接字

正如管道有两种类型(命名和无名)一样,套接字也有两种类型。IPC 套接字(即 Unix 套接字)给予进程在相同设备(主机)上基于通道的通信能力;而网络套接字给予进程运行在不同主机的能力,因此也带来了网络通信的能力。网络套接字需要底层协议的支持,例如 TCP(传输控制协议)或 UDP(用户数据报协议)。

与之相反,IPC 套接字依赖于本地系统内核的支持来进行通信;特别的,IPC 通信使用一个本地的文件作为套接字地址。尽管这两种套接字的实现有所不同,但在本质上,IPC 套接字和网络套接字的 API 是一致的。接下来的例子将包含网络套接字的内容,但示例服务器和客户端程序可以在相同的机器上运行,因为服务器使用了 localhost(127.0.0.1)这个网络地址,该地址表示的是本地机器上的本地机器地址。

套接字以流的形式(下面将会讨论到)被配置为双向的,并且其控制遵循 C/S(客户端/服务器端)模式:客户端通过尝试连接一个服务器来初始化对话,而服务器端将尝试接受该连接。假如万事顺利,来自客户端的请求和来自服务器端的响应将通过管道进行传输,直到其中任意一方关闭该通道,从而断开这个连接。

一个迭代服务器(只适用于开发)将一直和连接它的客户端打交道:从最开始服务第一个客户端,然后到这个连接关闭,然后服务第二个客户端,循环往复。这种方式的一个缺点是处理一个特定的客户端可能会挂起,使得其他的客户端一直在后面等待。生产级别的服务器将是并发的,通常使用了多进程或者多线程的混合。例如,我台式机上的 Nginx 网络服务器有一个 4 个 工人 worker 的进程池,它们可以并发地处理客户端的请求。在下面的代码示例中,我们将使用迭代服务器,使得我们将要处理的问题保持在一个很小的规模,只关注基本的 API,而不去关心并发的问题。

最后,随着各种 POSIX 改进的出现,套接字 API 随着时间的推移而发生了显著的变化。当前针对服务器端和客户端的示例代码特意写的比较简单,但是它着重强调了基于流的套接字中连接的双方。下面是关于流控制的一个总结,其中服务器端在一个终端中开启,而客户端在另一个不同的终端中开启:

  • 服务器端等待客户端的连接,对于给定的一个成功连接,它就读取来自客户端的数据。
  • 为了强调是双方的会话,服务器端会对接收自客户端的数据做回应。这些数据都是 ASCII 字符代码,它们组成了一些书的标题。
  • 客户端将书的标题写给服务器端的进程,并从服务器端的回应中读取到相同的标题。然后客户端和服务器端都在屏幕上打印出标题。下面是服务器端的输出,客户端的输出也和它完全一样:
Listening on port 9876 for clients...
War and Peace
Pride and Prejudice
The Sound and the Fury

示例 1. 使用套接字的客户端程序

#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/tcp.h>
#include <arpa/inet.h>
#include "sock.h"

void report(const char* msg, int terminate) {
  perror(msg);
  if (terminate) exit(-1); /* failure */
}

int main() {
  int fd = socket(AF_INET,     /* network versus AF_LOCAL */
          SOCK_STREAM, /* reliable, bidirectional: TCP */
          0);          /* system picks underlying protocol */
  if (fd < 0) report("socket", 1); /* terminate */
    
  /* bind the server's local address in memory */
  struct sockaddr_in saddr;
  memset(&saddr, 0, sizeof(saddr));          /* clear the bytes */
  saddr.sin_family = AF_INET;                /* versus AF_LOCAL */
  saddr.sin_addr.s_addr = htonl(INADDR_ANY); /* host-to-network endian */
  saddr.sin_port = htons(PortNumber);        /* for listening */
  
  if (bind(fd, (struct sockaddr *) &saddr, sizeof(saddr)) < 0)
    report("bind", 1); /* terminate */
    
  /* listen to the socket */
  if (listen(fd, MaxConnects) < 0) /* listen for clients, up to MaxConnects */
    report("listen", 1); /* terminate */

  fprintf(stderr, "Listening on port %i for clients...\n", PortNumber);
  /* a server traditionally listens indefinitely */
  while (1) {
    struct sockaddr_in caddr; /* client address */
    int len = sizeof(caddr);  /* address length could change */
    
    int client_fd = accept(fd, (struct sockaddr*) &caddr, &len);  /* accept blocks */
    if (client_fd < 0) {
      report("accept", 0); /* don't terminated, though there's a problem */
      continue;
    }

    /* read from client */
    int i;
    for (i = 0; i < ConversationLen; i++) {
      char buffer[BuffSize + 1];
      memset(buffer, '\0', sizeof(buffer)); 
      int count = read(client_fd, buffer, sizeof(buffer));
      if (count > 0) {
    puts(buffer);
    write(client_fd, buffer, sizeof(buffer)); /* echo as confirmation */
      }
    }
    close(client_fd); /* break connection */
  }  /* while(1) */
  return 0;
}

上面的服务器端程序执行典型的 4 个步骤来准备回应客户端的请求,然后接受其他的独立请求。这里每一个步骤都以服务器端程序调用的系统函数来命名。

  1. socket(…):为套接字连接获取一个文件描述符
  2. bind(…):将套接字和服务器主机上的一个地址进行绑定
  3. listen(…):监听客户端请求
  4. accept(…):接受一个特定的客户端请求

上面的 socket 调用的完整形式为:

int sockfd = socket(AF_INET,      /* versus AF_LOCAL */
                    SOCK_STREAM,  /* reliable, bidirectional */
                    0);           /* system picks protocol (TCP) */

第一个参数特别指定了使用的是一个网络套接字,而不是 IPC 套接字。对于第二个参数有多种选项,但 SOCK_STREAMSOCK_DGRAM(数据报)是最为常用的。基于流的套接字支持可信通道,在这种通道中如果发生了信息的丢失或者更改,都将会被报告。这种通道是双向的,并且从一端到另外一端的有效载荷在大小上可以是任意的。相反的,基于数据报的套接字大多是不可信的,没有方向性,并且需要固定大小的载荷。socket 的第三个参数特别指定了协议。对于这里展示的基于流的套接字,只有一种协议选择:TCP,在这里表示的 0。因为对 socket 的一次成功调用将返回相似的文件描述符,套接字可以被读写,对应的语法和读写一个本地文件是类似的。

bind 的调用是最为复杂的,因为它反映出了在套接字 API 方面上的各种改进。我们感兴趣的点是这个调用将一个套接字和服务器端所在机器中的一个内存地址进行绑定。但对 listen 的调用就非常直接了:

if (listen(fd, MaxConnects) < 0)

第一个参数是套接字的文件描述符,第二个参数则指定了在服务器端处理一个拒绝连接错误之前,有多少个客户端连接被允许连接。(在头文件 sock.hMaxConnects 的值被设置为 8。)

accept 调用默认将是一个阻塞等待:服务器端将不做任何事情直到一个客户端尝试连接它,然后进行处理。accept 函数返回的值如果是 -1 则暗示有错误发生。假如这个调用是成功的,则它将返回另一个文件描述符,这个文件描述符被用来指代另一个可读可写的套接字,它与 accept 调用中的第一个参数对应的接收套接字有所不同。服务器端使用这个可读可写的套接字来从客户端读取请求然后写回它的回应。接收套接字只被用于接受客户端的连接。

在设计上,服务器端可以一直运行下去。当然服务器端可以通过在命令行中使用 Ctrl+C 来终止它。

示例 2. 使用套接字的客户端

#include <string.h>
#include <stdio.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <netinet/tcp.h>
#include <netdb.h>
#include "sock.h"

const char* books[] = {"War and Peace",
               "Pride and Prejudice",
               "The Sound and the Fury"};

void report(const char* msg, int terminate) {
  perror(msg);
  if (terminate) exit(-1); /* failure */
}

int main() {
  /* fd for the socket */
  int sockfd = socket(AF_INET,      /* versus AF_LOCAL */
              SOCK_STREAM,  /* reliable, bidirectional */
              0);           /* system picks protocol (TCP) */
  if (sockfd < 0) report("socket", 1); /* terminate */

  /* get the address of the host */
  struct hostent* hptr = gethostbyname(Host); /* localhost: 127.0.0.1 */ 
  if (!hptr) report("gethostbyname", 1); /* is hptr NULL? */
  if (hptr->h_addrtype != AF_INET)       /* versus AF_LOCAL */
    report("bad address family", 1);
  
  /* connect to the server: configure server's address 1st */
  struct sockaddr_in saddr;
  memset(&saddr, 0, sizeof(saddr));
  saddr.sin_family = AF_INET;
  saddr.sin_addr.s_addr = 
     ((struct in_addr*) hptr->h_addr_list[0])->s_addr;
  saddr.sin_port = htons(PortNumber); /* port number in big-endian */
  
  if (connect(sockfd, (struct sockaddr*) &saddr, sizeof(saddr)) < 0)
    report("connect", 1);
  
  /* Write some stuff and read the echoes. */
  puts("Connect to server, about to write some stuff...");
  int i;
  for (i = 0; i < ConversationLen; i++) {
    if (write(sockfd, books[i], strlen(books[i])) > 0) {
      /* get confirmation echoed from server and print */
      char buffer[BuffSize + 1];
      memset(buffer, '\0', sizeof(buffer));
      if (read(sockfd, buffer, sizeof(buffer)) > 0)
    puts(buffer);
    }
  }
  puts("Client done, about to exit...");
  close(sockfd); /* close the connection */
  return 0;
}

客户端程序的设置代码和服务器端类似。两者主要的区别既不是在于监听也不在于接收,而是连接:

if (connect(sockfd, (struct sockaddr*) &saddr, sizeof(saddr)) < 0)

connect 的调用可能因为多种原因而导致失败,例如客户端拥有错误的服务器端地址或者已经有太多的客户端连接上了服务器端。假如 connect 操作成功,客户端将在一个 for 循环中,写入它的请求然后读取返回的响应。在会话后,服务器端和客户端都将调用 close 去关闭这个可读可写套接字,尽管任何一边的关闭操作就足以关闭它们之间的连接。此后客户端可以退出了,但正如前面提到的那样,服务器端可以一直保持开放以处理其他事务。

从上面的套接字示例中,我们看到了请求信息被回显给客户端,这使得客户端和服务器端之间拥有进行丰富对话的可能性。也许这就是套接字的主要魅力。在现代系统中,客户端应用(例如一个数据库客户端)和服务器端通过套接字进行通信非常常见。正如先前提及的那样,本地 IPC 套接字和网络套接字只在某些实现细节上面有所不同,一般来说,IPC 套接字有着更低的消耗和更好的性能。它们的通信 API 基本是一样的。

信号

信号会中断一个正在执行的程序,在这种意义下,就是用信号与这个程序进行通信。大多数的信号要么可以被忽略(阻塞)或者被处理(通过特别设计的代码)。SIGSTOP (暂停)和 SIGKILL(立即停止)是最应该提及的两种信号。这种符号常量有整数类型的值,例如 SIGKILL 对应的值为 9

信号可以在与用户交互的情况下发生。例如,一个用户从命令行中敲了 Ctrl+C 来终止一个从命令行中启动的程序;Ctrl+C 将产生一个 SIGTERM 信号。SIGTERM 意即终止,它可以被阻塞或者被处理,而不像 SIGKILL 信号那样。一个进程也可以通过信号和另一个进程通信,这样使得信号也可以作为一种 IPC 机制。

考虑一下一个多进程应用,例如 Nginx 网络服务器是如何被另一个进程优雅地关闭的。kill 函数:

int kill(pid_t pid, int signum); /* declaration */

可以被一个进程用来终止另一个进程或者一组进程。假如 kill 函数的第一个参数是大于 0 的,那么这个参数将会被认为是目标进程的 pid(进程 ID),假如这个参数是 0,则这个参数将会被视作信号发送者所属的那组进程。

kill 的第二个参数要么是一个标准的信号数字(例如 SIGTERMSIGKILL),要么是 0 ,这将会对信号做一次询问,确认第一个参数中的 pid 是否是有效的。这样优雅地关闭一个多进程应用就可以通过向组成该应用的一组进程发送一个终止信号来完成,具体来说就是调用一个 kill 函数,使得这个调用的第二个参数是 SIGTERM 。(Nginx 主进程可以通过调用 kill 函数来终止其他工人进程,然后再停止自己。)就像许多库函数一样,kill 函数通过一个简单的可变语法拥有更多的能力和灵活性。

示例 3. 一个多进程系统的优雅停止

#include <stdio.h>
#include <signal.h>
#include <stdlib.h>
#include <unistd.h>
#include <sys/wait.h>

void graceful(int signum) {
  printf("\tChild confirming received signal: %i\n", signum);
  puts("\tChild about to terminate gracefully...");
  sleep(1);
  puts("\tChild terminating now...");
  _exit(0); /* fast-track notification of parent */
}

void set_handler() {
  struct sigaction current;
  sigemptyset(&current.sa_mask);         /* clear the signal set */
  current.sa_flags = 0;                  /* enables setting sa_handler, not sa_action */
  current.sa_handler = graceful;         /* specify a handler */
  sigaction(SIGTERM, &current, NULL);    /* register the handler */
}

void child_code() {
  set_handler();

  while (1) {   /` loop until interrupted `/
    sleep(1);
    puts("\tChild just woke up, but going back to sleep.");
  }
}

void parent_code(pid_t cpid) {
  puts("Parent sleeping for a time...");
  sleep(5);

  /* Try to terminate child. */
  if (-1 == kill(cpid, SIGTERM)) {
    perror("kill");
    exit(-1);
  }
  wait(NULL); /` wait for child to terminate `/
  puts("My child terminated, about to exit myself...");
}

int main() {
  pid_t pid = fork();
  if (pid < 0) {
    perror("fork");
    return -1; /* error */
  }
  if (0 == pid)
    child_code();
  else
    parent_code(pid);
  return 0;  /* normal */
}

上面的停止程序模拟了一个多进程系统的优雅退出,在这个例子中,这个系统由一个父进程和一个子进程组成。这次模拟的工作流程如下:

  • 父进程尝试去 fork 一个子进程。假如这个 fork 操作成功了,每个进程就执行它自己的代码:子进程就执行函数 child_code,而父进程就执行函数 parent_code
  • 子进程将会进入一个潜在的无限循环,在这个循环中子进程将睡眠一秒,然后打印一个信息,接着再次进入睡眠状态,以此循环往复。来自父进程的一个 SIGTERM 信号将引起子进程去执行一个信号处理回调函数 graceful。这样这个信号就使得子进程可以跳出循环,然后进行子进程和父进程之间的优雅终止。在终止之前,进程将打印一个信息。
  • fork 一个子进程后,父进程将睡眠 5 秒,使得子进程可以执行一会儿;当然在这个模拟中,子进程大多数时间都在睡眠。然后父进程调用 SIGTERM 作为第二个参数的 kill 函数,等待子进程的终止,然后自己再终止。

下面是一次运行的输出:

% ./shutdown
Parent sleeping for a time...
        Child just woke up, but going back to sleep.
        Child just woke up, but going back to sleep.
        Child just woke up, but going back to sleep.
        Child just woke up, but going back to sleep.
        Child confirming received signal: 15  ## SIGTERM is 15
        Child about to terminate gracefully...
        Child terminating now...
My child terminated, about to exit myself...

对于信号的处理,上面的示例使用了 sigaction 库函数(POSIX 推荐的用法)而不是传统的 signal 函数,signal 函数有移植性问题。下面是我们主要关心的代码片段:

  • 假如对 fork 的调用成功了,父进程将执行 parent_code 函数,而子进程将执行 child_code 函数。在给子进程发送信号之前,父进程将会等待 5 秒:
puts("Parent sleeping for a time...");
sleep(5);
if (-1 == kill(cpid, SIGTERM)) {
...sleepkillcpidSIGTERM...

假如 kill 调用成功了,父进程将在子进程终止时做等待,使得子进程不会变成一个僵尸进程。在等待完成后,父进程再退出。

  • child_code 函数首先调用 set_handler 然后进入它的可能永久睡眠的循环。下面是我们将要查看的 set_handler 函数:
void set_handler() {
  struct sigaction current;            /* current setup */
  sigemptyset(&current.sa_mask);       /* clear the signal set */
  current.sa_flags = 0;                /* for setting sa_handler, not sa_action */
  current.sa_handler = graceful;       /* specify a handler */
  sigaction(SIGTERM, &current, NULL);  /* register the handler */
}

上面代码的前三行在做相关的准备。第四个语句将为 graceful 设定为句柄,它将在调用 _exit 来停止之前打印一些信息。第 5 行和最后一行的语句将通过调用 sigaction 来向系统注册上面的句柄。sigaction 的第一个参数是 SIGTERM ,用作终止;第二个参数是当前的 sigaction 设定,而最后的参数(在这个例子中是 NULL )可被用来保存前面的 sigaction 设定,以备后面的可能使用。

使用信号来作为 IPC 的确是一个很轻量的方法,但确实值得尝试。通过信号来做 IPC 显然可以被归入 IPC 工具箱中。

这个系列的总结

在这个系列中,我们通过三篇有关 IPC 的文章,用示例代码介绍了如下机制:

  • 共享文件
  • 共享内存(通过信号量)
  • 管道(命名和无名)
  • 消息队列
  • 套接字
  • 信号

甚至在今天,在以线程为中心的语言,例如 Java、C# 和 Go 等变得越来越流行的情况下,IPC 仍然很受欢迎,因为相比于使用多线程,通过多进程来实现并发有着一个明显的优势:默认情况下,每个进程都有它自己的地址空间,除非使用了基于共享内存的 IPC 机制(为了达到安全的并发,竞争条件在多线程和多进程的时候必须被加上锁),在多进程中可以排除掉基于内存的竞争条件。对于任何一个写过即使是基本的通过共享变量来通信的多线程程序的人来说,他都会知道想要写一个清晰、高效、线程安全的代码是多么具有挑战性。使用单线程的多进程的确是很有吸引力的,这是一个切实可行的方式,使用它可以利用好今天多处理器的机器,而不需要面临基于内存的竞争条件的风险。

当然,没有一个简单的答案能够回答上述 IPC 机制中的哪一个更好。在编程中每一种 IPC 机制都会涉及到一个取舍问题:是追求简洁,还是追求功能强大。以信号来举例,它是一个相对简单的 IPC 机制,但并不支持多个进程之间的丰富对话。假如确实需要这样的对话,另外的选择可能会更合适一些。带有锁的共享文件则相对直接,但是当要处理大量共享的数据流时,共享文件并不能很高效地工作。管道,甚至是套接字,有着更复杂的 API,可能是更好的选择。让具体的问题去指导我们的选择吧。

尽管所有的示例代码(可以在我的网站上获取到)都是使用 C 写的,其他的编程语言也经常提供这些 IPC 机制的轻量包装。这些代码示例都足够短小简单,希望这样能够鼓励你去进行实验。


via: https://opensource.com/article/19/4/interprocess-communication-linux-networking

作者:Marty Kalin 选题:lujun9972 译者:FSSlc 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

学习在 Linux 中进程是如何与其他进程进行同步的。

本篇是 Linux 下进程间通信(IPC)系列的第二篇文章。第一篇文章 聚焦于通过共享文件和共享内存段这样的共享存储来进行 IPC。这篇文件的重点将转向管道,它是连接需要通信的进程之间的通道。管道拥有一个写端用于写入字节数据,还有一个读端用于按照先入先出的顺序读入这些字节数据。而这些字节数据可能代表任何东西:数字、员工记录、数字电影等等。

管道有两种类型,命名管道和无名管道,都可以交互式的在命令行或程序中使用它们;相关的例子在下面展示。这篇文章也将介绍内存队列,尽管它们有些过时了,但它们不应该受这样的待遇。

在本系列的第一篇文章中的示例代码承认了在 IPC 中可能受到竞争条件(不管是基于文件的还是基于内存的)的威胁。自然地我们也会考虑基于管道的 IPC 的安全并发问题,这个也将在本文中提及。针对管道和内存队列的例子将会使用 POSIX 推荐使用的 API,POSIX 的一个核心目标就是线程安全。

请查看一些 mq\_open 函数的 man 页,这个函数属于内存队列的 API。这个 man 页中有关 特性 的章节带有一个小表格:

接口特性
mq_open()线程安全MT-Safe

上面的 MT-Safe(MT 指的是 多线程 multi-threaded )意味着 mq_open 函数是线程安全的,进而暗示是进程安全的:一个进程的执行和它的一个线程执行的过程类似,假如竞争条件不会发生在处于相同进程的线程中,那么这样的条件也不会发生在处于不同进程的线程中。MT-Safe 特性保证了调用 mq_open 时不会出现竞争条件。一般来说,基于通道的 IPC 是并发安全的,尽管在下面例子中会出现一个有关警告的注意事项。

无名管道

首先让我们通过一个特意构造的命令行例子来展示无名管道是如何工作的。在所有的现代系统中,符号 | 在命令行中都代表一个无名管道。假设我们的命令行提示符为 %,接下来考虑下面的命令:

## 写入方在 | 左边,读取方在右边
% sleep 5 | echo "Hello, world!" 

sleepecho 程序以不同的进程执行,无名管道允许它们进行通信。但是上面的例子被特意设计为没有通信发生。问候语 “Hello, world!” 出现在屏幕中,然后过了 5 秒后,命令行返回,暗示 sleepecho 进程都已经结束了。这期间发生了什么呢?

在命令行中的竖线 | 的语法中,左边的进程(sleep)是写入方,右边的进程(echo)为读取方。默认情况下,读取方将会阻塞,直到从通道中能够读取到字节数据,而写入方在写完它的字节数据后,将发送 流已终止 end-of-stream 的标志。(即便写入方过早终止了,一个流已终止的标志还是会发给读取方。)无名管道将保持到写入方和读取方都停止的那个时刻。

在上面的例子中,sleep 进程并没有向通道写入任何的字节数据,但在 5 秒后就终止了,这时将向通道发送一个流已终止的标志。与此同时,echo 进程立即向标准输出(屏幕)写入问候语,因为这个进程并不从通道中读入任何字节,所以它并没有等待。一旦 sleepecho 进程都终止了,不会再用作通信的无名管道将会消失然后返回命令行提示符。

下面这个更加实用的示例将使用两个无名管道。我们假定文件 test.dat 的内容如下:

this
is
the
way
the
world
ends

下面的命令:

% cat test.dat | sort | uniq

会将 cat 连接 concatenate 的缩写)进程的输出通过管道传给 sort 进程以生成排序后的输出,然后将排序后的输出通过管道传给 uniq 进程以消除重复的记录(在本例中,会将两次出现的 “the” 缩减为一个):

ends
is
the
this
way
world

下面展示的情景展示的是一个带有两个进程的程序通过一个无名管道通信来进行通信。

示例 1. 两个进程通过一个无名管道来进行通信

#include <sys/wait.h> /* wait */
#include <stdio.h>
#include <stdlib.h>   /* exit functions */
#include <unistd.h>   /* read, write, pipe, _exit */
#include <string.h>

#define ReadEnd  0
#define WriteEnd 1

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1);    /** failure **/
}

int main() {
  int pipeFDs[2]; /* two file descriptors */
  char buf;       /* 1-byte buffer */
  const char* msg = "Nature's first green is gold\n"; /* bytes to write */

  if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");
  pid_t cpid = fork();                                /* fork a child process */
  if (cpid < 0) report_and_exit("fork");              /* check for failure */

  if (0 == cpid) {    /*** child ***/                 /* child process */
    close(pipeFDs[WriteEnd]);                         /* child reads, doesn't write */

    while (read(pipeFDs[ReadEnd], &buf, 1) > 0)       /* read until end of byte stream */
      write(STDOUT_FILENO, &buf, sizeof(buf));        /* echo to the standard output */

    close(pipeFDs[ReadEnd]);                          /* close the ReadEnd: all done */
    _exit(0);                                         /* exit and notify parent at once  */
  }
  else {              /*** parent ***/
    close(pipeFDs[ReadEnd]);                          /* parent writes, doesn't read */

    write(pipeFDs[WriteEnd], msg, strlen(msg));       /* write the bytes to the pipe */
    close(pipeFDs[WriteEnd]);                         /* done writing: generate eof */

    wait(NULL);                                       /* wait for child to exit */
    exit(0);                                          /* exit normally */
  }
  return 0;
}

上面名为 pipeUN 的程序使用系统函数 fork 来创建一个进程。尽管这个程序只有一个单一的源文件,在它正确执行的情况下将会发生多进程的情况。

下面的内容是对库函数 fork 如何工作的一个简要回顾:

  • fork 函数由进程调用,在失败时返回 -1 给父进程。在 pipeUN 这个例子中,相应的调用是:
pid_t cpid = fork(); /* called in parent */

函数调用后的返回值也被保存下来了。在这个例子中,保存在整数类型 pid_t 的变量 cpid 中。(每个进程有它自己的进程 ID,这是一个非负的整数,用来标记进程)。复刻一个新的进程可能会因为多种原因而失败,包括进程表满了的原因,这个结构由系统维持,以此来追踪进程状态。明确地说,僵尸进程假如没有被处理掉,将可能引起进程表被填满的错误。

  • 假如 fork 调用成功,则它将创建一个新的子进程,向父进程返回一个值,向子进程返回另外的一个值。在调用 fork 后父进程和子进程都将执行相同的代码。(子进程继承了到此为止父进程中声明的所有变量的拷贝),特别地,一次成功的 fork 调用将返回如下的东西:
+ 向子进程返回 `0`
+ 向父进程返回子进程的进程 ID
  • 在一次成功的 fork 调用后,一个 if/else 或等价的结构将会被用来隔离针对父进程和子进程的代码。在这个例子中,相应的声明为:
if (0 == cpid) { /*** child ***/
...
}
else { /*** parent ***/
...
} 

假如成功地复刻出了一个子进程,pipeUN 程序将像下面这样去执行。在一个整数的数列里:

int pipeFDs[2]; /* two file descriptors */

来保存两个文件描述符,一个用来向管道中写入,另一个从管道中写入。(数组元素 pipeFDs[0] 是读端的文件描述符,元素 pipeFDs[1] 是写端的文件描述符。)在调用 fork 之前,对系统 pipe 函数的成功调用,将立刻使得这个数组获得两个文件描述符:

if (pipe(pipeFDs) < 0) report_and_exit("pipeFD");

父进程和子进程现在都有了文件描述符的副本。但分离关注点模式意味着每个进程恰好只需要一个描述符。在这个例子中,父进程负责写入,而子进程负责读取,尽管这样的角色分配可以反过来。在 if 子句中的第一个语句将用于关闭管道的读端:

close(pipeFDs[WriteEnd]); /* called in child code */

在父进程中的 else 子句将会关闭管道的读端:

close(pipeFDs[ReadEnd]); /* called in parent code */

然后父进程将向无名管道中写入某些字节数据(ASCII 代码),子进程读取这些数据,然后向标准输出中回放它们。

在这个程序中还需要澄清的一点是在父进程代码中的 wait 函数。一旦被创建后,子进程很大程度上独立于它的父进程,正如简短的 pipeUN 程序所展示的那样。子进程可以执行任意的代码,而它们可能与父进程完全没有关系。但是,假如当子进程终止时,系统将会通过一个信号来通知父进程。

要是父进程在子进程之前终止又该如何呢?在这种情形下,除非采取了预防措施,子进程将会变成在进程表中的一个僵尸进程。预防措施有两大类型:第一种是让父进程去通知系统,告诉系统它对子进程的终止没有任何兴趣:

signal(SIGCHLD, SIG_IGN); /* in parent: ignore notification */

第二种方法是在子进程终止时,让父进程执行一个 wait。这样就确保了父进程可以独立于子进程而存在。在 pipeUN 程序中使用了第二种方法,其中父进程的代码使用的是下面的调用:

wait(NULL); /* called in parent */

这个对 wait 的调用意味着一直等待直到任意一个子进程的终止发生,因此在 pipeUN 程序中,只有一个子进程。(其中的 NULL 参数可以被替换为一个保存有子程序退出状态的整数变量的地址。)对于更细粒度的控制,还可以使用更灵活的 waitpid 函数,例如特别指定多个子进程中的某一个。

pipeUN 将会采取另一个预防措施。当父进程结束了等待,父进程将会调用常规的 exit 函数去退出。对应的,子进程将会调用 _exit 变种来退出,这类变种将快速跟踪终止相关的通知。在效果上,子进程会告诉系统立刻去通知父进程它的这个子进程已经终止了。

假如两个进程向相同的无名管道中写入内容,字节数据会交错吗?例如,假如进程 P1 向管道写入内容:

foo bar

同时进程 P2 并发地写入:

baz baz

到相同的管道,最后的结果似乎是管道中的内容将会是任意错乱的,例如像这样:

baz foo baz bar

只要没有写入超过 PIPE_BUF 字节,POSIX 标准就能确保写入不会交错。在 Linux 系统中, PIPE_BUF 的大小是 4096 字节。对于管道我更喜欢只有一个写入方和一个读取方,从而绕过这个问题。

命名管道

无名管道没有备份文件:系统将维持一个内存缓存来将字节数据从写方传给读方。一旦写方和读方终止,这个缓存将会被回收,进而无名管道消失。相反的,命名管道有备份文件和一个不同的 API。

下面让我们通过另一个命令行示例来了解命名管道的要点。下面是具体的步骤:

  • 开启两个终端。这两个终端的工作目录应该相同。
  • 在其中一个终端中,键入下面的两个命令(命令行提示符仍然是 %,我的注释以 ## 打头。):
% mkfifo tester ## 创建一个备份文件,名为 tester
% cat tester    ## 将管道的内容输出到 stdout 

在最开始,没有任何东西会出现在终端中,因为到现在为止没有在命名管道中写入任何东西。

  • 在第二个终端中输入下面的命令:
% cat > tester ## redirect keyboard input to the pipe
hello, world!  ## then hit Return key
bye, bye       ## ditto
<Control-C>    ## terminate session with a Control-C

无论在这个终端中输入什么,它都会在另一个终端中显示出来。一旦键入 Ctrl+C,就会回到正常的命令行提示符,因为管道已经被关闭了。

  • 通过移除实现命名管道的文件来进行清理:
% unlink tester

正如 mkfifo 程序的名字所暗示的那样,命名管道也被叫做 FIFO,因为第一个进入的字节,就会第一个出,其他的类似。有一个名为 mkfifo 的库函数,用它可以在程序中创建一个命名管道,它将在下一个示例中被用到,该示例由两个进程组成:一个向命名管道写入,而另一个从该管道读取。

示例 2. fifoWriter 程序

#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <time.h>
#include <stdlib.h>
#include <stdio.h>

#define MaxLoops         12000   /* outer loop */
#define ChunkSize           16   /* how many written at a time */
#define IntsPerChunk         4   /* four 4-byte ints per chunk */
#define MaxZs              250   /* max microseconds to sleep */

int main() {
  const char* pipeName = "./fifoChannel";
  mkfifo(pipeName, 0666);                      /* read/write for user/group/others */
  int fd = open(pipeName, O_CREAT | O_WRONLY); /* open as write-only */
  if (fd < 0) return -1;                       /* can't go on */

  int i;
  for (i = 0; i < MaxLoops; i++) {          /* write MaxWrites times */
    int j;
    for (j = 0; j < ChunkSize; j++) {       /* each time, write ChunkSize bytes */
      int k;
      int chunk[IntsPerChunk];
      for (k = 0; k < IntsPerChunk; k++)
        chunk[k] = rand();
      write(fd, chunk, sizeof(chunk));
    }
    usleep((rand() % MaxZs) + 1);           /* pause a bit for realism */
  }

  close(fd);           /* close pipe: generates an end-of-stream marker */
  unlink(pipeName);    /* unlink from the implementing file */
  printf("%i ints sent to the pipe.\n", MaxLoops * ChunkSize * IntsPerChunk);

  return 0;
}

上面的 fifoWriter 程序可以被总结为如下:

  • 首先程序创建了一个命名管道用来写入数据:
mkfifo(pipeName, 0666); /* read/write perms for user/group/others */
int fd = open(pipeName, O_CREAT | O_WRONLY);

其中的 pipeName 是备份文件的名字,传递给 mkfifo 作为它的第一个参数。接着命名管道通过我们熟悉的 open 函数调用被打开,而这个函数将会返回一个文件描述符。

  • 在实现层面上,fifoWriter 不会一次性将所有的数据都写入,而是写入一个块,然后休息随机数目的微秒时间,接着再循环往复。总的来说,有 768000 个 4 字节整数值被写入到命名管道中。
  • 在关闭命名管道后,fifoWriter 也将使用 unlink 取消对该文件的连接。
close(fd); /* close pipe: generates end-of-stream marker */
unlink(pipeName); /* unlink from the implementing file */

一旦连接到管道的每个进程都执行了 unlink 操作后,系统将回收这些备份文件。在这个例子中,只有两个这样的进程 fifoWriterfifoReader,它们都做了 unlink 操作。

这个两个程序应该在不同终端的相同工作目录中执行。但是 fifoWriter 应该在 fifoReader 之前被启动,因为需要 fifoWriter 去创建管道。然后 fifoReader 才能够获取到刚被创建的命名管道。

示例 3. fifoReader 程序

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <fcntl.h>
#include <unistd.h>

unsigned is_prime(unsigned n) { /* not pretty, but efficient */
  if (n <= 3) return n > 1;
  if (0 == (n % 2) || 0 == (n % 3)) return 0;

  unsigned i;
  for (i = 5; (i * i) <= n; i += 6)
    if (0 == (n % i) || 0 == (n % (i + 2))) return 0;

  return 1; /* found a prime! */
}

int main() {
  const char* file = "./fifoChannel";
  int fd = open(file, O_RDONLY);
  if (fd < 0) return -1; /* no point in continuing */
  unsigned count = 0, total = 0, primes_count = 0;

  while (1) {
    int next;
    int i;

    ssize_t count = read(fd, &next, sizeof(int));
    if (0 == count) break;                  /* end of stream */
    else if (count == sizeof(int)) {        /* read a 4-byte int value */
      total++;
      if (is_prime(next)) primes_count++;
    }
  }

  close(fd);       /* close pipe from read end */
  unlink(file);    /* unlink from the underlying file */
  printf("Received ints: %u, primes: %u\n", total, primes_count);

  return 0;
}

上面的 fifoReader 的内容可以总结为如下:

  • 因为 fifoWriter 已经创建了命名管道,所以 fifoReader 只需要利用标准的 open 调用来通过备份文件来获取到管道中的内容:
const char* file = "./fifoChannel";
int fd = open(file, O_RDONLY);

这个文件的是以只读打开的。

  • 然后这个程序进入一个潜在的无限循环,在每次循环时,尝试读取 4 字节的块。read 调用:
ssize_t count = read(fd, &next, sizeof(int));

返回 0 来暗示该流的结束。在这种情况下,fifoReader 跳出循环,关闭命名管道,并在终止前 unlink 备份文件。

  • 在读入 4 字节整数后,fifoReader 检查这个数是否为质数。这个操作代表了一个生产级别的读取器可能在接收到的字节数据上执行的逻辑操作。在示例运行中,在接收到的 768000 个整数中有 37682 个质数。

重复运行示例, fifoReader 将成功地读取 fifoWriter 写入的所有字节。这不是很让人惊讶的。这两个进程在相同的机器上执行,从而可以不用考虑网络相关的问题。命名管道是一个可信且高效的 IPC 机制,因而被广泛使用。

下面是这两个程序的输出,它们在不同的终端中启动,但处于相同的工作目录:

% ./fifoWriter
768000 ints sent to the pipe.
###
% ./fifoReader
Received ints: 768000, primes: 37682

消息队列

管道有着严格的先入先出行为:第一个被写入的字节将会第一个被读,第二个写入的字节将第二个被读,以此类推。消息队列可以做出相同的表现,但它又足够灵活,可以使得字节块可以不以先入先出的次序来接收。

正如它的名字所提示的那样,消息队列是一系列的消息,每个消息包含两部分:

  • 荷载,一个字节序列(在 C 中是 char)
  • 类型,以一个正整数值的形式给定,类型用来分类消息,为了更灵活的回收

看一下下面对一个消息队列的描述,每个消息由一个整数类型标记:

          +-+    +-+    +-+    +-+
sender--->|3|--->|2|--->|2|--->|1|--->receiver
          +-+    +-+    +-+    +-+

在上面展示的 4 个消息中,标记为 1 的是开头,即最接近接收端,然后另个标记为 2 的消息,最后接着一个标记为 3 的消息。假如按照严格的 FIFO 行为执行,消息将会以 1-2-2-3 这样的次序被接收。但是消息队列允许其他收取次序。例如,消息可以被接收方以 3-2-1-2 的次序接收。

mqueue 示例包含两个程序,sender 将向消息队列中写入数据,而 receiver 将从这个队列中读取数据。这两个程序都包含的头文件 queue.h 如下所示:

示例 4. 头文件 queue.h

#define ProjectId 123
#define PathName  "queue.h" /* any existing, accessible file would do */
#define MsgLen    4
#define MsgCount  6

typedef struct { 
  long type;                 /* must be of type long */ 
  char payload[MsgLen + 1];  /* bytes in the message */  
} queuedMessage;

上面的头文件定义了一个名为 queuedMessage 的结构类型,它带有 payload(字节数组)和 type(整数)这两个域。该文件也定义了一些符号常数(使用 #define 语句),前两个常数被用来生成一个 key,而这个 key 反过来被用来获取一个消息队列的 ID。ProjectId 可以是任何正整数值,而 PathName 必须是一个存在的、可访问的文件,在这个示例中,指的是文件 queue.h。在 senderreceiver 中,它们都有的设定语句为:

key_t key = ftok(PathName, ProjectId); /* generate key */
int qid = msgget(key, 0666 | IPC_CREAT); /* use key to get queue id */

ID qid 在效果上是消息队列文件描述符的对应物。

示例 5. sender 程序

#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include <string.h>
#include "queue.h"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1); /* EXIT_FAILURE */
}

int main() {
  key_t key = ftok(PathName, ProjectId);
  if (key < 0) report_and_exit("couldn't get key...");

  int qid = msgget(key, 0666 | IPC_CREAT);
  if (qid < 0) report_and_exit("couldn't get queue id...");

  char* payloads[] = {"msg1", "msg2", "msg3", "msg4", "msg5", "msg6"};
  int types[] = {1, 1, 2, 2, 3, 3}; /* each must be > 0 */
  int i;
  for (i = 0; i < MsgCount; i++) {
    /* build the message */
    queuedMessage msg;
    msg.type = types[i];
    strcpy(msg.payload, payloads[i]);

    /* send the message */
    msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT); /* don't block */
    printf("%s sent as type %i\n", msg.payload, (int) msg.type);
  }
  return 0;
}

上面的 sender 程序将发送出 6 个消息,每两个为一个类型:前两个是类型 1,接着的连个是类型 2,最后的两个为类型 3。发送的语句:

msgsnd(qid, &msg, sizeof(msg), IPC_NOWAIT);

被配置为非阻塞的(IPC_NOWAIT 标志),是因为这里的消息体量上都很小。唯一的危险在于一个完整的序列将可能导致发送失败,而这个例子不会。下面的 receiver 程序也将使用 IPC_NOWAIT 标志来接收消息。

示例 6. receiver 程序

#include <stdio.h>
#include <sys/ipc.h>
#include <sys/msg.h>
#include <stdlib.h>
#include "queue.h"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1); /* EXIT_FAILURE */
}

int main() {
  key_t key= ftok(PathName, ProjectId); /* key to identify the queue */
  if (key < 0) report_and_exit("key not gotten...");

  int qid = msgget(key, 0666 | IPC_CREAT); /* access if created already */
  if (qid < 0) report_and_exit("no access to queue...");

  int types[] = {3, 1, 2, 1, 3, 2}; /* different than in sender */
  int i;
  for (i = 0; i < MsgCount; i++) {
    queuedMessage msg; /* defined in queue.h */
    if (msgrcv(qid, &msg, sizeof(msg), types[i], MSG_NOERROR | IPC_NOWAIT) < 0)
      puts("msgrcv trouble...");
    printf("%s received as type %i\n", msg.payload, (int) msg.type);
  }

  /** remove the queue **/
  if (msgctl(qid, IPC_RMID, NULL) < 0)  /* NULL = 'no flags' */
    report_and_exit("trouble removing queue...");

  return 0;
}

这个 receiver 程序不会创建消息队列,尽管 API 尽管建议那样。在 receiver 中,对

int qid = msgget(key, 0666 | IPC_CREAT);

的调用可能因为带有 IPC_CREAT 标志而具有误导性,但是这个标志的真实意义是如果需要就创建,否则直接获取sender 程序调用 msgsnd 来发送消息,而 receiver 调用 msgrcv 来接收它们。在这个例子中,sender 以 1-1-2-2-3-3 的次序发送消息,但 receiver 接收它们的次序为 3-1-2-1-3-2,这显示消息队列没有被严格的 FIFO 行为所拘泥:

% ./sender
msg1 sent as type 1
msg2 sent as type 1
msg3 sent as type 2
msg4 sent as type 2
msg5 sent as type 3
msg6 sent as type 3

% ./receiver
msg5 received as type 3
msg1 received as type 1
msg3 received as type 2
msg2 received as type 1
msg6 received as type 3
msg4 received as type 2

上面的输出显示 senderreceiver 可以在同一个终端中启动。输出也显示消息队列是持久的,即便 sender 进程在完成创建队列、向队列写数据、然后退出的整个过程后,该队列仍然存在。只有在 receiver 进程显式地调用 msgctl 来移除该队列,这个队列才会消失:

if (msgctl(qid, IPC_RMID, NULL) < 0) /* remove queue */

总结

管道和消息队列的 API 在根本上来说都是单向的:一个进程写,然后另一个进程读。当然还存在双向命名管道的实现,但我认为这个 IPC 机制在它最为简单的时候反而是最佳的。正如前面提到的那样,消息队列已经不大受欢迎了,尽管没有找到什么特别好的原因来解释这个现象;而队列仍然是 IPC 工具箱中的一个工具。这个快速的 IPC 工具箱之旅将以第 3 部分(通过套接字和信号来示例 IPC)来终结。


via: https://opensource.com/article/19/4/interprocess-communication-linux-channels

作者:Marty Kalin 选题:lujun9972 译者:FSSlc 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

学习在 Linux 中进程是如何与其他进程进行同步的。

本篇是 Linux 下进程间通信(IPC)系列的第一篇文章。这个系列将使用 C 语言代码示例来阐明以下 IPC 机制:

  • 共享文件
  • 共享内存(使用信号量)
  • 管道(命名的或非命名的管道)
  • 消息队列
  • 套接字
  • 信号

在聚焦上面提到的共享文件和共享内存这两个机制之前,这篇文章将带你回顾一些核心的概念。

核心概念

进程是运行着的程序,每个进程都有着它自己的地址空间,这些空间由进程被允许访问的内存地址组成。进程有一个或多个执行线程,而线程是一系列执行指令的集合:单线程进程就只有一个线程,而多线程的进程则有多个线程。一个进程中的线程共享各种资源,特别是地址空间。另外,一个进程中的线程可以直接通过共享内存来进行通信,尽管某些现代语言(例如 Go)鼓励一种更有序的方式,例如使用线程安全的通道。当然对于不同的进程,默认情况下,它们能共享内存。

有多种方法启动之后要进行通信的进程,下面所举的例子中主要使用了下面的两种方法:

  • 一个终端被用来启动一个进程,另外一个不同的终端被用来启动另一个。
  • 在一个进程(父进程)中调用系统函数 fork,以此生发另一个进程(子进程)。

第一个例子采用了上面使用终端的方法。这些代码示例的 ZIP 压缩包可以从我的网站下载到。

共享文件

程序员对文件访问应该都已经很熟识了,包括许多坑(不存在的文件、文件权限损坏等等),这些问题困扰着程序对文件的使用。尽管如此,共享文件可能是最为基础的 IPC 机制了。考虑一下下面这样一个相对简单的例子,其中一个进程(生产者 producer)创建和写入一个文件,然后另一个进程(消费者 consumer)从这个相同的文件中进行读取:

          writes +-----------+ reads
producer-------->| disk file |<-------consumer
                 +-----------+

在使用这个 IPC 机制时最明显的挑战是竞争条件可能会发生:生产者和消费者可能恰好在同一时间访问该文件,从而使得输出结果不确定。为了避免竞争条件的发生,该文件在处于状态时必须以某种方式处于被锁状态,从而阻止在操作执行时和其他操作的冲突。在标准系统库中与锁相关的 API 可以被总结如下:

  • 生产者应该在写入文件时获得一个文件的排斥锁。一个排斥锁最多被一个进程所拥有。这样就可以排除掉竞争条件的发生,因为在锁被释放之前没有其他的进程可以访问这个文件。
  • 消费者应该在从文件中读取内容时得到至少一个共享锁。多个读取者可以同时保有一个共享锁,但是没有写入者可以获取到文件内容,甚至在当只有一个读取者保有一个共享锁时。

共享锁可以提升效率。假如一个进程只是读入一个文件的内容,而不去改变它的内容,就没有什么原因阻止其他进程来做同样的事。但如果需要写入内容,则很显然需要文件有排斥锁。

标准的 I/O 库中包含一个名为 fcntl 的实用函数,它可以被用来检查或者操作一个文件上的排斥锁和共享锁。该函数通过一个文件描述符(一个在进程中的非负整数值)来标记一个文件(在不同的进程中不同的文件描述符可能标记同一个物理文件)。对于文件的锁定, Linux 提供了名为 flock 的库函数,它是 fcntl 的一个精简包装。第一个例子中使用 fcntl 函数来暴露这些 API 细节。

示例 1. 生产者程序

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>
#include <string.h>

#define FileName "data.dat"
#define DataString "Now is the winter of our discontent\nMade glorious summer by this sun of York\n"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1); /* EXIT_FAILURE */
}

int main() {
  struct flock lock;
  lock.l_type = F_WRLCK;    /* read/write (exclusive versus shared) lock */
  lock.l_whence = SEEK_SET; /* base for seek offsets */
  lock.l_start = 0;         /* 1st byte in file */
  lock.l_len = 0;           /* 0 here means 'until EOF' */
  lock.l_pid = getpid();    /* process id */

  int fd; /* file descriptor to identify a file within a process */
  if ((fd = open(FileName, O_RDWR | O_CREAT, 0666)) < 0)  /* -1 signals an error */
    report_and_exit("open failed...");

  if (fcntl(fd, F_SETLK, &lock) < 0) /** F_SETLK doesn't block, F_SETLKW does **/
    report_and_exit("fcntl failed to get lock...");
  else {
    write(fd, DataString, strlen(DataString)); /* populate data file */
    fprintf(stderr, "Process %d has written to data file...\n", lock.l_pid);
  }

  /* Now release the lock explicitly. */
  lock.l_type = F_UNLCK;
  if (fcntl(fd, F_SETLK, &lock) < 0)
    report_and_exit("explicit unlocking failed...");

  close(fd); /* close the file: would unlock if needed */
  return 0;  /* terminating the process would unlock as well */
}

上面生产者程序的主要步骤可以总结如下:

  • 这个程序首先声明了一个类型为 struct flock 的变量,它代表一个锁,并对它的 5 个域做了初始化。第一个初始化
lock.l_type = F_WRLCK; /* exclusive lock */

使得这个锁为排斥锁(read-write)而不是一个共享锁(read-only)。假如生产者获得了这个锁,则其他的进程将不能够对文件做读或者写操作,直到生产者释放了这个锁,或者显式地调用 fcntl,又或者隐式地关闭这个文件。(当进程终止时,所有被它打开的文件都会被自动关闭,从而释放了锁)

  • 上面的程序接着初始化其他的域。主要的效果是整个文件都将被锁上。但是,有关锁的 API 允许特别指定的字节被上锁。例如,假如文件包含多个文本记录,则单个记录(或者甚至一个记录的一部分)可以被锁,而其余部分不被锁。
  • 第一次调用 fcntl
if (fcntl(fd, F_SETLK, &lock) < 0)

尝试排斥性地将文件锁住,并检查调用是否成功。一般来说, fcntl 函数返回 -1 (因此小于 0)意味着失败。第二个参数 F_SETLK 意味着 fcntl 的调用不是堵塞的;函数立即做返回,要么获得锁,要么显示失败了。假如替换地使用 F_SETLKW(末尾的 W 代指等待),那么对 fcntl 的调用将是阻塞的,直到有可能获得锁的时候。在调用 fcntl 函数时,它的第一个参数 fd 指的是文件描述符,第二个参数指定了将要采取的动作(在这个例子中,F_SETLK 指代设置锁),第三个参数为锁结构的地址(在本例中,指的是 &lock)。

  • 假如生产者获得了锁,这个程序将向文件写入两个文本记录。
  • 在向文件写入内容后,生产者改变锁结构中的 l_type 域为 unlock 值:
lock.l_type = F_UNLCK;

并调用 fcntl 来执行解锁操作。最后程序关闭了文件并退出。

示例 2. 消费者程序

#include <stdio.h>
#include <stdlib.h>
#include <fcntl.h>
#include <unistd.h>

#define FileName "data.dat"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1); /* EXIT_FAILURE */
}

int main() {
  struct flock lock;
  lock.l_type = F_WRLCK;    /* read/write (exclusive) lock */
  lock.l_whence = SEEK_SET; /* base for seek offsets */
  lock.l_start = 0;         /* 1st byte in file */
  lock.l_len = 0;           /* 0 here means 'until EOF' */
  lock.l_pid = getpid();    /* process id */

  int fd; /* file descriptor to identify a file within a process */
  if ((fd = open(FileName, O_RDONLY)) < 0)  /* -1 signals an error */
    report_and_exit("open to read failed...");

  /* If the file is write-locked, we can't continue. */
  fcntl(fd, F_GETLK, &lock); /* sets lock.l_type to F_UNLCK if no write lock */
  if (lock.l_type != F_UNLCK)
    report_and_exit("file is still write locked...");

  lock.l_type = F_RDLCK; /* prevents any writing during the reading */
  if (fcntl(fd, F_SETLK, &lock) < 0)
    report_and_exit("can't get a read-only lock...");

  /* Read the bytes (they happen to be ASCII codes) one at a time. */
  int c; /* buffer for read bytes */
  while (read(fd, &c, 1) > 0)    /* 0 signals EOF */
    write(STDOUT_FILENO, &c, 1); /* write one byte to the standard output */

  /* Release the lock explicitly. */
  lock.l_type = F_UNLCK;
  if (fcntl(fd, F_SETLK, &lock) < 0)
    report_and_exit("explicit unlocking failed...");

  close(fd);
  return 0;
}

相比于锁的 API,消费者程序会相对复杂一点儿。特别的,消费者程序首先检查文件是否被排斥性的被锁,然后才尝试去获得一个共享锁。相关的代码为:

lock.l_type = F_WRLCK;
...
fcntl(fd, F_GETLK, &lock); /* sets lock.l_type to F_UNLCK if no write lock */
if (lock.l_type != F_UNLCK)
  report_and_exit("file is still write locked...");

fcntl 调用中的 F_GETLK 操作指定检查一个锁,在本例中,上面代码的声明中给了一个 F_WRLCK 的排斥锁。假如特指的锁不存在,那么 fcntl 调用将会自动地改变锁类型域为 F_UNLCK 以此来显示当前的状态。假如文件是排斥性地被锁,那么消费者将会终止。(一个更健壮的程序版本或许应该让消费者会儿,然后再尝试几次。)

假如当前文件没有被锁,那么消费者将尝试获取一个共享(read-only)锁(F_RDLCK)。为了缩短程序,fcntl 中的 F_GETLK 调用可以丢弃,因为假如其他进程已经保有一个读写锁,F_RDLCK 的调用就可能会失败。重新调用一个只读锁能够阻止其他进程向文件进行写的操作,但可以允许其他进程对文件进行读取。简而言之,共享锁可以被多个进程所保有。在获取了一个共享锁后,消费者程序将立即从文件中读取字节数据,然后在标准输出中打印这些字节的内容,接着释放锁,关闭文件并终止。

下面的 % 为命令行提示符,下面展示的是从相同终端开启这两个程序的输出:

% ./producer
Process 29255 has written to data file...

% ./consumer
Now is the winter of our discontent
Made glorious summer by this sun of York

在本次的代码示例中,通过 IPC 传输的数据是文本:它们来自莎士比亚的戏剧《理查三世》中的两行台词。然而,共享文件的内容还可以是纷繁复杂的,任意的字节数据(例如一个电影)都可以,这使得文件共享变成了一个非常灵活的 IPC 机制。但它的缺点是文件获取速度较慢,因为文件的获取涉及到读或者写。同往常一样,编程总是伴随着折中。下面的例子将通过共享内存来做 IPC,而不是通过共享文件,在性能上相应的有极大的提升。

共享内存

对于共享内存,Linux 系统提供了两类不同的 API:传统的 System V API 和更新一点的 POSIX API。在单个应用中,这些 API 不能混用。但是,POSIX 方式的一个坏处是它的特性仍在发展中,并且依赖于安装的内核版本,这非常影响代码的可移植性。例如,默认情况下,POSIX API 用内存映射文件来实现共享内存:对于一个共享的内存段,系统为相应的内容维护一个备份文件。在 POSIX 规范下共享内存可以被配置为不需要备份文件,但这可能会影响可移植性。我的例子中使用的是带有备份文件的 POSIX API,这既结合了内存获取的速度优势,又获得了文件存储的持久性。

下面的共享内存例子中包含两个程序,分别名为 memwritermemreader,并使用信号量来调整它们对共享内存的获取。在任何时候当共享内存进入一个写入者场景时,无论是多进程还是多线程,都有遇到基于内存的竞争条件的风险,所以,需要引入信号量来协调(同步)对共享内存的获取。

memwriter 程序应当在它自己所处的终端首先启动,然后 memreader 程序才可以在它自己所处的终端启动(在接着的十几秒内)。memreader 的输出如下:

This is the way the world ends...

在每个源程序的最上方注释部分都解释了在编译它们时需要添加的链接参数。

首先让我们复习一下信号量是如何作为一个同步机制工作的。一般的信号量也被叫做一个计数信号量,因为带有一个可以增加的值(通常初始化为 0)。考虑一家租用自行车的商店,在它的库存中有 100 辆自行车,还有一个供职员用于租赁的程序。每当一辆自行车被租出去,信号量就增加 1;当一辆自行车被还回来,信号量就减 1。在信号量的值为 100 之前都还可以进行租赁业务,但如果等于 100 时,就必须停止业务,直到至少有一辆自行车被还回来,从而信号量减为 99。

二元信号量是一个特例,它只有两个值:0 和 1。在这种情况下,信号量的表现为互斥量(一个互斥的构造)。下面的共享内存示例将把信号量用作互斥量。当信号量的值为 0 时,只有 memwriter 可以获取共享内存,在写操作完成后,这个进程将增加信号量的值,从而允许 memreader 来读取共享内存。

示例 3. memwriter 进程的源程序

/** Compilation: gcc -o memwriter memwriter.c -lrt -lpthread **/
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <semaphore.h>
#include <string.h>
#include "shmem.h"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1);
}

int main() {
  int fd = shm_open(BackingFile,      /* name from smem.h */
                    O_RDWR | O_CREAT, /* read/write, create if needed */
                    AccessPerms);     /* access permissions (0644) */
  if (fd < 0) report_and_exit("Can't open shared mem segment...");

  ftruncate(fd, ByteSize); /* get the bytes */

  caddr_t memptr = mmap(NULL,       /* let system pick where to put segment */
                        ByteSize,   /* how many bytes */
                        PROT_READ | PROT_WRITE, /* access protections */
                        MAP_SHARED, /* mapping visible to other processes */
                        fd,         /* file descriptor */
                        0);         /* offset: start at 1st byte */
  if ((caddr_t) -1  == memptr) report_and_exit("Can't get segment...");

  fprintf(stderr, "shared mem address: %p [0..%d]\n", memptr, ByteSize - 1);
  fprintf(stderr, "backing file:       /dev/shm%s\n", BackingFile );

  /* semaphore code to lock the shared mem */
  sem_t* semptr = sem_open(SemaphoreName, /* name */
                           O_CREAT,       /* create the semaphore */
                           AccessPerms,   /* protection perms */
                           0);            /* initial value */
  if (semptr == (void*) -1) report_and_exit("sem_open");

  strcpy(memptr, MemContents); /* copy some ASCII bytes to the segment */

  /* increment the semaphore so that memreader can read */
  if (sem_post(semptr) < 0) report_and_exit("sem_post");

  sleep(12); /* give reader a chance */

  /* clean up */
  munmap(memptr, ByteSize); /* unmap the storage */
  close(fd);
  sem_close(semptr);
  shm_unlink(BackingFile); /* unlink from the backing file */
  return 0;
}

下面是 memwritermemreader 程序如何通过共享内存来通信的一个总结:

  • 上面展示的 memwriter 程序调用 shm_open 函数来得到作为系统协调共享内存的备份文件的文件描述符。此时,并没有内存被分配。接下来调用的是令人误解的名为 ftruncate 的函数
ftruncate(fd, ByteSize); /* get the bytes */

它将分配 ByteSize 字节的内存,在该情况下,一般为大小适中的 512 字节。memwritermemreader 程序都只从共享内存中获取数据,而不是从备份文件。系统将负责共享内存和备份文件之间数据的同步。

  • 接着 memwriter 调用 mmap 函数:
caddr_t memptr = mmap(NULL, /* let system pick where to put segment */
                  ByteSize, /* how many bytes */
                  PROT_READ | PROT_WRITE, /* access protections */
                  MAP_SHARED, /* mapping visible to other processes */
                  fd, /* file descriptor */
                  0); /* offset: start at 1st byte */

来获得共享内存的指针。(memreader 也做一次类似的调用。) 指针类型 caddr_tc 开头,它代表 calloc,而这是动态初始化分配的内存为 0 的一个系统函数。memwriter 通过库函数 strcpy(字符串复制)来获取后续操作的 memptr

  • 到现在为止,memwriter 已经准备好进行写操作了,但首先它要创建一个信号量来确保共享内存的排斥性。假如 memwriter 正在执行写操作而同时 memreader 在执行读操作,则有可能出现竞争条件。假如调用 sem_open 成功了:
sem_t* semptr = sem_open(SemaphoreName, /* name */
                     O_CREAT, /* create the semaphore */
                     AccessPerms, /* protection perms */
                     0); /* initial value */

那么,接着写操作便可以执行。上面的 SemaphoreName(任意一个唯一的非空名称)用来在 memwritermemreader 识别信号量。初始值 0 将会传递给信号量的创建者,在这个例子中指的是 memwriter 赋予它执行操作的权利。

  • 在写操作完成后,memwriter* 通过调用sem\_post` 函数将信号量的值增加到 1:
if (sem_post(semptr) < 0) ..

增加信号了将释放互斥锁,使得 memreader 可以执行它的操作。为了更好地测量,memwriter 也将从它自己的地址空间中取消映射,

munmap(memptr, ByteSize); /* unmap the storage *

这将使得 memwriter 不能进一步地访问共享内存。

示例 4. memreader 进程的源代码

/** Compilation: gcc -o memreader memreader.c -lrt -lpthread **/
#include <stdio.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include <semaphore.h>
#include <string.h>
#include "shmem.h"

void report_and_exit(const char* msg) {
  perror(msg);
  exit(-1);
}

int main() {
  int fd = shm_open(BackingFile, O_RDWR, AccessPerms);  /* empty to begin */
  if (fd < 0) report_and_exit("Can't get file descriptor...");

  /* get a pointer to memory */
  caddr_t memptr = mmap(NULL,       /* let system pick where to put segment */
                        ByteSize,   /* how many bytes */
                        PROT_READ | PROT_WRITE, /* access protections */
                        MAP_SHARED, /* mapping visible to other processes */
                        fd,         /* file descriptor */
                        0);         /* offset: start at 1st byte */
  if ((caddr_t) -1 == memptr) report_and_exit("Can't access segment...");

  /* create a semaphore for mutual exclusion */
  sem_t* semptr = sem_open(SemaphoreName, /* name */
                           O_CREAT,       /* create the semaphore */
                           AccessPerms,   /* protection perms */
                           0);            /* initial value */
  if (semptr == (void*) -1) report_and_exit("sem_open");

  /* use semaphore as a mutex (lock) by waiting for writer to increment it */
  if (!sem_wait(semptr)) { /* wait until semaphore != 0 */
    int i;
    for (i = 0; i < strlen(MemContents); i++)
      write(STDOUT_FILENO, memptr + i, 1); /* one byte at a time */
    sem_post(semptr);
  }

  /* cleanup */
  munmap(memptr, ByteSize);
  close(fd);
  sem_close(semptr);
  unlink(BackingFile);
  return 0;
}

memwritermemreader 程序中,共享内存的主要着重点都在 shm_openmmap 函数上:在成功时,第一个调用返回一个备份文件的文件描述符,而第二个调用则使用这个文件描述符从共享内存段中获取一个指针。它们对 shm_open 的调用都很相似,除了 memwriter 程序创建共享内存,而 `memreader 只获取这个已经创建的内存:

int fd = shm_open(BackingFile, O_RDWR | O_CREAT, AccessPerms); /* memwriter */
int fd = shm_open(BackingFile, O_RDWR, AccessPerms); /* memreader */

有了文件描述符,接着对 mmap 的调用就是类似的了:

caddr_t memptr = mmap(NULL, size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);

mmap 的第一个参数为 NULL,这意味着让系统自己决定在虚拟内存地址的哪个地方分配内存,当然也可以指定一个地址(但很有技巧性)。MAP_SHARED 标志着被分配的内存在进程中是共享的,最后一个参数(在这个例子中为 0 ) 意味着共享内存的偏移量应该为第一个字节。size 参数特别指定了将要分配的字节数目(在这个例子中是 512);另外的保护参数(AccessPerms)暗示着共享内存是可读可写的。

memwriter 程序执行成功后,系统将创建并维护备份文件,在我的系统中,该文件为 /dev/shm/shMemEx,其中的 shMemEx 是我为共享存储命名的(在头文件 shmem.h 中给定)。在当前版本的 memwritermemreader 程序中,下面的语句

shm_unlink(BackingFile); /* removes backing file */

将会移除备份文件。假如没有 unlink 这个语句,则备份文件在程序终止后仍然持久地保存着。

memreadermemwriter 一样,在调用 sem_open 函数时,通过信号量的名字来获取信号量。但 memreader 随后将进入等待状态,直到 memwriter 将初始值为 0 的信号量的值增加。

if (!sem_wait(semptr)) { /* wait until semaphore != 0 */

一旦等待结束,memreader 将从共享内存中读取 ASCII 数据,然后做些清理工作并终止。

共享内存 API 包括显式地同步共享内存段和备份文件。在这次的示例中,这些操作都被省略了,以免文章显得杂乱,好让我们专注于内存共享和信号量的代码。

即便在信号量代码被移除的情况下,memwritermemreader 程序很大几率也能够正常执行而不会引入竞争条件:memwriter 创建了共享内存段,然后立即向它写入;memreader 不能访问共享内存,直到共享内存段被创建好。然而,当一个写操作处于混合状态时,最佳实践需要共享内存被同步。信号量 API 足够重要,值得在代码示例中着重强调。

总结

上面共享文件和共享内存的例子展示了进程是怎样通过共享存储来进行通信的,前者通过文件而后者通过内存块。这两种方法的 API 相对来说都很直接。这两种方法有什么共同的缺点吗?现代的应用经常需要处理流数据,而且是非常大规模的数据流。共享文件或者共享内存的方法都不能很好地处理大规模的流数据。按照类型使用管道会更加合适一些。所以这个系列的第二部分将会介绍管道和消息队列,同样的,我们将使用 C 语言写的代码示例来辅助讲解。


via: https://opensource.com/article/19/4/interprocess-communication-linux-storage

作者:Marty Kalin 选题:lujun9972 译者:FSSlc 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

学习一门新的编程语言是在你的职业生涯中继续前进的好方法,但是应该学习哪一门呢?

如果你想要开始你的编程生涯或继续前进,那么学习一门新语言是一个聪明的主意。但是,大量活跃使用的语言引发了一个问题:哪种编程语言是最好的?要回答这个问题,让我们从一个简单的问题开始:你想做什么样的程序?

如果你想在客户端进行网络编程,那么特定语言 HTML、CSS 和 JavaScript(看似无穷无尽的方言之一)是必须要学习的。

如果你想在服务器端进行 Web 编程,那么选择包括常见的通用语言:C++、Golang、Java、C#、 Node.js、Perl、Python、Ruby 等等。当然,服务器程序与数据存储(例如关系数据库和其他数据库)打交道,这意味着 SQL 等查询语言可能会发挥作用。

如果你正在为移动设备编写原生应用程序,那么了解目标平台非常重要。对于 Apple 设备,Swift 已经取代 Objective C 成为首选语言。对于 Android 设备,Java(带有专用库和工具集)仍然是主要语言。有一些特殊语言,如与 C# 一起使用的 Xamarin,可以为 Apple、Android 和 Windows 设备生成特定于平台的代码。

那么通用语言呢?通常有各种各样的选择。在动态脚本语言(如 Perl、Python 和 Ruby)中,有一些新东西,如 Node.js。而 Java 和 C# 的相似之处比它们的粉丝愿意承认的还要多,仍然是针对虚拟机(分别是 JVM 和 CLR)的主要静态编译语言。在可以编译为原生可执行文件的语言中,C++ 仍在使用,还有后来出现的 Golang 和 Rust 等。通用的函数式语言比比皆是(如 Clojure、Haskell、Erlang、F#、Lisp 和 Scala),它们通常都有热情投入的社区。值得注意的是,面向对象语言(如 Java 和 C#)已经添加了函数式构造(特别是 lambdas),而动态语言从一开始就有函数式构造。

让我以 C 语言结尾,它是一种小巧、优雅、可扩展的语言,不要与 C++ 混淆。现代操作系统主要用 C 语言编写,其余部分用汇编语言编写。任何平台上的标准库大多数都是用 C 语言编写的。例如,任何打印 Hello, world! 这种问候都是通过调用名为 write 的 C 库函数来实现的。

C 作为一种可移植的汇编语言,公开了其他高级语言有意隐藏的底层系统的详细信息。因此,理解 C 可以更好地掌握程序如何竞争执行所需的共享系统资源(如处理器、内存和 I/O 设备)。C 语言既高级又接近硬件,因此在性能方面无与伦比,当然,汇编语言除外。最后,C 是编程语言中的通用语言,几乎所有通用语言都支持某种形式的 C 调用。

有关现代 C 语言的介绍,参考我的书籍《C 语言编程:可移植的汇编器介绍》。无论你怎么做,学习 C 语言,你会学到比另一种编程语言多得多的东西。

你认为学习哪些编程语言很重要?你是否同意这些建议?在评论告知我们!


via: https://opensource.com/article/19/2/which-programming-languages-should-you-learn

作者:Marty Kalin 选题:lujun9972 译者:MjSeven 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出