标签 python 下的文章

Instagram 目前部署了世界上最大规模的 Django Web 框架(该框架完全使用 Python 编写)。我们最初选用 Python 是因为它久负盛名的简洁性与实用性,这非常符合我们的哲学思想——“先做简单的事情”。但简洁性也会带来效率方面的折衷。Instagram 的规模在过去两年中已经翻番,并且最近已突破 5 亿用户,所以急需最大程度地提升 web 服务效率以便我们的平台能够继续顺利地扩大。在过去的一年,我们已经将 效率计划 efficiency program 提上日程,并在过去的六个月,我们已经能够做到无需向我们的 Django 层 Django tiers 添加新的容量来维持我们的用户增长。我们将在本文分享一些由我们构建的工具以及如何使用它们来优化我们的日常部署流程。

为何需要提升效率?

Instagram,正如所有的软件,受限于像服务器和数据中心能源这样的物理限制。鉴于这些限制,在我们的效率计划中有两个我们希望实现的主要目标:

  1. Instagram 应当能够利用持续代码发布正常地提供通信服务,防止因为自然灾害、区域性网络问题等造成某一个数据中心区丢失。
  2. Instagram 应当能够自由地滚动发布新产品和新功能,不必因容量而受阻。

想要实现这些目标,我们意识到我们需要持续不断地监控我们的系统并与 回归 regressions 进行战斗。

定义效率

Web services 的瓶颈通常在于每台服务器上可用的 CPU 时间。在这种环境下,效率就意味着利用相同的 CPU 资源完成更多的任务,也就是说, 每秒处理更多的用户请求 requests per second,RPS 。当我们寻找优化方法时,我们面临的第一个最大的挑战就是尝试量化我们当前的效率。到目前为止,我们一直在使用“每次请求的平均 CPU 时间”来评估效率,但使用这种指标也有其固有限制:

  1. 设备多样性。使用 CPU 时间来测量 CPU 资源并非理想方案,因为它同时受到 CPU 型号与 CPU 负载的影响。
  2. 请求影响数据。测量每次请求的 CPU 资源并非理想方案,因为在使用 每次请求测量 per-request measurement 方案时,添加或移除轻量级或重量级的请求也会影响到效率指标。

相对于 CPU 时间来说,CPU 指令是一种更好的指标,因为对于相同的请求,它会报告相同的数字,不管 CPU 型号和 CPU 负载情况如何。我们选择使用了一种叫做” 每个活动用户 per active user “的指标,而不是将我们所有的数据关联到每个用户请求上。我们最终采用“ 每个活动用户在高峰期间的 CPU 指令 CPU instruction per active user during peak minute ”来测量效率。我们建立好新的度量标准后,下一步就是通过对 Django 的分析来更多的了解一下我们的回归。

Django web services 分析

通过分析我们的 Django web services,我们希望回答两个主要问题:

  1. CPU 回归会发生吗?
  2. 是什么导致了 CPU 回归发生以及我们该怎样修复它?

想要回答第一个问题,我们需要追踪“ 每个活动用户的 CPU 指令 CPU-instruction-per-active-user ”指标。如果该指标增加,我们就知道已经发生了一次 CPU 回归。

我们为此构建的工具叫做 Dynostats。Dynostats 利用 Django 中间件以一定的速率采样用户请求,记录关键的效率以及性能指标,例如 CPU 总指令数、端到端请求时延、花费在访问内存缓存(memcache)和数据库服务的时间等。另一方面,每个请求都有很多可用于聚合的 元数据 metadata ,例如端点名称、HTTP 请求返回码、服务该请求的服务器名称以及请求中最新提交的 哈希值 hash 。对于单个请求记录来说,有两个方面非常强大,因为我们可以在不同的维度上进行切割,那将帮助我们减少任何导致 CPU 回归的原因。例如,我们可以根据它们的端点名称聚合所有请求,正如下面的时间序列图所示,从图中可以清晰地看出在特定端点上是否发生了回归。

CPU 指令对测量效率很重要——当然,它们也很难获得。Python 并没有支持直接访问 CPU 硬件计数器(CPU 硬件计数器是指可编程 CPU 寄存器,用于测量性能指标,例如 CPU 指令)的公共库。另一方面,Linux 内核提供了 perf_event_open 系统调用。通过 Python ctypes 桥接技术能够让我们调用标准 C 库的系统调用函数 syscall,它也为我们提供了兼容 C 的数据类型,从而可以编程硬件计数器并从它们读取数据。

使用 Dynostats,我们已经可以找出 CPU 回归,并探究 CPU 回归发生的原因,例如哪个端点受到的影响最多,谁提交了真正会导致 CPU 回归的变更等。然而,当开发者收到他们的变更已经导致一次 CPU 回归发生的通知时,他们通常难以找出问题所在。如果问题很明显,那么回归可能就不会一开始就被提交!

这就是为何我们需要一个 Python 分析器,(一旦 Dynostats 发现了它)从而使开发者能够使用它找出回归发生的根本原因。不同于白手起家,我们决定对一个现成的 Python 分析器 cProfile 做适当的修改。cProfile 模块通常会提供一个统计集合来描述程序不同的部分执行时间和执行频率。我们将 cProfile 的 定时器 timer 替换成了一个从硬件计数器读取的 CPU 指令计数器,以此取代对时间的测量。我们在采样请求后产生数据并把数据发送到数据流水线。我们也会发送一些我们在 Dynostats 所拥有的类似元数据,例如服务器名称、集群、区域、端点名称等。

在数据流水线的另一边,我们创建了一个消费数据的 尾随者 tailer 。尾随者的主要功能是解析 cProfile 的统计数据并创建能够表示 Python 函数级别的 CPU 指令的实体。如此,我们能够通过 Python 函数来聚合 CPU 指令,从而更加方便地找出是什么函数导致了 CPU 回归。

监控与警报机制

在 Instagram,我们每天部署 30-50 次后端服务。这些部署中的任何一个都能发生 CPU 回归的问题。因为每次发生通常都包含至少一个 差异 diff ,所以找出任何回归是很容易的。我们的效率监控机制包括在每次发布前后都会在 Dynostats 中扫描 CPU 指令,并且当变更超出某个阈值时发出警告。对于长期会发生 CPU 回归的情况,我们也有一个探测器为负载最繁重的端点提供日常和每周的变更扫描。

部署新的变更并非触发一次 CPU 回归的唯一情况。在许多情况下,新的功能和新的代码路径都由 全局环境变量 global environment variables,GEV 控制。 在一个计划好的时间表上,给一部分用户发布新功能是很常见事情。我们在 Dynostats 和 cProfile 统计数据中为每个请求添加了这个信息作为额外的元数据字段。按这些字段将请求分组可以找出由全局环境变量(GEV)改变导致的可能的 CPU 回归问题。这让我们能够在它们对性能造成影响前就捕获到 CPU 回归。

接下来是什么?

Dynostats 和我们定制的 cProfile,以及我们建立的支持它们的监控和警报机制能够有效地找出大多数导致 CPU 回归的元凶。这些进展已经帮助我们恢复了超过 50% 的不必要的 CPU 回归,否则我们就根本不会知道。

我们仍然还有一些可以提升的方面,并很容易将它们地加入到 Instagram 的日常部署流程中:

  1. CPU 指令指标应该要比其它指标如 CPU 时间更加稳定,但我们仍然观察了让我们头疼的差异。保持“ 信噪比 signal:noise ratio ”合理地低是非常重要的,这样开发者们就可以集中于真实的回归上。这可以通过引入 置信区间 confidence intervals 的概念来提升,并在信噪比过高时发出警报。针对不同的端点,变化的阈值也可以设置为不同值。
  2. 通过更改 GEV 来探测 CPU 回归的一个限制就是我们要在 Dynostats 中手动启用这些比较的日志输出。当 GEV 的数量逐渐增加,开发了越来越多的功能,这就不便于扩展了。相反,我们能够利用一个自动化框架来调度这些比较的日志输出,并对所有的 GEV 进行遍历,然后当检查到回归时就发出警告。
  3. cProfile 需要一些增强以便更好地处理封装函数以及它们的子函数。

鉴于我们在为 Instagram 的 web service 构建效率框架中所投入的工作,所以我们对于将来使用 Python 继续扩展我们的服务很有信心。我们也开始向 Python 语言本身投入更多,并且开始探索从 Python 2 转移 Python 3 之道。我们将会继续探索并做更多的实验以继续提升基础设施与开发者效率,我们期待着很快能够分享更多的经验。

本文作者 Min Ni 是 Instagram 的软件工程师。

