标签 并行 下的文章

我们过去曾写过两篇如何并行地在多个远程服务器上运行命令的文章:并行 SSH(PSSH)分布式 Shell(DSH)。今天,我们将讨论相同类型的主题,但它允许我们在集群节点上执行相同的操作。你可能会想,我可以编写一个小的 shell 脚本来实现这个目的,而不是安装这些第三方软件包。

当然,你是对的,如果要在十几个远程系统中运行一些命令,那么你不需要使用它。但是,你的脚本需要一些时间来完成此任务,因为它是按顺序运行的。想想你要是在一千多台服务器上运行一些命令会是什么样子?在这种情况下,你的脚本用处不大。此外,完成任务需要很长时间。所以,要克服这种问题和情况,我们需要可以在远程计算机上并行运行命令。

为此,我们需要在一个并行应用程序中使用它。我希望这个解释可以解决你对并行实用程序的疑虑。

ClusterShell

ClusterShell 是一个事件驱动的开源 Python 库,旨在在服务器场或大型 Linux 集群上并行运行本地或远程命令。(clushClusterShell)。

它将处理在 HPC 集群上遇到的常见问题,例如在节点组上操作,使用优化过的执行算法运行分布式命令,以及收集结果和合并相同的输出,或检索返回代码。

ClusterShell 可以利用已安装在系统上的现有远程 shell 设施,如 SSH。

ClusterShell 的主要目标是通过为开发人员提供轻量级、但可扩展的 Python API 来改进高性能集群的管理。它还提供了 clushclubakcluset/nodeset 等方便的命令行工具,可以让传统的 shell 脚本利用这个库的一些功能。

ClusterShell 是用 Python 编写的,它需要 Python(v2.6+ 或 v3.4+)才能在你的系统上运行。

如何在 Linux 上安装 ClusterShell?

ClusterShell 包在大多数发行版的官方包管理器中都可用。因此,使用发行版包管理器工具进行安装。

对于 Fedora 系统,使用 DNF 命令来安装 clustershell。

$ sudo dnf install clustershell

如果系统默认是 Python 2,这会安装 Python 2 模块和工具,可以运行以下命令安装 Python 3 开发包。

$ sudo dnf install python3-clustershell

在执行 clustershell 安装之前,请确保你已在系统上启用 EPEL 存储库

对于 RHEL/CentOS 系统,使用 YUM 命令 来安装 clustershell。

$ sudo yum install clustershell

如果系统默认是 Python 2,这会安装 Python 2 模块和工具,可以运行以下命令安装 Python 3 开发包。

$ sudo yum install python34-clustershell

对于 openSUSE Leap 系统,使用 Zypper 命令 来安装 clustershell。

$ sudo zypper install clustershell

如果系统默认是 Python 2,这会安装 Python 2 模块和工具,可以运行以下命令安装 Python 3 开发包。

$ sudo zypper install python3-clustershell

对于 Debian/Ubuntu 系统,使用 APT-GET 命令APT 命令 来安装 clustershell。

$ sudo apt install clustershell

如何在 Linux 使用 PIP 安装 ClusterShell?

可以使用 PIP 安装 ClusterShell,因为它是用 Python 编写的。

在执行 clustershell 安装之前,请确保你已在系统上启用了 PythonPIP

$ sudo pip install ClusterShell

如何在 Linux 上使用 ClusterShell?

与其他实用程序(如 psshdsh)相比,它是直接了当的优秀工具。它有很多选项可以在远程并行执行。

在开始使用 clustershell 之前,请确保你已启用系统上的无密码登录

以下配置文件定义了系统范围的默认值。你不需要修改这里的任何东西。

$ cat /etc/clustershell/clush.conf

如果你想要创建一个服务器组,那也可以。默认情况下有一些示例,请根据你的要求执行相同操作。

$ cat /etc/clustershell/groups.d/local.cfg

只需按以下列格式运行 clustershell 命令即可从给定节点获取信息:

$ clush -w 192.168.1.4,192.168.1.9 cat /proc/version
192.168.1.9: Linux version 4.15.0-45-generic ([email protected]) (gcc version 7.3.0 (Ubuntu 7.3.0-16ubuntu3)) #48-Ubuntu SMP Tue Jan 29 16:28:13 UTC 2019
192.168.1.4: Linux version 3.10.0-957.el7.x86_64 ([email protected]) (gcc version 4.8.5 20150623 (Red Hat 4.8.5-36) (GCC) ) #1 SMP Thu Nov 8 23:39:32 UTC 2018

选项:

  • -w: 你要运行该命令的节点。

你可以使用正则表达式而不是使用完整主机名和 IP:

$ clush -w 192.168.1.[4,9] uname -r
192.168.1.9: 4.15.0-45-generic
192.168.1.4: 3.10.0-957.el7.x86_64

或者,如果服务器位于同一 IP 系列中,则可以使用以下格式:

$ clush -w 192.168.1.[4-9] date
192.168.1.6: Mon Mar  4 21:08:29 IST 2019
192.168.1.7: Mon Mar  4 21:08:29 IST 2019
192.168.1.8: Mon Mar  4 21:08:29 IST 2019
192.168.1.5: Mon Mar  4 09:16:30 CST 2019
192.168.1.9: Mon Mar  4 21:08:29 IST 2019
192.168.1.4: Mon Mar  4 09:16:30 CST 2019

clustershell 允许我们以批处理模式运行命令。使用以下格式来实现此目的:

