标签 并发 下的文章

这是并发服务器系列的第三节。第一节 介绍了阻塞式编程,第二节:线程 探讨了多线程,将其作为一种可行的方法来实现服务器并发编程。

另一种常见的实现并发的方法叫做 事件驱动编程,也可以叫做 异步 编程 注1 。这种方法变化万千,因此我们会从最基本的开始,使用一些基本的 API 而非从封装好的高级方法开始。本系列以后的文章会讲高层次抽象,还有各种混合的方法。

本系列的所有文章:

阻塞式 vs. 非阻塞式 I/O

作为本篇的介绍,我们先讲讲阻塞和非阻塞 I/O 的区别。阻塞式 I/O 更好理解,因为这是我们使用 I/O 相关 API 时的“标准”方式。从套接字接收数据的时候,调用 recv 函数会发生 阻塞,直到它从端口上接收到了来自另一端套接字的数据。这恰恰是第一部分讲到的顺序服务器的问题。

因此阻塞式 I/O 存在着固有的性能问题。第二节里我们讲过一种解决方法,就是用多线程。哪怕一个线程的 I/O 阻塞了,别的线程仍然可以使用 CPU 资源。实际上,阻塞 I/O 通常在利用资源方面非常高效,因为线程就等待着 —— 操作系统将线程变成休眠状态,只有满足了线程需要的条件才会被唤醒。

非阻塞式 I/O 是另一种思路。把套接字设成非阻塞模式时,调用 recv 时(还有 send,但是我们现在只考虑接收),函数返回的会很快,哪怕没有接收到数据。这时,就会返回一个特殊的错误状态 注2 来通知调用者,此时没有数据传进来。调用者可以去做其他的事情,或者尝试再次调用 recv 函数。

示范阻塞式和非阻塞式的 recv 区别的最好方式就是贴一段示例代码。这里有个监听套接字的小程序,一直在 recv 这里阻塞着;当 recv 返回了数据,程序就报告接收到了多少个字节 注3

int main(int argc, const char** argv) {
  setvbuf(stdout, NULL, _IONBF, 0);

  int portnum = 9988;
  if (argc >= 2) {
    portnum = atoi(argv[1]);
  }
  printf("Listening on port %d\n", portnum);

  int sockfd = listen_inet_socket(portnum);
  struct sockaddr_in peer_addr;
  socklen_t peer_addr_len = sizeof(peer_addr);

  int newsockfd = accept(sockfd, (struct sockaddr*)&peer_addr, &peer_addr_len);
  if (newsockfd < 0) {
    perror_die("ERROR on accept");
  }
  report_peer_connected(&peer_addr, peer_addr_len);

  while (1) {
    uint8_t buf[1024];
    printf("Calling recv...\n");
    int len = recv(newsockfd, buf, sizeof buf, 0);
    if (len < 0) {
      perror_die("recv");
    } else if (len == 0) {
      printf("Peer disconnected; I'm done.\n");
      break;
    }
    printf("recv returned %d bytes\n", len);
  }

  close(newsockfd);
  close(sockfd);

  return 0;
}

主循环重复调用 recv 并且报告它返回的字节数(记住 recv 返回 0 时,就是客户端断开连接了)。试着运行它,我们会在一个终端里运行这个程序,然后在另一个终端里用 nc 进行连接,发送一些字符,每次发送之间间隔几秒钟:

$ nc localhost 9988
hello                                   # wait for 2 seconds after typing this
socket world
^D                                      # to end the connection>

监听程序会输出以下内容:

$ ./blocking-listener 9988
Listening on port 9988
peer (localhost, 37284) connected
Calling recv...
recv returned 6 bytes
Calling recv...
recv returned 13 bytes
Calling recv...
Peer disconnected; I'm done.

现在试试非阻塞的监听程序的版本。这是代码:

int main(int argc, const char** argv) {
  setvbuf(stdout, NULL, _IONBF, 0);

  int portnum = 9988;
  if (argc >= 2) {
    portnum = atoi(argv[1]);
  }
  printf("Listening on port %d\n", portnum);

  int sockfd = listen_inet_socket(portnum);
  struct sockaddr_in peer_addr;
  socklen_t peer_addr_len = sizeof(peer_addr);

  int newsockfd = accept(sockfd, (struct sockaddr*)&peer_addr, &peer_addr_len);
  if (newsockfd < 0) {
    perror_die("ERROR on accept");
  }
  report_peer_connected(&peer_addr, peer_addr_len);

  // 把套接字设成非阻塞模式
  int flags = fcntl(newsockfd, F_GETFL, 0);
  if (flags == -1) {
    perror_die("fcntl F_GETFL");
  }

  if (fcntl(newsockfd, F_SETFL, flags | O_NONBLOCK) == -1) {
    perror_die("fcntl F_SETFL O_NONBLOCK");
  }

  while (1) {
    uint8_t buf[1024];
    printf("Calling recv...\n");
    int len = recv(newsockfd, buf, sizeof buf, 0);
    if (len < 0) {
      if (errno == EAGAIN || errno == EWOULDBLOCK) {
        usleep(200 * 1000);
        continue;
      }
      perror_die("recv");
    } else if (len == 0) {
      printf("Peer disconnected; I'm done.\n");
      break;
    }
    printf("recv returned %d bytes\n", len);
  }

  close(newsockfd);
  close(sockfd);

  return 0;
}

这里与阻塞版本有些差异,值得注意:

  1. accept 函数返回的 newsocktfd 套接字因调用了 fcntl, 被设置成非阻塞的模式。
  2. 检查 recv 的返回状态时,我们对 errno 进行了检查,判断它是否被设置成表示没有可供接收的数据的状态。这时,我们仅仅是休眠了 200 毫秒然后进入到下一轮循环。

同样用 nc 进行测试,以下是非阻塞监听器的输出:

$ ./nonblocking-listener 9988
Listening on port 9988
peer (localhost, 37288) connected
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
recv returned 6 bytes
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
Calling recv...
recv returned 13 bytes
Calling recv...
Calling recv...
Calling recv...
Peer disconnected; I'm done.

作为练习,给输出添加一个时间戳,确认调用 recv 得到结果之间花费的时间是比输入到 nc 中所用的多还是少(每一轮是 200 ms)。

这里就实现了使用非阻塞的 recv 让监听者检查套接字变为可能,并且在没有数据的时候重新获得控制权。换句话说,用编程的语言说这就是 轮询 polling —— 主程序周期性的查询套接字以便读取数据。

对于顺序响应的问题,这似乎是个可行的方法。非阻塞的 recv 让同时与多个套接字通信变成可能,轮询这些套接字,仅当有新数据到来时才处理。就是这样,这种方式 可以 用来写并发服务器;但实际上一般不这么做,因为轮询的方式很难扩展。

首先,我在代码中引入的 200ms 延迟对于演示非常好(监听器在我输入 nc 之间只打印几行 “Calling recv...”,但实际上应该有上千行)。但它也增加了多达 200ms 的服务器响应时间,这无意是不必要的。实际的程序中,延迟会低得多,休眠时间越短,进程占用的 CPU 资源就越多。有些时钟周期只是浪费在等待,这并不好,尤其是在移动设备上,这些设备的电量往往有限。

但是当我们实际这样来使用多个套接字的时候,更严重的问题出现了。想像下监听器正在同时处理 1000 个客户端。这意味着每一个循环迭代里面,它都得为 这 1000 个套接字中的每一个 执行一遍非阻塞的 recv,找到其中准备好了数据的那一个。这非常低效,并且极大的限制了服务器能够并发处理的客户端数。这里有个准则:每次轮询之间等待的间隔越久,服务器响应性越差;而等待的时间越少,CPU 在无用的轮询上耗费的资源越多。

讲真,所有的轮询都像是无用功。当然操作系统应该是知道哪个套接字是准备好了数据的,因此没必要逐个扫描。事实上,就是这样,接下来就会讲一些 API,让我们可以更优雅地处理多个客户端。

select

select 的系统调用是可移植的(POSIX),是标准 Unix API 中常有的部分。它是为上一节最后一部分描述的问题而设计的 —— 允许一个线程可以监视许多文件描述符 注4 的变化,而不用在轮询中执行不必要的代码。我并不打算在这里引入一个关于 select 的全面教程,有很多网站和书籍讲这个,但是在涉及到问题的相关内容时,我会介绍一下它的 API,然后再展示一个非常复杂的例子。

select 允许 多路 I/O,监视多个文件描述符,查看其中任何一个的 I/O 是否可用。

int select(int nfds, fd_set *readfds, fd_set *writefds,
           fd_set *exceptfds, struct timeval *timeout);

readfds 指向文件描述符的缓冲区,这个缓冲区被监视是否有读取事件;fd_set 是一个特殊的数据结构,用户使用 FD_* 宏进行操作。writefds 是针对写事件的。nfds 是监视的缓冲中最大的文件描述符数字(文件描述符就是整数)。timeout 可以让用户指定 select 应该阻塞多久,直到某个文件描述符准备好了(timeout == NULL 就是说一直阻塞着)。现在先跳过 exceptfds

select 的调用过程如下:

  1. 在调用之前,用户先要为所有不同种类的要监视的文件描述符创建 fd_set 实例。如果想要同时监视读取和写入事件,readfdswritefds 都要被创建并且引用。
  2. 用户可以使用 FD_SET 来设置集合中想要监视的特殊描述符。例如,如果想要监视描述符 2、7 和 10 的读取事件,在 readfds 这里调用三次 FD_SET,分别设置 2、7 和 10。
  3. select 被调用。
  4. select 返回时(现在先不管超时),就是说集合中有多少个文件描述符已经就绪了。它也修改 readfdswritefds 集合,来标记这些准备好的描述符。其它所有的描述符都会被清空。
  5. 这时用户需要遍历 readfdswritefds,找到哪个描述符就绪了(使用 FD_ISSET)。

作为完整的例子,我在并发的服务器程序上使用 select,重新实现了我们之前的协议。完整的代码在这里;接下来的是代码中的重点部分及注释。警告:示例代码非常复杂,因此第一次看的时候,如果没有足够的时间,快速浏览也没有关系。