(题图来自:nostarch.com


via: https://engineering.instagram.com/web-service-efficiency-at-instagram-with-python-4976d078e366#.tiakuoi4p

作者:Min Ni 译者:ChrisLeeGit 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

在这个系列中,我们基于多人游戏 贪吃蛇 来制作一个异步的 Python 程序。上一篇文章聚焦于编写游戏循环上,而本系列第 1 部分则涵盖了如何异步化

4、制作一个完整的游戏

4.1 工程概览

在此部分,我们将回顾一个完整在线游戏的设计。这是一个经典的贪吃蛇游戏,增加了多玩家支持。你可以自己在 (http://snakepit-game.com) 亲自试玩。源码在 GitHub 的这个仓库。游戏包括下列文件:

  • server.py - 处理主游戏循环和连接。
  • game.py - 主要的 Game 类。实现游戏的逻辑和游戏的大部分通信协议。
  • player.py - Player 类,包括每一个独立玩家的数据和蛇的展现。这个类负责获取玩家的输入并相应地移动蛇。
  • datatypes.py - 基本数据结构。
  • settings.py - 游戏设置,在注释中有相关的说明。
  • index.html - 客户端所有的 html 和 javascript代码都放在一个文件中。

4.2 游戏循环内窥

多人的贪吃蛇游戏是个用于学习十分好的例子,因为它简单。所有的蛇在每个帧中移动到一个位置,而且帧以非常低的频率进行变化,这样就可以让你就观察到游戏引擎到底是如何工作的。因为速度慢,对于玩家的按键不会立马响应。按键先是记录下来,然后在一个游戏循环迭代的最后计算下一帧时使用。

现代的动作游戏帧频率更高,而且通常服务端和客户端的帧频率是不相等的。客户端的帧频率通常依赖于客户端的硬件性能,而服务端的帧频率则是固定的。一个客户端可能根据一个游戏“嘀嗒”的数据渲染多个帧。这样就可以创建平滑的动画,这个受限于客户端的性能。在这个例子中,服务端不仅传输物体的当前位置,也要传输它们的移动方向、速度和加速度。客户端的帧频率称之为 FPS( 每秒帧数 frames per second ),服务端的帧频率称之为 TPS( 每秒滴答数 ticks per second )。在这个贪吃蛇游戏的例子中,二者的值是相等的,在客户端显示的一帧是在服务端的一个“嘀嗒”内计算出来的。

我们使用类似文本模式的游戏区域,事实上是 html 表格中的一个字符宽的小格。游戏中的所有对象都是通过表格中的不同颜色字符来表示。大部分时候,客户端将按键的码发送至服务端,然后每个“滴答”更新游戏区域。服务端一次更新包括需要更新字符的坐标和颜色。所以我们将所有游戏逻辑放置在服务端,只将需要渲染的数据发送给客户端。此外,我们通过替换通过网络发送的数据来减少游戏被破解的概率。

4.3 它是如何运行的?

这个游戏中的服务端出于简化的目的,它和例子 3.2 类似。但是我们用一个所有服务端都可访问的 Game 对象来代替之前保存了所有已连接 websocket 的全局列表。一个 Game 实例包括一个表示连接到此游戏的玩家的 Player 对象的列表(在 self._players 属性里面),以及他们的个人数据和 websocket 对象。将所有游戏相关的数据存储在一个 Game 对象中,会方便我们增加多个游戏房间这个功能——如果我们要增加这个功能的话。这样,我们维护多个 Game 对象,每个游戏开始时创建一个。

客户端和服务端的所有交互都是通过编码成 json 的消息来完成。来自客户端的消息仅包含玩家所按下键码对应的编号。其它来自客户端消息使用如下格式:

[command, arg1, arg2, ... argN ]

来自服务端的消息以列表的形式发送,因为通常一次要发送多个消息 (大多数情况下是渲染的数据):

[[command, arg1, arg2, ... argN ], ... ]

在每次游戏循环迭代的最后会计算下一帧,并且将数据发送给所有的客户端。当然,每次不是发送完整的帧,而是发送两帧之间的变化列表。

注意玩家连接上服务端后不是立马加入游戏。连接开始时是 观望者 spectator 模式,玩家可以观察其它玩家如何玩游戏。如果游戏已经开始或者上一个游戏会话已经在屏幕上显示 “game over” (游戏结束),用户此时可以按下 “Join”(参与),来加入一个已经存在的游戏,或者如果游戏没有运行(没有其它玩家)则创建一个新的游戏。后一种情况下,游戏区域在开始前会被先清空。

游戏区域存储在 Game._field 这个属性中,它是由嵌套列表组成的二维数组,用于内部存储游戏区域的状态。数组中的每一个元素表示区域中的一个小格,最终小格会被渲染成 html 表格的格子。它有一个 Char 的类型,是一个 namedtuple ,包括一个字符和颜色。在所有连接的客户端之间保证游戏区域的同步很重要,所以所有游戏区域的更新都必须依据发送到客户端的相应的信息。这是通过 Game.apply_render() 来实现的。它接受一个 Draw 对象的列表,其用于内部更新游戏区域和发送渲染消息给客户端。

我们使用 namedtuple 不仅因为它表示简单数据结构很方便,也因为用它生成 json 格式的消息时相对于 dict 更省空间。如果你在一个真实的游戏循环中需要发送复杂的数据结构,建议先将它们序列化成一个简单的、更短的格式,甚至打包成二进制格式(例如 bson,而不是 json),以减少网络传输。

Player 对象包括用 deque 对象表示的蛇。这种数据类型和 list 相似,但是在两端增加和删除元素时效率更高,用它来表示蛇很理想。它的主要方法是 Player.render_move(),它返回移动玩家的蛇至下一个位置的渲染数据。一般来说它在新的位置渲染蛇的头部,移除上一帧中表示蛇的尾巴的元素。如果蛇吃了一个数字变长了,在相应的多个帧中尾巴是不需要移动的。蛇的渲染数据在主类的 Game.next_frame() 中使用,该方法中实现所有的游戏逻辑。这个方法渲染所有蛇的移动,检查每一个蛇前面的障碍物,而且生成数字和“石头”。每一个“嘀嗒”,game_loop() 都会直接调用它来生成下一帧。

如果蛇头前面有障碍物,在 Game.next_frame() 中会调用 Game.game_over()。它后通知所有的客户端那个蛇死掉了 (会调用 player.render_game_over() 方法将其变成石头),然后更新表中的分数排行榜。Player 对象的 alive 标记被置为 False,当渲染下一帧时,这个玩家会被跳过,除非他重新加入游戏。当没有蛇存活时,游戏区域会显示 “game over” (游戏结束)。而且,主游戏循环会停止,设置 game.running 标记为 False。当某个玩家下次按下 “Join” (加入)时,游戏区域会被清空。

在渲染游戏的每个下一帧时也会产生数字和石头,它们是由随机值决定的。产生数字或者石头的概率可以在 settings.py 中修改成其它值。注意数字的产生是针对游戏区域每一个活的蛇的,所以蛇越多,产生的数字就越多,这样它们都有足够的食物来吃掉。

4.4 网络协议

从客户端发送消息的列表:

命令参数描述
new\_player[name]设置玩家的昵称
join 玩家加入游戏

从服务端发送消息的列表:

命令参数描述
handshake[id]给一个玩家指定 ID
world[[(char, color), ...], ...]初始化游戏区域(世界地图)
reset\_world 清除实际地图,替换所有字符为空格
render[x, y, char, color]在某个位置显示字符
p\_joined[id, name, color, score]新玩家加入游戏
p\_gameover[id]某个玩家游戏结束
p\_score[id, score]给某个玩家计分
top\_scores[[name, score, color], ...]更新排行榜

典型的消息交换顺序:

客户端 -> 服务端服务端 -> 客户端服务端 -> 所有客户端备注
new\_player 名字传递给服务端
handshake 指定 ID
world 初始化传递的世界地图
top\_scores 收到传递的排行榜
join 玩家按下“Join”,游戏循环开始
reset\_world命令客户端清除游戏区域
render, render, ...第一个游戏“滴答”,渲染第一帧
(key code) 玩家按下一个键
render, render, ...渲染第二帧
p\_score蛇吃掉了一个数字
render, render, ...渲染第三帧
... 重复若干帧 ...
p\_gameover试着吃掉障碍物时蛇死掉了
top\_scores更新排行榜(如果需要更新的话)

5. 总结

说实话,我十分享受 Python 最新的异步特性。新的语法做了改善,所以异步代码很容易阅读。可以明显看出哪些调用是非阻塞的,什么时候发生 greenthread 的切换。所以现在我可以宣称 Python 是异步编程的好工具。

SnakePit 在 7WebPages 团队中非常受欢迎。如果你在公司想休息一下,不要忘记给我们在 Twitter 或者 Facebook 留下反馈。


via: https://7webpages.com/blog/writing-online-multiplayer-game-with-python-and-asyncio-part-3/

作者:Kyrylo Subbotin 译者:chunyang-wen 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

(题图来自:wallpaperinhd.net-wallpaper.html))

你在 Python 中用过异步编程吗?本文中我会告诉你怎样做,而且用一个能工作的例子来展示它:这是一个流行的贪吃蛇游戏,而且是为多人游戏而设计的。

介绍和理论部分参见“第一部分 异步化”。

3、编写游戏循环主体

游戏循环是每一个游戏的核心。它持续地运行以读取玩家的输入、更新游戏的状态,并且在屏幕上渲染游戏结果。在在线游戏中,游戏循环分为客户端和服务端两部分,所以一般有两个循环通过网络通信。通常客户端的角色是获取玩家输入,比如按键或者鼠标移动,将数据传输给服务端,然后接收需要渲染的数据。服务端处理来自玩家的所有数据,更新游戏的状态,执行渲染下一帧的必要计算,然后将结果传回客户端,例如游戏中对象的新位置。如果没有可靠的理由,不混淆客户端和服务端的角色是一件很重要的事。如果你在客户端执行游戏逻辑的计算,很容易就会和其它客户端失去同步,其实你的游戏也可以通过简单地传递客户端的数据来创建。

游戏循环的一次迭代称为一个 嘀嗒 tick 。嘀嗒是一个事件,表示当前游戏循环的迭代已经结束,下一帧(或者多帧)的数据已经就绪。

在后面的例子中,我们使用相同的客户端,它使用 WebSocket 从一个网页上连接到服务端。它执行一个简单的循环,将按键码发送给服务端,并显示来自服务端的所有信息。客户端代码戳这里

例子 3.1:基本游戏循环

我们使用 aiohttp 库来创建游戏服务器。它可以通过 asyncio 创建网页服务器和客户端。这个库的一个优势是它同时支持普通 http 请求和 websocket。所以我们不用其他网页服务器来渲染游戏的 html 页面。

下面是启动服务器的方法:

app = web.Application()
app["sockets"] = []

asyncio.ensure_future(game_loop(app))

app.router.add_route('GET', '/connect', wshandler)
app.router.add_route('GET', '/', handle)

web.run_app(app)

web.run_app 是创建服务主任务的快捷方法,通过它的 run_forever() 方法来执行 asyncio 事件循环。建议你查看这个方法的源码,弄清楚服务器到底是如何创建和结束的。

app 变量就是一个类似于字典的对象,它用于在所连接的客户端之间共享数据。我们使用它来存储连接的套接字的列表。随后会用这个列表来给所有连接的客户端发送消息。asyncio.ensure_future() 调用会启动主游戏循环的任务,每隔2 秒向客户端发送嘀嗒消息。这个任务会在同样的 asyncio 事件循环中和网页服务器并行执行。

有两个网页请求处理器:handle 是提供 html 页面的处理器;wshandler 是主要的 websocket 服务器任务,处理和客户端之间的交互。在事件循环中,每一个连接的客户端都会创建一个新的 wshandler 任务。这个任务会添加客户端的套接字到列表中,以便 game_loop 任务可以给所有的客户端发送消息。然后它将随同消息回显客户端的每个击键。

在启动的任务中,我们在 asyncio 的主事件循环中启动 worker 循环。任务之间的切换发生在它们之间任何一个使用 await语句来等待某个协程结束时。例如 asyncio.sleep 仅仅是将程序执行权交给调度器一段指定的时间;ws.receive 等待 websocket 的消息,此时调度器可能切换到其它任务。

在浏览器中打开主页,连接上服务器后,试试随便按下键。它们的键值会从服务端返回,每隔 2 秒这个数字会被游戏循环中发给所有客户端的嘀嗒消息所覆盖。

我们刚刚创建了一个处理客户端按键的服务器,主游戏循环在后台做一些处理,周期性地同时更新所有的客户端。

例子 3.2: 根据请求启动游戏

在前一个例子中,在服务器的生命周期内,游戏循环一直运行着。但是现实中,如果没有一个人连接服务器,空运行游戏循环通常是不合理的。而且,同一个服务器上可能有不同的“游戏房间”。在这种假设下,每一个玩家“创建”一个游戏会话(比如说,多人游戏中的一个比赛或者大型多人游戏中的副本),这样其他用户可以加入其中。当游戏会话开始时,游戏循环才开始执行。

在这个例子中,我们使用一个全局标记来检测游戏循环是否在执行。当第一个用户发起连接时,启动它。最开始,游戏循环没有执行,标记设置为 False。游戏循环是通过客户端的处理方法启动的。

  if app["game_is_running"] == False:
        asyncio.ensure_future(game_loop(app))

game_loop() 运行时,这个标记设置为 True;当所有客户端都断开连接时,其又被设置为 False

例子 3.3:管理任务

这个例子用来解释如何和任务对象协同工作。我们把游戏循环的任务直接存储在游戏循环的全局字典中,代替标记的使用。在像这样的一个简单例子中并不一定是最优的,但是有时候你可能需要控制所有已经启动的任务。

    if app["game_loop"] is None or \
       app["game_loop"].cancelled():
        app["game_loop"] = asyncio.ensure_future(game_loop(app))

这里 ensure_future() 返回我们存放在全局字典中的任务对象,当所有用户都断开连接时,我们使用下面方式取消任务:

    app["game_loop"].cancel()

这个 cancel() 调用将通知调度器不要向这个协程传递执行权,而且将它的状态设置为已取消:cancelled,之后可以通过 cancelled() 方法来检查是否已取消。这里有一个值得一提的小注意点:当你持有一个任务对象的外部引用时,而这个任务执行中发生了异常,这个异常不会抛出。取而代之的是为这个任务设置一个异常状态,可以通过 exception() 方法来检查是否出现了异常。这种悄无声息地失败在调试时不是很有用。所以,你可能想用抛出所有异常来取代这种做法。你可以对所有未完成的任务显式地调用 result() 来实现。可以通过如下的回调来实现:

    app["game_loop"].add_done_callback(lambda t: t.result())

如果我们打算在我们代码中取消这个任务,但是又不想产生 CancelError 异常,有一个检查 cancelled 状态的点:

    app["game_loop"].add_done_callback(lambda t: t.result()
                                       if not t.cancelled() else None)

注意仅当你持有任务对象的引用时才需要这么做。在前一个例子,所有的异常都是没有额外的回调,直接抛出所有异常。

例子 3.4:等待多个事件

在许多场景下,在客户端的处理方法中你需要等待多个事件的发生。除了来自客户端的消息,你可能需要等待不同类型事件的发生。比如,如果你的游戏时间有限制,那么你可能需要等一个来自定时器的信号。或者你需要使用管道来等待来自其它进程的消息。亦或者是使用分布式消息系统的网络中其它服务器的信息。

为了简单起见,这个例子是基于例子 3.1。但是这个例子中我们使用 Condition 对象来与已连接的客户端保持游戏循环的同步。我们不保存套接字的全局列表,因为只在该处理方法中使用套接字。当游戏循环停止迭代时,我们使用 Condition.notify_all() 方法来通知所有的客户端。这个方法允许在 asyncio 的事件循环中使用发布/订阅的模式。

为了等待这两个事件,首先我们使用 ensure_future() 来封装任务中这个可等待对象。

    if not recv_task:
        recv_task = asyncio.ensure_future(ws.receive())
    if not tick_task:
        await tick.acquire()
        tick_task = asyncio.ensure_future(tick.wait())

在我们调用 Condition.wait() 之前,我们需要在它后面获取一把锁。这就是我们为什么先调用 tick.acquire() 的原因。在调用 tick.wait() 之后,锁会被释放,这样其他的协程也可以使用它。但是当我们收到通知时,会重新获取锁,所以在收到通知后需要调用 tick.release() 来释放它。

我们使用 asyncio.wait() 协程来等待两个任务。

    done, pending = await asyncio.wait(
        [recv_task,
         tick_task],
        return_when=asyncio.FIRST_COMPLETED)

程序会阻塞,直到列表中的任意一个任务完成。然后它返回两个列表:执行完成的任务列表和仍然在执行的任务列表。如果任务执行完成了,其对应变量赋值为 None,所以在下一个迭代时,它可能会被再次创建。

例子 3.5: 结合多个线程

在这个例子中,我们结合 asyncio 循环和线程,在一个单独的线程中执行主游戏循环。我之前提到过,由于 GIL 的存在,Python 代码的真正并行执行是不可能的。所以使用其它线程来执行复杂计算并不是一个好主意。然而,在使用 asyncio 时结合线程有原因的:当我们使用的其它库不支持 asyncio 时就需要。在主线程中调用这些库会阻塞循环的执行,所以异步使用他们的唯一方法是在不同的线程中使用他们。

我们使用 asyncio 循环的run_in_executor() 方法和 ThreadPoolExecutor 来执行游戏循环。注意 game_loop() 已经不再是一个协程了。它是一个由其它线程执行的函数。然而我们需要和主线程交互,在游戏事件到来时通知客户端。asyncio 本身不是线程安全的,它提供了可以在其它线程中执行你的代码的方法。普通函数有 call_soon_threadsafe(),协程有 run_coroutine_threadsafe()。我们在 notify() 协程中增加了通知客户端游戏的嘀嗒的代码,然后通过另外一个线程执行主事件循环。

def game_loop(asyncio_loop):
    print("Game loop thread id {}".format(threading.get_ident()))
    async def notify():
        print("Notify thread id {}".format(threading.get_ident()))
        await tick.acquire()
        tick.notify_all()
        tick.release()

    while 1:
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        # blocking the thread
        sleep(1)
        # make sure the task has finished
        task.result()

当你执行这个例子时,你会看到 “Notify thread id” 和 “Main thread id” 相等,因为 notify() 协程在主线程中执行。与此同时 sleep(1) 在另外一个线程中执行,因此它不会阻塞主事件循环。

例子 3.6:多进程和扩展

单线程的服务器可能运行得很好,但是它只能使用一个 CPU 核。为了将服务扩展到多核,我们需要执行多个进程,每个进程执行各自的事件循环。这样我们需要在进程间交互信息或者共享游戏的数据。而且在一个游戏中经常需要进行复杂的计算,例如路径查找之类。这些任务有时候在一个游戏嘀嗒中没法快速完成。在协程中不推荐进行费时的计算,因为它会阻塞事件的处理。在这种情况下,将这个复杂任务交给其它并行执行的进程可能更合理。

最简单的使用多个核的方法是启动多个使用单核的服务器,就像之前的例子中一样,每个服务器占用不同的端口。你可以使用 supervisord 或者其它进程控制的系统。这个时候你需要一个像 HAProxy 这样的负载均衡器,使得连接的客户端分布在多个进程间。已经有一些可以连接 asyncio 和一些流行的消息及存储系统的适配系统。例如:

你可以在 github 或者 pypi 上找到其它的软件包,大部分以 aio 开头。

使用网络服务在存储持久状态和交换某些信息时可能比较有效。但是如果你需要进行进程间通信的实时处理,它的性能可能不足。此时,使用标准的 unix 管道可能更合适。asyncio 支持管道,在aiohttp仓库有个 使用管道的服务器的非常底层的例子

在当前的例子中,我们使用 Python 的高层类库 multiprocessing 来在不同的核上启动复杂的计算,使用 multiprocessing.Queue 来进行进程间的消息交互。不幸的是,当前的 multiprocessing 实现与 asyncio 不兼容。所以每一个阻塞方法的调用都会阻塞事件循环。但是此时线程正好可以起到帮助作用,因为如果在不同线程里面执行 multiprocessing 的代码,它就不会阻塞主线程。所有我们需要做的就是把所有进程间的通信放到另外一个线程中去。这个例子会解释如何使用这个方法。和上面的多线程例子非常类似,但是我们从线程中创建的是一个新的进程。

def game_loop(asyncio_loop):
    # coroutine to run in main thread
    async def notify():
        await tick.acquire()
        tick.notify_all()
        tick.release()

    queue = Queue()

    # function to run in a different process
    def worker():
        while 1:
            print("doing heavy calculation in process {}".format(os.getpid()))
            sleep(1)
            queue.put("calculation result")

    Process(target=worker).start()

    while 1:
        # blocks this thread but not main thread with event loop
        result = queue.get()
        print("getting {} in process {}".format(result, os.getpid()))
        task = asyncio.run_coroutine_threadsafe(notify(), asyncio_loop)
        task.result()

这里我们在另外一个进程中运行 worker() 函数。它包括一个执行复杂计算并把计算结果放到 queue 中的循环,这个 queuemultiprocessing.Queue 的实例。然后我们就可以在另外一个线程的主事件循环中获取结果并通知客户端,就和例子 3.5 一样。这个例子已经非常简化了,它没有合理的结束进程。而且在真实的游戏中,我们可能需要另外一个队列来将数据传递给 worker

有一个项目叫 aioprocessing,它封装了 multiprocessing,使得它可以和 asyncio 兼容。但是实际上它只是和上面例子使用了完全一样的方法:从线程中创建进程。它并没有给你带来任何方便,除了它使用了简单的接口隐藏了后面的这些技巧。希望在 Python 的下一个版本中,我们能有一个基于协程且支持 asynciomultiprocessing 库。

注意!如果你从主线程或者主进程中创建了一个不同的线程或者子进程来运行另外一个 asyncio 事件循环,你需要显式地使用 asyncio.new_event_loop() 来创建循环,不然的话可能程序不会正常工作。

via: https://7webpages.com/blog/writing-online-multiplayer-game-with-python-and-asyncio-writing-game-loop/

作者:Kyrylo Subbotin 译者:chunyang-wen 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

(题图来自:deviantart.com

你在 Python 中用过异步编程吗?本文中我会告诉你怎样做,而且用一个能工作的例子来展示它:这是一个流行的贪吃蛇游戏,而且是为多人游戏而设计的。

1、简介

在技术和文化领域,大规模多人在线游戏(MMO)毋庸置疑是我们当今世界的潮流之一。很长时间以来,为一个 MMO 游戏写一个服务器这件事总是会涉及到大量的预算与复杂的底层编程技术,不过在最近这几年,事情迅速发生了变化。基于动态语言的现代框架允许在中档的硬件上面处理大量并发的用户连接。同时,HTML5 和 WebSockets 标准使得创建基于实时图形的游戏的直接运行至浏览器上的客户端成为可能,而不需要任何的扩展。

对于创建可扩展的非堵塞性的服务器来说,Python 可能不是最受欢迎的工具,尤其是和在这个领域里最受欢迎的 Node.js 相比而言。但是最近版本的 Python 正在改变这种现状。asyncio 的引入和一个特别的 async/await 语法使得异步代码看起来像常规的阻塞代码一样,这使得 Python 成为了一个值得信赖的异步编程语言,所以我将尝试利用这些新特点来创建一个多人在线游戏。

2、异步

一个游戏服务器应该可以接受尽可能多的用户并发连接,并实时处理这些连接。一个典型的解决方案是创建线程,然而在这种情况下并不能解决这个问题。运行上千的线程需要 CPU 在它们之间不停的切换(这叫做上下文切换),这将导致开销非常大,效率很低下。更糟糕的是使用进程来实现,因为,不但如此,它们还会占用大量的内存。在 Python 中,甚至还有一个问题,Python 的解释器(CPython)并不是针对多线程设计的,相反它主要针对于单线程应用实现最大的性能。这就是为什么它使用 GIL(global interpreter lock),这是一个不允许同时运行多线程 Python 代码的架构,以防止同一个共享对象出现使用不可控。正常情况下,在当前线程正在等待的时候,解释器会转换到另一个线程,通常是等待一个 I/O 的响应(举例说,比如等待 Web 服务器的响应)。这就允许在你的应用中实现非阻塞 I/O 操作,因为每一个操作仅仅阻塞一个线程而不是阻塞整个服务器。然而,这也使得通常的多线程方案变得几近无用,因为它不允许你并发执行 Python 代码,即使是在多核心的 CPU 上也是这样。而与此同时,在一个单一线程中拥有非阻塞 I/O 是完全有可能的,因而消除了经常切换上下文的需要。

实际上,你可以用纯 Python 代码来实现一个单线程的非阻塞 I/O。你所需要的只是标准的 select 模块,这个模块可以让你写一个事件循环来等待未阻塞的 socket 的 I/O。然而,这个方法需要你在一个地方定义所有 app 的逻辑,用不了多久,你的 app 就会变成非常复杂的状态机。有一些框架可以简化这个任务,比较流行的是 tornadetwisted。它们被用来使用回调方法实现复杂的协议(这和 Node.js 比较相似)。这种框架运行在它自己的事件循环中,按照定义的事件调用你的回调函数。并且,这或许是一些情况的解决方案,但是它仍然需要使用回调的方式编程,这使你的代码变得碎片化。与写同步代码并且并发地执行多个副本相比,这就像我们在普通的线程上做的一样。在单个线程上这为什么是不可能的呢?

这就是为什么出现微线程(microthread)概念的原因。这个想法是为了在一个线程上并发执行任务。当你在一个任务中调用阻塞的方法时,有一个叫做“manager” (或者“scheduler”)的东西在执行事件循环。当有一些事件准备处理的时候,一个 manager 会转移执行权给一个任务,并等着它执行完毕。任务将一直执行,直到它遇到一个阻塞调用,然后它就会将执行权返还给 manager。

微线程也称为轻量级线程(lightweight threads)或绿色线程(green threads)(来自于 Java 中的一个术语)。在伪线程中并发执行的任务叫做 tasklets、greenlets 或者协程(coroutines)。

Python 中的微线程最早的实现之一是 Stackless Python。它之所以这么知名是因为它被用在了一个叫 EVE online 的非常有名的在线游戏中。这个 MMO 游戏自称说在一个持久的“宇宙”中,有上千个玩家在做不同的活动,这些都是实时发生的。Stackless 是一个独立的 Python 解释器,它代替了标准的函数栈调用,并且直接控制程序运行流程来减少上下文切换的开销。尽管这非常有效,这个解决方案不如在标准解释器中使用“软”库更流行,像 eventletgevent 的软件包配备了修补过的标准 I/O 库,I/O 函数会将执行权传递到内部事件循环。这使得将正常的阻塞代码转变成非阻塞的代码变得简单。这种方法的一个缺点是从代码上看这并不分明,它的调用是非阻塞的。新版本的 Python 引入了本地协程作为生成器的高级形式。在 Python 的 3.4 版本之后,引入了 asyncio 库,这个库依赖于本地协程来提供单线程并发。但是仅仅到了 Python 3.5 ,协程就变成了 Python 语言的一部分,使用新的关键字 async 和 await 来描述。这是一个简单的例子,演示了使用 asyncio 来运行并发任务。

import asyncio

async def my_task(seconds):
    print("start sleeping for {} seconds".format(seconds))
    await asyncio.sleep(seconds)
    print("end sleeping for {} seconds".format(seconds))

all_tasks = asyncio.gather(my_task(1), my_task(2))
loop = asyncio.get_event_loop()
loop.run_until_complete(all_tasks)
loop.close()    

我们启动了两个任务,一个睡眠 1 秒钟,另一个睡眠 2 秒钟,输出如下:

start sleeping for 1 seconds
start sleeping for 2 seconds
end sleeping for 1 seconds
end sleeping for 2 seconds

正如你所看到的,协程不会阻塞彼此——第二个任务在第一个结束之前启动。这发生的原因是 asyncio.sleep 是协程,它会返回执行权给调度器,直到时间到了。

在下一节中,我们将会使用基于协程的任务来创建一个游戏循环。


via: https://7webpages.com/blog/writing-online-multiplayer-game-with-python-asyncio-getting-asynchronous/

作者:Kyrylo Subbotin 译者:xinglianfly 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

(题图来自:deviantart.net

为什么你想要自己构建一个 web 框架呢?我想,原因有以下几点:

  • 你有一个新奇的想法,觉得将会取代其他的框架
  • 你想要获得一些名气
  • 你遇到的问题很独特,以至于现有的框架不太合适
  • 你对 web 框架是如何工作的很感兴趣,因为你想要成为一位更好的 web 开发者。

接下来的笔墨将着重于最后一点。这篇文章旨在通过对设计和实现过程一步一步的阐述告诉读者,我在完成一个小型的服务器和框架之后学到了什么。你可以在这个代码仓库中找到这个项目的完整代码。

我希望这篇文章可以鼓励更多的人来尝试,因为这确实很有趣。它让我知道了 web 应用是如何工作的,而且这比我想的要容易的多!

范围

框架可以处理请求-响应周期、身份认证、数据库访问、模板生成等部分工作。Web 开发者使用框架是因为,大多数的 web 应用拥有大量相同的功能,而对每个项目都重新实现同样的功能意义不大。

比较大的的框架如 Rails 和 Django 实现了高层次的抽象,或者说“自备电池”(“batteries-included”,这是 Python 的口号之一,意即所有功能都自足。)。而实现所有的这些功能可能要花费数千小时,因此在这个项目上,我们重点完成其中的一小部分。在开始写代码前,我先列举一下所需的功能以及限制。

功能:

  • 处理 HTTP 的 GET 和 POST 请求。你可以在这篇 wiki 中对 HTTP 有个大致的了解。
  • 实现异步操作(我喜欢 Python 3 的 asyncio 模块)。
  • 简单的路由逻辑以及参数撷取。
  • 像其他微型框架一样,提供一个简单的用户级 API 。
  • 支持身份认证,因为学会这个很酷啊(微笑)。

限制:

  • 将只支持 HTTP 1.1 的一个小子集,不支持 传输编码 transfer-encoding HTTP 认证 http-auth 内容编码 content-encoding (如 gzip)以及持久化连接等功能。
  • 不支持对响应内容的 MIME 判断 - 用户需要手动指定。
  • 不支持 WSGI - 仅能处理简单的 TCP 连接。
  • 不支持数据库。

我觉得一个小的用例可以让上述内容更加具体,也可以用来演示这个框架的 API:

from diy_framework import App, Router
from diy_framework.http_utils import Response


# GET simple route
async def home(r):
    rsp = Response()
    rsp.set_header('Content-Type', 'text/html')
    rsp.body = '<html><body><b>test</b></body></html>'
    return rsp


# GET route + params
async def welcome(r, name):
    return "Welcome {}".format(name)

# POST route + body param
async def parse_form(r):
    if r.method == 'GET':
        return 'form'
    else:
        name = r.body.get('name', '')[0]
        password = r.body.get('password', '')[0]

       return "{0}:{1}".format(name, password)

# application = router + http server
router = Router()
router.add_routes({
    r'/welcome/{name}': welcome,
    r'/': home,
    r'/login': parse_form,})

app = App(router)
app.start_server()

' 用户需要定义一些能够返回字符串或 Response 对象的异步函数,然后将这些函数与表示路由的字符串配对,最后通过一个函数调用(start_server)开始处理请求。

完成设计之后,我将它抽象为几个我需要编码的部分:

  • 接受 TCP 连接以及调度一个异步函数来处理这些连接的部分
  • 将原始文本解析成某种抽象容器的部分
  • 对于每个请求,用来决定调用哪个函数的部分
  • 将上述部分集中到一起,并为开发者提供一个简单接口的部分

我先编写一些测试,这些测试被用来描述每个部分的功能。几次重构后,整个设计被分成若干部分,每个部分之间是相对解耦的。这样就非常好,因为每个部分可以被独立地研究学习。以下是我上文列出的抽象的具体体现:

  • 一个 HTTPServer 对象,需要一个 Router 对象和一个 http\_parser 模块,并使用它们来初始化。
  • HTTPConnection 对象,每一个对象表示一个单独的客户端 HTTP 连接,并且处理其请求-响应周期:使用 http\_parser 模块将收到的字节流解析为一个 Request 对象;使用一个 Router 实例寻找并调用正确的函数来生成一个响应;最后将这个响应发送回客户端。
  • 一对 Request 和 Response 对象为用户提供了一种友好的方式,来处理实质上是字节流的字符串。用户不需要知道正确的消息格式和分隔符是怎样的。
  • 一个包含“路由:函数”对应关系的 Router 对象。它提供一个添加配对的方法,可以根据 URL 路径查找到相应的函数。
  • 最后,一个 App 对象。它包含配置信息,并使用它们实例化一个 HTTPServer 实例。

让我们从 HTTPConnection 开始来讲解各个部分。

模拟异步连接

为了满足上述约束条件,每一个 HTTP 请求都是一个单独的 TCP 连接。这使得处理请求的速度变慢了,因为建立多个 TCP 连接需要相对高的花销(DNS 查询,TCP 三次握手,慢启动等等的花销),不过这样更加容易模拟。对于这一任务,我选择相对高级的 asyncio-stream 模块,它建立在 asyncio 的传输和协议的基础之上。我强烈推荐你读一读标准库中的相应代码,很有意思!

一个 HTTPConnection 的实例能够处理多个任务。首先,它使用 asyncio.StreamReader 对象以增量的方式从 TCP 连接中读取数据,并存储在缓存中。每一个读取操作完成后,它会尝试解析缓存中的数据,并生成一个 Request 对象。一旦收到了这个完整的请求,它就生成一个回复,并通过 asyncio.StreamWriter 对象发送回客户端。当然,它还有两个任务:超时连接以及错误处理。

你可以在这里浏览这个类的完整代码。我将分别介绍代码的每一部分。为了简单起见,我移除了代码文档。

class HTTPConnection(object):
    def init(self, http_server, reader, writer):
        self.router = http_server.router
        self.http_parser = http_server.http_parser
        self.loop = http_server.loop

        self._reader = reader
        self._writer = writer
        self._buffer = bytearray()
        self._conn_timeout = None
        self.request = Request()

这个 init 方法没啥意思,它仅仅是收集了一些对象以供后面使用。它存储了一个 router 对象、一个 http_parser 对象以及 loop 对象,分别用来生成响应、解析请求以及在事件循环中调度任务。

然后,它存储了代表一个 TCP 连接的读写对,和一个充当原始字节缓冲区的空字节数组_conn_timeout 存储了一个 asyncio.Handle 的实例,用来管理超时逻辑。最后,它还存储了 Request 对象的一个单一实例。

下面的代码是用来接受和发送数据的核心功能:

async def handle_request(self):
    try:
        while not self.request.finished and not self._reader.at_eof():
            data = await self._reader.read(1024)
            if data:
                self._reset_conn_timeout()
                await self.process_data(data)
        if self.request.finished:
            await self.reply()
        elif self._reader.at_eof():
            raise BadRequestException()
    except (NotFoundException,
            BadRequestException) as e:
        self.error_reply(e.code, body=Response.reason_phrases[e.code])
    except Exception as e:
        self.error_reply(500, body=Response.reason_phrases[500])

    self.close_connection()

所有内容被包含在 try-except 代码块中,这样在解析请求或响应期间抛出的异常可以被捕获到,然后一个错误响应会发送回客户端。

while 循环中不断读取请求,直到解析器将 self.request.finished 设置为 True ,或者客户端关闭连接所触发的信号使得 self._reader_at_eof() 函数返回值为 True 为止。这段代码尝试在每次循环迭代中从 StreamReader 中读取数据,并通过调用 self.process_data(data) 函数以增量方式生成 self.request。每次循环读取数据时,连接超时计数器被重置。

这儿有个错误,你发现了吗?稍后我们会再讨论这个。需要注意的是,这个循环可能会耗尽 CPU 资源,因为如果没有读取到东西 self._reader.read() 函数将会返回一个空的字节对象 b''。这就意味着循环将会不断运行,却什么也不做。一个可能的解决方法是,用非阻塞的方式等待一小段时间:await asyncio.sleep(0.1)。我们暂且不对它做优化。

还记得上一段我提到的那个错误吗?只有从 StreamReader 读取数据时,self._reset_conn_timeout() 函数才会被调用。这就意味着,直到第一个字节到达时timeout 才被初始化。如果有一个客户端建立了与服务器的连接却不发送任何数据,那就永远不会超时。这可能被用来消耗系统资源,从而导致拒绝服务式攻击(DoS)。修复方法就是在 init 函数中调用 self._reset_conn_timeout() 函数。

当请求接受完成或连接中断时,程序将运行到 if-else 代码块。这部分代码会判断解析器收到完整的数据后是否完成了解析。如果是,好,生成一个回复并发送回客户端。如果不是,那么请求信息可能有错误,抛出一个异常!最后,我们调用 self.close_connection 执行清理工作。

解析请求的部分在 self.process_data 方法中。这个方法非常简短,也易于测试:

async def process_data(self, data):
    self._buffer.extend(data)

    self._buffer = self.http_parser.parse_into(
        self.request, self._buffer)

每一次调用都将数据累积到 self._buffer 中,然后试着用 self.http_parser 来解析已经收集的数据。这里需要指出的是,这段代码展示了一种称为依赖注入(Dependency Injection)的模式。如果你还记得 init 函数的话,应该知道我们传入了一个包含 http_parser 对象的 http_server 对象。在这个例子里,http_parser 对象是 diy_framework 包中的一个模块。不过它也可以是任何含有 parse_into 函数的类,这个 parse_into 函数接受一个 Request 对象以及字节数组作为参数。这很有用,原因有二:一是,这意味着这段代码更易扩展。如果有人想通过一个不同的解析器来使用 HTTPConnection,没问题,只需将它作为参数传入即可。二是,这使得测试更加容易,因为 http_parser 不是硬编码的,所以使用虚假数据或者 mock 对象来替代是很容易的。

下一段有趣的部分就是 reply 方法了:

async def reply(self):
    request = self.request
    handler = self.router.get_handler(request.path)

    response = await handler.handle(request)

    if not isinstance(response, Response):
        response = Response(code=200, body=response)

    self._writer.write(response.to_bytes())
    await self._writer.drain()

这里,一个 HTTPConnection 的实例使用了 HTTPServer 中的 router 对象来得到一个生成响应的对象。一个路由可以是任何一个拥有 get_handler 方法的对象,这个方法接收一个字符串作为参数,返回一个可调用的对象或者抛出 NotFoundException 异常。而这个可调用的对象被用来处理请求以及生成响应。处理程序由框架的使用者编写,如上文所说的那样,应该返回字符串或者 Response 对象。Response 对象提供了一个友好的接口,因此这个简单的 if 语句保证了无论处理程序返回什么,代码最终都得到一个统一的 Response 对象。

接下来,被赋值给 self._writerStreamWriter 实例被调用,将字节字符串发送回客户端。函数返回前,程序在 await self._writer.drain() 处等待,以确保所有的数据被发送给客户端。只要缓存中还有未发送的数据,self._writer.close() 方法就不会执行。

HTTPConnection 类还有两个更加有趣的部分:一个用于关闭连接的方法,以及一组用来处理超时机制的方法。首先,关闭一条连接由下面这个小函数完成:

def close_connection(self):
    self._cancel_conn_timeout()
    self._writer.close()

每当一条连接将被关闭时,这段代码首先取消超时,然后把连接从事件循环中清除。

超时机制由三个相关的函数组成:第一个函数在超时后给客户端发送错误消息并关闭连接;第二个函数用于取消当前的超时;第三个函数调度超时功能。前两个函数比较简单,我将详细解释第三个函数 _reset_cpmm_timeout()

def _conn_timeout_close(self):
    self.error_reply(500, 'timeout')
    self.close_connection()

def _cancel_conn_timeout(self):
    if self._conn_timeout:
        self._conn_timeout.cancel()

def _reset_conn_timeout(self, timeout=TIMEOUT):
    self._cancel_conn_timeout()
    self._conn_timeout = self.loop.call_later(
        timeout, self._conn_timeout_close)

每当 _reset_conn_timeout 函数被调用时,它会先取消之前所有赋值给 self._conn_timeoutasyncio.Handle 对象。然后,使用 BaseEventLoop.call\_later 函数让 _conn_timeout_close 函数在超时数秒(timeout)后执行。如果你还记得 handle_request 函数的内容,就知道每当接收到数据时,这个函数就会被调用。这就取消了当前的超时并且重新安排 _conn_timeout_close 函数在超时数秒(timeout)后执行。只要接收到数据,这个循环就会不断地重置超时回调。如果在超时时间内没有接收到数据,最后函数 _conn_timeout_close 就会被调用。

创建连接

我们需要创建 HTTPConnection 对象,并且正确地使用它们。这一任务由 HTTPServer 类完成。HTTPServer 类是一个简单的容器,可以存储着一些配置信息(解析器,路由和事件循环实例),并使用这些配置来创建 HTTPConnection 实例:

class HTTPServer(object):
    def init(self, router, http_parser, loop):
        self.router = router
        self.http_parser = http_parser
        self.loop = loop

    async def handle_connection(self, reader, writer):
        connection = HTTPConnection(self, reader, writer)
        asyncio.ensure_future(connection.handle_request(), loop=self.loop)

HTTPServer 的每一个实例能够监听一个端口。它有一个 handle_connection 的异步方法来创建 HTTPConnection 的实例,并安排它们在事件循环中运行。这个方法被传递给 asyncio.start\_server 作为一个回调函数。也就是说,每当一个 TCP 连接初始化时(以 StreamReaderStreamWriter 为参数),它就会被调用。

   self._server = HTTPServer(self.router, self.http_parser, self.loop)
   self._connection_handler = asyncio.start_server(
        self._server.handle_connection,
        host=self.host,
        port=self.port,
        reuse_address=True,
        reuse_port=True,
        loop=self.loop)

这就是构成整个应用程序工作原理的核心:asyncio.start_server 接受 TCP 连接,然后在一个预配置的 HTTPServer 对象上调用一个方法。这个方法将处理一条 TCP 连接的所有逻辑:读取、解析、生成响应并发送回客户端、以及关闭连接。它的重点是 IO 逻辑、解析和生成响应。

讲解了核心的 IO 部分,让我们继续。

解析请求

这个微型框架的使用者被宠坏了,不愿意和字节打交道。它们想要一个更高层次的抽象 —— 一种更加简单的方法来处理请求。这个微型框架就包含了一个简单的 HTTP 解析器,能够将字节流转化为 Request 对象。

这些 Request 对象是像这样的容器:

class Request(object):
    def init(self):
        self.method = None
        self.path = None
        self.query_params = {}
        self.path_params = {}
        self.headers = {}
        self.body = None
        self.body_raw = None
        self.finished = False

它包含了所有需要的数据,可以用一种容易理解的方法从客户端接受数据。哦,不包括 cookie ,它对身份认证是非常重要的,我会将它留在第二部分。

每一个 HTTP 请求都包含了一些必需的内容,如请求路径和请求方法。它们也包含了一些可选的内容,如请求体、请求头,或是 URL 参数。随着 REST 的流行,除了 URL 参数,URL 本身会包含一些信息。比如,"/user/1/edit" 包含了用户的 id 。

一个请求的每个部分都必须被识别、解析,并正确地赋值给 Request 对象的对应属性。HTTP/1.1 是一个文本协议,事实上这简化了很多东西。(HTTP/2 是一个二进制协议,这又是另一种乐趣了)

解析器不需要跟踪状态,因此 http_parser 模块其实就是一组函数。调用函数需要用到 Request 对象,并将它连同一个包含原始请求信息的字节数组传递给 parse_into 函数。然后解析器会修改 Request 对象以及充当缓存的字节数组。字节数组的信息被逐渐地解析到 request 对象中。

http_parser 模块的核心功能就是下面这个 parse_into 函数:

def parse_into(request, buffer):
    _buffer = buffer[:]
    if not request.method and can_parse_request_line(_buffer):
        (request.method, request.path,
         request.query_params) = parse_request_line(_buffer)
        remove_request_line(_buffer)

    if not request.headers and can_parse_headers(_buffer):
        request.headers = parse_headers(_buffer)
        if not has_body(request.headers):
            request.finished = True

        remove_intro(_buffer)

    if not request.finished and can_parse_body(request.headers, _buffer):
        request.body_raw, request.body = parse_body(request.headers, _buffer)
        clear_buffer(_buffer)
        request.finished = True
    return _buffer

从上面的代码中可以看到,我把解析的过程分为三个部分:解析请求行(这行像这样:GET /resource HTTP/1.1),解析请求头以及解析请求体。

请求行包含了 HTTP 请求方法以及 URL 地址。而 URL 地址则包含了更多的信息:路径、url 参数和开发者自定义的 url 参数。解析请求方法和 URL 还是很容易的 - 合适地分割字符串就好了。函数 urlparse.parse 可以用来解析 URL 参数。开发者自定义的 URL 参数可以通过正则表达式来解析。

接下来是 HTTP 头部。它们是一行行由键值对组成的简单文本。问题在于,可能有多个 HTTP 头有相同的名字,却有不同的值。一个值得关注的 HTTP 头部是 Content-Length,它描述了请求体的字节长度(不是整个请求,仅仅是请求体)。这对于决定是否解析请求体有很重要的作用。

最后,解析器根据 HTTP 方法和头部来决定是否解析请求体。

路由!

在某种意义上,路由就像是连接框架和用户的桥梁,用户用合适的方法创建 Router 对象并为其设置路径/函数对,然后将它赋值给 App 对象。而 App 对象依次调用 get_handler 函数生成相应的回调函数。简单来说,路由就负责两件事,一是存储路径/函数对,二是返回需要的路径/函数对

Router 类中有两个允许最终开发者添加路由的方法,分别是 add_routesadd_route。因为 add_routes 就是 add_route 函数的一层封装,我们将主要讲解 add_route 函数:

def add_route(self, path, handler):
    compiled_route = self.class.build_route_regexp(path)
    if compiled_route not in self.routes:
        self.routes[compiled_route] = handler
    else:
        raise DuplicateRoute

首先,这个函数使用 Router.build_router_regexp 的类方法,将一条路由规则(如 '/cars/{id}' 这样的字符串),“编译”到一个已编译的正则表达式对象。这些已编译的正则表达式用来匹配请求路径,以及解析开发者自定义的 URL 参数。如果已经存在一个相同的路由,程序就会抛出一个异常。最后,这个路由/处理程序对被添加到一个简单的字典self.routes中。

下面展示 Router 是如何“编译”路由的:

@classmethod
def build_route_regexp(cls, regexp_str):
    """
    Turns a string into a compiled regular expression. Parses '{}' into
    named groups ie. '/path/{variable}' is turned into
    '/path/(?P<variable>[a-zA-Z0-9_-]+)'.

    :param regexp_str: a string representing a URL path.
    :return: a compiled regular expression.
    """
    def named_groups(matchobj):
        return '(?P<{0}>[a-zA-Z0-9_-]+)'.format(matchobj.group(1))

    re_str = re.sub(r'{([a-zA-Z0-9_-]+)}', named_groups, regexp_str)
    re_str = ''.join(('^', re_str, '$',))
    return re.compile(re_str)

这个方法使用正则表达式将所有出现的 {variable} 替换为 (?P<variable>)。然后在字符串头尾分别添加 ^$ 标记,最后编译正则表达式对象。

完成了路由存储仅成功了一半,下面是如何得到路由对应的函数:

def get_handler(self, path):
    logger.debug('Getting handler for: {0}'.format(path))
    for route, handler in self.routes.items():
        path_params = self.class.match_path(route, path)
        if path_params is not None:
            logger.debug('Got handler for: {0}'.format(path))
            wrapped_handler = HandlerWrapper(handler, path_params)
            return wrapped_handler

    raise NotFoundException()

一旦 App 对象获得一个 Request 对象,也就获得了 URL 的路径部分(如 /users/15/edit)。然后,我们需要匹配函数来生成一个响应或者 404 错误。get_handler 函数将路径作为参数,循环遍历路由,对每条路由调用 Router.match_path 类方法检查是否有已编译的正则对象与这个请求路径匹配。如果存在,我们就调用 HandleWrapper 来包装路由对应的函数。path_params 字典包含了路径变量(如 '/users/15/edit' 中的 '15'),若路由没有指定变量,字典就为空。最后,我们将包装好的函数返回给 App 对象。

如果遍历了所有的路由都找不到与路径匹配的,函数就会抛出 NotFoundException 异常。

这个 Route.match 类方法挺简单:

def match_path(cls, route, path):
    match = route.match(path)
    try:
        return match.groupdict()
    except AttributeError:
        return None

它使用正则对象的 match 方法来检查路由是否与路径匹配。若果不匹配,则返回 None 。

最后,我们有 HandleWraapper 类。它的唯一任务就是封装一个异步函数,存储 path_params 字典,并通过 handle 方法对外提供一个统一的接口。

class HandlerWrapper(object):
    def init(self, handler, path_params):
        self.handler = handler
        self.path_params = path_params
        self.request = None

    async def handle(self, request):
        return await self.handler(request, **self.path_params)

组合到一起

框架的最后部分就是用 App 类把所有的部分联系起来。

App 类用于集中所有的配置细节。一个 App 对象通过其 start_server 方法,使用一些配置数据创建一个 HTTPServer 的实例,然后将它传递给 asyncio.start\_server 函数asyncio.start_server 函数会对每一个 TCP 连接调用 HTTPServer 对象的 handle_connection 方法。

def start_server(self):
    if not self._server:
        self.loop = asyncio.get_event_loop()
        self._server = HTTPServer(self.router, self.http_parser, self.loop)
        self._connection_handler = asyncio.start_server(
            self._server.handle_connection,
            host=self.host,
            port=self.port,
            reuse_address=True,
            reuse_port=True,
            loop=self.loop)

        logger.info('Starting server on {0}:{1}'.format(
            self.host, self.port))
        self.loop.run_until_complete(self._connection_handler)

        try:
            self.loop.run_forever()
        except KeyboardInterrupt:
            logger.info('Got signal, killing server')
        except DiyFrameworkException as e:
            logger.error('Critical framework failure:')
            logger.error(e.traceback)
        finally:
            self.loop.close()
    else:
        logger.info('Server already started - {0}'.format(self))

总结

如果你查看源码,就会发现所有的代码仅 320 余行(包括测试代码的话共 540 余行)。这么少的代码实现了这么多的功能,让我有点惊讶。这个框架没有提供模板、身份认证以及数据库访问等功能(这些内容也很有趣哦)。这也让我知道,像 Django 和 Tornado 这样的框架是如何工作的,而且我能够快速地调试它们了。

这也是我按照测试驱动开发完成的第一个项目,整个过程有趣而有意义。先编写测试用例迫使我思考设计和架构,而不仅仅是把代码放到一起,让它们可以运行。不要误解我的意思,有很多时候,后者的方式更好。不过如果你想给确保这些不怎么维护的代码在之后的几周甚至几个月依然工作,那么测试驱动开发正是你需要的。

我研究了下整洁架构以及依赖注入模式,这些充分体现在 Router 类是如何作为一个更高层次的抽象的(实体?)。Router 类是比较接近核心的,像 http_parserApp 的内容比较边缘化,因为它们只是完成了极小的字符串和字节流、或是中层 IO 的工作。测试驱动开发(TDD)迫使我独立思考每个小部分,这使我问自己这样的问题:方法调用的组合是否易于理解?类名是否准确地反映了我正在解决的问题?我的代码中是否很容易区分出不同的抽象层?

来吧,写个小框架,真的很有趣:)


via: http://mattscodecave.com/posts/simple-python-framework-from-scratch.html

作者:Matt 译者:Cathon 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

(题图来自:es-static.us

Allison 是 Dropbox 的工程师,在那里她维护着这个世界上最大的 Python 客户端网络之一。在去 Dropbox 之前,她是 Recurse Center 的协调人, 是这个位于纽约的程序员深造机构的作者。她在北美的 PyCon 做过关于 Python 内部机制的演讲,并且她喜欢研究奇怪的 bug。她的博客地址是 akaptur.com

介绍

Byterun 是一个用 Python 实现的 Python 解释器。随着我对 Byterun 的开发,我惊喜地的发现,这个 Python 解释器的基础结构用 500 行代码就能实现。在这一章我们会搞清楚这个解释器的结构,给你足够探索下去的背景知识。我们的目标不是向你展示解释器的每个细节---像编程和计算机科学其他有趣的领域一样,你可能会投入几年的时间去深入了解这个主题。

Byterun 是 Ned Batchelder 和我完成的,建立在 Paul Swartz 的工作之上。它的结构和主要的 Python 实现(CPython)差不多,所以理解 Byterun 会帮助你理解大多数解释器,特别是 CPython 解释器。(如果你不知道你用的是什么 Python,那么很可能它就是 CPython)。尽管 Byterun 很小,但它能执行大多数简单的 Python 程序(这一章是基于 Python 3.5 及其之前版本生成的字节码的,在 Python 3.6 中生成的字节码有一些改变)。

Python 解释器

在开始之前,让我们限定一下“Pyhton 解释器”的意思。在讨论 Python 的时候,“解释器”这个词可以用在很多不同的地方。有的时候解释器指的是 Python REPL,即当你在命令行下敲下 python 时所得到的交互式环境。有时候人们会或多或少的互换使用 “Python 解释器”和“Python”来说明从头到尾执行 Python 代码的这一过程。在本章中,“解释器”有一个更精确的意思:Python 程序的执行过程中的最后一步。

在解释器接手之前,Python 会执行其他 3 个步骤:词法分析,语法解析和编译。这三步合起来把源代码转换成 代码对象 code object ,它包含着解释器可以理解的指令。而解释器的工作就是解释代码对象中的指令。

你可能很奇怪执行 Python 代码会有编译这一步。Python 通常被称为解释型语言,就像 Ruby,Perl 一样,它们和像 C,Rust 这样的编译型语言相对。然而,这个术语并不是它看起来的那样精确。大多数解释型语言包括 Python 在内,确实会有编译这一步。而 Python 被称为解释型的原因是相对于编译型语言,它在编译这一步的工作相对较少(解释器做相对多的工作)。在这章后面你会看到,Python 的编译器比 C 语言编译器需要更少的关于程序行为的信息。

Python 的 Python 解释器

Byterun 是一个用 Python 写的 Python 解释器,这点可能让你感到奇怪,但没有比用 C 语言写 C 语言编译器更奇怪的了。(事实上,广泛使用的 gcc 编译器就是用 C 语言本身写的)你可以用几乎任何语言写一个 Python 解释器。

用 Python 写 Python 既有优点又有缺点。最大的缺点就是速度:用 Byterun 执行代码要比用 CPython 执行慢的多,CPython 解释器是用 C 语言实现的,并做了认真优化。然而 Byterun 是为了学习而设计的,所以速度对我们不重要。使用 Python 最大优势是我们可以仅仅实现解释器,而不用担心 Python 运行时部分,特别是对象系统。比如当 Byterun 需要创建一个类时,它就会回退到“真正”的 Python。另外一个优势是 Byterun 很容易理解,部分原因是它是用人们很容易理解的高级语言写的(Python !)(另外我们不会对解释器做优化 —— 再一次,清晰和简单比速度更重要)

构建一个解释器

在我们考察 Byterun 代码之前,我们需要从高层次对解释器结构有一些了解。Python 解释器是如何工作的?

Python 解释器是一个 虚拟机 virtual machine ,是一个模拟真实计算机的软件。我们这个虚拟机是 栈机器 stack machine ,它用几个栈来完成操作(与之相对的是 寄存器机器 register machine ,它从特定的内存地址读写数据)。

Python 解释器是一个 字节码解释器 bytecode interpreter :它的输入是一些称作 字节码 bytecode 的指令集。当你写 Python 代码时,词法分析器、语法解析器和编译器会生成 代码对象 code object 让解释器去操作。每个代码对象都包含一个要被执行的指令集 —— 它就是字节码 —— 以及还有一些解释器需要的信息。字节码是 Python 代码的一个 中间层表示 intermediate representation :它以一种解释器可以理解的方式来表示源代码。这和汇编语言作为 C 语言和机器语言的中间表示很类似。

微型解释器

为了让说明更具体,让我们从一个非常小的解释器开始。它只能计算两个数的和,只能理解三个指令。它执行的所有代码只是这三个指令的不同组合。下面就是这三个指令:

  • LOAD_VALUE
  • ADD_TWO_VALUES
  • PRINT_ANSWER

我们不关心词法、语法和编译,所以我们也不在乎这些指令集是如何产生的。你可以想象,当你写下 7 + 5,然后一个编译器为你生成那三个指令的组合。如果你有一个合适的编译器,你甚至可以用 Lisp 的语法来写,只要它能生成相同的指令。

假设

7 + 5

生成这样的指令集:

what_to_execute = {
    "instructions": [("LOAD_VALUE", 0),  # the first number
                     ("LOAD_VALUE", 1),  # the second number
                     ("ADD_TWO_VALUES", None),
                     ("PRINT_ANSWER", None)],
    "numbers": [7, 5] }

Python 解释器是一个 栈机器 stack machine ,所以它必须通过操作栈来完成这个加法(见下图)。解释器先执行第一条指令,LOAD_VALUE,把第一个数压到栈中。接着它把第二个数也压到栈中。然后,第三条指令,ADD_TWO_VALUES,先把两个数从栈中弹出,加起来,再把结果压入栈中。最后一步,把结果弹出并输出。

栈机器

LOAD_VALUE这条指令告诉解释器把一个数压入栈中,但指令本身并没有指明这个数是多少。指令需要一个额外的信息告诉解释器去哪里找到这个数。所以我们的指令集有两个部分:指令本身和一个常量列表。(在 Python 中,字节码就是我们所称的“指令”,而解释器“执行”的是代码对象。)

为什么不把数字直接嵌入指令之中?想象一下,如果我们加的不是数字,而是字符串。我们可不想把字符串这样的东西加到指令中,因为它可以有任意的长度。另外,我们这种设计也意味着我们只需要对象的一份拷贝,比如这个加法 7 + 7, 现在常量表 "numbers"只需包含一个[7]

你可能会想为什么会需要除了ADD_TWO_VALUES之外的指令。的确,对于我们两个数加法,这个例子是有点人为制作的意思。然而,这个指令却是建造更复杂程序的轮子。比如,就我们目前定义的三个指令,只要给出正确的指令组合,我们可以做三个数的加法,或者任意个数的加法。同时,栈提供了一个清晰的方法去跟踪解释器的状态,这为我们增长的复杂性提供了支持。

现在让我们来完成我们的解释器。解释器对象需要一个栈,它可以用一个列表来表示。它还需要一个方法来描述怎样执行每条指令。比如,LOAD_VALUE会把一个值压入栈中。

class Interpreter:
    def __init__(self):
        self.stack = []

    def LOAD_VALUE(self, number):
        self.stack.append(number)

    def PRINT_ANSWER(self):
        answer = self.stack.pop()
        print(answer)

    def ADD_TWO_VALUES(self):
        first_num = self.stack.pop()
        second_num = self.stack.pop()
        total = first_num + second_num
        self.stack.append(total)

这三个方法完成了解释器所理解的三条指令。但解释器还需要一样东西:一个能把所有东西结合在一起并执行的方法。这个方法就叫做 run_code,它把我们前面定义的字典结构 what-to-execute 作为参数,循环执行里面的每条指令,如果指令有参数就处理参数,然后调用解释器对象中相应的方法。

    def run_code(self, what_to_execute):
        instructions = what_to_execute["instructions"]
        numbers = what_to_execute["numbers"]
        for each_step in instructions:
            instruction, argument = each_step
            if instruction == "LOAD_VALUE":
                number = numbers[argument]
                self.LOAD_VALUE(number)
            elif instruction == "ADD_TWO_VALUES":
                self.ADD_TWO_VALUES()
            elif instruction == "PRINT_ANSWER":
                self.PRINT_ANSWER()

为了测试,我们创建一个解释器对象,然后用前面定义的 7 + 5 的指令集来调用 run_code

    interpreter = Interpreter()
    interpreter.run_code(what_to_execute)

显然,它会输出 12。

尽管我们的解释器功能十分受限,但这个过程几乎和真正的 Python 解释器处理加法是一样的。这里,我们还有几点要注意。

首先,一些指令需要参数。在真正的 Python 字节码当中,大概有一半的指令有参数。像我们的例子一样,参数和指令打包在一起。注意指令的参数和传递给对应方法的参数是不同的。

第二,指令ADD_TWO_VALUES不需要任何参数,它从解释器栈中弹出所需的值。这正是以基于栈的解释器的特点。

记得我们说过只要给出合适的指令集,不需要对解释器做任何改变,我们就能做多个数的加法。考虑下面的指令集,你觉得会发生什么?如果你有一个合适的编译器,什么代码才能编译出下面的指令集?

    what_to_execute = {
        "instructions": [("LOAD_VALUE", 0),
                         ("LOAD_VALUE", 1),
                         ("ADD_TWO_VALUES", None),
                         ("LOAD_VALUE", 2),
                         ("ADD_TWO_VALUES", None),
                         ("PRINT_ANSWER", None)],
        "numbers": [7, 5, 8] }

从这点出发,我们开始看到这种结构的可扩展性:我们可以通过向解释器对象增加方法来描述更多的操作(只要有一个编译器能为我们生成组织良好的指令集就行)。

变量

接下来给我们的解释器增加变量的支持。我们需要一个保存变量值的指令 STORE_NAME;一个取变量值的指令LOAD_NAME;和一个变量到值的映射关系。目前,我们会忽略命名空间和作用域,所以我们可以把变量和值的映射直接存储在解释器对象中。最后,我们要保证what_to_execute除了一个常量列表,还要有个变量名字的列表。

>>> def s():
...     a = 1
...     b = 2
...     print(a + b)
# a friendly compiler transforms `s` into:
    what_to_execute = {
        "instructions": [("LOAD_VALUE", 0),
                         ("STORE_NAME", 0),
                         ("LOAD_VALUE", 1),
                         ("STORE_NAME", 1),
                         ("LOAD_NAME", 0),
                         ("LOAD_NAME", 1),
                         ("ADD_TWO_VALUES", None),
                         ("PRINT_ANSWER", None)],
        "numbers": [1, 2],
        "names":   ["a", "b"] }

我们的新的实现在下面。为了跟踪哪个名字绑定到哪个值,我们在__init__方法中增加一个environment字典。我们也增加了STORE_NAMELOAD_NAME方法,它们获得变量名,然后从environment字典中设置或取出这个变量值。

现在指令的参数就有两个不同的意思,它可能是numbers列表的索引,也可能是names列表的索引。解释器通过检查所执行的指令就能知道是那种参数。而我们打破这种逻辑 ,把指令和它所用何种参数的映射关系放在另一个单独的方法中。

class Interpreter:
    def __init__(self):
        self.stack = []
        self.environment = {}

    def STORE_NAME(self, name):
        val = self.stack.pop()
        self.environment[name] = val

    def LOAD_NAME(self, name):
        val = self.environment[name]
        self.stack.append(val)

    def parse_argument(self, instruction, argument, what_to_execute):
        """ Understand what the argument to each instruction means."""
        numbers = ["LOAD_VALUE"]
        names = ["LOAD_NAME", "STORE_NAME"]

        if instruction in numbers:
            argument = what_to_execute["numbers"][argument]
        elif instruction in names:
            argument = what_to_execute["names"][argument]

        return argument

    def run_code(self, what_to_execute):
        instructions = what_to_execute["instructions"]
        for each_step in instructions:
            instruction, argument = each_step
            argument = self.parse_argument(instruction, argument, what_to_execute)

            if instruction == "LOAD_VALUE":
                self.LOAD_VALUE(argument)
            elif instruction == "ADD_TWO_VALUES":
                self.ADD_TWO_VALUES()
            elif instruction == "PRINT_ANSWER":
                self.PRINT_ANSWER()
            elif instruction == "STORE_NAME":
                self.STORE_NAME(argument)
            elif instruction == "LOAD_NAME":
                self.LOAD_NAME(argument)

仅仅五个指令,run_code这个方法已经开始变得冗长了。如果保持这种结构,那么每条指令都需要一个if分支。这里,我们要利用 Python 的动态方法查找。我们总会给一个称为FOO的指令定义一个名为FOO的方法,这样我们就可用 Python 的getattr函数在运行时动态查找方法,而不用这个大大的分支结构。run_code方法现在是这样:

    def execute(self, what_to_execute):
        instructions = what_to_execute["instructions"]
        for each_step in instructions:
            instruction, argument = each_step
            argument = self.parse_argument(instruction, argument, what_to_execute)
            bytecode_method = getattr(self, instruction)
            if argument is None:
                bytecode_method()
            else:
                bytecode_method(argument)

真实的 Python 字节码

现在,放弃我们的小指令集,去看看真正的 Python 字节码。字节码的结构和我们的小解释器的指令集差不多,除了字节码用一个字节而不是一个名字来代表这条指令。为了理解它的结构,我们将考察一个函数的字节码。考虑下面这个例子:

>>> def cond():
...     x = 3
...     if x < 5:
...         return 'yes'
...     else:
...         return 'no'
...

Python 在运行时会暴露一大批内部信息,并且我们可以通过 REPL 直接访问这些信息。对于函数对象condcond.__code__是与其关联的代码对象,而cond.__code__.co_code就是它的字节码。当你写 Python 代码时,你永远也不会想直接使用这些属性,但是这可以让我们做出各种恶作剧,同时也可以看看内部机制。

>>> cond.__code__.co_code  # the bytecode as raw bytes
b'd\x01\x00}\x00\x00|\x00\x00d\x02\x00k\x00\x00r\x16\x00d\x03\x00Sd\x04\x00Sd\x00
   \x00S'
>>> list(cond.__code__.co_code)  # the bytecode as numbers
[100, 1, 0, 125, 0, 0, 124, 0, 0, 100, 2, 0, 107, 0, 0, 114, 22, 0, 100, 3, 0, 83, 
 100, 4, 0, 83, 100, 0, 0, 83]

当我们直接输出这个字节码,它看起来完全无法理解 —— 唯一我们了解的是它是一串字节。很幸运,我们有一个很强大的工具可以用:Python 标准库中的dis模块。

dis是一个字节码反汇编器。反汇编器以为机器而写的底层代码作为输入,比如汇编代码和字节码,然后以人类可读的方式输出。当我们运行dis.dis,它输出每个字节码的解释。

>>> dis.dis(cond)
  2           0 LOAD_CONST               1 (3)
              3 STORE_FAST               0 (x)

  3           6 LOAD_FAST                0 (x)
              9 LOAD_CONST               2 (5)
             12 COMPARE_OP               0 (<)
             15 POP_JUMP_IF_FALSE       22

  4          18 LOAD_CONST               3 ('yes')
             21 RETURN_VALUE

  6     >>   22 LOAD_CONST               4 ('no')
             25 RETURN_VALUE
             26 LOAD_CONST               0 (None)
             29 RETURN_VALUE

这些都是什么意思?让我们以第一条指令LOAD_CONST为例子。第一列的数字(2)表示对应源代码的行数。第二列的数字是字节码的索引,告诉我们指令LOAD_CONST在位置 0 。第三列是指令本身对应的人类可读的名字。如果第四列存在,它表示指令的参数。如果第五列存在,它是一个关于参数是什么的提示。

考虑这个字节码的前几个字节:[100, 1, 0, 125, 0, 0]。这 6 个字节表示两条带参数的指令。我们可以使用dis.opname,一个字节到可读字符串的映射,来找到指令 100 和指令 125 代表的是什么:

>>> dis.opname[100]
'LOAD_CONST'
>>> dis.opname[125]
'STORE_FAST'

第二和第三个字节 —— 1 、0 ——是LOAD_CONST的参数,第五和第六个字节 —— 0、0 —— 是STORE_FAST的参数。就像我们前面的小例子,LOAD_CONST需要知道的到哪去找常量,STORE_FAST需要知道要存储的名字。(Python 的LOAD_CONST和我们小例子中的LOAD_VALUE一样,LOAD_FASTLOAD_NAME一样)。所以这六个字节代表第一行源代码x = 3 (为什么用两个字节表示指令的参数?如果 Python 使用一个字节,每个代码对象你只能有 256 个常量/名字,而用两个字节,就增加到了 256 的平方,65536个)。

条件语句与循环语句

到目前为止,我们的解释器只能一条接着一条的执行指令。这有个问题,我们经常会想多次执行某个指令,或者在特定的条件下跳过它们。为了可以写循环和分支结构,解释器必须能够在指令中跳转。在某种程度上,Python 在字节码中使用GOTO语句来处理循环和分支!让我们再看一个cond函数的反汇编结果:

>>> dis.dis(cond)
  2           0 LOAD_CONST               1 (3)
              3 STORE_FAST               0 (x)

  3           6 LOAD_FAST                0 (x)
              9 LOAD_CONST               2 (5)
             12 COMPARE_OP               0 (<)
             15 POP_JUMP_IF_FALSE       22

  4          18 LOAD_CONST               3 ('yes')
             21 RETURN_VALUE

  6     >>   22 LOAD_CONST               4 ('no')
             25 RETURN_VALUE
             26 LOAD_CONST               0 (None)
             29 RETURN_VALUE

第三行的条件表达式if x < 5被编译成四条指令:LOAD_FASTLOAD_CONSTCOMPARE_OPPOP_JUMP_IF_FALSEx < 5对应加载x、加载 5、比较这两个值。指令POP_JUMP_IF_FALSE完成这个if语句。这条指令把栈顶的值弹出,如果值为真,什么都不发生。如果值为假,解释器会跳转到另一条指令。

这条将被加载的指令称为跳转目标,它作为指令POP_JUMP的参数。这里,跳转目标是 22,索引为 22 的指令是LOAD_CONST,对应源码的第 6 行。(dis>>标记跳转目标。)如果X < 5为假,解释器会忽略第四行(return yes),直接跳转到第6行(return "no")。因此解释器通过跳转指令选择性的执行指令。

Python 的循环也依赖于跳转。在下面的字节码中,while x < 5这一行产生了和if x < 10几乎一样的字节码。在这两种情况下,解释器都是先执行比较,然后执行POP_JUMP_IF_FALSE来控制下一条执行哪个指令。第四行的最后一条字节码JUMP_ABSOLUT(循环体结束的地方),让解释器返回到循环开始的第 9 条指令处。当 x < 10变为假,POP_JUMP_IF_FALSE会让解释器跳到循环的终止处,第 34 条指令。

>>> def loop():
...      x = 1
...      while x < 5:
...          x = x + 1
...      return x
...
>>> dis.dis(loop)
  2           0 LOAD_CONST               1 (1)
              3 STORE_FAST               0 (x)

  3           6 SETUP_LOOP              26 (to 35)
        >>    9 LOAD_FAST                0 (x)
             12 LOAD_CONST               2 (5)
             15 COMPARE_OP               0 (<)
             18 POP_JUMP_IF_FALSE       34

  4          21 LOAD_FAST                0 (x)
             24 LOAD_CONST               1 (1)
             27 BINARY_ADD
             28 STORE_FAST               0 (x)
             31 JUMP_ABSOLUTE            9
        >>   34 POP_BLOCK

  5     >>   35 LOAD_FAST                0 (x)
             38 RETURN_VALUE

探索字节码

我希望你用dis.dis来试试你自己写的函数。一些有趣的问题值得探索:

  • 对解释器而言 for 循环和 while 循环有什么不同?
  • 能不能写出两个不同函数,却能产生相同的字节码?
  • elif是怎么工作的?列表推导呢?

到目前为止,我们已经知道了 Python 虚拟机是一个栈机器。它能顺序执行指令,在指令间跳转,压入或弹出栈值。但是这和我们期望的解释器还有一定距离。在前面的那个例子中,最后一条指令是RETURN_VALUE,它和return语句相对应。但是它返回到哪里去呢?

为了回答这个问题,我们必须再增加一层复杂性: frame 。一个帧是一些信息的集合和代码的执行上下文。帧在 Python 代码执行时动态地创建和销毁。每个帧对应函数的一次调用 —— 所以每个帧只有一个代码对象与之关联,而一个代码对象可以有多个帧。比如你有一个函数递归的调用自己 10 次,这会产生 11 个帧,每次调用对应一个,再加上启动模块对应的一个帧。总的来说,Python 程序的每个作用域都有一个帧,比如,模块、函数、类定义。

帧存在于 调用栈 call stack 中,一个和我们之前讨论的完全不同的栈。(你最熟悉的栈就是调用栈,就是你经常看到的异常回溯,每个以"File 'program.py'"开始的回溯对应一个帧。)解释器在执行字节码时操作的栈,我们叫它 数据栈 data stack 。其实还有第三个栈,叫做 块栈 block stack ,用于特定的控制流块,比如循环和异常处理。调用栈中的每个帧都有它自己的数据栈和块栈。

让我们用一个具体的例子来说明一下。假设 Python 解释器执行到下面标记为 3 的地方。解释器正处于foo函数的调用中,它接着调用bar。下面是帧调用栈、块栈和数据栈的示意图。我们感兴趣的是解释器先从最底下的foo()开始,接着执行foo的函数体,然后到达bar

>>> def bar(y):
...     z = y + 3     # <--- (3) ... and the interpreter is here.
...     return z
...
>>> def foo():
...     a = 1
...     b = 2
...     return a + bar(b) # <--- (2) ... which is returning a call to bar ...
...
>>> foo()             # <--- (1) We're in the middle of a call to foo ...
3

调用栈

现在,解释器处于bar函数的调用中。调用栈中有 3 个帧:一个对应于模块层,一个对应函数foo,另一个对应函数bar。(见上图)一旦bar返回,与它对应的帧就会从调用栈中弹出并丢弃。

字节码指令RETURN_VALUE告诉解释器在帧之间传递一个值。首先,它把位于调用栈栈顶的帧中的数据栈的栈顶值弹出。然后把整个帧弹出丢弃。最后把这个值压到下一个帧的数据栈中。

当 Ned Batchelder 和我在写 Byterun 时,很长一段时间我们的实现中一直有个重大的错误。我们整个虚拟机中只有一个数据栈,而不是每个帧都有一个。我们写了很多测试代码,同时在 Byterun 和真正的 Python 上运行,希望得到一致结果。我们几乎通过了所有测试,只有一样东西不能通过,那就是 生成器 generators 。最后,通过仔细的阅读 CPython 的源码,我们发现了错误所在(感谢 Michael Arntzenius 对这个 bug 的洞悉)。把数据栈移到每个帧就解决了这个问题。

回头在看看这个 bug,我惊讶的发现 Python 真的很少依赖于每个帧有一个数据栈这个特性。在 Python 中几乎所有的操作都会清空数据栈,所以所有的帧公用一个数据栈是没问题的。在上面的例子中,当bar执行完后,它的数据栈为空。即使foo公用这一个栈,它的值也不会受影响。然而,对应生成器,它的一个关键的特点是它能暂停一个帧的执行,返回到其他的帧,一段时间后它能返回到原来的帧,并以它离开时的相同状态继续执行。

Byterun

现在我们有足够的 Python 解释器的知识背景去考察 Byterun。

Byterun 中有四种对象。

  • VirtualMachine类,它管理高层结构,尤其是帧调用栈,并包含了指令到操作的映射。这是一个比前面Inteprter对象更复杂的版本。
  • Frame类,每个Frame类都有一个代码对象,并且管理着其他一些必要的状态位,尤其是全局和局部命名空间、指向调用它的整的指针和最后执行的字节码指令。
  • Function类,它被用来代替真正的 Python 函数。回想一下,调用函数时会创建一个新的帧。我们自己实现了Function,以便我们控制新的Frame的创建。
  • Block类,它只是包装了块的 3 个属性。(块的细节不是解释器的核心,我们不会花时间在它身上,把它列在这里,是因为 Byterun 需要它。)

VirtualMachine

每次程序运行时只会创建一个VirtualMachine实例,因为我们只有一个 Python 解释器。VirtualMachine 保存调用栈、异常状态、在帧之间传递的返回值。它的入口点是run_code方法,它以编译后的代码对象为参数,以创建一个帧为开始,然后运行这个帧。这个帧可能再创建出新的帧;调用栈随着程序的运行而增长和缩短。当第一个帧返回时,执行结束。

class VirtualMachineError(Exception):
    pass

class VirtualMachine(object):
    def __init__(self):
        self.frames = []   # The call stack of frames.
        self.frame = None  # The current frame.
        self.return_value = None
        self.last_exception = None

    def run_code(self, code, global_names=None, local_names=None):
        """ An entry point to execute code using the virtual machine."""
        frame = self.make_frame(code, global_names=global_names, 
                                local_names=local_names)
        self.run_frame(frame)

Frame

接下来,我们来写Frame对象。帧是一个属性的集合,它没有任何方法。前面提到过,这些属性包括由编译器生成的代码对象;局部、全局和内置命名空间;前一个帧的引用;一个数据栈;一个块栈;最后执行的指令指针。(对于内置命名空间我们需要多做一点工作,Python 在不同模块中对这个命名空间有不同的处理;但这个细节对我们的虚拟机不重要。)

class Frame(object):
    def __init__(self, code_obj, global_names, local_names, prev_frame):
        self.code_obj = code_obj
        self.global_names = global_names
        self.local_names = local_names
        self.prev_frame = prev_frame
        self.stack = []
        if prev_frame:
            self.builtin_names = prev_frame.builtin_names
        else:
            self.builtin_names = local_names['__builtins__']
            if hasattr(self.builtin_names, '__dict__'):
                self.builtin_names = self.builtin_names.__dict__

        self.last_instruction = 0
        self.block_stack = []

接着,我们在虚拟机中增加对帧的操作。这有 3 个帮助函数:一个创建新的帧的方法(它负责为新的帧找到名字空间),和压栈和出栈的方法。第四个函数,run_frame,完成执行帧的主要工作,待会我们再讨论这个方法。

class VirtualMachine(object):
    [... 删节 ...]

    # Frame manipulation
    def make_frame(self, code, callargs={}, global_names=None, local_names=None):
        if global_names is not None and local_names is not None:
            local_names = global_names
        elif self.frames:
            global_names = self.frame.global_names
            local_names = {}
        else:
            global_names = local_names = {
                '__builtins__': __builtins__,
                '__name__': '__main__',
                '__doc__': None,
                '__package__': None,
            }
        local_names.update(callargs)
        frame = Frame(code, global_names, local_names, self.frame)
        return frame

    def push_frame(self, frame):
        self.frames.append(frame)
        self.frame = frame

    def pop_frame(self):
        self.frames.pop()
        if self.frames:
            self.frame = self.frames[-1]
        else:
            self.frame = None

    def run_frame(self):
        pass
        # we'll come back to this shortly

Function

Function的实现有点曲折,但是大部分的细节对理解解释器不重要。重要的是当调用函数时 —— 即调用 __call__方法 —— 它创建一个新的Frame并运行它。

class Function(object):
    """
    Create a realistic function object, defining the things the interpreter expects.
    """
    __slots__ = [
        'func_code', 'func_name', 'func_defaults', 'func_globals',
        'func_locals', 'func_dict', 'func_closure',
        '__name__', '__dict__', '__doc__',
        '_vm', '_func',
    ]

    def __init__(self, name, code, globs, defaults, closure, vm):
        """You don't need to follow this closely to understand the interpreter."""
        self._vm = vm
        self.func_code = code
        self.func_name = self.__name__ = name or code.co_name
        self.func_defaults = tuple(defaults)
        self.func_globals = globs
        self.func_locals = self._vm.frame.f_locals
        self.__dict__ = {}
        self.func_closure = closure
        self.__doc__ = code.co_consts[0] if code.co_consts else None

        # Sometimes, we need a real Python function.  This is for that.
        kw = {
            'argdefs': self.func_defaults,
        }
        if closure:
            kw['closure'] = tuple(make_cell(0) for _ in closure)
        self._func = types.FunctionType(code, globs, **kw)

    def __call__(self, *args, **kwargs):
        """When calling a Function, make a new frame and run it."""
        callargs = inspect.getcallargs(self._func, *args, **kwargs)
        # Use callargs to provide a mapping of arguments: values to pass into the new 
        # frame.
        frame = self._vm.make_frame(
            self.func_code, callargs, self.func_globals, {}
        )
        return self._vm.run_frame(frame)

def make_cell(value):
    """Create a real Python closure and grab a cell."""
    # Thanks to Alex Gaynor for help with this bit of twistiness.
    fn = (lambda x: lambda: x)(value)
    return fn.__closure__[0]

接着,回到VirtualMachine对象,我们对数据栈的操作也增加一些帮助方法。字节码操作的栈总是在当前帧的数据栈。这些帮助函数让我们的POP_TOPLOAD_FAST以及其他操作栈的指令的实现可读性更高。

class VirtualMachine(object):
    [... 删节 ...]

    # Data stack manipulation
    def top(self):
        return self.frame.stack[-1]

    def pop(self):
        return self.frame.stack.pop()

    def push(self, *vals):
        self.frame.stack.extend(vals)

    def popn(self, n):
        """Pop a number of values from the value stack.
        A list of `n` values is returned, the deepest value first.
        """
        if n:
            ret = self.frame.stack[-n:]
            self.frame.stack[-n:] = []
            return ret
        else:
            return []

在我们运行帧之前,我们还需两个方法。

第一个方法,parse_byte_and_args 以一个字节码为输入,先检查它是否有参数,如果有,就解析它的参数。这个方法同时也更新帧的last_instruction属性,它指向最后执行的指令。一条没有参数的指令只有一个字节长度,而有参数的字节有3个字节长。参数的意义依赖于指令是什么。比如,前面说过,指令POP_JUMP_IF_FALSE,它的参数指的是跳转目标。BUILD_LIST,它的参数是列表的个数。LOAD_CONST,它的参数是常量的索引。

一些指令用简单的数字作为参数。对于另一些,虚拟机需要一点努力去发现它含意。标准库中的dis模块中有一个备忘单,它解释什么参数有什么意思,这让我们的代码更加简洁。比如,列表dis.hasname告诉我们LOAD_NAMEIMPORT_NAMELOAD_GLOBAL,以及另外的 9 个指令的参数都有同样的意义:对于这些指令,它们的参数代表了代码对象中的名字列表的索引。

class VirtualMachine(object):
    [... 删节 ...]

    def parse_byte_and_args(self):
        f = self.frame
        opoffset = f.last_instruction
        byteCode = f.code_obj.co_code[opoffset]
        f.last_instruction += 1
        byte_name = dis.opname[byteCode]
        if byteCode >= dis.HAVE_ARGUMENT:
            # index into the bytecode
            arg = f.code_obj.co_code[f.last_instruction:f.last_instruction+2]  
            f.last_instruction += 2   # advance the instruction pointer
            arg_val = arg[0] + (arg[1] * 256)
            if byteCode in dis.hasconst:   # Look up a constant
                arg = f.code_obj.co_consts[arg_val]
            elif byteCode in dis.hasname:  # Look up a name
                arg = f.code_obj.co_names[arg_val]
            elif byteCode in dis.haslocal: # Look up a local name
                arg = f.code_obj.co_varnames[arg_val]
            elif byteCode in dis.hasjrel:  # Calculate a relative jump
                arg = f.last_instruction + arg_val
            else:
                arg = arg_val
            argument = [arg]
        else:
            argument = []

        return byte_name, argument

下一个方法是dispatch,它查找给定的指令并执行相应的操作。在 CPython 中,这个分派函数用一个巨大的 switch 语句实现,有超过 1500 行的代码。幸运的是,我们用的是 Python,我们的代码会简洁的多。我们会为每一个字节码名字定义一个方法,然后用getattr来查找。就像我们前面的小解释器一样,如果一条指令叫做FOO_BAR,那么它对应的方法就是byte_FOO_BAR。现在,我们先把这些方法当做一个黑盒子。每个指令方法都会返回None或者一个字符串why,有些情况下虚拟机需要这个额外why信息。这些指令方法的返回值,仅作为解释器状态的内部指示,千万不要和执行帧的返回值相混淆。

class VirtualMachine(object):
    [... 删节 ...]

    def dispatch(self, byte_name, argument):
        """ Dispatch by bytename to the corresponding methods.
        Exceptions are caught and set on the virtual machine."""

        # When later unwinding the block stack,
        # we need to keep track of why we are doing it.
        why = None
        try:
            bytecode_fn = getattr(self, 'byte_%s' % byte_name, None)
            if bytecode_fn is None:
                if byte_name.startswith('UNARY_'):
                    self.unaryOperator(byte_name[6:])
                elif byte_name.startswith('BINARY_'):
                    self.binaryOperator(byte_name[7:])
                else:
                    raise VirtualMachineError(
                        "unsupported bytecode type: %s" % byte_name
                    )
            else:
                why = bytecode_fn(*argument)
        except:
            # deal with exceptions encountered while executing the op.
            self.last_exception = sys.exc_info()[:2] + (None,)
            why = 'exception'

        return why

    def run_frame(self, frame):
        """Run a frame until it returns (somehow).
        Exceptions are raised, the return value is returned.
        """
        self.push_frame(frame)
        while True:
            byte_name, arguments = self.parse_byte_and_args()

            why = self.dispatch(byte_name, arguments)

            # Deal with any block management we need to do
            while why and frame.block_stack:
                why = self.manage_block_stack(why)

            if why:
                break

        self.pop_frame()

        if why == 'exception':
            exc, val, tb = self.last_exception
            e = exc(val)
            e.__traceback__ = tb
            raise e

        return self.return_value

Block

在我们完成每个字节码方法前,我们简单的讨论一下块。一个块被用于某种控制流,特别是异常处理和循环。它负责保证当操作完成后数据栈处于正确的状态。比如,在一个循环中,一个特殊的迭代器会存在栈中,当循环完成时它从栈中弹出。解释器需要检查循环仍在继续还是已经停止。

为了跟踪这些额外的信息,解释器设置了一个标志来指示它的状态。我们用一个变量why实现这个标志,它可以是None或者是下面几个字符串之一:"continue""break""excption"return。它们指示对块栈和数据栈进行什么操作。回到我们迭代器的例子,如果块栈的栈顶是一个loop块,why的代码是continue,迭代器就应该保存在数据栈上,而如果whybreak,迭代器就会被弹出。

块操作的细节比这个还要繁琐,我们不会花时间在这上面,但是有兴趣的读者值得仔细的看看。

Block = collections.namedtuple("Block", "type, handler, stack_height")

class VirtualMachine(object):
    [... 删节 ...]

    # Block stack manipulation
    def push_block(self, b_type, handler=None):
        level = len(self.frame.stack)
        self.frame.block_stack.append(Block(b_type, handler, stack_height))

    def pop_block(self):
        return self.frame.block_stack.pop()

    def unwind_block(self, block):
        """Unwind the values on the data stack corresponding to a given block."""
        if block.type == 'except-handler':
            # The exception itself is on the stack as type, value, and traceback.
            offset = 3  
        else:
            offset = 0

        while len(self.frame.stack) > block.level + offset:
            self.pop()

        if block.type == 'except-handler':
            traceback, value, exctype = self.popn(3)
            self.last_exception = exctype, value, traceback

    def manage_block_stack(self, why):
        """ """
        frame = self.frame
        block = frame.block_stack[-1]
        if block.type == 'loop' and why == 'continue':
            self.jump(self.return_value)
            why = None
            return why

        self.pop_block()
        self.unwind_block(block)

        if block.type == 'loop' and why == 'break':
            why = None
            self.jump(block.handler)
            return why

        if (block.type in ['setup-except', 'finally'] and why == 'exception'):
            self.push_block('except-handler')
            exctype, value, tb = self.last_exception
            self.push(tb, value, exctype)
            self.push(tb, value, exctype) # yes, twice
            why = None
            self.jump(block.handler)
            return why

        elif block.type == 'finally':
            if why in ('return', 'continue'):
                self.push(self.return_value)

            self.push(why)

            why = None
            self.jump(block.handler)
            return why
        return why

指令

剩下了的就是完成那些指令方法了:byte_LOAD_FASTbyte_BINARY_MODULO等等。而这些指令的实现并不是很有趣,这里我们只展示了一小部分,完整的实现在 GitHub 上。(这里包括的指令足够执行我们前面所述的所有代码了。)

class VirtualMachine(object):
    [... 删节 ...]

    ## Stack manipulation

    def byte_LOAD_CONST(self, const):
        self.push(const)

    def byte_POP_TOP(self):
        self.pop()

    ## Names
    def byte_LOAD_NAME(self, name):
        frame = self.frame
        if name in frame.f_locals:
            val = frame.f_locals[name]
        elif name in frame.f_globals:
            val = frame.f_globals[name]
        elif name in frame.f_builtins:
            val = frame.f_builtins[name]
        else:
            raise NameError("name '%s' is not defined" % name)
        self.push(val)

    def byte_STORE_NAME(self, name):
        self.frame.f_locals[name] = self.pop()

    def byte_LOAD_FAST(self, name):
        if name in self.frame.f_locals:
            val = self.frame.f_locals[name]
        else:
            raise UnboundLocalError(
                "local variable '%s' referenced before assignment" % name
            )
        self.push(val)

    def byte_STORE_FAST(self, name):
        self.frame.f_locals[name] = self.pop()

    def byte_LOAD_GLOBAL(self, name):
        f = self.frame
        if name in f.f_globals:
            val = f.f_globals[name]
        elif name in f.f_builtins:
            val = f.f_builtins[name]
        else:
            raise NameError("global name '%s' is not defined" % name)
        self.push(val)

    ## Operators

    BINARY_OPERATORS = {
        'POWER':    pow,
        'MULTIPLY': operator.mul,
        'FLOOR_DIVIDE': operator.floordiv,
        'TRUE_DIVIDE':  operator.truediv,
        'MODULO':   operator.mod,
        'ADD':      operator.add,
        'SUBTRACT': operator.sub,
        'SUBSCR':   operator.getitem,
        'LSHIFT':   operator.lshift,
        'RSHIFT':   operator.rshift,
        'AND':      operator.and_,
        'XOR':      operator.xor,
        'OR':       operator.or_,
    }

    def binaryOperator(self, op):
        x, y = self.popn(2)
        self.push(self.BINARY_OPERATORS[op](x, y))

    COMPARE_OPERATORS = [
        operator.lt,
        operator.le,
        operator.eq,
        operator.ne,
        operator.gt,
        operator.ge,
        lambda x, y: x in y,
        lambda x, y: x not in y,
        lambda x, y: x is y,
        lambda x, y: x is not y,
        lambda x, y: issubclass(x, Exception) and issubclass(x, y),
    ]

    def byte_COMPARE_OP(self, opnum):
        x, y = self.popn(2)
        self.push(self.COMPARE_OPERATORS[opnum](x, y))

    ## Attributes and indexing

    def byte_LOAD_ATTR(self, attr):
        obj = self.pop()
        val = getattr(obj, attr)
        self.push(val)

    def byte_STORE_ATTR(self, name):
        val, obj = self.popn(2)
        setattr(obj, name, val)

    ## Building

    def byte_BUILD_LIST(self, count):
        elts = self.popn(count)
        self.push(elts)

    def byte_BUILD_MAP(self, size):
        self.push({})

    def byte_STORE_MAP(self):
        the_map, val, key = self.popn(3)
        the_map[key] = val
        self.push(the_map)

    def byte_LIST_APPEND(self, count):
        val = self.pop()
        the_list = self.frame.stack[-count] # peek
        the_list.append(val)

    ## Jumps

    def byte_JUMP_FORWARD(self, jump):
        self.jump(jump)

    def byte_JUMP_ABSOLUTE(self, jump):
        self.jump(jump)

    def byte_POP_JUMP_IF_TRUE(self, jump):
        val = self.pop()
        if val:
            self.jump(jump)

    def byte_POP_JUMP_IF_FALSE(self, jump):
        val = self.pop()
        if not val:
            self.jump(jump)

    ## Blocks

    def byte_SETUP_LOOP(self, dest):
        self.push_block('loop', dest)

    def byte_GET_ITER(self):
        self.push(iter(self.pop()))

    def byte_FOR_ITER(self, jump):
        iterobj = self.top()
        try:
            v = next(iterobj)
            self.push(v)
        except StopIteration:
            self.pop()
            self.jump(jump)

    def byte_BREAK_LOOP(self):
        return 'break'

    def byte_POP_BLOCK(self):
        self.pop_block()

    ## Functions

    def byte_MAKE_FUNCTION(self, argc):
        name = self.pop()
        code = self.pop()
        defaults = self.popn(argc)
        globs = self.frame.f_globals
        fn = Function(name, code, globs, defaults, None, self)
        self.push(fn)

    def byte_CALL_FUNCTION(self, arg):
        lenKw, lenPos = divmod(arg, 256) # KWargs not supported here
        posargs = self.popn(lenPos)

        func = self.pop()
        frame = self.frame
        retval = func(*posargs)
        self.push(retval)

    def byte_RETURN_VALUE(self):
        self.return_value = self.pop()
        return "return"

动态类型:编译器不知道它是什么

你可能听过 Python 是一种动态语言 —— 它是动态类型的。在我们建造解释器的过程中,已经透露出这样的信息。

动态的一个意思是很多工作是在运行时完成的。前面我们看到 Python 的编译器没有很多关于代码真正做什么的信息。举个例子,考虑下面这个简单的函数mod。它取两个参数,返回它们的模运算值。从它的字节码中,我们看到变量ab首先被加载,然后字节码BINAY_MODULO完成这个模运算。

>>> def mod(a, b):
...    return a % b
>>> dis.dis(mod)
  2           0 LOAD_FAST                0 (a)
              3 LOAD_FAST                1 (b)
              6 BINARY_MODULO
              7 RETURN_VALUE
>>> mod(19, 5)
4

计算 19 % 5 得4,—— 一点也不奇怪。如果我们用不同类的参数呢?

>>> mod("by%sde", "teco")
'bytecode'

刚才发生了什么?你可能在其它地方见过这样的语法,格式化字符串。

>>> print("by%sde" % "teco")
bytecode

用符号%去格式化字符串会调用字节码BUNARY_MODULO。它取栈顶的两个值求模,不管这两个值是字符串、数字或是你自己定义的类的实例。字节码在函数编译时生成(或者说,函数定义时)相同的字节码会用于不同类的参数。

Python 的编译器关于字节码的功能知道的很少,而取决于解释器来决定BINAYR_MODULO应用于什么类型的对象并完成正确的操作。这就是为什么 Python 被描述为 动态类型 dynamically typed :直到运行前你不必知道这个函数参数的类型。相反,在一个静态类型语言中,程序员需要告诉编译器参数的类型是什么(或者编译器自己推断出参数的类型。)

编译器的无知是优化 Python 的一个挑战 —— 只看字节码,而不真正运行它,你就不知道每条字节码在干什么!你可以定义一个类,实现__mod__方法,当你对这个类的实例使用%时,Python 就会自动调用这个方法。所以,BINARY_MODULO其实可以运行任何代码。

看看下面的代码,第一个a % b看起来没有用。

def mod(a,b):
    a % b
    return a %b

不幸的是,对这段代码进行静态分析 —— 不运行它 —— 不能确定第一个a % b没有做任何事。用 %调用__mod__可能会写一个文件,或是和程序的其他部分交互,或者其他任何可以在 Python 中完成的事。很难优化一个你不知道它会做什么的函数。在 Russell Power 和 Alex Rubinsteyn 的优秀论文中写道,“我们可以用多快的速度解释 Python?”,他们说,“在普遍缺乏类型信息下,每条指令必须被看作一个INVOKE_ARBITRARY_METHOD。”

总结

Byterun 是一个比 CPython 容易理解的简洁的 Python 解释器。Byterun 复制了 CPython 的主要结构:一个基于栈的解释器对称之为字节码的指令集进行操作,它们顺序执行或在指令间跳转,向栈中压入和从中弹出数据。解释器随着函数和生成器的调用和返回,动态的创建、销毁帧,并在帧之间跳转。Byterun 也有着和真正解释器一样的限制:因为 Python 使用动态类型,解释器必须在运行时决定指令的正确行为。

我鼓励你去反汇编你的程序,然后用 Byterun 来运行。你很快会发现这个缩短版的 Byterun 所没有实现的指令。完整的实现在 https://github.com/nedbat/byterun,或者你可以仔细阅读真正的 CPython 解释器ceval.c,你也可以实现自己的解释器!

致谢

感谢 Ned Batchelder 发起这个项目并引导我的贡献,感谢 Michael Arntzenius 帮助调试代码和这篇文章的修订,感谢 Leta Montopoli 的修订,以及感谢整个 Recurse Center 社区的支持和鼓励。所有的不足全是我自己没搞好。


via: http://aosabook.org/en/500L/a-python-interpreter-written-in-python.html

作者: Allison Kaptur 译者:qingyunha 校对:wxy

本文由 LCTT 原创翻译,Linux中国 荣誉推出