$ clush -w 192.168.1.4,192.168.1.9 -b
Enter 'quit' to leave this interactive mode
Working with nodes: 192.168.1.[4,9]
clush> hostnamectl
---------------
192.168.1.4
---------------
   Static hostname: CentOS7.2daygeek.com
         Icon name: computer-vm
           Chassis: vm
        Machine ID: 002f47b82af248f5be1d67b67e03514c
           Boot ID: f9b37a073c534dec8b236885e754cb56
    Virtualization: kvm
  Operating System: CentOS Linux 7 (Core)
       CPE OS Name: cpe:/o:centos:centos:7
            Kernel: Linux 3.10.0-957.el7.x86_64
      Architecture: x86-64
---------------
192.168.1.9
---------------
   Static hostname: Ubuntu18
         Icon name: computer-vm
           Chassis: vm
        Machine ID: 27f6c2febda84dc881f28fd145077187
           Boot ID: f176f2eb45524d4f906d12e2b5716649
    Virtualization: oracle
  Operating System: Ubuntu 18.04.2 LTS
            Kernel: Linux 4.15.0-45-generic
      Architecture: x86-64
clush> free -m
---------------
192.168.1.4
---------------
              total        used        free      shared  buff/cache   available
Mem:           1838         641         217          19         978         969
Swap:          2047           0        2047
---------------
192.168.1.9
---------------
              total        used        free      shared  buff/cache   available
Mem:           1993         352        1067           1         573        1473
Swap:          1425           0        1425
clush> w
---------------
192.168.1.4
---------------
 09:21:14 up  3:21,  3 users,  load average: 0.00, 0.01, 0.05
USER     TTY      FROM             LOGIN@   IDLE   JCPU   PCPU WHAT
daygeek  :0       :0               06:02   ?xdm?   1:28   0.30s /usr/libexec/gnome-session-binary --session gnome-classic
daygeek  pts/0    :0               06:03    3:17m  0.06s  0.06s bash
daygeek  pts/1    192.168.1.6      06:03   52:26   0.10s  0.10s -bash
---------------
192.168.1.9
---------------
 21:13:12 up  3:12,  1 user,  load average: 0.08, 0.03, 0.00
USER     TTY      FROM             LOGIN@   IDLE   JCPU   PCPU WHAT
daygeek  pts/0    192.168.1.6      20:42   29:41   0.05s  0.05s -bash
clush> quit

如果要在一组节点上运行该命令,请使用以下格式:

$ clush -w @dev uptime
or
$ clush -g dev uptime
or
$ clush --group=dev uptime

192.168.1.9:  21:10:10 up  3:09,  1 user,  load average: 0.09, 0.03, 0.01
192.168.1.4:  09:18:12 up  3:18,  3 users,  load average: 0.01, 0.02, 0.05

如果要在多个节点组上运行该命令,请使用以下格式:

$ clush -w @dev,@uat uptime
or
$ clush -g dev,uat uptime
or
$ clush --group=dev,uat uptime

192.168.1.7: 07:57:19 up 59 min, 1 user, load average: 0.08, 0.03, 0.00
192.168.1.9: 20:27:20 up 1:00, 1 user, load average: 0.00, 0.00, 0.00
192.168.1.5: 08:57:21 up 59 min, 1 user, load average: 0.00, 0.01, 0.05

clustershell 允许我们将文件复制到远程计算机。将本地文件或目录复制到同一个远程节点:

$ clush -w 192.168.1.[4,9] --copy /home/daygeek/passwd-up.sh

我们可以通过运行以下命令来验证它:

$ clush -w 192.168.1.[4,9] ls -lh /home/daygeek/passwd-up.sh
192.168.1.4: -rwxr-xr-x. 1 daygeek daygeek 159 Mar 4 09:00 /home/daygeek/passwd-up.sh
192.168.1.9: -rwxr-xr-x 1 daygeek daygeek 159 Mar 4 20:52 /home/daygeek/passwd-up.sh

将本地文件或目录复制到不同位置的远程节点:

$ clush -g uat --copy /home/daygeek/passwd-up.sh --dest /tmp

我们可以通过运行以下命令来验证它:

$ clush --group=uat ls -lh /tmp/passwd-up.sh
192.168.1.7: -rwxr-xr-x. 1 daygeek daygeek 159 Mar 6 07:44 /tmp/passwd-up.sh

将文件或目录从远程节点复制到本地系统:

$ clush -w 192.168.1.7 --rcopy /home/daygeek/Documents/magi.txt --dest /tmp

我们可以通过运行以下命令来验证它:

$ ls -lh /tmp/magi.txt.192.168.1.7
-rw-r--r-- 1 daygeek daygeek 35 Mar 6 20:24 /tmp/magi.txt.192.168.1.7

via: https://www.2daygeek.com/clustershell-clush-run-commands-on-cluster-nodes-remote-system-in-parallel-linux/

作者:Magesh Maruthamuthu 选题:lujun9972 译者:wxy 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

Dask 库可以将 Python 计算扩展到多个核心甚至是多台机器。

关于 Python 性能的一个常见抱怨是全局解释器锁(GIL)。由于 GIL,同一时刻只能有一个线程执行 Python 字节码。因此,即使在现代的多核机器上,使用线程也不会加速计算。

但当你需要并行化到多核时,你不需要放弃使用 Python:Dask 库可以将计算扩展到多个内核甚至多个机器。某些设置可以在数千台机器上配置 Dask,每台机器都有多个内核。虽然存在扩展规模的限制,但一般达不到。