使用 select 的并发服务器

使用 I/O 的多发 API 诸如 select 会给我们服务器的设计带来一些限制;这不会马上显现出来,但这值得探讨,因为它们是理解事件驱动编程到底是什么的关键。

最重要的是,要记住这种方法本质上是单线程的 注5 。服务器实际上在 同一时刻只能做一件事。因为我们想要同时处理多个客户端请求,我们需要换一种方式重构代码。

首先,让我们谈谈主循环。它看起来是什么样的呢?先让我们想象一下服务器有一堆任务,它应该监视哪些东西呢?两种类型的套接字活动:

  1. 新客户端尝试连接。这些客户端应该被 accept
  2. 已连接的客户端发送数据。这个数据要用 第一节 中所讲到的协议进行传输,有可能会有一些数据要被回送给客户端。

尽管这两种活动在本质上有所区别,我们还是要把它们放在一个循环里,因为只能有一个主循环。循环会包含 select 的调用。这个 select 的调用会监视上述的两种活动。

这里是部分代码,设置了文件描述符集合,并在主循环里转到被调用的 select 部分。

// “master” 集合存活在该循环中,跟踪我们想要监视的读取事件或写入事件的文件描述符(FD)。
fd_set readfds_master;
FD_ZERO(&readfds_master);
fd_set writefds_master;
FD_ZERO(&writefds_master);

// 监听的套接字一直被监视,用于读取数据,并监测到来的新的端点连接。
FD_SET(listener_sockfd, &readfds_master);

// 要想更加高效,fdset_max 追踪当前已知最大的 FD;这使得每次调用时对 FD_SETSIZE 的迭代选择不是那么重要了。
int fdset_max = listener_sockfd;