虽然 Dask 有许多内置的数组操作,但举一个非内置的例子,我们可以计算偏度

import numpy
import dask
from dask import array as darray

arr = dask.from_array(numpy.array(my_data), chunks=(1000,))
mean = darray.mean()
stddev = darray.std(arr)
unnormalized_moment = darry.mean(arr * arr * arr)
## See formula in wikipedia:
skewness = ((unnormalized_moment - (3 * mean * stddev ** 2) - mean ** 3) /
            stddev ** 3)

请注意,每个操作将根据需要使用尽可能多的内核。这将在所有核心上并行化执行,即使在计算数十亿个元素时也是如此。

当然,并不是我们所有的操作都可由这个库并行化,有时我们需要自己实现并行性。

为此,Dask 有一个“延迟”功能:

import dask

def is_palindrome(s):
    return s == s[::-1]

palindromes = [dask.delayed(is_palindrome)(s) for s in string_list]
total = dask.delayed(sum)(palindromes)
result = total.compute()

这将计算字符串是否是回文并返回回文的数量。

虽然 Dask 是为数据科学家创建的,但它绝不仅限于数据科学。每当我们需要在 Python 中并行化任务时,我们可以使用 Dask —— 无论有没有 GIL。


via: https://opensource.com/article/19/4/parallel-computation-python-dask

作者:Moshe Zadka (Community Moderator) 选题:lujun9972 译者:geekpi 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

将您的计算机变成一个多任务的动力室。

你是否有过这种感觉,你的主机运行速度没有预期的那么快?我也曾经有过这种感觉,直到我发现了 GNU Parallel。

GNU Parallel 是一个 shell 工具,可以并行执行任务。它可以解析多种输入,让你可以同时在多份数据上运行脚本或命令。你终于可以使用全部的 CPU 了!

如果你用过 xargs,上手 Parallel 几乎没有难度。如果没有用过,这篇教程会告诉你如何使用,同时给出一些其它的用例。

安装 GNU Parallel

GNU Parallel 很可能没有预装在你的 Linux 或 BSD 主机上,你可以从软件源中安装。以 Fedora 为例:

$ sudo dnf install parallel

对于 NetBSD:

# pkg_add parallel

如果各种方式都不成功,请参考项目主页

从串行到并行

正如其名称所示,Parallel 的强大之处是以并行方式执行任务;而我们中不少人平时仍然以串行方式运行任务。

当你对多个对象执行某个命令时,你实际上创建了一个任务队列。一部分对象可以被命令处理,剩余的对象需要等待,直到命令处理它们。这种方式是低效的。只要数据够多,总会形成任务队列;但与其只使用一个任务队列,为何不使用多个更小规模的任务队列呢?

假设你有一个图片目录,你希望将目录中的图片从 JEEG 格式转换为 PNG 格式。有多种方法可以完成这个任务。可以手动用 GIMP 打开每个图片,输出成新格式,但这基本是最差的选择,费时费力。

上述方法有一个漂亮且简洁的变种,即基于 shell 的方案:

$ convert 001.jpeg 001.png
$ convert 002.jpeg 002.png
$ convert 003.jpeg 003.png
... 略 ...

对于初学者而言,这是一个不小的转变,而且看起来是个不小的改进。不再需要图像界面和不断的鼠标点击,但仍然是费力的。

进一步改进:

$ for i in *jpeg; do convert $i $i.png ; done

至少,这一步设置好任务执行,让你节省时间去做更有价值的事情。但问题来了,这仍然是串行操作;一张图片转换完成后,队列中的下一张进行转换,依此类推直到全部完成。

使用 Parallel:

$ find . -name "*jpeg" | parallel -I% --max-args 1 convert % %.png

这是两条命令的组合:find 命令,用于收集需要操作的对象;parallel 命令,用于对象排序并确保每个对象按需处理。

  • find . -name "*jpeg" 查找当前目录下以 jpeg 结尾的所有文件。
  • parallel 调用 GNU Parallel。
  • -I% 创建了一个占位符 %,代表 find 传递给 Parallel 的内容。如果不使用占位符,你需要对 find 命令的每一个结果手动编写一个命令,而这恰恰是你想要避免的。
  • --max-args 1 给出 Parallel 从队列获取新对象的速率限制。考虑到 Parallel 运行的命令只需要一个文件输入,这里将速率限制设置为 1。假如你需要执行更复杂的命令,需要两个文件输入(例如 cat 001.txt 002.txt > new.txt),你需要将速率限制设置为 2。
  • convert % %.png 是你希望 Parallel 执行的命令。

组合命令的执行效果如下:find 命令收集所有相关的文件信息并传递给 parallel,后者(使用当前参数)启动一个任务,(无需等待任务完成)立即获取参数行中的下一个参数(LCTT 译注:管道输出的每一行对应 parallel 的一个参数,所有参数构成参数行);只要你的主机没有瘫痪,Parallel 会不断做这样的操作。旧任务完成后,Parallel 会为分配新任务,直到所有数据都处理完成。不使用 Parallel 完成任务大约需要 10 分钟,使用后仅需 3 至 5 分钟。

多个输入

只要你熟悉 findxargs (整体被称为 GNU 查找工具,或 findutils),find 命令是一个完美的 Parallel 数据提供者。它提供了灵活的接口,大多数 Linux 用户已经很习惯使用,即使对于初学者也很容易学习。

find 命令十分直截了当:你向 find 提供搜索路径和待查找文件的一部分信息。可以使用通配符完成模糊搜索;在下面的例子中,星号匹配任何字符,故 find 定位(文件名)以字符 searchterm 结尾的全部文件:

$ find /path/to/directory -name "*searchterm"

默认情况下,find 逐行返回搜索结果,每个结果对应 1 行:

$ find ~/graphics -name "*jpg"
/home/seth/graphics/001.jpg
/home/seth/graphics/cat.jpg
/home/seth/graphics/penguin.jpg
/home/seth/graphics/IMG_0135.jpg

当使用管道将 find 的结果传递给 parallel 时,每一行中的文件路径被视为 parallel 命令的一个参数。另一方面,如果你需要使用命令处理多个参数,你可以改变队列数据传递给 parallel 的方式。

下面先给出一个不那么实际的例子,后续会做一些修改使其更加有意义。如果你安装了 GNU Parallel,你可以跟着这个例子操作。

假设你有 4 个文件,按照每行一个文件的方式列出,具体如下:

$ echo ada > ada ; echo lovelace > lovelace
$ echo richard > richard ; echo stallman > stallman
$ ls -1
ada
lovelace
richard
stallman

你需要将两个文件合并成第三个文件,后者同时包含前两个文件的内容。这种情况下,Parallel 需要访问两个文件,使用 -I% 变量的方式不符合本例的预期。

Parallel 默认情况下读取 1 个队列对象:

$ ls -1 | parallel echo
ada
lovelace
richard
stallman

现在让 Parallel 每个任务使用 2 个队列对象:

$ ls -1 | parallel --max-args=2 echo
ada lovelace
richard stallman

现在,我们看到行已经并合并;具体而言,ls -1 的两个查询结果会被同时传送给 Parallel。传送给 Parallel 的参数涉及了任务所需的 2 个文件,但目前还只是 1 个有效参数:(对于两个任务分别为)“ada lovelace” 和 “richard stallman”。你真正需要的是每个任务对应 2 个独立的参数。

值得庆幸的是,Parallel 本身提供了上述所需的解析功能。如果你将 --max-args 设置为 2,那么 {1}{2} 这两个变量分别代表传入参数的第一和第二部分:

$ ls -1 | parallel --max-args=2 cat {1} {2} ">" {1}_{2}.person

在上面的命令中,变量 {1} 值为 adarichard (取决于你选取的任务),变量 {2} 值为 lovelacestallman。通过使用重定向符号(放到引号中,防止被 Bash 识别,以便 Parallel 使用),(两个)文件的内容被分别重定向至新文件 ada_lovelace.personrichard_stallman.person

$ ls -1
ada
ada_lovelace.person
lovelace
richard
richard_stallman.person
stallman

$ cat ada_*person
ada lovelace
$ cat ri*person
richard stallman

如果你整天处理大量几百 MB 大小的日志文件,那么(上述)并行处理文本的方法对你帮忙很大;否则,上述例子只是个用于上手的示例。

然而,这种处理方法对于很多文本处理之外的操作也有很大帮助。下面是来自电影产业的真实案例,其中需要将一个目录中的视频文件和(对应的)音频文件进行合并。

$ ls -1
12_LS_establishing-manor.avi
12_wildsound.flac
14_butler-dialogue-mixed.flac
14_MS_butler.avi
...略...

使用同样的方法,使用下面这个简单命令即可并行地合并文件:

$ ls -1 | parallel --max-args=2 ffmpeg -i {1} -i {2} -vcodec copy -acodec copy {1}.mkv

简单粗暴的方式

上述花哨的输入输出处理不一定对所有人的口味。如果你希望更直接一些,可以将一堆命令甩给 Parallel,然后去干些其它事情。

首先,需要创建一个文本文件,每行包含一个命令:

$ cat jobs2run
bzip2 oldstuff.tar
oggenc music.flac
opusenc ambiance.wav
convert bigfile.tiff small.jpeg
ffmepg -i foo.avi -v:b 12000k foo.mp4
xsltproc --output build/tmp.fo style/dm.xsl src/tmp.xml
bzip2 archive.tar

接着,将文件传递给 Parallel:

$ parallel --jobs 6 < jobs2run

现在文件中对应的全部任务都在被 Parallel 执行。如果任务数量超过允许的数目(LCTT 译注:应该是 --jobs 指定的数目或默认值),Parallel 会创建并维护一个队列,直到任务全部完成。

更多内容

GNU Parallel 是个强大而灵活的工具,还有很多很多用例无法在本文中讲述。工具的 man 页面提供很多非常酷的例子可供你参考,包括通过 SSH 远程执行和在 Parallel 命令中使用 Bash 函数等。YouTube 上甚至有一个系列,包含大量操作演示,让你可以直接从 GNU Parallel 团队学习。GNU Paralle 的主要维护者还发布了官方使用指导手册,可以从 Lulu.com 获取。

GNU Parallel 有可能改变你完成计算的方式;即使没有,也会至少改变你主机花在计算上的时间。马上上手试试吧!


via: https://opensource.com/article/18/5/gnu-parallel

作者:Seth Kenlon 选题:lujun9972 译者:pinewall 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

在这个系列中,我们基于多人游戏 贪吃蛇 来制作一个异步的 Python 程序。上一篇文章聚焦于编写游戏循环上,而本系列第 1 部分则涵盖了如何异步化

4、制作一个完整的游戏

4.1 工程概览