while (1) {
  // select() 会修改传递给它的 fd_sets,因此进行拷贝一下再传值。
  fd_set readfds = readfds_master;
  fd_set writefds = writefds_master;

  int nready = select(fdset_max + 1, &readfds, &writefds, NULL, NULL);
  if (nready < 0) {
    perror_die("select");
  }
  ...

这里的一些要点:

  1. 由于每次调用 select 都会重写传递给函数的集合,调用器就得维护一个 “master” 集合,在循环迭代中,保持对所监视的所有活跃的套接字的追踪。
  2. 注意我们所关心的,最开始的唯一那个套接字是怎么变成 listener_sockfd 的,这就是最开始的套接字,服务器借此来接收新客户端的连接。
  3. select 的返回值,是在作为参数传递的集合中,那些已经就绪的描述符的个数。select 修改这个集合,用来标记就绪的描述符。下一步是在这些描述符中进行迭代。
...
for (int fd = 0; fd <= fdset_max && nready > 0; fd++) {
  // 检查 fd 是否变成可读的
  if (FD_ISSET(fd, &readfds)) {
    nready--;

    if (fd == listener_sockfd) {
      // 监听的套接字就绪了;这意味着有个新的客户端连接正在联系
      ...
    } else {
      fd_status_t status = on_peer_ready_recv(fd);
      if (status.want_read) {
        FD_SET(fd, &readfds_master);
      } else {
        FD_CLR(fd, &readfds_master);
      }
      if (status.want_write) {
        FD_SET(fd, &writefds_master);
      } else {
        FD_CLR(fd, &writefds_master);
      }
      if (!status.want_read && !status.want_write) {
        printf("socket %d closing\n", fd);
        close(fd);
      }
    }

这部分循环检查 可读的 描述符。让我们跳过监听器套接字(要浏览所有内容,看这个代码) 然后看看当其中一个客户端准备好了之后会发生什么。出现了这种情况后,我们调用一个叫做 on_peer_ready_recv回调 函数,传入相应的文件描述符。这个调用意味着客户端连接到套接字上,发送某些数据,并且对套接字上 recv 的调用不会被阻塞 注6 。这个回调函数返回结构体 fd_status_t

typedef struct {
  bool want_read;
  bool want_write;
} fd_status_t;

这个结构体告诉主循环,是否应该监视套接字的读取事件、写入事件,或者两者都监视。上述代码展示了 FD_SETFD_CLR 是怎么在合适的描述符集合中被调用的。对于主循环中某个准备好了写入数据的描述符,代码是类似的,除了它所调用的回调函数,这个回调函数叫做 on_peer_ready_send

现在来花点时间看看这个回调:

typedef enum { INITIAL_ACK, WAIT_FOR_MSG, IN_MSG } ProcessingState;

#define SENDBUF_SIZE 1024

typedef struct {
  ProcessingState state;

  // sendbuf 包含了服务器要返回给客户端的数据。on_peer_ready_recv 句柄填充这个缓冲,
  // on_peer_read_send 进行消耗。sendbuf_end 指向缓冲区的最后一个有效字节,
  // sendptr 指向下个字节
  uint8_t sendbuf[SENDBUF_SIZE];
  int sendbuf_end;
  int sendptr;
} peer_state_t;

// 每一端都是通过它连接的文件描述符(fd)进行区分。只要客户端连接上了,fd 就是唯一的。
// 当客户端断开连接,另一个客户端连接上就会获得相同的 fd。on_peer_connected 应该
// 进行初始化,以便移除旧客户端在同一个 fd 上留下的东西。
peer_state_t global_state[MAXFDS];

fd_status_t on_peer_ready_recv(int sockfd) {
  assert(sockfd < MAXFDs);
  peer_state_t* peerstate = &global_state[sockfd];

  if (peerstate->state == INITIAL_ACK ||
      peerstate->sendptr < peerstate->sendbuf_end) {
    // 在初始的 ACK 被送到了客户端,就没有什么要接收的了。
    // 等所有待发送的数据都被发送之后接收更多的数据。
    return fd_status_W;
  }

  uint8_t buf[1024];
  int nbytes = recv(sockfd, buf, sizeof buf, 0);
  if (nbytes == 0) {
    // 客户端断开连接
    return fd_status_NORW;
  } else if (nbytes < 0) {
    if (errno == EAGAIN || errno == EWOULDBLOCK) {
      // 套接字 *实际* 并没有准备好接收,等到它就绪。
      return fd_status_R;
    } else {
      perror_die("recv");
    }
  }
  bool ready_to_send = false;
  for (int i = 0; i < nbytes; ++i) {
    switch (peerstate->state) {
    case INITIAL_ACK:
      assert(0 && "can't reach here");
      break;
    case WAIT_FOR_MSG:
      if (buf[i] == '^') {
        peerstate->state = IN_MSG;
      }
      break;
    case IN_MSG:
      if (buf[i] == '$') {
        peerstate->state = WAIT_FOR_MSG;
      } else {
        assert(peerstate->sendbuf_end < SENDBUF_SIZE);
        peerstate->sendbuf[peerstate->sendbuf_end++] = buf[i] + 1;
        ready_to_send = true;
      }
      break;
    }
  }
  // 如果没有数据要发送给客户端,报告读取状态作为最后接收的结果。
  return (fd_status_t){.want_read = !ready_to_send,
                       .want_write = ready_to_send};
}

peer_state_t 是全状态对象,用来表示在主循环中两次回调函数调用之间的客户端的连接。因为回调函数在客户端发送的某些数据时被调用,不能假设它能够不停地与客户端通信,并且它得运行得很快,不能被阻塞。因为套接字被设置成非阻塞模式,recv 会快速的返回。除了调用 recv, 这个句柄做的是处理状态,没有其它的调用,从而不会发生阻塞。

举个例子,你知道为什么这个代码需要一个额外的状态吗?这个系列中,我们的服务器目前只用到了两个状态,但是这个服务器程序需要三个状态。

来看看 “套接字准备好发送” 的回调函数:

fd_status_t on_peer_ready_send(int sockfd) {
  assert(sockfd < MAXFDs);
  peer_state_t* peerstate = &global_state[sockfd];

  if (peerstate->sendptr >= peerstate->sendbuf_end) {
    // 没有要发送的。
    return fd_status_RW;
  }
  int sendlen = peerstate->sendbuf_end - peerstate->sendptr;
  int nsent = send(sockfd, peerstate->sendbuf, sendlen, 0);
  if (nsent == -1) {
    if (errno == EAGAIN || errno == EWOULDBLOCK) {
      return fd_status_W;
    } else {
      perror_die("send");
    }
  }
  if (nsent < sendlen) {
    peerstate->sendptr += nsent;
    return fd_status_W;
  } else {
    // 所有东西都成功发送;重置发送队列。
    peerstate->sendptr = 0;
    peerstate->sendbuf_end = 0;

    // 如果我们现在是处于特殊的 INITIAL_ACK 状态,就转变到其他状态。
    if (peerstate->state == INITIAL_ACK) {
      peerstate->state = WAIT_FOR_MSG;
    }

    return fd_status_R;
  }
}

这里也一样,回调函数调用了一个非阻塞的 send,演示了状态管理。在异步代码中,回调函数执行的很快是受争议的,任何延迟都会阻塞主循环进行处理,因此也阻塞了整个服务器程序去处理其他的客户端。

用脚步再来运行这个服务器,同时连接 3 个客户端。在一个终端中我们运行下面的命令:

$ ./select-server

在另一个终端中:

$ python3.6 simple-client.py  -n 3 localhost 9090
INFO:2017-09-26 05:29:15,864:conn1 connected...
INFO:2017-09-26 05:29:15,864:conn2 connected...
INFO:2017-09-26 05:29:15,864:conn0 connected...
INFO:2017-09-26 05:29:15,865:conn1 sending b'^abc$de^abte$f'
INFO:2017-09-26 05:29:15,865:conn2 sending b'^abc$de^abte$f'
INFO:2017-09-26 05:29:15,865:conn0 sending b'^abc$de^abte$f'
INFO:2017-09-26 05:29:15,865:conn1 received b'bcdbcuf'
INFO:2017-09-26 05:29:15,865:conn2 received b'bcdbcuf'
INFO:2017-09-26 05:29:15,865:conn0 received b'bcdbcuf'
INFO:2017-09-26 05:29:16,866:conn1 sending b'xyz^123'
INFO:2017-09-26 05:29:16,867:conn0 sending b'xyz^123'
INFO:2017-09-26 05:29:16,867:conn2 sending b'xyz^123'
INFO:2017-09-26 05:29:16,867:conn1 received b'234'
INFO:2017-09-26 05:29:16,868:conn0 received b'234'
INFO:2017-09-26 05:29:16,868:conn2 received b'234'
INFO:2017-09-26 05:29:17,868:conn1 sending b'25$^ab0000$abab'
INFO:2017-09-26 05:29:17,869:conn1 received b'36bc1111'
INFO:2017-09-26 05:29:17,869:conn0 sending b'25$^ab0000$abab'
INFO:2017-09-26 05:29:17,870:conn0 received b'36bc1111'
INFO:2017-09-26 05:29:17,870:conn2 sending b'25$^ab0000$abab'
INFO:2017-09-26 05:29:17,870:conn2 received b'36bc1111'
INFO:2017-09-26 05:29:18,069:conn1 disconnecting
INFO:2017-09-26 05:29:18,070:conn0 disconnecting
INFO:2017-09-26 05:29:18,070:conn2 disconnecting

和线程的情况相似,客户端之间没有延迟,它们被同时处理。而且在 select-server 也没有用线程!主循环 多路 处理所有的客户端,通过高效使用 select 轮询多个套接字。回想下 第二节中 顺序的 vs 多线程的客户端处理过程的图片。对于我们的 select-server,三个客户端的处理流程像这样:

多客户端处理流程

所有的客户端在同一个线程中同时被处理,通过乘积,做一点这个客户端的任务,然后切换到另一个,再切换到下一个,最后切换回到最开始的那个客户端。注意,这里没有什么循环调度,客户端在它们发送数据的时候被客户端处理,这实际上是受客户端左右的。

同步、异步、事件驱动、回调

select-server 示例代码为讨论什么是异步编程、它和事件驱动及基于回调的编程有何联系,提供了一个良好的背景。因为这些词汇在并发服务器的(非常矛盾的)讨论中很常见。

让我们从一段 select 的手册页面中引用的一句话开始:

select,pselect,FD\_CLR,FD\_ISSET,FD\_SET,FD\_ZERO - 同步 I/O 处理

因此 select同步 处理。但我刚刚演示了大量代码的例子,使用 select 作为 异步 处理服务器的例子。有哪些东西?

答案是:这取决于你的观察角度。同步常用作阻塞处理,并且对 select 的调用实际上是阻塞的。和第 1、2 节中讲到的顺序的、多线程的服务器中对 sendrecv 是一样的。因此说 select同步的 API 是有道理的。可是,服务器的设计却可以是 异步的,或是 基于回调的,或是 事件驱动的,尽管其中有对 select 的使用。注意这里的 on_peer_* 函数是回调函数;它们永远不会阻塞,并且只有网络事件触发的时候才会被调用。它们可以获得部分数据,并能够在调用过程中保持稳定的状态。

如果你曾经做过一些 GUI 编程,这些东西对你来说应该很亲切。有个 “事件循环”,常常完全隐藏在框架里,应用的 “业务逻辑” 建立在回调上,这些回调会在各种事件触发后被调用,用户点击鼠标、选择菜单、定时器触发、数据到达套接字等等。曾经最常见的编程模型是客户端的 JavaScript,这里面有一堆回调函数,它们在浏览网页时用户的行为被触发。

select 的局限

使用 select 作为第一个异步服务器的例子对于说明这个概念很有用,而且由于 select 是很常见、可移植的 API。但是它也有一些严重的缺陷,在监视的文件描述符非常大的时候就会出现。

  1. 有限的文件描述符的集合大小。
  2. 糟糕的性能。

从文件描述符的大小开始。FD_SETSIZE 是一个编译期常数,在如今的操作系统中,它的值通常是 1024。它被硬编码在 glibc 的头文件里,并且不容易修改。它把 select 能够监视的文件描述符的数量限制在 1024 以内。曾有些人想要写出能够处理上万个并发访问的客户端请求的服务器,所以这个问题很有现实意义。有一些方法,但是不可移植,也很难用。

糟糕的性能问题就好解决的多,但是依然非常严重。注意当 select 返回的时候,它向调用者提供的信息是 “就绪的” 描述符的个数,还有被修改过的描述符集合。描述符集映射着描述符“就绪/未就绪”,但是并没有提供什么有效的方法去遍历所有就绪的描述符。如果只有一个描述符是就绪的,最坏的情况是调用者需要遍历 整个集合 来找到那个描述符。这在监视的描述符数量比较少的时候还行,但是如果数量变的很大的时候,这种方法弊端就凸显出了 注7

由于这些原因,为了写出高性能的并发服务器, select 已经不怎么用了。每一个流行的操作系统有独特的不可移植的 API,允许用户写出非常高效的事件循环;像框架这样的高级结构还有高级语言通常在一个可移植的接口中包含这些 API。

epoll

举个例子,来看看 epoll,Linux 上的关于高容量 I/O 事件通知问题的解决方案。epoll 高效的关键之处在于它与内核更好的协作。不是使用文件描述符,epoll_wait 用当前准备好的事件填满一个缓冲区。只有准备好的事件添加到了缓冲区,因此没有必要遍历客户端中当前 所有 监视的文件描述符。这简化了查找就绪的描述符的过程,把空间复杂度从 select 中的 O(N) 变为了 O(1)。

关于 epoll API 的完整展示不是这里的目的,网上有很多相关资源。虽然你可能猜到了,我还要写一个不同的并发服务器,这次是用 epool 而不是 select。完整的示例代码 在这里。实际上,由于大部分代码和 用 select 的服务器相同,所以我只会讲要点,在主循环里使用 epoll

struct epoll_event accept_event;
accept_event.data.fd = listener_sockfd;
accept_event.events = EPOLLIN;
if (epoll_ctl(epollfd, EPOLL_CTL_ADD, listener_sockfd, &accept_event) < 0) {
  perror_die("epoll_ctl EPOLL_CTL_ADD");
}

struct epoll_event* events = calloc(MAXFDS, sizeof(struct epoll_event));
if (events == NULL) {
  die("Unable to allocate memory for epoll_events");
}

while (1) {
  int nready = epoll_wait(epollfd, events, MAXFDS, -1);
  for (int i = 0; i < nready; i++) {
    if (events[i].events & EPOLLERR) {
      perror_die("epoll_wait returned EPOLLERR");
    }

    if (events[i].data.fd == listener_sockfd) {
      // 监听的套接字就绪了;意味着新客户端正在连接。
      ...
    } else {
      // A peer socket is ready.
      if (events[i].events & EPOLLIN) {
        // 准备好了读取
        ...
      } else if (events[i].events & EPOLLOUT) {
        // 准备好了写入
        ...
      }
    }
  }
}

通过调用 epoll_ctl 来配置 epoll。这时,配置监听的套接字数量,也就是 epoll 监听的描述符的数量。然后分配一个缓冲区,把就绪的事件传给 epoll 以供修改。在主循环里对 epoll_wait 的调用是魅力所在。它阻塞着,直到某个描述符就绪了(或者超时),返回就绪的描述符数量。但这时,不要盲目地迭代所有监视的集合,我们知道 epoll_write 会修改传给它的 events 缓冲区,缓冲区中有就绪的事件,从 0 到 nready-1,因此我们只需迭代必要的次数。

要在 select 里面重新遍历,有明显的差异:如果在监视着 1000 个描述符,只有两个就绪, epoll_waits 返回的是 nready=2,然后修改 events 缓冲区最前面的两个元素,因此我们只需要“遍历”两个描述符。用 select 我们就需要遍历 1000 个描述符,找出哪个是就绪的。因此,在繁忙的服务器上,有许多活跃的套接字时 epollselect 更加容易扩展。

剩下的代码很直观,因为我们已经很熟悉 “select 服务器” 了。实际上,“epoll 服务器” 中的所有“业务逻辑”和 “select 服务器” 是一样的,回调构成相同的代码。

这种相似是通过将事件循环抽象分离到一个库/框架中。我将会详述这些内容,因为很多优秀的程序员曾经也是这样做的。相反,下一篇文章里我们会了解 libuv,一个最近出现的更加受欢迎的时间循环抽象层。像 libuv 这样的库让我们能够写出并发的异步服务器,并且不用考虑系统调用下繁琐的细节。


  • 注1:我试着在做网络浏览和阅读这两件事的实际差别中突显自己,但经常做得头疼。有很多不同的选项,从“它们是一样的东西”到“一个是另一个的子集”,再到“它们是完全不同的东西”。在面临这样主观的观点时,最好是完全放弃这个问题,专注特殊的例子和用例。
  • 注2:POSIX 表示这可以是 EAGAIN,也可以是 EWOULDBLOCK,可移植应用应该对这两个都进行检查。
  • 注3:和这个系列所有的 C 示例类似,代码中用到了某些助手工具来设置监听套接字。这些工具的完整代码在这个 仓库utils 模块里。
  • 注4:select 不是网络/套接字专用的函数,它可以监视任意的文件描述符,有可能是硬盘文件、管道、终端、套接字或者 Unix 系统中用到的任何文件描述符。这篇文章里,我们主要关注它在套接字方面的应用。
  • 注5:有多种方式用多线程来实现事件驱动,我会把它放在稍后的文章中进行讨论。
  • 注6:由于各种非实验因素,它 仍然 可以阻塞,即使是在 select 说它就绪了之后。因此服务器上打开的所有套接字都被设置成非阻塞模式,如果对 recvsend 的调用返回了 EAGAIN 或者 EWOULDBLOCK,回调函数就装作没有事件发生。阅读示例代码的注释可以了解更多细节。
  • 注7:注意这比该文章前面所讲的异步轮询的例子要稍好一点。轮询需要 一直 发生,而 select 实际上会阻塞到有一个或多个套接字准备好读取/写入;select 会比一直询问浪费少得多的 CPU 时间。

via: https://eli.thegreenplace.net/2017/concurrent-servers-part-3-event-driven/

作者:Eli Bendersky 译者:GitFuture 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

这是并发网络服务器系列的第二节。第一节 提出了服务端实现的协议,还有简单的顺序服务器的代码,是这整个系列的基础。

这一节里,我们来看看怎么用多线程来实现并发,用 C 实现一个最简单的多线程服务器,和用 Python 实现的线程池。

该系列的所有文章:

多线程的方法设计并发服务器

说起第一节里的顺序服务器的性能,最显而易见的,是在服务器处理客户端连接时,计算机的很多资源都被浪费掉了。尽管假定客户端快速发送完消息,不做任何等待,仍然需要考虑网络通信的开销;网络要比现在的 CPU 慢上百万倍还不止,因此 CPU 运行服务器时会等待接收套接字的流量,而大量的时间都花在完全不必要的等待中。

这里是一份示意图,表明顺序时客户端的运行过程:

顺序客户端处理流程

这个图片上有 3 个客户端程序。棱形表示客户端的“到达时间”(即客户端尝试连接服务器的时间)。黑色线条表示“等待时间”(客户端等待服务器真正接受连接所用的时间),有色矩形表示“处理时间”(服务器和客户端使用协议进行交互所用的时间)。有色矩形的末端表示客户端断开连接。

上图中,绿色和橘色的客户端尽管紧跟在蓝色客户端之后到达服务器,也要等到服务器处理完蓝色客户端的请求。这时绿色客户端得到响应,橘色的还要等待一段时间。

多线程服务器会开启多个控制线程,让操作系统管理 CPU 的并发(使用多个 CPU 核心)。当客户端连接的时候,创建一个线程与之交互,而在主线程中,服务器能够接受其他的客户端连接。下图是该模式的时间轴:

并行客户端处理流程

每个客户端一个线程,在 C 语言里要用 pthread

这篇文章的 第一个示例代码 是一个简单的 “每个客户端一个线程” 的服务器,用 C 语言编写,使用了 phtreads API 用于实现多线程。这里是主循环代码:

while (1) {
  struct sockaddr_in peer_addr;
  socklen_t peer_addr_len = sizeof(peer_addr);

  int newsockfd =
      accept(sockfd, (struct sockaddr*)&peer_addr, &peer_addr_len);

  if (newsockfd < 0) {
    perror_die("ERROR on accept");
  }

  report_peer_connected(&peer_addr, peer_addr_len);
  pthread_t the_thread;

  thread_config_t* config = (thread_config_t*)malloc(sizeof(*config));
  if (!config) {
    die("OOM");
  }
  config->sockfd = newsockfd;
  pthread_create(&the_thread, NULL, server_thread, config);

  // 回收线程 —— 在线程结束的时候,它占用的资源会被回收
  // 因为主线程在一直运行,所以它比服务线程存活更久。
  pthread_detach(the_thread);
}

这是 server_thread 函数:

void* server_thread(void* arg) {
  thread_config_t* config = (thread_config_t*)arg;
  int sockfd = config->sockfd;
  free(config);

  // This cast will work for Linux, but in general casting pthread_id to an 这个类型转换在 Linux 中可以正常运行,但是一般来说将 pthread_id 类型转换成整形不便于移植代码
  // integral type isn't portable.
  unsigned long id = (unsigned long)pthread_self();
  printf("Thread %lu created to handle connection with socket %d\n", id,
         sockfd);
  serve_connection(sockfd);
  printf("Thread %lu done\n", id);
  return 0;
}

线程 “configuration” 是作为 thread_config_t 结构体进行传递的:

typedef struct { int sockfd; } thread_config_t;

主循环中调用的 pthread_create 产生一个新线程,然后运行 server_thread 函数。这个线程会在 server_thread 返回的时候结束。而在 serve_connection 返回的时候 server_thread 才会返回。serve_connection 和第一节完全一样。

第一节中我们用脚本生成了多个并发访问的客户端,观察服务器是怎么处理的。现在来看看多线程服务器的处理结果:

$ python3.6 simple-client.py  -n 3 localhost 9090
INFO:2017-09-20 06:31:56,632:conn1 connected...
INFO:2017-09-20 06:31:56,632:conn2 connected...
INFO:2017-09-20 06:31:56,632:conn0 connected...
INFO:2017-09-20 06:31:56,632:conn1 sending b'^abc$de^abte$f'
INFO:2017-09-20 06:31:56,632:conn2 sending b'^abc$de^abte$f'
INFO:2017-09-20 06:31:56,632:conn0 sending b'^abc$de^abte$f'
INFO:2017-09-20 06:31:56,633:conn1 received b'b'
INFO:2017-09-20 06:31:56,633:conn2 received b'b'
INFO:2017-09-20 06:31:56,633:conn0 received b'b'
INFO:2017-09-20 06:31:56,670:conn1 received b'cdbcuf'
INFO:2017-09-20 06:31:56,671:conn0 received b'cdbcuf'
INFO:2017-09-20 06:31:56,671:conn2 received b'cdbcuf'
INFO:2017-09-20 06:31:57,634:conn1 sending b'xyz^123'
INFO:2017-09-20 06:31:57,634:conn2 sending b'xyz^123'
INFO:2017-09-20 06:31:57,634:conn1 received b'234'
INFO:2017-09-20 06:31:57,634:conn0 sending b'xyz^123'
INFO:2017-09-20 06:31:57,634:conn2 received b'234'
INFO:2017-09-20 06:31:57,634:conn0 received b'234'
INFO:2017-09-20 06:31:58,635:conn1 sending b'25$^ab0000$abab'
INFO:2017-09-20 06:31:58,635:conn2 sending b'25$^ab0000$abab'
INFO:2017-09-20 06:31:58,636:conn1 received b'36bc1111'
INFO:2017-09-20 06:31:58,636:conn2 received b'36bc1111'
INFO:2017-09-20 06:31:58,637:conn0 sending b'25$^ab0000$abab'
INFO:2017-09-20 06:31:58,637:conn0 received b'36bc1111'
INFO:2017-09-20 06:31:58,836:conn2 disconnecting
INFO:2017-09-20 06:31:58,836:conn1 disconnecting
INFO:2017-09-20 06:31:58,837:conn0 disconnecting

实际上,所有客户端同时连接,它们与服务器的通信是同时发生的。

每个客户端一个线程的难点

尽管在现代操作系统中就资源利用率方面来看,线程相当的高效,但前一节中讲到的方法在高负载时却会出现纰漏。

想象一下这样的情景:很多客户端同时进行连接,某些会话持续的时间长。这意味着某个时刻服务器上有很多活跃的线程。太多的线程会消耗掉大量的内存和 CPU 资源,而仅仅是用于上下文切换 注1 。另外其也可视为安全问题:因为这样的设计容易让服务器成为 DoS 攻击 的目标 —— 上百万个客户端同时连接,并且客户端都处于闲置状态,这样耗尽了所有资源就可能让服务器宕机。

当服务器要与每个客户端通信,CPU 进行大量计算时,就会出现更严重的问题。这种情况下,容易想到的方法是减少服务器的响应能力 —— 只有其中一些客户端能得到服务器的响应。

因此,对多线程服务器所能够处理的并发客户端数做一些 速率限制 就是个明智的选择。有很多方法可以实现。最容易想到的是计数当前已经连接上的客户端,把连接数限制在某个范围内(需要通过仔细的测试后决定)。另一种流行的多线程应用设计是使用 线程池

线程池

线程池 很简单,也很有用。服务器创建几个任务线程,这些线程从某些队列中获取任务。这就是“池”。然后每一个客户端的连接被当成任务分发到池中。只要池中有空闲的线程,它就会去处理任务。如果当前池中所有线程都是繁忙状态,那么服务器就会阻塞,直到线程池可以接受任务(某个繁忙状态的线程处理完当前任务后,变回空闲的状态)。

这里有个 4 线程的线程池处理任务的图。任务(这里就是客户端的连接)要等到线程池中的某个线程可以接受新任务。

非常明显,线程池的定义就是一种按比例限制的机制。我们可以提前设定服务器所能拥有的线程数。那么这就是并发连接的最多的客户端数 —— 其它的客户端就要等到线程空闲。如果我们的池中有 8 个线程,那么 8 就是服务器可以处理的最多的客户端并发连接数,哪怕上千个客户端想要同时连接。

那么怎么确定池中需要有多少个线程呢?通过对问题范畴进行细致的分析、评估、实验以及根据我们拥有的硬件配置。如果是单核的云服务器,答案只有一个;如果是 100 核心的多套接字的服务器,那么答案就有很多种。也可以在运行时根据负载动态选择池的大小 —— 我会在这个系列之后的文章中谈到这个东西。

使用线程池的服务器在高负载情况下表现出 性能退化 —— 客户端能够以稳定的速率进行连接,可能会比其它时刻得到响应的用时稍微久一点;也就是说,无论多少个客户端同时进行连接,服务器总能保持响应,尽最大能力响应等待的客户端。与之相反,每个客户端一个线程的服务器,会接收多个客户端的连接直到过载,这时它更容易崩溃或者因为要处理所有客户端而变得缓慢,因为资源都被耗尽了(比如虚拟内存的占用)。

在服务器上使用线程池

为了改变服务器的实现,我用了 Python,在 Python 的标准库中带有一个已经实现好的稳定的线程池。(concurrent.futures 模块里的 ThreadPoolExecutor 注2

服务器创建一个线程池,然后进入循环,监听套接字接收客户端的连接。用 submit 把每一个连接的客户端分配到池中:

pool = ThreadPoolExecutor(args.n)
sockobj = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sockobj.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sockobj.bind(('localhost', args.port))
sockobj.listen(15)

try:
    while True:
        client_socket, client_address = sockobj.accept()
        pool.submit(serve_connection, client_socket, client_address)
except KeyboardInterrupt as e:
    print(e)
    sockobj.close()

serve_connection 函数和 C 的那部分很像,与一个客户端交互,直到其断开连接,并且遵循我们的协议:

ProcessingState = Enum('ProcessingState', 'WAIT_FOR_MSG IN_MSG')

def serve_connection(sockobj, client_address):
    print('{0} connected'.format(client_address))
    sockobj.sendall(b'*')
    state = ProcessingState.WAIT_FOR_MSG

    while True:
        try:
            buf = sockobj.recv(1024)
            if not buf:
                break
        except IOError as e:
            break
        for b in buf:
            if state == ProcessingState.WAIT_FOR_MSG:
                if b == ord(b'^'):
                    state = ProcessingState.IN_MSG
            elif state == ProcessingState.IN_MSG:
                if b == ord(b'$'):
                    state = ProcessingState.WAIT_FOR_MSG
                else:
                    sockobj.send(bytes([b + 1]))
            else:
                assert False

    print('{0} done'.format(client_address))
    sys.stdout.flush()
    sockobj.close()

来看看线程池的大小对并行访问的客户端的阻塞行为有什么样的影响。为了演示,我会运行一个池大小为 2 的线程池服务器(只生成两个线程用于响应客户端)。

$ python3.6 threadpool-server.py -n 2

在另外一个终端里,运行客户端模拟器,产生 3 个并发访问的客户端:

$ python3.6 simple-client.py  -n 3 localhost 9090
INFO:2017-09-22 05:58:52,815:conn1 connected...
INFO:2017-09-22 05:58:52,827:conn0 connected...
INFO:2017-09-22 05:58:52,828:conn1 sending b'^abc$de^abte$f'
INFO:2017-09-22 05:58:52,828:conn0 sending b'^abc$de^abte$f'
INFO:2017-09-22 05:58:52,828:conn1 received b'b'
INFO:2017-09-22 05:58:52,828:conn0 received b'b'
INFO:2017-09-22 05:58:52,867:conn1 received b'cdbcuf'
INFO:2017-09-22 05:58:52,867:conn0 received b'cdbcuf'
INFO:2017-09-22 05:58:53,829:conn1 sending b'xyz^123'
INFO:2017-09-22 05:58:53,829:conn0 sending b'xyz^123'
INFO:2017-09-22 05:58:53,830:conn1 received b'234'
INFO:2017-09-22 05:58:53,831:conn0 received b'2'
INFO:2017-09-22 05:58:53,831:conn0 received b'34'
INFO:2017-09-22 05:58:54,831:conn1 sending b'25$^ab0000$abab'
INFO:2017-09-22 05:58:54,832:conn1 received b'36bc1111'
INFO:2017-09-22 05:58:54,832:conn0 sending b'25$^ab0000$abab'
INFO:2017-09-22 05:58:54,833:conn0 received b'36bc1111'
INFO:2017-09-22 05:58:55,032:conn1 disconnecting
INFO:2017-09-22 05:58:55,032:conn2 connected...
INFO:2017-09-22 05:58:55,033:conn2 sending b'^abc$de^abte$f'
INFO:2017-09-22 05:58:55,033:conn0 disconnecting
INFO:2017-09-22 05:58:55,034:conn2 received b'b'
INFO:2017-09-22 05:58:55,071:conn2 received b'cdbcuf'
INFO:2017-09-22 05:58:56,036:conn2 sending b'xyz^123'
INFO:2017-09-22 05:58:56,036:conn2 received b'234'
INFO:2017-09-22 05:58:57,037:conn2 sending b'25$^ab0000$abab'
INFO:2017-09-22 05:58:57,038:conn2 received b'36bc1111'
INFO:2017-09-22 05:58:57,238:conn2 disconnecting

回顾之前讨论的服务器行为:

  1. 在顺序服务器中,所有的连接都是串行的。一个连接结束后,下一个连接才能开始。
  2. 前面讲到的每个客户端一个线程的服务器中,所有连接都被同时接受并得到服务。

这里可以看到一种可能的情况:两个连接同时得到服务,只有其中一个结束连接后第三个才能连接上。这就是把线程池大小设置成 2 的结果。真实用例中我们会把线程池设置的更大些,取决于机器和实际的协议。线程池的缓冲机制就能很好理解了 —— 我 几个月前 更详细的介绍过这种机制,关于 Clojure 的 core.async 模块。

总结与展望

这篇文章讨论了在服务器中,用多线程作并发的方法。每个客户端一个线程的方法最早提出来,但是实际上却不常用,因为它并不安全。

线程池就常见多了,最受欢迎的几个编程语言有良好的实现(某些编程语言,像 Python,就是在标准库中实现)。这里说的使用线程池的服务器,不会受到每个客户端一个线程的弊端。

然而,线程不是处理多个客户端并行访问的唯一方法。下一节中我们会看看其它的解决方案,可以使用异步处理,或者事件驱动的编程。


  • 注1:老实说,现代 Linux 内核可以承受足够多的并发线程 —— 只要这些线程主要在 I/O 上被阻塞。这里有个示例程序,它产生可配置数量的线程,线程在循环体中是休眠的,每 50 ms 唤醒一次。我在 4 核的 Linux 机器上可以轻松的产生 10000 个线程;哪怕这些线程大多数时间都在睡眠,它们仍然消耗一到两个核心,以便实现上下文切换。而且,它们占用了 80 GB 的虚拟内存(Linux 上每个线程的栈大小默认是 8MB)。实际使用中,线程会使用内存并且不会在循环体中休眠,因此它可以非常快的占用完一个机器的内存。
  • 注2:自己动手实现一个线程池是个有意思的练习,但我现在还不想做。我曾写过用来练手的 针对特殊任务的线程池。是用 Python 写的;用 C 重写的话有些难度,但对于经验丰富的程序员,几个小时就够了。

via: https://eli.thegreenplace.net/2017/concurrent-servers-part-2-threads/

作者:Eli Bendersky 译者:GitFuture 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

这是关于并发网络服务器编程的第一篇教程。我计划测试几个主流的、可以同时处理多个客户端请求的服务器并发模型,基于可扩展性和易实现性对这些模型进行评判。所有的服务器都会监听套接字连接,并且实现一些简单的协议用于与客户端进行通讯。

该系列的所有文章:

协议

该系列教程所用的协议都非常简单,但足以展示并发服务器设计的许多有趣层面。而且这个协议是 有状态的 —— 服务器根据客户端发送的数据改变内部状态,然后根据内部状态产生相应的行为。并非所有的协议都是有状态的 —— 实际上,基于 HTTP 的许多协议是无状态的,但是有状态的协议也是很常见,值得认真讨论。

在服务器端看来,这个协议的视图是这样的:

总之:服务器等待新客户端的连接;当一个客户端连接的时候,服务器会向该客户端发送一个 * 字符,进入“等待消息”的状态。在该状态下,服务器会忽略客户端发送的所有字符,除非它看到了一个 ^ 字符,这表示一个新消息的开始。这个时候服务器就会转变为“正在通信”的状态,这时它会向客户端回送数据,把收到的所有字符的每个字节加 1 回送给客户端 注1 。当客户端发送了 $ 字符,服务器就会退回到等待新消息的状态。^$ 字符仅仅用于分隔消息 —— 它们不会被服务器回送。

每个状态之后都有个隐藏的箭头指向 “等待客户端” 状态,用于客户端断开连接。因此,客户端要表示“我已经结束”的方法很简单,关掉它那一端的连接就好。

显然,这个协议是真实协议的简化版,真实使用的协议一般包含复杂的报文头、转义字符序列(例如让消息体中可以出现 $ 符号),额外的状态变化。但是我们这个协议足以完成期望。

另一点:这个系列是介绍性的,并假设客户端都工作的很好(虽然可能运行很慢);因此没有设置超时,也没有设置特殊的规则来确保服务器不会因为客户端的恶意行为(或是故障)而出现阻塞,导致不能正常结束。

顺序服务器

这个系列中我们的第一个服务端程序是一个简单的“顺序”服务器,用 C 进行编写,除了标准的 POSIX 中用于套接字的内容以外没有使用其它库。服务器程序是顺序,因为它一次只能处理一个客户端的请求;当有客户端连接时,像之前所说的那样,服务器会进入到状态机中,并且不再监听套接字接受新的客户端连接,直到当前的客户端结束连接。显然这不是并发的,而且即便在很少的负载下也不能服务多个客户端,但它对于我们的讨论很有用,因为我们需要的是一个易于理解的基础。

这个服务器的完整代码在这里;接下来,我会着重于一些重点的部分。main 函数里面的外层循环用于监听套接字,以便接受新客户端的连接。一旦有客户端进行连接,就会调用 serve_connection,这个函数中的代码会一直运行,直到客户端断开连接。

顺序服务器在循环里调用 accept 用来监听套接字,并接受新连接:

while (1) {
  struct sockaddr_in peer_addr;
  socklen_t peer_addr_len = sizeof(peer_addr);

  int newsockfd =
      accept(sockfd, (struct sockaddr*)&peer_addr, &peer_addr_len);

  if (newsockfd < 0) {
    perror_die("ERROR on accept");
  }

  report_peer_connected(&peer_addr, peer_addr_len);
  serve_connection(newsockfd);
  printf("peer done\n");
}

accept 函数每次都会返回一个新的已连接的套接字,然后服务器调用 serve_connection;注意这是一个 阻塞式 的调用 —— 在 serve_connection 返回前,accept 函数都不会再被调用了;服务器会被阻塞,直到客户端结束连接才能接受新的连接。换句话说,客户端按 顺序 得到响应。

这是 serve_connection 函数:

typedef enum { WAIT_FOR_MSG, IN_MSG } ProcessingState;

void serve_connection(int sockfd) {
  if (send(sockfd, "*", 1, 0) < 1) {
    perror_die("send");
  }

  ProcessingState state = WAIT_FOR_MSG;

  while (1) {
    uint8_t buf[1024];
    int len = recv(sockfd, buf, sizeof buf, 0);
    if (len < 0) {
      perror_die("recv");
    } else if (len == 0) {
      break;
    }

    for (int i = 0; i < len; ++i) {
      switch (state) {
      case WAIT_FOR_MSG:
        if (buf[i] == '^') {
          state = IN_MSG;
        }
        break;
      case IN_MSG:
        if (buf[i] == '$') {
          state = WAIT_FOR_MSG;
        } else {
          buf[i] += 1;
          if (send(sockfd, &buf[i], 1, 0) < 1) {
            perror("send error");
            close(sockfd);
            return;
          }
        }
        break;
      }
    }
  }

  close(sockfd);
}

它完全是按照状态机协议进行编写的。每次循环的时候,服务器尝试接收客户端的数据。收到 0 字节意味着客户端断开连接,然后循环就会退出。否则,会逐字节检查接收缓存,每一个字节都可能会触发一个状态。

recv 函数返回接收到的字节数与客户端发送消息的数量完全无关(^...$ 闭合序列的字节)。因此,在保持状态的循环中遍历整个缓冲区很重要。而且,每一个接收到的缓冲中可能包含多条信息,但也有可能开始了一个新消息,却没有显式的结束字符;而这个结束字符可能在下一个缓冲中才能收到,这就是处理状态在循环迭代中进行维护的原因。

例如,试想主循环中的 recv 函数在某次连接中返回了三个非空的缓冲:

  1. ^abc$de^abte$f
  2. xyz^123
  3. 25$^ab$abab

服务端返回的是哪些数据?追踪代码对于理解状态转变很有用。(答案见 2

多个并发客户端

如果多个客户端在同一时刻向顺序服务器发起连接会发生什么事情?

服务器端的代码(以及它的名字 “顺序服务器”)已经说的很清楚了,一次只能处理 一个 客户端的请求。只要服务器在 serve_connection 函数中忙于处理客户端的请求,就不会接受别的客户端的连接。只有当前的客户端断开了连接,serve_connection 才会返回,然后最外层的循环才能继续执行接受其他客户端的连接。

为了演示这个行为,该系列教程的示例代码 包含了一个 Python 脚本,用于模拟几个想要同时连接服务器的客户端。每一个客户端发送类似之前那样的三个数据缓冲 注3 ,不过每次发送数据之间会有一定延迟。

客户端脚本在不同的线程中并发地模拟客户端行为。这是我们的序列化服务器与客户端交互的信息记录:

$ python3.6 simple-client.py  -n 3 localhost 9090
INFO:2017-09-16 14:14:17,763:conn1 connected...
INFO:2017-09-16 14:14:17,763:conn1 sending b'^abc$de^abte$f'
INFO:2017-09-16 14:14:17,763:conn1 received b'b'
INFO:2017-09-16 14:14:17,802:conn1 received b'cdbcuf'
INFO:2017-09-16 14:14:18,764:conn1 sending b'xyz^123'
INFO:2017-09-16 14:14:18,764:conn1 received b'234'
INFO:2017-09-16 14:14:19,764:conn1 sending b'25$^ab0000$abab'
INFO:2017-09-16 14:14:19,765:conn1 received b'36bc1111'
INFO:2017-09-16 14:14:19,965:conn1 disconnecting
INFO:2017-09-16 14:14:19,966:conn2 connected...
INFO:2017-09-16 14:14:19,967:conn2 sending b'^abc$de^abte$f'
INFO:2017-09-16 14:14:19,967:conn2 received b'b'
INFO:2017-09-16 14:14:20,006:conn2 received b'cdbcuf'
INFO:2017-09-16 14:14:20,968:conn2 sending b'xyz^123'
INFO:2017-09-16 14:14:20,969:conn2 received b'234'
INFO:2017-09-16 14:14:21,970:conn2 sending b'25$^ab0000$abab'
INFO:2017-09-16 14:14:21,970:conn2 received b'36bc1111'
INFO:2017-09-16 14:14:22,171:conn2 disconnecting
INFO:2017-09-16 14:14:22,171:conn0 connected...
INFO:2017-09-16 14:14:22,172:conn0 sending b'^abc$de^abte$f'
INFO:2017-09-16 14:14:22,172:conn0 received b'b'
INFO:2017-09-16 14:14:22,210:conn0 received b'cdbcuf'
INFO:2017-09-16 14:14:23,173:conn0 sending b'xyz^123'
INFO:2017-09-16 14:14:23,174:conn0 received b'234'
INFO:2017-09-16 14:14:24,175:conn0 sending b'25$^ab0000$abab'
INFO:2017-09-16 14:14:24,176:conn0 received b'36bc1111'
INFO:2017-09-16 14:14:24,376:conn0 disconnecting

这里要注意连接名:conn1 是第一个连接到服务器的,先跟服务器交互了一段时间。接下来的连接 conn2 —— 在第一个断开连接后,连接到了服务器,然后第三个连接也是一样。就像日志显示的那样,每一个连接让服务器变得繁忙,持续了大约 2.2 秒的时间(这实际上是人为地在客户端代码中加入的延迟),在这段时间里别的客户端都不能连接。

显然,这不是一个可扩展的策略。这个例子中,客户端中加入了延迟,让服务器不能处理别的交互动作。一个智能服务器应该能处理一堆客户端的请求,而这个原始的服务器在结束连接之前一直繁忙(我们将会在之后的章节中看到如何实现智能的服务器)。尽管服务端有延迟,但这不会过度占用 CPU;例如,从数据库中查找信息(时间基本上是花在连接到数据库服务器上,或者是花在硬盘中的本地数据库)。

总结及期望

这个示例服务器达成了两个预期目标:

  1. 首先是介绍了问题范畴和贯彻该系列文章的套接字编程基础。
  2. 对于并发服务器编程的抛砖引玉 —— 就像之前的部分所说,顺序服务器还不能在非常轻微的负载下进行扩展,而且没有高效的利用资源。

在看下一篇文章前,确保你已经理解了这里所讲的服务器/客户端协议,还有顺序服务器的代码。我之前介绍过了这个简单的协议;例如 串行通信分帧用协程来替代状态机。要学习套接字网络编程的基础,Beej 的教程 用来入门很不错,但是要深入理解我推荐你还是看本书。

如果有什么不清楚的,请在评论区下进行评论或者向我发送邮件。深入理解并发服务器!


  • 注1:状态转变中的 In/Out 记号是指 Mealy machine
  • 注2:回应的是 bcdbcuf23436bc
  • 注3:这里在结尾处有一点小区别,加了字符串 0000 —— 服务器回应这个序列,告诉客户端让其断开连接;这是一个简单的握手协议,确保客户端有足够的时间接收到服务器发送的所有回复。

via: https://eli.thegreenplace.net/2017/concurrent-servers-part-1-introduction/

作者:Eli Bendersky 译者:GitFuture 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

最近我开始发力钻研 Python 的新 asyncio 模块。原因是我需要做一些事情,使用事件 IO 会使这些事情工作得更好,炙手可热的 asynio 正好可以用来牛刀小试。从试用的经历来看,该模块比我预想的复杂许多,我现在可以非常肯定地说,我不知道该如何恰当地使用 asyncio。

从 Twisted 框架借鉴一些经验来理解 asynio 并非难事,但是,asyncio 包含众多的元素,我开始动摇,不知道如何将这些孤立的零碎拼图组合成一副完整的图画。我已没有足够的智力提出任何更好的建议,在这里,只想分享我的困惑,求大神指点。

原语

asyncio 通过 协程 coroutines 的帮助来实现异步 IO。最初它是通过 yieldyield from 表达式实现的一个库,因为 Python 语言本身演进的缘故,现在它已经变成一个更复杂的怪兽。所以,为了在同一个频道讨论下去,你需要了解如下一些术语:

  • 事件循环
  • 事件循环策略
  • awaitable
  • 协程函数
  • 老式协程函数
  • 协程
  • 协程封装
  • 生成器 generator
  • future
  • 并发的future
  • 任务 task
  • 句柄
  • 执行器 executor
  • 传输 transport
  • 协议

此外,Python 还新增了一些新的特殊方法:

  • __aenter____aenter__,用于异步块操作
  • __aiter____anext__,用于异步迭代器(异步循环和异步推导)。为了更强大些,协议已经改变过一次了。 在 Python 3.5 它返回一个 awaitable(这是个协程);在 3.6它返回一个新的异步生成器。
  • __await__,用于自定义的 awaitable

你还需要了解相当多的内容,文档涵盖了那些部分。尽管如此,我做了一些额外说明以便对其有更好的理解:

事件循环

asyncio 事件循环和你第一眼看上去的略有不同。表面看,每个线程都有一个事件循环,然而事实并非如此。我认为它们应该按照如下的方式工作:

  • 如果是主线程,当调用 asyncio.get_event_loop() 时创建一个事件循环。
  • 如果是其它线程,当调用 asyncio.get_event_loop() 时返回运行时错误。
  • 当前线程可以使用 asyncio.set_event_loop() 在任何时间节点绑定事件循环。该事件循环可由 asyncio.new_evet_loop() 函数创建。
  • 事件循环可以在不绑定到当前线程的情况下使用。
  • asyncio.get_event_loop() 返回绑定线程的事件循环,而非当前运行的事件循环。

这些行为的组合是超混淆的,主要有以下几个原因。 首先,你需要知道这些函数被委托到全局设置的底层事件循环策略。 默认是将事件循环绑定到线程。 或者,如果需要的话,可以在理论上将事件循环绑定到一个 greenlet 或类似的。 然而,重要的是要知道库代码不控制策略,因此不能推断 asyncio 将适用于线程。

其次,asyncio 不需要通过策略将事件循环绑定到上下文。 事件循环可以单独工作。 但是这正是库代码的第一个问题,因为协同程序或类似的东西并不知道哪个事件循环负责调度它。 这意味着,如果从协程中调用 asyncio.get_event_loop(),你可能没有机会取得事件循环。 这也是所有 API 均采用可选的显式事件循环参数的原因。 举例来说,要弄清楚当前哪个协程正在运行,不能使用如下调用:

def get_task():
    loop = asyncio.get_event_loop()
    try:
        return asyncio.Task.get_current(loop)
    except RuntimeError:
        return None

相反,必须显式地传递事件循环。 这进一步要求你在库代码中显式地遍历事件循环,否则可能发生很奇怪的事情。 我不知道这种设计的思想是什么,但如果不解决这个问题(例如 get_event_loop() 返回实际运行的事件循环),那么唯一有意义的其它方案是明确禁止显式事件循环传递,并要求它绑定到当前上下文(线程等)。

由于事件循环策略不提供当前上下文的标识符,因此库也不可能以任何方式“索引”到当前上下文。 也没有回调函数用来监视这样的上下文的拆除,这进一步限制了实际可以开展的操作。

awaitable 与 协程 coroutine

以我的愚见,Python 最大的设计错误是过度重载迭代器。它们现在不仅用于迭代,而且用于各种类型的协程。 Python 中迭代器最大的设计错误之一是如果 StopIteration 没有被捕获形成的空泡。 这可能导致非常令人沮丧的问题,其中某处的异常可能导致其它地方的生成器或协同程序中止。 这是一个长期存在的问题,基于 Python 的模板引擎如 Jinja 经常面临这种问题。 该模板引擎在内部渲染为生成器,并且当由于某种原因的模板引起 StopIteration 时,渲染就停止在那里。

Python 慢慢认识到了过度重载的教训。 首先在 3.x 版本加入 asyncio 模块,并没有语言级支持。 所以自始至终它不过仅仅是装饰器和生成器而已。 为了实现 yield from 以及其它东西,StopIteration 再次重载。 这导致了令人困惑的行为,像这样:

>>> def foo(n):
...  if n in (0, 1):
...   return [1]
...  for item in range(n):
...   yield item * 2
...
>>> list(foo(0))
[]
>>> list(foo(1))
[]
>>> list(foo(2))
[0, 2]

没有错误,没有警告。只是不是你所期望的行为。 这是因为从一个作为生成器的函数中 return 的值实际上引发了一个带有单个参数的 StopIteration,它不是由迭代器协议捕获的,而只是在协程代码中处理。

在 3.5 和 3.6 有很多改变,因为现在除了生成器我们还有协程对象。除了通过封装生成器来生成协程,没有其它可以直接生成协程的单独对象。它是通过用给函数加 async 前缀来实现。 例如 async def x() 会产生这样的协程。 现在在 3.6,将有单独的异步生成器,它通过触发 AsyncStopIteration 保持其独立性。 此外,对于Python 3.5 和更高版本,导入新的 future 对象(generator_stop),如果代码在迭代步骤中触发 StopIteration,它将引发 RuntimeError

为什么我提到这一切? 因为老的实现方式并未真的消失。 生成器仍然具有 sendthrow 方法以及协程仍然在很大程度上表现为生成器。你需要知道这些东西,它们将在未来伴随你相当长的时间。

为了统一很多这样的重复,现在我们在 Python 中有更多的概念了:

  • awaitable:具有__await__方法的对象。 由本地协同程序和旧式协同程序以及一些其它程序实现。
  • 协程函数 coroutinefunction :返回原生协程的函数。 不要与返回协程的函数混淆。
  • 协程 coroutine : 原生的协程程序。 注意,目前为止,当前文档不认为老式 asyncio 协程是协程程序。 至少 inspect.iscoroutine 不认为它是协程。 尽管它被 future/awaitable 分支接纳。

特别令人困惑的是 asyncio.iscoroutinefunctioninspect.iscoroutinefunction 正在做不同的事情,这与 inspect.iscoroutineinspect.iscoroutinefunction 情况相同。 值得注意的是,尽管 inspect 在类型检查中不知道有关 asycnio 旧式协程函数的任何信息,但是当您检查 awaitable 状态时它显然知道它们,即使它与 **await** 不一致。

协程封装器 coroutine wrapper

每当你运行 async def ,Python 就会调用一个线程局部的协程封装器。它由 sys.set_coroutine_wrapper 设置,并且它是可以包装这些东西的一个函数。 看起来有点像如下代码:

>>> import sys
>>> sys.set_coroutine_wrapper(lambda x: 42)
>>> async def foo():
...  pass
...
>>> foo()
__main__:1: RuntimeWarning: coroutine 'foo' was never awaited
42

在这种情况下,我从来没有实际调用原始的函数,只是给你一个提示,说明这个函数可以做什么。 目前我只能说它总是线程局部有效,所以,如果替换事件循环策略,你需要搞清楚如何让协程封装器在相同的上下文同步更新。创建的新线程不会从父线程继承那些标识。

这不要与 asyncio 协程封装代码混淆。

awaitable 和 future

有些东西是 awaitable 的。 据我所见,以下概念被认为是 awaitable:

  • 原生的协程
  • 配置了假的 CO_ITERABLE_COROUTINE 标识的生成器(文中有涉及)
  • 具有 __await__ 方法的对象

除了生成器由于历史遗留的原因不使用之外,其它的对象都使用 __await__ 方法。 CO_ITERABLE_COROUTINE 标志来自哪里?它来自一个协程封装器(现在与 sys.set_coroutine_wrapper 有些混淆),即 @asyncio.coroutine。 通过一些间接方法,它使用 types.coroutine(现在与 types.CoroutineTypeasyncio.coroutine 有些混淆)封装生成器,并通过另外一个标志 CO_ITERABLE_COROUTINE 重新创建内部代码对象。

所以既然我们知道这些东西是什么,那么什么是 future? 首先,我们需要澄清一件事情:在 Python 3 中,实际上有两种(完全不兼容)的 future 类型:asyncio.futures.Futureconcurrent.futures.Future。 其中一个出现在另一个之前,但它们都仍然在 asyncio 中使用。 例如,asyncio.run_coroutine_threadsafe() 将调度一个协程到在另一个线程中运行的事件循环,但它返回一个 concurrent.futures.Future 对象,而不是 asyncio.futures.Future 对象。 这是有道理的,因为只有 concurrent.futures.Future 对象是线程安全的。

所以现在我们知道有两个不兼容的 future,我们应该澄清哪个 future 在 asyncio 中。 老实说,我不完全确定差异在哪里,但我打算暂时称之为“最终”。它是一个最终将持有一个值的对象,当还在计算时你可以对最终结果做一些处理。 future 对象的一些变种称为 deferred,还有一些叫做 promise。 我实在难以理解它们真正的区别。

你能用一个 future 对象做什么? 你可以关联一个准备就绪时将被调用的回调函数,或者你可以关联一个 future 失败时将被触发的回调函数。 此外,你可以 await 它(它实现__await__,因此可等待),此外,future 也可以取消。

那么你怎样才能得到这样的 future 对象? 通过在 awaitable 对象上调用 asyncio.ensure_future。它会把一个旧版的生成器转变为 future 对象。 然而,如果你阅读文档,你会读到 asyncio.ensure_future 实际上返回一个task(任务)。 那么问题来了,什么是任务?

任务

任务 task 某种意义上是一个封装了协程的 futur 对象。它的工作方式和 future 类似,但它也有一些额外的方法来提取所包含的协程的当前堆栈。 我们已经见过了在前面提到过的任务,因为它是通过 Task.get_current 确定事件循环当前正在做什么的主要方式。

在如何取消工作方面,任务和 future 也有区别,但这超出了本文的范围。“取消”是它们自己最大的问题。 如果你处于一个协程中,并且知道自己正在运行,你可以通过前面提到的 Task.get_current 获取自己的任务,但这需要你知道自己被派遣在哪个事件循环,该事件循环可能是、也可能不是已绑定的那个线程。

协程不可能知道它与哪个循环一起使用。task 也没有提供该信息的公共 API。 然而,如果你确实可以获得一个任务,你可以访问 task._loop,通过它反指到事件循环。

句柄

除了上面提到的所有一切还有句柄。 句柄是等待执行的不透明对象,不可等待,但可以被取消。 特别是如果你使用 call_soon 或者 call_soon_threadsafe(还有其它一些)调度执行一个调用,你可以获得句柄,然后使用它尽力尝试取消执行,但不能等待实际调用生效。

执行器 Executor

因为你可以有多个事件循环,但这并不意味着每个线程理所当然地应用多个事件循环,最常见的情形还是一个线程一个事件循环。 那么你如何通知另一个事件循环做一些工作? 你不能到另一个线程的事件循环中执行回调函数并获取结果。 这种情况下,你需要使用执行器。

执行器 Executor 来自 concurrent.futures,它允许你将工作安排到本身未发生事件的线程中。 例如,如果在事件循环中使用 run_in_executor 来调度将在另一个线程中调用的函数。 其返回结果是 asyncio 协程,而不是像 run_coroutine_threadsafe 这样的并发协程。 我还没有足够的心智来弄清楚为什么设计这样的 API,应该如何使用,以及什么时候使用。 文档中建议执行器可以用于构建多进程。

传输和协议

我总是认为传输与协议也凌乱不堪,实际这部分内容基本上是对 Twisted 的逐字拷贝。详情毋庸赘述,请直接阅读相关文档。

如何使用 asyncio

现在我们已经大致了解 asyncio,我发现了一些模式,人们似乎在写 asyncio 代码时使用:

  • 将事件循环传递给所有协程。 这似乎是社区中一部分人的做法。 把事件循环信息提供给协程为协程获取自己运行的任务提供了可能性。
  • 或者你要求事件循环绑定到线程,这也能达到同样的目的。 理想情况下两者都支持。 可悲的是,社区已经分化。
  • 如果想使用上下文数据(如线程本地数据),你可谓是运气不佳。 最流行的变通方法显然是 atlassian 的 aiolocals,它基本上需要你手动传递上下文信息到协程,因为解释器不为此提供支持。 这意味着如果你用一个工具类库生成协程,你将失去上下文。
  • 忽略 Python 中的旧式协程。 只使用 3.5 版本中 async def 关键字和协程。 你总可能要用到它们,因为在老版本中,没有异步上下文管理器,这是非常必要的资源管理。
  • 学习重新启动事件循环进行善后清理。 这部分功能和我预想的不同,我花了比较长的时间来厘清它的实现。清理操作的最好方式是不断重启事件循环直到没有等待事件。 遗憾的是没有什么通用的模式来处理清理操作,你只能用一些丑陋的临时方案糊口度日。 例如 aiohttp 的 web 支持也做这个模式,所以如果你想要结合两个清理逻辑,你可能需要重新实现它提供的工具助手,因为该助手功能实现后,它彻底破坏了事件循环的设计。 当然,它不是我见过的第一个干这种坏事的库 :(。
  • 使用子进程是不明显的。 你需要一个事件循环在主线程中运行,我想它是在监听信号事件,然后分派到其它事件循环。 这需要通过 asyncio.get_child_watcher().attach_loop(...) 通知循环。
  • 编写同时支持异步和同步的代码在某种程度上注定要失败。 尝试在同一个对象上支持 withasync with 是危险的事情。
  • 如果你想给一个协程起个更好的名字,弄清楚为什么它没有被等待,设置 __name__没有帮助。 你需要设置 __qualname__ 而不是打印出错误消息来。
  • 有时内部类型交换会使你麻痹。 特别是 asyncio.wait() 函数将确保所有的事情都是 future,这意味着如果你传递协程,你将很难发现你的协程是否已经完成或者正在等待,因为输入对象不再匹配输出对象。 在这种情况下,唯一真正理智的做法是确保前期一切都是 future。

上下文数据

除了疯狂的复杂性和对如何更好地编写 API 缺乏理解,我最大的问题是完全缺乏对上下文本地数据的考虑。这是 Node 社区现在学习的东西。continuation-local-storage 存在,但该实现被接受的太晚。持续本地存储和类似的概念常用于在并发环境中实施安全策略,并且该信息的损坏可能导致严重的安全问题。

事实上,Python 甚至没有任何存储,这令人失望至极。我正在研究这个内容,因为我正在调查如何最好地支持 Sentry's breadcrumbs 的 asyncio,然而我并没有看到一个合理的方式做到这一点。在 asyncio 中没有上下文的概念,没有办法从通用代码中找出您正在使用的事件循环,并且如果没有 monkeypatching(运行环境下的补丁),也无法获取这些信息。

Node 当前正在经历如何找到这个问题的长期解决方案的过程。这个问题不容忽视,因为它在所有生态系统中反复出现过,如 JavaScript、Python 和 .NET 环境。该问题被命名为异步上下文传播,其解决方案有许多名称。在 Go 中,需要使用上下文包,并明确地传递给所有 goroutine(不是一个完美的解决方案,但至少有一个)。.NET 具有本地调用上下文形式的最佳解决方案。它可以是线程上下文,Web 请求上下文或类似的东西,除非被抑制,否则它会自动传播。微软的解决方案是我们的黄金标准。我现在相信,微软在 15 年前已经解决了该问题。

我不知道该生态系统是否还够年轻,还可以添加逻辑调用上下文,可能现在仍然为时未晚。

个人感想

复杂的东西变得越来越复杂。 我没有随意使用 asyncio 的心智。它需要不断地更新所有 Python 语言的变化的知识,这很大程度上使语言本身变得复杂。 令人鼓舞的是,围绕着它的生态系统正在不断发展,只是不知道还需要几年的时间,才能带给开发者愉快和稳定的开发体验。

3.5 版本引入的东西(新的协程对象)非常棒。 特别是这些变化包括引入了一个合理的基础,这些都是我在早期的版本中一直期盼的。在我心中, 通过重载生成器实现协程是一个错误。 关于什么是 asyncio,我难以置喙。 这是一个非常复杂的事情,内部令人眼花缭乱。 我很难理解它工作的所有细节。你什么时候可以传递一个生成器,什么时候它必须是一个真正的协程,future 是什么,任务是什么,事件循环如何工作,这甚至还没有触碰到真正的 IO 部分。

最糟糕的是,asyncio 甚至不是特别快。 David Beazley 演示的它设计的 asyncio 的替代品是原生版本速度的两倍。 asyncio 巨复杂,很难理解,也无法兑现自己在主要特性上的承诺,对于它,我只想说我想静静。我知道,至少我对 asyncio 理解的不够透彻,没有足够的信心对人们如何用它构建代码给出建议。


作者:

Armin Ronacher

软件开发者和开源骨灰, Flask 框架的创造者。


via: http://lucumr.pocoo.org/2016/10/30/i-dont-understand-asyncio/

作者:Armin Ronacher 译者:firstadream 校对:jasminepeng

本文由 LCTT 原创编译,Linux中国 荣誉推出

你在 Python 中用过异步编程吗?本文中我会告诉你怎样做,而且用一个能工作的例子来展示它:这是一个流行的贪吃蛇游戏,而且是为多人游戏而设计的。

1、简介

在技术和文化领域,大规模多人在线游戏(MMO)毋庸置疑是我们当今世界的潮流之一。很长时间以来,为一个 MMO 游戏写一个服务器这件事总是会涉及到大量的预算与复杂的底层编程技术,不过在最近这几年,事情迅速发生了变化。基于动态语言的现代框架允许在中档的硬件上面处理大量并发的用户连接。同时,HTML5 和 WebSockets 标准使得创建基于实时图形的游戏的直接运行至浏览器上的客户端成为可能,而不需要任何的扩展。

对于创建可扩展的非堵塞性的服务器来说,Python 可能不是最受欢迎的工具,尤其是和在这个领域里最受欢迎的 Node.js 相比而言。但是最近版本的 Python 正在改变这种现状。asyncio 的引入和一个特别的 async/await 语法使得异步代码看起来像常规的阻塞代码一样,这使得 Python 成为了一个值得信赖的异步编程语言,所以我将尝试利用这些新特点来创建一个多人在线游戏。

2、异步

一个游戏服务器应该可以接受尽可能多的用户并发连接,并实时处理这些连接。一个典型的解决方案是创建线程,然而在这种情况下并不能解决这个问题。运行上千的线程需要 CPU 在它们之间不停的切换(这叫做上下文切换),这将导致开销非常大,效率很低下。更糟糕的是使用进程来实现,因为,不但如此,它们还会占用大量的内存。在 Python 中,甚至还有一个问题,Python 的解释器(CPython)并不是针对多线程设计的,相反它主要针对于单线程应用实现最大的性能。这就是为什么它使用 GIL(global interpreter lock),这是一个不允许同时运行多线程 Python 代码的架构,以防止同一个共享对象出现使用不可控。正常情况下,在当前线程正在等待的时候,解释器会转换到另一个线程,通常是等待一个 I/O 的响应(举例说,比如等待 Web 服务器的响应)。这就允许在你的应用中实现非阻塞 I/O 操作,因为每一个操作仅仅阻塞一个线程而不是阻塞整个服务器。然而,这也使得通常的多线程方案变得几近无用,因为它不允许你并发执行 Python 代码,即使是在多核心的 CPU 上也是这样。而与此同时,在一个单一线程中拥有非阻塞 I/O 是完全有可能的,因而消除了经常切换上下文的需要。

实际上,你可以用纯 Python 代码来实现一个单线程的非阻塞 I/O。你所需要的只是标准的 select 模块,这个模块可以让你写一个事件循环来等待未阻塞的 socket 的 I/O。然而,这个方法需要你在一个地方定义所有 app 的逻辑,用不了多久,你的 app 就会变成非常复杂的状态机。有一些框架可以简化这个任务,比较流行的是 tornadetwisted。它们被用来使用回调方法实现复杂的协议(这和 Node.js 比较相似)。这种框架运行在它自己的事件循环中,按照定义的事件调用你的回调函数。并且,这或许是一些情况的解决方案,但是它仍然需要使用回调的方式编程,这使你的代码变得碎片化。与写同步代码并且并发地执行多个副本相比,这就像我们在普通的线程上做的一样。在单个线程上这为什么是不可能的呢?

这就是为什么出现微线程(microthread)概念的原因。这个想法是为了在一个线程上并发执行任务。当你在一个任务中调用阻塞的方法时,有一个叫做“manager” (或者“scheduler”)的东西在执行事件循环。当有一些事件准备处理的时候,一个 manager 会转移执行权给一个任务,并等着它执行完毕。任务将一直执行,直到它遇到一个阻塞调用,然后它就会将执行权返还给 manager。

微线程也称为轻量级线程(lightweight threads)或绿色线程(green threads)(来自于 Java 中的一个术语)。在伪线程中并发执行的任务叫做 tasklets、greenlets 或者协程(coroutines)。

Python 中的微线程最早的实现之一是 Stackless Python。它之所以这么知名是因为它被用在了一个叫 EVE online 的非常有名的在线游戏中。这个 MMO 游戏自称说在一个持久的“宇宙”中,有上千个玩家在做不同的活动,这些都是实时发生的。Stackless 是一个独立的 Python 解释器,它代替了标准的函数栈调用,并且直接控制程序运行流程来减少上下文切换的开销。尽管这非常有效,这个解决方案不如在标准解释器中使用“软”库更流行,像 eventletgevent 的软件包配备了修补过的标准 I/O 库,I/O 函数会将执行权传递到内部事件循环。这使得将正常的阻塞代码转变成非阻塞的代码变得简单。这种方法的一个缺点是从代码上看这并不分明,它的调用是非阻塞的。新版本的 Python 引入了本地协程作为生成器的高级形式。在 Python 的 3.4 版本之后,引入了 asyncio 库,这个库依赖于本地协程来提供单线程并发。但是仅仅到了 Python 3.5 ,协程就变成了 Python 语言的一部分,使用新的关键字 async 和 await 来描述。这是一个简单的例子,演示了使用 asyncio 来运行并发任务。

import asyncio

async def my_task(seconds):
    print("start sleeping for {} seconds".format(seconds))
    await asyncio.sleep(seconds)
    print("end sleeping for {} seconds".format(seconds))

all_tasks = asyncio.gather(my_task(1), my_task(2))
loop = asyncio.get_event_loop()
loop.run_until_complete(all_tasks)
loop.close()    

我们启动了两个任务,一个睡眠 1 秒钟,另一个睡眠 2 秒钟,输出如下:

start sleeping for 1 seconds
start sleeping for 2 seconds
end sleeping for 1 seconds
end sleeping for 2 seconds

正如你所看到的,协程不会阻塞彼此——第二个任务在第一个结束之前启动。这发生的原因是 asyncio.sleep 是协程,它会返回执行权给调度器,直到时间到了。

在下一节中,我们将会使用基于协程的任务来创建一个游戏循环。


via: https://7webpages.com/blog/writing-online-multiplayer-game-with-python-asyncio-getting-asynchronous/

作者:Kyrylo Subbotin 译者:xinglianfly 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

(题图来自:deviantart.net