在此部分,我们将回顾一个完整在线游戏的设计。这是一个经典的贪吃蛇游戏,增加了多玩家支持。你可以自己在 (http://snakepit-game.com) 亲自试玩。源码在 GitHub 的这个仓库。游戏包括下列文件:

  • server.py - 处理主游戏循环和连接。
  • game.py - 主要的 Game 类。实现游戏的逻辑和游戏的大部分通信协议。
  • player.py - Player 类,包括每一个独立玩家的数据和蛇的展现。这个类负责获取玩家的输入并相应地移动蛇。
  • datatypes.py - 基本数据结构。
  • settings.py - 游戏设置,在注释中有相关的说明。
  • index.html - 客户端所有的 html 和 javascript代码都放在一个文件中。

4.2 游戏循环内窥

多人的贪吃蛇游戏是个用于学习十分好的例子,因为它简单。所有的蛇在每个帧中移动到一个位置,而且帧以非常低的频率进行变化,这样就可以让你就观察到游戏引擎到底是如何工作的。因为速度慢,对于玩家的按键不会立马响应。按键先是记录下来,然后在一个游戏循环迭代的最后计算下一帧时使用。

现代的动作游戏帧频率更高,而且通常服务端和客户端的帧频率是不相等的。客户端的帧频率通常依赖于客户端的硬件性能,而服务端的帧频率则是固定的。一个客户端可能根据一个游戏“嘀嗒”的数据渲染多个帧。这样就可以创建平滑的动画,这个受限于客户端的性能。在这个例子中,服务端不仅传输物体的当前位置,也要传输它们的移动方向、速度和加速度。客户端的帧频率称之为 FPS( 每秒帧数 frames per second ),服务端的帧频率称之为 TPS( 每秒滴答数 ticks per second )。在这个贪吃蛇游戏的例子中,二者的值是相等的,在客户端显示的一帧是在服务端的一个“嘀嗒”内计算出来的。

我们使用类似文本模式的游戏区域,事实上是 html 表格中的一个字符宽的小格。游戏中的所有对象都是通过表格中的不同颜色字符来表示。大部分时候,客户端将按键的码发送至服务端,然后每个“滴答”更新游戏区域。服务端一次更新包括需要更新字符的坐标和颜色。所以我们将所有游戏逻辑放置在服务端,只将需要渲染的数据发送给客户端。此外,我们通过替换通过网络发送的数据来减少游戏被破解的概率。

4.3 它是如何运行的?

这个游戏中的服务端出于简化的目的,它和例子 3.2 类似。但是我们用一个所有服务端都可访问的 Game 对象来代替之前保存了所有已连接 websocket 的全局列表。一个 Game 实例包括一个表示连接到此游戏的玩家的 Player 对象的列表(在 self._players 属性里面),以及他们的个人数据和 websocket 对象。将所有游戏相关的数据存储在一个 Game 对象中,会方便我们增加多个游戏房间这个功能——如果我们要增加这个功能的话。这样,我们维护多个 Game 对象,每个游戏开始时创建一个。

客户端和服务端的所有交互都是通过编码成 json 的消息来完成。来自客户端的消息仅包含玩家所按下键码对应的编号。其它来自客户端消息使用如下格式:

[command, arg1, arg2, ... argN ]

来自服务端的消息以列表的形式发送,因为通常一次要发送多个消息 (大多数情况下是渲染的数据):

[[command, arg1, arg2, ... argN ], ... ]

在每次游戏循环迭代的最后会计算下一帧,并且将数据发送给所有的客户端。当然,每次不是发送完整的帧,而是发送两帧之间的变化列表。

注意玩家连接上服务端后不是立马加入游戏。连接开始时是 观望者 spectator 模式,玩家可以观察其它玩家如何玩游戏。如果游戏已经开始或者上一个游戏会话已经在屏幕上显示 “game over” (游戏结束),用户此时可以按下 “Join”(参与),来加入一个已经存在的游戏,或者如果游戏没有运行(没有其它玩家)则创建一个新的游戏。后一种情况下,游戏区域在开始前会被先清空。

游戏区域存储在 Game._field 这个属性中,它是由嵌套列表组成的二维数组,用于内部存储游戏区域的状态。数组中的每一个元素表示区域中的一个小格,最终小格会被渲染成 html 表格的格子。它有一个 Char 的类型,是一个 namedtuple ,包括一个字符和颜色。在所有连接的客户端之间保证游戏区域的同步很重要,所以所有游戏区域的更新都必须依据发送到客户端的相应的信息。这是通过 Game.apply_render() 来实现的。它接受一个 Draw 对象的列表,其用于内部更新游戏区域和发送渲染消息给客户端。

我们使用 namedtuple 不仅因为它表示简单数据结构很方便,也因为用它生成 json 格式的消息时相对于 dict 更省空间。如果你在一个真实的游戏循环中需要发送复杂的数据结构,建议先将它们序列化成一个简单的、更短的格式,甚至打包成二进制格式(例如 bson,而不是 json),以减少网络传输。

Player 对象包括用 deque 对象表示的蛇。这种数据类型和 list 相似,但是在两端增加和删除元素时效率更高,用它来表示蛇很理想。它的主要方法是 Player.render_move(),它返回移动玩家的蛇至下一个位置的渲染数据。一般来说它在新的位置渲染蛇的头部,移除上一帧中表示蛇的尾巴的元素。如果蛇吃了一个数字变长了,在相应的多个帧中尾巴是不需要移动的。蛇的渲染数据在主类的 Game.next_frame() 中使用,该方法中实现所有的游戏逻辑。这个方法渲染所有蛇的移动,检查每一个蛇前面的障碍物,而且生成数字和“石头”。每一个“嘀嗒”,game_loop() 都会直接调用它来生成下一帧。

如果蛇头前面有障碍物,在 Game.next_frame() 中会调用 Game.game_over()。它后通知所有的客户端那个蛇死掉了 (会调用 player.render_game_over() 方法将其变成石头),然后更新表中的分数排行榜。Player 对象的 alive 标记被置为 False,当渲染下一帧时,这个玩家会被跳过,除非他重新加入游戏。当没有蛇存活时,游戏区域会显示 “game over” (游戏结束)。而且,主游戏循环会停止,设置 game.running 标记为 False。当某个玩家下次按下 “Join” (加入)时,游戏区域会被清空。

在渲染游戏的每个下一帧时也会产生数字和石头,它们是由随机值决定的。产生数字或者石头的概率可以在 settings.py 中修改成其它值。注意数字的产生是针对游戏区域每一个活的蛇的,所以蛇越多,产生的数字就越多,这样它们都有足够的食物来吃掉。

4.4 网络协议

从客户端发送消息的列表:

命令参数描述
new\_player[name]设置玩家的昵称
join 玩家加入游戏

从服务端发送消息的列表:

命令参数描述
handshake[id]给一个玩家指定 ID
world[[(char, color), ...], ...]初始化游戏区域(世界地图)
reset\_world 清除实际地图,替换所有字符为空格
render[x, y, char, color]在某个位置显示字符
p\_joined[id, name, color, score]新玩家加入游戏
p\_gameover[id]某个玩家游戏结束
p\_score[id, score]给某个玩家计分
top\_scores[[name, score, color], ...]更新排行榜

典型的消息交换顺序:

客户端 -> 服务端服务端 -> 客户端服务端 -> 所有客户端备注
new\_player 名字传递给服务端
handshake 指定 ID
world 初始化传递的世界地图
top\_scores 收到传递的排行榜
join 玩家按下“Join”,游戏循环开始
reset\_world命令客户端清除游戏区域
render, render, ...第一个游戏“滴答”,渲染第一帧
(key code) 玩家按下一个键
render, render, ...渲染第二帧
p\_score蛇吃掉了一个数字
render, render, ...渲染第三帧
... 重复若干帧 ...
p\_gameover试着吃掉障碍物时蛇死掉了
top\_scores更新排行榜(如果需要更新的话)

5. 总结

说实话,我十分享受 Python 最新的异步特性。新的语法做了改善,所以异步代码很容易阅读。可以明显看出哪些调用是非阻塞的,什么时候发生 greenthread 的切换。所以现在我可以宣称 Python 是异步编程的好工具。

SnakePit 在 7WebPages 团队中非常受欢迎。如果你在公司想休息一下,不要忘记给我们在 Twitter 或者 Facebook 留下反馈。


via: https://7webpages.com/blog/writing-online-multiplayer-game-with-python-and-asyncio-part-3/

作者:Kyrylo Subbotin 译者:chunyang-wen 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

(题图来自:wallpaperinhd.net-wallpaper.html))

你在 Python 中用过异步编程吗?本文中我会告诉你怎样做,而且用一个能工作的例子来展示它:这是一个流行的贪吃蛇游戏,而且是为多人游戏而设计的。

介绍和理论部分参见“第一部分 异步化”。

3、编写游戏循环主体

游戏循环是每一个游戏的核心。它持续地运行以读取玩家的输入、更新游戏的状态,并且在屏幕上渲染游戏结果。在在线游戏中,游戏循环分为客户端和服务端两部分,所以一般有两个循环通过网络通信。通常客户端的角色是获取玩家输入,比如按键或者鼠标移动,将数据传输给服务端,然后接收需要渲染的数据。服务端处理来自玩家的所有数据,更新游戏的状态,执行渲染下一帧的必要计算,然后将结果传回客户端,例如游戏中对象的新位置。如果没有可靠的理由,不混淆客户端和服务端的角色是一件很重要的事。如果你在客户端执行游戏逻辑的计算,很容易就会和其它客户端失去同步,其实你的游戏也可以通过简单地传递客户端的数据来创建。

游戏循环的一次迭代称为一个 嘀嗒 tick 。嘀嗒是一个事件,表示当前游戏循环的迭代已经结束,下一帧(或者多帧)的数据已经就绪。

在后面的例子中,我们使用相同的客户端,它使用 WebSocket 从一个网页上连接到服务端。它执行一个简单的循环,将按键码发送给服务端,并显示来自服务端的所有信息。客户端代码戳这里

例子 3.1:基本游戏循环

我们使用 aiohttp 库来创建游戏服务器。它可以通过 asyncio 创建网页服务器和客户端。这个库的一个优势是它同时支持普通 http 请求和 websocket。所以我们不用其他网页服务器来渲染游戏的 html 页面。

下面是启动服务器的方法:

app = web.Application()
app["sockets"] = []

asyncio.ensure_future(game_loop(app))

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

web.run_app 是创建服务主任务的快捷方法,通过它的 run_forever() 方法来执行 asyncio 事件循环。建议你查看这个方法的源码,弄清楚服务器到底是如何创建和结束的。

app 变量就是一个类似于字典的对象,它用于在所连接的客户端之间共享数据。我们使用它来存储连接的套接字的列表。随后会用这个列表来给所有连接的客户端发送消息。asyncio.ensure_future() 调用会启动主游戏循环的任务,每隔2 秒向客户端发送嘀嗒消息。这个任务会在同样的 asyncio 事件循环中和网页服务器并行执行。

有两个网页请求处理器:handle 是提供 html 页面的处理器;wshandler 是主要的 websocket 服务器任务,处理和客户端之间的交互。在事件循环中,每一个连接的客户端都会创建一个新的 wshandler 任务。这个任务会添加客户端的套接字到列表中,以便 game_loop 任务可以给所有的客户端发送消息。然后它将随同消息回显客户端的每个击键。

在启动的任务中,我们在 asyncio 的主事件循环中启动 worker 循环。任务之间的切换发生在它们之间任何一个使用 await语句来等待某个协程结束时。例如 asyncio.sleep 仅仅是将程序执行权交给调度器一段指定的时间;ws.receive 等待 websocket 的消息,此时调度器可能切换到其它任务。

在浏览器中打开主页,连接上服务器后,试试随便按下键。它们的键值会从服务端返回,每隔 2 秒这个数字会被游戏循环中发给所有客户端的嘀嗒消息所覆盖。

我们刚刚创建了一个处理客户端按键的服务器,主游戏循环在后台做一些处理,周期性地同时更新所有的客户端。

例子 3.2: 根据请求启动游戏

在前一个例子中,在服务器的生命周期内,游戏循环一直运行着。但是现实中,如果没有一个人连接服务器,空运行游戏循环通常是不合理的。而且,同一个服务器上可能有不同的“游戏房间”。在这种假设下,每一个玩家“创建”一个游戏会话(比如说,多人游戏中的一个比赛或者大型多人游戏中的副本),这样其他用户可以加入其中。当游戏会话开始时,游戏循环才开始执行。

在这个例子中,我们使用一个全局标记来检测游戏循环是否在执行。当第一个用户发起连接时,启动它。最开始,游戏循环没有执行,标记设置为 False。游戏循环是通过客户端的处理方法启动的。

  if app["game_is_running"] == False:
        asyncio.ensure_future(game_loop(app))

game_loop() 运行时,这个标记设置为 True;当所有客户端都断开连接时,其又被设置为 False

例子 3.3:管理任务

这个例子用来解释如何和任务对象协同工作。我们把游戏循环的任务直接存储在游戏循环的全局字典中,代替标记的使用。在像这样的一个简单例子中并不一定是最优的,但是有时候你可能需要控制所有已经启动的任务。

    if app["game_loop"] is None or \
       app["game_loop"].cancelled():
        app["game_loop"] = asyncio.ensure_future(game_loop(app))

这里 ensure_future() 返回我们存放在全局字典中的任务对象,当所有用户都断开连接时,我们使用下面方式取消任务:

    app["game_loop"].cancel()

这个 cancel() 调用将通知调度器不要向这个协程传递执行权,而且将它的状态设置为已取消:cancelled,之后可以通过 cancelled() 方法来检查是否已取消。这里有一个值得一提的小注意点:当你持有一个任务对象的外部引用时,而这个任务执行中发生了异常,这个异常不会抛出。取而代之的是为这个任务设置一个异常状态,可以通过 exception() 方法来检查是否出现了异常。这种悄无声息地失败在调试时不是很有用。所以,你可能想用抛出所有异常来取代这种做法。你可以对所有未完成的任务显式地调用 result() 来实现。可以通过如下的回调来实现:

    app["game_loop"].add_done_callback(lambda t: t.result())

如果我们打算在我们代码中取消这个任务,但是又不想产生 CancelError 异常,有一个检查 cancelled 状态的点:

    app["game_loop"].add_done_callback(lambda t: t.result()
                                       if not t.cancelled() else None)

注意仅当你持有任务对象的引用时才需要这么做。在前一个例子,所有的异常都是没有额外的回调,直接抛出所有异常。

例子 3.4:等待多个事件

在许多场景下,在客户端的处理方法中你需要等待多个事件的发生。除了来自客户端的消息,你可能需要等待不同类型事件的发生。比如,如果你的游戏时间有限制,那么你可能需要等一个来自定时器的信号。或者你需要使用管道来等待来自其它进程的消息。亦或者是使用分布式消息系统的网络中其它服务器的信息。

为了简单起见,这个例子是基于例子 3.1。但是这个例子中我们使用 Condition 对象来与已连接的客户端保持游戏循环的同步。我们不保存套接字的全局列表,因为只在该处理方法中使用套接字。当游戏循环停止迭代时,我们使用 Condition.notify_all() 方法来通知所有的客户端。这个方法允许在 asyncio 的事件循环中使用发布/订阅的模式。

为了等待这两个事件,首先我们使用 ensure_future() 来封装任务中这个可等待对象。

    if not recv_task:
        recv_task = asyncio.ensure_future(ws.receive())
    if not tick_task:
        await tick.acquire()
        tick_task = asyncio.ensure_future(tick.wait())

在我们调用 Condition.wait() 之前,我们需要在它后面获取一把锁。这就是我们为什么先调用 tick.acquire() 的原因。在调用 tick.wait() 之后,锁会被释放,这样其他的协程也可以使用它。但是当我们收到通知时,会重新获取锁,所以在收到通知后需要调用 tick.release() 来释放它。

我们使用 asyncio.wait() 协程来等待两个任务。

    done, pending = await asyncio.wait(
        [recv_task,
         tick_task],
        return_when=asyncio.FIRST_COMPLETED)

程序会阻塞,直到列表中的任意一个任务完成。然后它返回两个列表:执行完成的任务列表和仍然在执行的任务列表。如果任务执行完成了,其对应变量赋值为 None,所以在下一个迭代时,它可能会被再次创建。

例子 3.5: 结合多个线程

在这个例子中,我们结合 asyncio 循环和线程,在一个单独的线程中执行主游戏循环。我之前提到过,由于 GIL 的存在,Python 代码的真正并行执行是不可能的。所以使用其它线程来执行复杂计算并不是一个好主意。然而,在使用 asyncio 时结合线程有原因的:当我们使用的其它库不支持 asyncio 时就需要。在主线程中调用这些库会阻塞循环的执行,所以异步使用他们的唯一方法是在不同的线程中使用他们。

我们使用 asyncio 循环的run_in_executor() 方法和 ThreadPoolExecutor 来执行游戏循环。注意 game_loop() 已经不再是一个协程了。它是一个由其它线程执行的函数。然而我们需要和主线程交互,在游戏事件到来时通知客户端。asyncio 本身不是线程安全的,它提供了可以在其它线程中执行你的代码的方法。普通函数有 call_soon_threadsafe(),协程有 run_coroutine_threadsafe()。我们在 notify() 协程中增加了通知客户端游戏的嘀嗒的代码,然后通过另外一个线程执行主事件循环。

def game_loop(asyncio_loop):
    print("Game loop thread id {}".format(threading.get_ident()))
    async def notify():
        print("Notify thread id {}".format(threading.get_ident()))
        await tick.acquire()
        tick.notify_all()
        tick.release()

    while 1:
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        # blocking the thread
        sleep(1)
        # make sure the task has finished
        task.result()

当你执行这个例子时,你会看到 “Notify thread id” 和 “Main thread id” 相等,因为 notify() 协程在主线程中执行。与此同时 sleep(1) 在另外一个线程中执行,因此它不会阻塞主事件循环。

例子 3.6:多进程和扩展

单线程的服务器可能运行得很好,但是它只能使用一个 CPU 核。为了将服务扩展到多核,我们需要执行多个进程,每个进程执行各自的事件循环。这样我们需要在进程间交互信息或者共享游戏的数据。而且在一个游戏中经常需要进行复杂的计算,例如路径查找之类。这些任务有时候在一个游戏嘀嗒中没法快速完成。在协程中不推荐进行费时的计算,因为它会阻塞事件的处理。在这种情况下,将这个复杂任务交给其它并行执行的进程可能更合理。

最简单的使用多个核的方法是启动多个使用单核的服务器,就像之前的例子中一样,每个服务器占用不同的端口。你可以使用 supervisord 或者其它进程控制的系统。这个时候你需要一个像 HAProxy 这样的负载均衡器,使得连接的客户端分布在多个进程间。已经有一些可以连接 asyncio 和一些流行的消息及存储系统的适配系统。例如:

你可以在 github 或者 pypi 上找到其它的软件包,大部分以 aio 开头。

使用网络服务在存储持久状态和交换某些信息时可能比较有效。但是如果你需要进行进程间通信的实时处理,它的性能可能不足。此时,使用标准的 unix 管道可能更合适。asyncio 支持管道,在aiohttp仓库有个 使用管道的服务器的非常底层的例子

在当前的例子中,我们使用 Python 的高层类库 multiprocessing 来在不同的核上启动复杂的计算,使用 multiprocessing.Queue 来进行进程间的消息交互。不幸的是,当前的 multiprocessing 实现与 asyncio 不兼容。所以每一个阻塞方法的调用都会阻塞事件循环。但是此时线程正好可以起到帮助作用,因为如果在不同线程里面执行 multiprocessing 的代码,它就不会阻塞主线程。所有我们需要做的就是把所有进程间的通信放到另外一个线程中去。这个例子会解释如何使用这个方法。和上面的多线程例子非常类似,但是我们从线程中创建的是一个新的进程。

def game_loop(asyncio_loop):
    # coroutine to run in main thread
    async def notify():
        await tick.acquire()
        tick.notify_all()
        tick.release()

    queue = Queue()

    # function to run in a different process
    def worker():
        while 1:
            print("doing heavy calculation in process {}".format(os.getpid()))
            sleep(1)
            queue.put("calculation result")

    Process(target=worker).start()

    while 1:
        # blocks this thread but not main thread with event loop
        result = queue.get()
        print("getting {} in process {}".format(result, os.getpid()))
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        task.result()

这里我们在另外一个进程中运行 worker() 函数。它包括一个执行复杂计算并把计算结果放到 queue 中的循环,这个 queuemultiprocessing.Queue 的实例。然后我们就可以在另外一个线程的主事件循环中获取结果并通知客户端,就和例子 3.5 一样。这个例子已经非常简化了,它没有合理的结束进程。而且在真实的游戏中,我们可能需要另外一个队列来将数据传递给 worker

有一个项目叫 aioprocessing,它封装了 multiprocessing,使得它可以和 asyncio 兼容。但是实际上它只是和上面例子使用了完全一样的方法:从线程中创建进程。它并没有给你带来任何方便,除了它使用了简单的接口隐藏了后面的这些技巧。希望在 Python 的下一个版本中,我们能有一个基于协程且支持 asynciomultiprocessing 库。

注意!如果你从主线程或者主进程中创建了一个不同的线程或者子进程来运行另外一个 asyncio 事件循环,你需要显式地使用 asyncio.new_event_loop() 来创建循环,不然的话可能程序不会正常工作。

via: https://7webpages.com/blog/writing-online-multiplayer-game-with-python-and-asyncio-writing-game-loop/

作者:Kyrylo Subbotin 译者:chunyang-wen 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

(题图来自:deviantart.com