标签 协程 下的文章

在比较 Python 框架的系列文章的第三部分中,我们来了解 Tornado,它是为处理异步进程而构建的。

在这个由四部分组成的系列文章的前两篇中,我们介绍了 PyramidFlask Web 框架。我们已经构建了两次相同的应用程序,看到了一个完整的 DIY 框架和包含了更多功能的框架之间的异同。

现在让我们来看看另一个稍微不同的选择:Tornado 框架。Tornado 在很大程度上与 Flask 一样简单,但有一个主要区别:Tornado 是专门为处理异步进程而构建的。在我们本系列所构建的应用程序中,这种特殊的酱料(LCTT 译注:这里意思是 Tornado 的异步功能)在我们构建的 app 中并不是非常有用,但我们将看到在哪里可以使用它,以及它在更一般的情况下是如何工作的。

让我们继续前两篇文章中模式,首先从处理设置和配置开始。

Tornado 启动和配置

如果你一直关注这个系列,那么第一步应该对你来说习以为常。

$ mkdir tornado_todo
$ cd tornado_todo
$ pipenv install --python 3.6
$ pipenv shell
(tornado-someHash) $ pipenv install tornado

创建一个 setup.py 文件来安装我们的应用程序相关的东西:

(tornado-someHash) $ touch setup.py
# setup.py
from setuptools import setup, find_packages

requires = [
    'tornado',
    'tornado-sqlalchemy',
    'psycopg2',
]

setup(
    name='tornado_todo',
    version='0.0',
    description='A To-Do List built with Tornado',
    author='<Your name>',
    author_email='<Your email>',
    keywords='web tornado',
    packages=find_packages(),
    install_requires=requires,
    entry_points={
        'console_scripts': [
            'serve_app = todo:main',
        ],
    },
)

因为 Tornado 不需要任何外部配置,所以我们可以直接编写 Python 代码来让程序运行。让我们创建 todo 目录,并用需要的前几个文件填充它。

todo/
    __init__.py
    models.py
    views.py

就像 Flask 和 Pyramid 一样,Tornado 也有一些基本配置,放在 __init__.py 中。从 tornado.web 中,我们将导入 Application 对象,它将处理路由和视图的连接,包括数据库(当我们谈到那里时再说)以及运行 Tornado 应用程序所需的其它额外设置。

# __init__.py
from tornado.web import Application

def main():
    """Construct and serve the tornado application."""
    app = Application()

像 Flask 一样,Tornado 主要是一个 DIY 框架。当构建我们的 app 时,我们必须设置该应用实例。因为 Tornado 用它自己的 HTTP 服务器来提供该应用,我们必须设置如何提供该应用。首先,在 tornado.options.define 中定义要监听的端口。然后我们实例化 Tornado 的 HTTPServer,将该 Application 对象的实例作为参数传递给它。

# __init__.py
from tornado.httpserver import HTTPServer
from tornado.options import define, options
from tornado.web import Application

define('port', default=8888, help='port to listen on')

def main():
    """Construct and serve the tornado application."""
    app = Application()
    http_server = HTTPServer(app)
    http_server.listen(options.port)

当我们使用 define 函数时,我们最终会在 options 对象上创建属性。第一个参数位置的任何内容都将是属性的名称,分配给 default 关键字参数的内容将是该属性的值。

例如,如果我们将属性命名为 potato 而不是 port,我们可以通过 options.potato 访问它的值。

HTTPServer 上调用 listen 并不会启动服务器。我们必须再做一步,找一个可以监听请求并返回响应的工作应用程序,我们需要一个输入输出循环。幸运的是,Tornado 以 tornado.ioloop.IOLoop 的形式提供了开箱即用的功能。

# __init__.py
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.options import define, options
from tornado.web import Application

define('port', default=8888, help='port to listen on')

def main():
    """Construct and serve the tornado application."""
    app = Application()
    http_server = HTTPServer(app)
    http_server.listen(options.port)
    print('Listening on http://localhost:%i' % options.port)
    IOLoop.current().start()

我喜欢某种形式的 print 语句,来告诉我什么时候应用程序正在提供服务,这是我的习惯。如果你愿意,可以不使用 print

我们以 IOLoop.current().start() 开始我们的 I/O 循环。让我们进一步讨论输入,输出和异步性。

Python 中的异步和 I/O 循环的基础知识

请允许我提前说明,我绝对,肯定,一定并且放心地说不是异步编程方面的专家。就像我写的所有内容一样,接下来的内容源于我对这个概念的理解的局限性。因为我是人,可能有很深很深的缺陷。

异步程序的主要问题是:

* 数据如何进来?
* 数据如何出去?
* 什么时候可以在不占用我全部注意力情况下运行某个过程?

由于全局解释器锁(GIL),Python 被设计为一种单线程)语言。对于 Python 程序必须执行的每个任务,其线程执行的全部注意力都集中在该任务的持续时间内。我们的 HTTP 服务器是用 Python 编写的,因此,当接收到数据(如 HTTP 请求)时,服务器的唯一关心的是传入的数据。这意味着,在大多数情况下,无论是程序需要运行还是处理数据,程序都将完全消耗服务器的执行线程,阻止接收其它可能的数据,直到服务器完成它需要做的事情。

在许多情况下,这不是太成问题。典型的 Web 请求,响应周期只需要几分之一秒。除此之外,构建 HTTP 服务器的套接字可以维护待处理的传入请求的积压。因此,如果请求在该套接字处理其它内容时进入,则它很可能只是在处理之前稍微排队等待一会。对于低到中等流量的站点,几分之一秒的时间并不是什么大问题,你可以使用多个部署的实例以及 NGINX 等负载均衡器来为更大的请求负载分配流量。

但是,如果你的平均响应时间超过一秒钟,该怎么办?如果你使用来自传入请求的数据来启动一些长时间的过程(如机器学习算法或某些海量数据库查询),该怎么办?现在,你的单线程 Web 服务器开始累积一个无法寻址的积压请求,其中一些请求会因为超时而被丢弃。这不是一种选择,特别是如果你希望你的服务在一段时间内是可靠的。

异步 Python 程序登场。重要的是要记住因为它是用 Python 编写的,所以程序仍然是一个单线程进程。除非特别标记,否则在异步程序中仍然会阻塞执行。

但是,当异步程序结构正确时,只要你指定某个函数应该具有这样的能力,你的异步 Python 程序就可以“搁置”长时间运行的任务。然后,当搁置的任务完成并准备好恢复时,异步控制器会收到报告,只要在需要时管理它们的执行,而不会完全阻塞对新输入的处理。

这有点夸张,所以让我们用一个人类的例子来证明。

带回家吧

我经常发现自己在家里试图完成很多家务,但没有多少时间来做它们。在某一天,积压的家务可能看起来像:

* 做饭(20 分钟准备,40 分钟烹饪)
* 洗碗(60 分钟)
* 洗涤并擦干衣物(30 分钟洗涤,每次干燥 90 分钟)
* 真空清洗地板(30 分钟)

如果我是一个传统的同步程序,我会亲自完成每项任务。在我考虑处理任何其他事情之前,每项任务都需要我全神贯注地完成。因为如果没有我的全力关注,什么事情都完成不了。所以我的执行顺序可能如下:

1. 完全专注于准备和烹饪食物,包括等待食物烹饪(60 分钟)
2. 将脏盘子移到水槽中(65 分钟过去了)
3. 清洗所有盘子(125 分钟过去了)
4. 开始完全专注于洗衣服,包括等待洗衣机洗完,然后将衣物转移到烘干机,再等烘干机完成( 250 分钟过去了)
5. 对地板进行真空吸尘(280 分钟了)

从头到尾完成所有事情花费了 4 小时 40 分钟。

我应该像异步程序一样聪明地工作,而不是努力工作。我的家里到处都是可以为我工作的机器,而不用我一直努力工作。同时,现在我可以将注意力转移真正需要的东西上。

我的执行顺序可能看起来像:

1. 将衣物放入洗衣机并启动它(5 分钟)
2. 在洗衣机运行时,准备食物(25 分钟过去了)
3. 准备好食物后,开始烹饪食物(30 分钟过去了)
4. 在烹饪食物时,将衣物从洗衣机移到烘干机机中开始烘干(35 分钟过去了)
5. 当烘干机运行中,且食物仍在烹饪时,对地板进行真空吸尘(65 分钟过去了)
6. 吸尘后,将食物从炉子中取出并装盘子入洗碗机(70 分钟过去了)
7. 运行洗碗机(130 分钟完成)

现在花费的时间下降到 2 小时 10 分钟。即使我允许在作业之间切换花费更多时间(总共 10-20 分钟)。如果我等待着按顺序执行每项任务,我花费的时间仍然只有一半左右。这就是将程序构造为异步的强大功能。

那么 I/O 循环在哪里?

一个异步 Python 程序的工作方式是从某个外部源(输入)获取数据,如果某个进程需要,则将该数据转移到某个外部工作者(输出)进行处理。当外部进程完成时,Python 主程序会收到提醒,然后程序获取外部处理(输入)的结果,并继续这样其乐融融的方式。

当数据不在 Python 主程序手中时,主程序就会被释放来处理其它任何事情。包括等待全新的输入(如 HTTP 请求)和处理长时间运行的进程的结果(如机器学习算法的结果,长时间运行的数据库查询)。主程序虽仍然是单线程的,但成了事件驱动的,它对程序处理的特定事件会触发动作。监听这些事件并指示应如何处理它们的主要是 I/O 循环在工作。

我知道,我们走了很长的路才得到这个重要的解释,但我希望在这里传达的是,它不是魔术,也不是某种复杂的并行处理或多线程工作。全局解释器锁仍然存在,主程序中任何长时间运行的进程仍然会阻塞其它任何事情的进行,该程序仍然是单线程的。然而,通过将繁琐的工作外部化,我们可以将线程的注意力集中在它需要注意的地方。

这有点像我上面的异步任务。当我的注意力完全集中在准备食物上时,它就是我所能做的一切。然而,当我能让炉子帮我做饭,洗碗机帮我洗碗,洗衣机和烘干机帮我洗衣服时,我的注意力就会被释放出来,去做其它事情。当我被提醒,我的一个长时间运行的任务已经完成并准备再次处理时,如果我的注意力是空闲的,我可以获取该任务的结果,并对其做下一步需要做的任何事情。

Tornado 路由和视图

尽管经历了在 Python 中讨论异步的所有麻烦,我们还是决定暂不使用它。先来编写一个基本的 Tornado 视图。

与我们在 Flask 和 Pyramid 实现中看到的基于函数的视图不同,Tornado 的视图都是基于类的。这意味着我们将不在使用单独的、独立的函数来规定如何处理请求。相反,传入的 HTTP 请求将被捕获并将其分配为我们定义的类的一个属性。然后,它的方法将处理相应的请求类型。

让我们从一个基本的视图开始,即在屏幕上打印 “Hello, World”。我们为 Tornado 应用程序构造的每个基于类的视图都必须继承 tornado.web 中的 RequestHandler 对象。这将设置我们需要(但不想写)的所有底层逻辑来接收请求,同时构造正确格式的 HTTP 响应。

from tornado.web import RequestHandler

class HelloWorld(RequestHandler):
    """Print 'Hello, world!' as the response body."""

    def get(self):
        """Handle a GET request for saying Hello World!."""
        self.write("Hello, world!")

因为我们要处理 GET 请求,所以我们声明(实际上是重写)了 get 方法。我们提供文本或 JSON 可序列化对象,用 self.write 写入响应体。之后,我们让 RequestHandler 来做在发送响应之前必须完成的其它工作。

就目前而言,此视图与 Tornado 应用程序本身并没有实际连接。我们必须回到 __init__.py,并稍微更新 main 函数。以下是新的内容:

# __init__.py
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.options import define, options
from tornado.web import Application
from todo.views import HelloWorld

define('port', default=8888, help='port to listen on')

def main():
    """Construct and serve the tornado application."""
    app = Application([
        ('/', HelloWorld)
    ])
    http_server = HTTPServer(app)
    http_server.listen(options.port)
    print('Listening on http://localhost:%i' % options.port)
    IOLoop.current().start()

我们做了什么

我们将 views.py 文件中的 HelloWorld 视图导入到脚本 __init__.py 的顶部。然后我们添加了一个路由-视图对应的列表,作为 Application 实例化的第一个参数。每当我们想要在应用程序中声明一个路由时,它必须绑定到一个视图。如果需要,可以对多个路由使用相同的视图,但每个路由必须有一个视图。

我们可以通过在 setup.py 中启用的 serve_app 命令来运行应用程序,从而确保这一切都能正常工作。查看 http://localhost:8888/ 并看到它显示 “Hello, world!”。

当然,在这个领域中我们还能做更多,也将做更多,但现在让我们来讨论模型吧。

连接数据库

如果我们想要保留数据,就需要连接数据库。与 Flask 一样,我们将使用一个特定于框架的 SQLAchemy 变体,名为 tornado-sqlalchemy

为什么要使用它而不是 SQLAlchemy 呢?好吧,其实 tornado-sqlalchemy 具有简单 SQLAlchemy 的所有优点,因此我们仍然可以使用通用的 Base 声明模型,并使用我们习以为常的所有列数据类型和关系。除了我们已经惯常了解到的,tornado-sqlalchemy 还为其数据库查询功能提供了一种可访问的异步模式,专门用于与 Tornado 现有的 I/O 循环一起工作。

我们通过将 tornado-sqlalchemypsycopg2 添加到 setup.py 到所需包的列表并重新安装包来创建环境。在 models.py 中,我们声明了模型。这一步看起来与我们在 Flask 和 Pyramid 中已经看到的完全一样,所以我将跳过全部声明,只列出了 Task 模型的必要部分。

# 这不是完整的 models.py, 但是足够看到不同点
from tornado_sqlalchemy import declarative_base

Base = declarative_base

class Task(Base):
    # 等等,因为剩下的几乎所有的东西都一样 ...

我们仍然需要将 tornado-sqlalchemy 连接到实际应用程序。在 __init__.py 中,我们将定义数据库并将其集成到应用程序中。

# __init__.py
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.options import define, options
from tornado.web import Application
from todo.views import HelloWorld

# add these
import os
from tornado_sqlalchemy import make_session_factory

define('port', default=8888, help='port to listen on')
factory = make_session_factory(os.environ.get('DATABASE_URL', ''))

def main():
    """Construct and serve the tornado application."""
    app = Application([
        ('/', HelloWorld)
    ],
        session_factory=factory
    )
    http_server = HTTPServer(app)
    http_server.listen(options.port)
    print('Listening on http://localhost:%i' % options.port)
    IOLoop.current().start()

就像我们在 Pyramid 中传递的会话工厂一样,我们可以使用 make_session_factory 来接收数据库 URL 并生成一个对象,这个对象的唯一目的是为视图提供到数据库的连接。然后我们将新创建的 factory 传递给 Application 对象,并使用 session_factory 关键字参数将它绑定到应用程序中。

最后,初始化和管理数据库与 Flask 和 Pyramid 相同(即,单独的 DB 管理脚本,与 Base 对象一起工作等)。它看起来很相似,所以在这里我就不介绍了。

回顾视图

Hello,World 总是适合学习基础知识,但我们需要一些真实的,特定应用程序的视图。

让我们从 info 视图开始。

# views.py
import json
from tornado.web import RequestHandler

class InfoView(RequestHandler):
    """只允许 GET 请求"""
    SUPPORTED_METHODS = ["GET"]

    def set_default_headers(self):
        """设置默认响应头为 json 格式的"""
        self.set_header("Content-Type", 'application/json; charset="utf-8"')

    def get(self):
        """列出这个 API 的路由"""
        routes = {
            'info': 'GET /api/v1',
            'register': 'POST /api/v1/accounts',
            'single profile detail': 'GET /api/v1/accounts/<username>',
            'edit profile': 'PUT /api/v1/accounts/<username>',
            'delete profile': 'DELETE /api/v1/accounts/<username>',
            'login': 'POST /api/v1/accounts/login',
            'logout': 'GET /api/v1/accounts/logout',
            "user's tasks": 'GET /api/v1/accounts/<username>/tasks',
            "create task": 'POST /api/v1/accounts/<username>/tasks',
            "task detail": 'GET /api/v1/accounts/<username>/tasks/<id>',
            "task update": 'PUT /api/v1/accounts/<username>/tasks/<id>',
            "delete task": 'DELETE /api/v1/accounts/<username>/tasks/<id>'
        }
        self.write(json.dumps(routes))

有什么改变吗?让我们从上往下看。

我们添加了 SUPPORTED_METHODS 类属性,它是一个可迭代对象,代表这个视图所接受的请求方法,其他任何方法都将返回一个 405 状态码。当我们创建 HelloWorld 视图时,我们没有指定它,主要是当时有点懒。如果没有这个类属性,此视图将响应任何试图绑定到该视图的路由的请求。

我们声明了 set_default_headers 方法,它设置 HTTP 响应的默认头。我们在这里声明它,以确保我们返回的任何响应都有一个 "Content-Type""application/json" 类型。

我们将 json.dumps(some_object) 添加到 self.write 的参数中,因为它可以很容易地构建响应主体的内容。

现在已经完成了,我们可以继续将它连接到 __init__.py 中的主路由。

# __init__.py
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
from tornado.options import define, options
from tornado.web import Application
from todo.views import InfoView

# 添加这些
import os
from tornado_sqlalchemy import make_session_factory

define('port', default=8888, help='port to listen on')
factory = make_session_factory(os.environ.get('DATABASE_URL', ''))

def main():
    """Construct and serve the tornado application."""
    app = Application([
        ('/', InfoView)
    ],
        session_factory=factory
    )
    http_server = HTTPServer(app)
    http_server.listen(options.port)
    print('Listening on http://localhost:%i' % options.port)
    IOLoop.current().start()

我们知道,还需要编写更多的视图和路由。每个都会根据需要放入 Application 路由列表中,每个视图还需要一个 set_default_headers 方法。在此基础上,我们还将创建 send_response 方法,它的作用是将响应与我们想要给响应设置的任何自定义状态码打包在一起。由于每个视图都需要这两个方法,因此我们可以创建一个包含它们的基类,这样每个视图都可以继承基类。这样,我们只需要编写一次。

# views.py
import json
from tornado.web import RequestHandler

class BaseView(RequestHandler):
    """Base view for this application."""

    def set_default_headers(self):
        """Set the default response header to be JSON."""
        self.set_header("Content-Type", 'application/json; charset="utf-8"')

    def send_response(self, data, status=200):
        """Construct and send a JSON response with appropriate status code."""
        self.set_status(status)
        self.write(json.dumps(data))

对于我们即将编写的 TaskListView 这样的视图,我们还需要一个到数据库的连接。我们需要 tornado_sqlalchemy 中的 SessionMixin 在每个视图类中添加一个数据库会话。我们可以将它放在 BaseView 中,这样,默认情况下,从它继承的每个视图都可以访问数据库会话。

# views.py
import json
from tornado_sqlalchemy import SessionMixin
from tornado.web import RequestHandler

class BaseView(RequestHandler, SessionMixin):
    """Base view for this application."""

    def set_default_headers(self):
        """Set the default response header to be JSON."""
        self.set_header("Content-Type", 'application/json; charset="utf-8"')

    def send_response(self, data, status=200):
        """Construct and send a JSON response with appropriate status code."""
        self.set_status(status)
        self.write(json.dumps(data))

只要我们修改 BaseView 对象,在将数据发布到这个 API 时,我们就应该定位到这里。

当 Tornado(从 v.4.5 开始)使用来自客户端的数据并将其组织起来到应用程序中使用时,它会将所有传入数据视为字节串。但是,这里的所有代码都假设使用 Python 3,因此我们希望使用的唯一字符串是 Unicode 字符串。我们可以为这个 BaseView 类添加另一个方法,它的工作是将输入数据转换为 Unicode,然后再在视图的其他地方使用。

如果我们想要在正确的视图方法中使用它之前转换这些数据,我们可以重写视图类的原生 prepare 方法。它的工作是在视图方法运行前运行。如果我们重写 prepare 方法,我们可以设置一些逻辑来运行,每当收到请求时,这些逻辑就会执行字节串到 Unicode 的转换。

# views.py
import json
from tornado_sqlalchemy import SessionMixin
from tornado.web import RequestHandler

class BaseView(RequestHandler, SessionMixin):
    """Base view for this application."""

    def prepare(self):
        self.form_data = {
            key: [val.decode('utf8') for val in val_list]
            for key, val_list in self.request.arguments.items()
        }

    def set_default_headers(self):
        """Set the default response header to be JSON."""
        self.set_header("Content-Type", 'application/json; charset="utf-8"')

    def send_response(self, data, status=200):
        """Construct and send a JSON response with appropriate status code."""
        self.set_status(status)
        self.write(json.dumps(data))

如果有任何数据进入,它将在 self.request.arguments 字典中找到。我们可以通过键访问该数据库,并将其内容(始终是列表)转换为 Unicode。因为这是基于类的视图而不是基于函数的,所以我们可以将修改后的数据存储为一个实例属性,以便以后使用。我在这里称它为 form_data,但它也可以被称为 potato。关键是我们可以存储提交给应用程序的数据。

异步视图方法

现在我们已经构建了 BaseaView,我们可以构建 TaskListView 了,它会继承 BaseaView

正如你可以从章节标题中看到的那样,以下是所有关于异步性的讨论。TaskListView 将处理返回任务列表的 GET 请求和用户给定一些表单数据来创建新任务的 POST 请求。让我们首先来看看处理 GET 请求的代码。

# all the previous imports
import datetime
from tornado.gen import coroutine
from tornado_sqlalchemy import as_future
from todo.models import Profile, Task

# the BaseView is above here
class TaskListView(BaseView):
    """View for reading and adding new tasks."""
    SUPPORTED_METHODS = ("GET", "POST",)

    @coroutine
    def get(self, username):
        """Get all tasks for an existing user."""
        with self.make_session() as session:
            profile = yield as_future(session.query(Profile).filter(Profile.username == username).first)
            if profile:
                tasks = [task.to_dict() for task in profile.tasks]
                self.send_response({
                    'username': profile.username,
                    'tasks': tasks
                })

这里的第一个主要部分是 @coroutine 装饰器,它从 tornado.gen 导入。任何具有与调用堆栈的正常流程不同步的 Python 可调用部分实际上是“协程”,即一个可以与其它协程一起运行的协程。在我的家务劳动的例子中,几乎所有的家务活都是一个共同的例行协程。有些阻止了例行协程(例如,给地板吸尘),但这种例行协程只会阻碍我开始或关心其它任何事情的能力。它没有阻止已经启动的任何其他协程继续进行。

Tornado 提供了许多方法来构建一个利用协程的应用程序,包括允许我们设置函数调用锁,同步异步协程的条件,以及手动修改控制 I/O 循环的事件系统。

这里使用 @coroutine 装饰器的唯一条件是允许 get 方法将 SQL 查询作为后台进程,并在查询完成后恢复,同时不阻止 Tornado I/O 循环去处理其他传入的数据源。这就是关于此实现的所有“异步”:带外数据库查询。显然,如果我们想要展示异步 Web 应用程序的魔力和神奇,那么一个任务列表就不是好的展示方式。

但是,这就是我们正在构建的,所以让我们来看看方法如何利用 @coroutine 装饰器。SessionMixin 混合到 BaseView 声明中,为我们的视图类添加了两个方便的,支持数据库的属性:sessionmake_session。它们的名字相似,实现的目标也相当相似。

self.session 属性是一个关注数据库的会话。在请求-响应周期结束时,在视图将响应发送回客户端之前,任何对数据库的更改都被提交,并关闭会话。

self.make_session 是一个上下文管理器和生成器,可以动态构建和返回一个全新的会话对象。第一个 self.session 对象仍然存在。无论如何,反正 make_session 会创建一个新的。make_session 生成器还为其自身提供了一个功能,用于在其上下文(即缩进级别)结束时提交和关闭它创建的会话。

如果你查看源代码,则赋值给 self.session 的对象类型与 self.make_session 生成的对象类型之间没有区别,不同之处在于它们是如何被管理的。

使用 make_session 上下文管理器,生成的会话仅属于上下文,在该上下文中开始和结束。你可以使用 make_session 上下文管理器在同一个视图中打开,修改,提交以及关闭多个数据库会话。

self.session 要简单得多,当你进入视图方法时会话已经打开,在响应被发送回客户端之前会话就已提交。

虽然读取文档片段PyPI 示例都说明了上下文管理器的使用,但是没有说明 self.session 对象或由 self.make_session 生成的 session 本质上是不是异步的。当我们启动查询时,我们开始考虑内置于 tornado-sqlalchemy 中的异步行为。

tornado-sqlalchemy 包为我们提供了 as_future 函数。它的工作是装饰 tornado-sqlalchemy 会话构造的查询并 yield 其返回值。如果视图方法用 @coroutine 装饰,那么使用 yield as_future(query) 模式将使封装的查询成为一个异步后台进程。I/O 循环会接管等待查询的返回值和 as_future 创建的 future 对象的解析。

要访问 as_future(query) 的结果,你必须从它 yield。否则,你只能获得一个未解析的生成器对象,并且无法对查询执行任何操作。

这个视图方法中的其他所有内容都与之前课堂上的类似,与我们在 Flask 和 Pyramid 中看到的内容类似。

post 方法看起来非常相似。为了保持一致性,让我们看一下 post 方法以及它如何处理用 BaseView 构造的 self.form_data

@coroutine
def post(self, username):
    """Create a new task."""
    with self.make_session() as session:
        profile = yield as_future(session.query(Profile).filter(Profile.username == username).first)
        if profile:
            due_date = self.form_data['due_date'][0]
            task = Task(
                name=self.form_data['name'][0],
                note=self.form_data['note'][0],
                creation_date=datetime.now(),
                due_date=datetime.strptime(due_date, '%d/%m/%Y %H:%M:%S') if due_date else None,
                completed=self.form_data['completed'][0],
                profile_id=profile.id,
                profile=profile
            )
            session.add(task)
            self.send_response({'msg': 'posted'}, status=201)

正如我所说,这是我们所期望的:

* 与我们在 get 方法中看到的查询模式相同 * 构造一个新的 Task 对象的实例,用 form_data 的数据填充 * 添加新的 Task 对象(但不提交,因为它由上下文管理器处理!)到数据库会话 * 将响应发送给客户端

这样我们就有了 Tornado web 应用程序的基础。其他内容(例如,数据库管理和更多完整应用程序的视图)实际上与我们在 Flask 和 Pyramid 应用程序中看到的相同。

关于使用合适的工具完成合适的工作的一点想法

在我们继续浏览这些 Web 框架时,我们开始看到它们都可以有效地处理相同的问题。对于像这样的待办事项列表,任何框架都可以完成这项任务。但是,有些 Web 框架比其它框架更适合某些工作,这具体取决于对你来说什么“更合适”和你的需求。

虽然 Tornado 显然和 Pyramid 或 Flask 一样可以处理相同工作,但将它用于这样的应用程序实际上是一种浪费,这就像开车从家走一个街区(LCTT 译注:这里意思应该是从家开始走一个街区只需步行即可)。是的,它可以完成“旅行”的工作,但短途旅行不是你选择汽车而不是自行车或者使用双脚的原因。

根据文档,Tornado 被称为 “Python Web 框架和异步网络库”。在 Python Web 框架生态系统中很少有人喜欢它。如果你尝试完成的工作需要(或将从中获益)以任何方式、形状或形式的异步性,使用 Tornado。如果你的应用程序需要处理多个长期连接,同时又不想牺牲太多性能,选择 Tornado。如果你的应用程序是多个应用程序,并且需要线程感知以准确处理数据,使用 Tornado。这是它最有效的地方。

用你的汽车做“汽车的事情”,使用其他交通工具做其他事情。

向前看,进行一些深度检查

谈到使用合适的工具来完成合适的工作,在选择框架时,请记住应用程序的范围和规模,包括现在和未来。到目前为止,我们只研究了适用于中小型 Web 应用程序的框架。本系列的下一篇也是最后一篇将介绍最受欢迎的 Python 框架之一 Django,它适用于可能会变得更大的大型应用程序。同样,尽管它在技术上能够并且将会处理待办事项列表问题,但请记住,这不是它的真正用途。我们仍然会通过它来展示如何使用它来构建应用程序,但我们必须牢记框架的意图以及它是如何反映在架构中的:

  • Flask: 适用于小型,简单的项目。它可以使我们轻松地构建视图并将它们快速连接到路由,它可以简单地封装在一个文件中。
  • Pyramid: 适用于可能增长的项目。它包含一些配置来启动和运行。应用程序组件的独立领域可以很容易地划分并构建到任意深度,而不会忽略中央应用程序。
  • Tornado: 适用于受益于精确和有意识的 I/O 控制的项目。它允许协程,并轻松公开可以控制如何接收请求或发送响应以及何时发生这些操作的方法。
  • Django:(我们将会看到)意味着可能会变得更大的东西。它有着非常庞大的生态系统,包括大量插件和模块。它非常有主见的配置和管理,以保持所有不同部分在同一条线上。

无论你是从本系列的第一篇文章开始阅读,还是稍后才加入的,都要感谢阅读!请随意留下问题或意见。下次再见时,我手里会拿着 Django。

感谢 Python BDFL

我必须把功劳归于它应得的地方,非常感谢 Guido van Rossum,不仅仅是因为他创造了我最喜欢的编程语言。

PyCascades 2018 期间,我很幸运的不仅做了基于这个文章系列的演讲,而且还被邀请参加了演讲者的晚宴。整个晚上我都坐在 Guido 旁边,不停地问他问题。其中一个问题是,在 Python 中异步到底是如何工作的,但他没有一点大惊小怪,而是花时间向我解释,让我开始理解这个概念。他后来推特给我发了一条消息:是用于学习异步 Python 的广阔资源。我随后在三个月内阅读了三次,然后写了这篇文章。你真是一个非常棒的人,Guido!


via: https://opensource.com/article/18/6/tornado-framework

作者:Nicholas Hunt-Walker 选题:lujun9972 译者:MjSeven 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

Go 是一个内置支持并发编程的语言。借助使用 go 关键字去创建 协程 goroutine (轻量级线程)和在 Go 中提供的 使用 信道其它的并发 同步方法,使得并发编程变得很容易、很灵活和很有趣。

另一方面,Go 并不会阻止一些因 Go 程序员粗心大意或者缺乏经验而造成的并发编程错误。在本文的下面部分将展示一些在 Go 编程中常见的并发编程错误,以帮助 Go 程序员们避免再犯类似的错误。

需要同步的时候没有同步

代码行或许 不是按出现的顺序运行的

在下面的程序中有两个错误。

  • 第一,在 main 协程中读取 b 和在新的 协程 中写入 b 可能导致数据争用。
  • 第二,条件 b == true 并不能保证在 main 协程 中的 a != nil。在新的协程中编译器和 CPU 可能会通过 重排序指令 进行优化,因此,在运行时 b 赋值可能发生在 a 赋值之前,在 main 协程 中当 a 被修改后,它将会让部分 a 一直保持为 nil
package main

import (
    "time"
    "runtime"
)

func main() {
    var a []int // nil
    var b bool  // false

    // a new goroutine
    go func () {
        a = make([]int, 3)
        b = true // write b
    }()

    for !b { // read b
        time.Sleep(time.Second)
        runtime.Gosched()
    }
    a[0], a[1], a[2] = 0, 1, 2 // might panic
}

上面的程序或者在一台计算机上运行的很好,但是在另一台上可能会引发异常。或者它可能运行了 N 次都很好,但是可能在第 (N+1) 次引发了异常。

我们将使用 sync 标准包中提供的信道或者同步方法去确保内存中的顺序。例如,

package main

func main() {
    var a []int = nil
    c := make(chan struct{})

    // a new goroutine
    go func () {
        a = make([]int, 3)
        c <- struct{}{}
    }()

    <-c
    a[0], a[1], a[2] = 0, 1, 2
}

使用 time.Sleep 调用去做同步

我们先来看一个简单的例子。

package main

import (
    "fmt"
    "time"
)

func main() {
    var x = 123

    go func() {
        x = 789 // write x
    }()

    time.Sleep(time.Second)
    fmt.Println(x) // read x
}

我们预期程序将打印出 789。如果我们运行它,通常情况下,它确定打印的是 789。但是,这个程序使用的同步方式好吗?No!原因是 Go 运行时并不保证 x 的写入一定会发生在 x 的读取之前。在某些条件下,比如在同一个操作系统上,大部分 CPU 资源被其它运行的程序所占用的情况下,写入 x 可能就会发生在读取 x 之后。这就是为什么我们在正式的项目中,从来不使用 time.Sleep 调用去实现同步的原因。

我们来看一下另外一个示例。

package main

import (
    "fmt"
    "time"
)

var x = 0

func main() {
    var num = 123
    var p = &num

    c := make(chan int)

    go func() {
        c <- *p + x
    }()

    time.Sleep(time.Second)
    num = 789
    fmt.Println(<-c)
}

你认为程序的预期输出是什么?123 还是 789?事实上它的输出与编译器有关。对于标准的 Go 编译器 1.10 来说,这个程序很有可能输出是 123。但是在理论上,它可能输出的是 789,或者其它的随机数。

现在,我们来改变 c <- *p + xc <- *p,然后再次运行这个程序。你将会发现输出变成了 789 (使用标准的 Go 编译器 1.10)。这再次说明它的输出是与编译器相关的。

是的,在上面的程序中存在数据争用。表达式 *p 可能会被先计算、后计算、或者在处理赋值语句 num = 789 时计算。time.Sleep 调用并不能保证 *p 发生在赋值语句处理之前进行。

对于这个特定的示例,我们将在新的协程创建之前,将值保存到一个临时值中,然后在新的协程中使用临时值去消除数据争用。

...
    tmp := *p + x
    go func() {
        c <- tmp
    }()
...

使协程挂起

挂起协程是指让协程一直处于阻塞状态。导致协程被挂起的原因很多。比如,

  • 一个协程尝试从一个 nil 信道中或者从一个没有其它协程给它发送值的信道中检索数据。
  • 一个协程尝试去发送一个值到 nil 信道,或者发送到一个没有其它的协程接收值的信道中。
  • 一个协程被它自己死锁。
  • 一组协程彼此死锁。
  • 当运行一个没有 default 分支的 select 代码块时,一个协程被阻塞,以及在 select 代码块中 case 关键字后的所有信道操作保持阻塞状态。

除了有时我们为了避免程序退出,特意让一个程序中的 main 协程保持挂起之外,大多数其它的协程挂起都是意外情况。Go 运行时很难判断一个协程到底是处于挂起状态还是临时阻塞。因此,Go 运行时并不会去释放一个挂起的协程所占用的资源。

谁先响应谁获胜 的信道使用案例中,如果使用的 future 信道容量不够大,当尝试向 Future 信道发送结果时,一些响应较慢的信道将被挂起。比如,如果调用下面的函数,将有 4 个协程处于永远阻塞状态。

func request() int {
    c := make(chan int)
    for i := 0; i < 5; i++ {
        i := i
        go func() {
            c <- i // 4 goroutines will hang here.
        }()
    }
    return <-c
}

为避免这 4 个协程一直处于挂起状态, c 信道的容量必须至少是 4

实现谁先响应谁获胜的第二种方法 的信道使用案例中,如果将 future 信道用做非缓冲信道,那么有可能这个信息将永远也不会有响应而挂起。例如,如果在一个协程中调用下面的函数,协程可能会挂起。原因是,如果接收操作 <-c 准备就绪之前,五个发送操作全部尝试发送,那么所有的尝试发送的操作将全部失败,因此那个调用者协程将永远也不会接收到值。

func request() int {
    c := make(chan int)
    for i := 0; i < 5; i++ {
        i := i
        go func() {
            select {
            case c <- i:
            default:
            }
        }()
    }
    return <-c
}

将信道 c 变成缓冲信道将保证五个发送操作中的至少一个操作会发送成功,这样,上面函数中的那个调用者协程将不会被挂起。

sync 标准包中拷贝类型值

在实践中,sync 标准包中的类型值不会被拷贝。我们应该只拷贝这个值的指针。

下面是一个错误的并发编程示例。在这个示例中,当调用 Counter.Value 方法时,将拷贝一个 Counter 接收值。作为接收值的一个字段,Counter 接收值的各个 Mutex 字段也会被拷贝。拷贝不是同步发生的,因此,拷贝的 Mutex 值可能会出错。即便是没有错误,拷贝的 Counter 接收值的访问保护也是没有意义的。

import "sync"

type Counter struct {
    sync.Mutex
    n int64
}

// This method is okay.
func (c *Counter) Increase(d int64) (r int64) {
    c.Lock()
    c.n += d
    r = c.n
    c.Unlock()
    return
}

// The method is bad. When it is called, a Counter
// receiver value will be copied.
func (c Counter) Value() (r int64) {
    c.Lock()
    r = c.n
    c.Unlock()
    return
}

我们只需要改变 Value 接收类型方法为指针类型 *Counter,就可以避免拷贝 Mutex 值。

在官方的 Go SDK 中提供的 go vet 命令将会报告潜在的错误值拷贝。

在错误的地方调用 sync.WaitGroup 的方法

每个 sync.WaitGroup 值维护一个内部计数器,这个计数器的初始值为 0。如果一个 WaitGroup 计数器的值是 0,调用 WaitGroup 值的 Wait 方法就不会被阻塞,否则,在计数器值为 0 之前,这个调用会一直被阻塞。

为了让 WaitGroup 值的使用有意义,当一个 WaitGroup 计数器值为 0 时,必须在相应的 WaitGroup 值的 Wait 方法调用之前,去调用 WaitGroup 值的 Add 方法。

例如,下面的程序中,在不正确位置调用了 Add 方法,这将使最后打印出的数字不总是 100。事实上,这个程序最后打印的数字可能是在 [0, 100) 范围内的一个随意数字。原因就是 Add 方法的调用并不保证一定会发生在 Wait 方法调用之前。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
)

func main() {
    var wg sync.WaitGroup
    var x int32 = 0
    for i := 0; i < 100; i++ {
        go func() {
            wg.Add(1)
            atomic.AddInt32(&x, 1)
            wg.Done()
        }()
    }

    fmt.Println("To wait ...")
    wg.Wait()
    fmt.Println(atomic.LoadInt32(&x))
}

为让程序的表现符合预期,在 for 循环中,我们将把 Add 方法的调用移动到创建的新协程的范围之外,修改后的代码如下。

...
    for i := 0; i < 100; i++ {
        wg.Add(1)
        go func() {
            atomic.AddInt32(&x, 1)
            wg.Done()
        }()
    }
...

不正确使用 futures 信道

信道使用案例 的文章中,我们知道一些函数将返回 futures 信道。假设 fafb 就是这样的两个函数,那么下面的调用就使用了不正确的 future 参数。

doSomethingWithFutureArguments(<-fa(), <-fb())

在上面的代码行中,两个信道接收操作是顺序进行的,而不是并发的。我们做如下修改使它变成并发操作。

ca, cb := fa(), fb()
doSomethingWithFutureArguments(<-c1, <-c2)

没有等协程的最后的活动的发送结束就关闭信道

Go 程序员经常犯的一个错误是,还有一些其它的协程可能会发送值到以前的信道时,这个信道就已经被关闭了。当这样的发送(发送到一个已经关闭的信道)真实发生时,将引发一个异常。

这种错误在一些以往的著名 Go 项目中也有发生,比如在 Kubernetes 项目中的 这个 bug这个 bug

如何安全和优雅地关闭信道,请阅读 这篇文章

在值上做 64 位原子操作时没有保证值地址 64 位对齐

到目前为止(Go 1.10),在标准的 Go 编译器中,在一个 64 位原子操作中涉及到的值的地址要求必须是 64 位对齐的。如果没有对齐则导致当前的协程异常。对于标准的 Go 编译器来说,这种失败仅发生在 32 位的架构上。请阅读 内存布局 去了解如何在一个 32 位操作系统上保证 64 位对齐。

没有注意到大量的资源被 time.After 函数调用占用

time 标准包中的 After 函数返回 一个延迟通知的信道。这个函数在某些情况下用起来很便捷,但是,每次调用它将创建一个 time.Timer 类型的新值。这个新创建的 Timer 值在通过传递参数到 After 函数指定期间保持激活状态,如果在这个期间过多的调用了该函数,可能会有太多的 Timer 值保持激活,这将占用大量的内存和计算资源。

例如,如果调用了下列的 longRunning 函数,将在一分钟内产生大量的消息,然后在某些周期内将有大量的 Timer 值保持激活,即便是大量的这些 Timer 值已经没用了也是如此。

import (
    "fmt"
    "time"
)

// The function will return if a message arrival interval
// is larger than one minute.
func longRunning(messages <-chan string) {
    for {
        select {
        case <-time.After(time.Minute):
            return
        case msg := <-messages:
            fmt.Println(msg)
        }
    }
}

为避免在上述代码中创建过多的 Timer 值,我们将使用一个单一的 Timer 值去完成同样的任务。

func longRunning(messages <-chan string) {
    timer := time.NewTimer(time.Minute)
    defer timer.Stop()

    for {
        select {
        case <-timer.C:
            return
        case msg := <-messages:
            fmt.Println(msg)
            if !timer.Stop() {
                <-timer.C
            }
        }

        // The above "if" block can also be put here.

        timer.Reset(time.Minute)
    }
}

不正确地使用 time.Timer

在最后,我们将展示一个符合语言使用习惯的 time.Timer 值的使用示例。需要注意的一个细节是,那个 Reset 方法总是在停止或者 time.Timer 值释放时被使用。

select 块的第一个 case 分支的结束部分,time.Timer 值被释放,因此,我们不需要去停止它。但是必须在第二个分支中停止定时器。如果在第二个分支中 if 代码块缺失,它可能至少在 Reset 方法调用时,会(通过 Go 运行时)发送到 timer.C 信道,并且那个 longRunning 函数可能会早于预期返回,对于 Reset 方法来说,它可能仅仅是重置内部定时器为 0,它将不会清理(耗尽)那个发送到 timer.C 信道的值。

例如,下面的程序很有可能在一秒内而不是十秒时退出。并且更重要的是,这个程序并不是 DRF 的(LCTT 译注:data race free,多线程程序的一种同步程度)。

package main

import (
    "fmt"
    "time"
)

func main() {
    start := time.Now()
    timer := time.NewTimer(time.Second/2)
    select {
    case <-timer.C:
    default:
        time.Sleep(time.Second) // go here
    }
    timer.Reset(time.Second * 10)
    <-timer.C
    fmt.Println(time.Since(start)) // 1.000188181s
}

time.Timer 的值不再被其它任何一个东西使用时,它的值可能被停留在一种非停止状态,但是,建议在结束时停止它。

在多个协程中如果不按建议使用 time.Timer 值并发,可能会有 bug 隐患。

我们不应该依赖一个 Reset 方法调用的返回值。Reset 方法返回值的存在仅仅是为了兼容性目的。


via: https://go101.org/article/concurrent-common-mistakes.html

作者:<go101.org> 译者:qhwdw 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

本文作者:

A. Jesse Jiryu Davis 是纽约 MongoDB 的工程师。他编写了异步 MongoDB Python 驱动程序 Motor,也是 MongoDB C 驱动程序的开发领袖和 PyMongo 团队成员。 他也为 asyncio 和 Tornado 做了贡献,在 http://emptysqua.re 上写作。

Guido van Rossum 是主流编程语言 Python 的创造者,Python 社区称他为 BDFL (仁慈的终生大独裁者 (Benevolent Dictator For Life))——这是一个来自 Monty Python 短剧的称号。他的主页是 http://www.python.org/~guido/

使用协程

我们将从描述爬虫如何工作开始。现在是时候用 asynio 去实现它了。

我们的爬虫从获取第一个网页开始,解析出链接并把它们加到队列中。此后它开始傲游整个网站,并发地获取网页。但是由于客户端和服务端的负载限制,我们希望有一个最大数目的运行的 worker,不能再多。任何时候一个 worker 完成一个网页的获取,它应该立即从队列中取出下一个链接。我们会遇到没有那么多事干的时候,所以一些 worker 必须能够暂停。一旦又有 worker 获取一个有很多链接的网页,队列会突增,暂停的 worker 立马被唤醒干活。最后,当任务完成后我们的程序必须马上退出。

假如你的 worker 是线程,怎样去描述你的爬虫算法?我们可以使用 Python 标准库中的同步队列。每次有新的一项加入,队列增加它的 “tasks” 计数器。线程 worker 完成一个任务后调用 task_done。主线程阻塞在 Queue.join,直到“tasks”计数器与 task_done 调用次数相匹配,然后退出。

协程通过 asyncio 队列,使用和线程一样的模式来实现!首先我们导入它

try:
    from asyncio import JoinableQueue as Queue
except ImportError:
    # In Python 3.5, asyncio.JoinableQueue is
    # merged into Queue.
    from asyncio import Queue

我们把 worker 的共享状态收集在一个 crawler 类中,主要的逻辑写在 crawl 方法中。我们在一个协程中启动 crawl,运行 asyncio 的事件循环直到 crawl 完成:

loop = asyncio.get_event_loop()

crawler = crawling.Crawler('http://xkcd.com',
                           max_redirect=10)

loop.run_until_complete(crawler.crawl())

crawler 用一个根 URL 和最大重定向数 max_redirect 来初始化,它把 (URL, max_redirect) 序对放入队列中。(为什么要这样做,请看下文)

class Crawler:
    def __init__(self, root_url, max_redirect):
        self.max_tasks = 10
        self.max_redirect = max_redirect
        self.q = Queue()
        self.seen_urls = set()

        # aiohttp's ClientSession does connection pooling and
        # HTTP keep-alives for us.
        self.session = aiohttp.ClientSession(loop=loop)

        # Put (URL, max_redirect) in the queue.
        self.q.put((root_url, self.max_redirect))

现在队列中未完成的任务数是 1。回到我们的主程序,启动事件循环和 crawl 方法:

loop.run_until_complete(crawler.crawl())

crawl 协程把 worker 们赶起来干活。它像一个主线程:阻塞在 join 上直到所有任务完成,同时 worker 们在后台运行。

    @asyncio.coroutine
    def crawl(self):
        """Run the crawler until all work is done."""
        workers = [asyncio.Task(self.work())
                   for _ in range(self.max_tasks)]

        # When all work is done, exit.
        yield from self.q.join()
        for w in workers:
            w.cancel()

如果 worker 是线程,可能我们不会一次把它们全部创建出来。为了避免创建线程的昂贵代价,通常一个线程池会按需增长。但是协程很廉价,我们可以直接把他们全部创建出来。

怎么关闭这个 crawler 很有趣。当 join 完成,worker 存活但是被暂停:他们等待更多的 URL,所以主协程要在退出之前清除它们。否则 Python 解释器关闭并调用所有对象的析构函数时,活着的 worker 会哭喊到:

ERROR:asyncio:Task was destroyed but it is pending!

cancel 又是如何工作的呢?生成器还有一个我们还没介绍的特点。你可以从外部抛一个异常给它:

>>> gen = gen_fn()
>>> gen.send(None)  # Start the generator as usual.
1
>>> gen.throw(Exception('error'))
Traceback (most recent call last):
  File "<input>", line 3, in <module>
  File "<input>", line 2, in gen_fn
Exception: error

生成器被 throw 恢复,但是它现在抛出一个异常。如过生成器的调用堆栈中没有捕获异常的代码,这个异常被传递到顶层。所以注销一个协程:

    # Method of Task class.
    def cancel(self):
        self.coro.throw(CancelledError)

任何时候生成器暂停,在某些 yield from 语句它恢复并且抛出一个异常。我们在 task 的 step 方法中处理注销。

    # Method of Task class.
    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except CancelledError:
            self.cancelled = True
            return
        except StopIteration:
            return

        next_future.add_done_callback(self.step)

现在 task 知道它被注销了,所以当它被销毁时,它不再抱怨。

一旦 crawl 注销了 worker,它就退出。同时事件循环看见这个协程结束了(我们后面会见到的),也就退出。

loop.run_until_complete(crawler.crawl())

crawl 方法包含了所有主协程需要做的事。而 worker 则完成从队列中获取 URL、获取网页、解析它们得到新的链接。每个 worker 独立地运行 work 协程:

    @asyncio.coroutine
    def work(self):
        while True:
            url, max_redirect = yield from self.q.get()

            # Download page and add new links to self.q.
            yield from self.fetch(url, max_redirect)
            self.q.task_done()

Python 看见这段代码包含 yield from 语句,就把它编译成生成器函数。所以在 crawl 方法中,我们调用了 10 次 self.work,但并没有真正执行,它仅仅创建了 10 个指向这段代码的生成器对象并把它们包装成 Task 对象。task 接收每个生成器所 yield 的 future,通过调用 send 方法,当 future 解决时,用 future 的结果做为 send 的参数,来驱动它。由于生成器有自己的栈帧,它们可以独立运行,带有独立的局部变量和指令指针。

worker 使用队列来协调其小伙伴。它这样等待新的 URL:

    url, max_redirect = yield from self.q.get()

队列的 get 方法自身也是一个协程,它一直暂停到有新的 URL 进入队列,然后恢复并返回该条目。

碰巧,这也是当主协程注销 worker 时,最后 crawl 停止,worker 协程暂停的地方。从协程的角度,yield from 抛出CancelledError 结束了它在循环中的最后旅程。

worker 获取一个网页,解析链接,把新的链接放入队列中,接着调用task_done减小计数器。最终一个worker遇到一个没有新链接的网页,并且队列里也没有任务,这次task_done的调用使计数器减为0,而crawl正阻塞在join方法上,现在它就可以结束了。

我们承诺过要解释为什么队列中要使用序对,像这样:

# URL to fetch, and the number of redirects left.
('http://xkcd.com/353', 10)

新的 URL 的重定向次数是10。获取一个特别的 URL 会重定向一个新的位置。我们减小重定向次数,并把新的 URL 放入队列中。

# URL with a trailing slash. Nine redirects left.
('http://xkcd.com/353/', 9)

我们使用的 aiohttp 默认会跟踪重定向并返回最终结果。但是,我们告诉它不要这样做,爬虫自己来处理重定向,以便它可以合并那些目的相同的重定向路径:如果我们已经在 self.seen_urls 看到一个 URL,说明它已经从其他的地方走过这条路了。

Figure 5.4 - Redirects

crawler 获取“foo”并发现它重定向到了“baz”,所以它会加“baz”到队列和 seen_urls 中。如果它获取的下一个页面“bar” 也重定向到“baz”,fetcher 不会再次将 “baz”加入到队列中。如果该响应是一个页面,而不是一个重定向,fetch 会解析它的链接,并把新链接放到队列中。

    @asyncio.coroutine
    def fetch(self, url, max_redirect):
        # Handle redirects ourselves.
        response = yield from self.session.get(
            url, allow_redirects=False)

        try:
            if is_redirect(response):
                if max_redirect > 0:
                    next_url = response.headers['location']
                    if next_url in self.seen_urls:
                        # We have been down this path before.
                        return

                    # Remember we have seen this URL.
                    self.seen_urls.add(next_url)

                    # Follow the redirect. One less redirect remains.
                    self.q.put_nowait((next_url, max_redirect - 1))
             else:
                 links = yield from self.parse_links(response)
                 # Python set-logic:
                 for link in links.difference(self.seen_urls):
                    self.q.put_nowait((link, self.max_redirect))
                self.seen_urls.update(links)
        finally:
            # Return connection to pool.
            yield from response.release()

如果这是多进程代码,就有可能遇到讨厌的竞争条件。比如,一个 worker 检查一个链接是否在 seen_urls 中,如果没有它就把这个链接加到队列中并把它放到 seen_urls 中。如果它在这两步操作之间被中断,而另一个 worker 解析到相同的链接,发现它并没有出现在 seen_urls 中就把它加入队列中。这(至少)导致同样的链接在队列中出现两次,做了重复的工作和错误的统计。

然而,一个协程只在 yield from 时才会被中断。这是协程比多线程少遇到竞争条件的关键。多线程必须获得锁来明确的进入一个临界区,否则它就是可中断的。而 Python 的协程默认是不会被中断的,只有它明确 yield 时才主动放弃控制权。

我们不再需要在用回调方式时用的 fetcher 类了。这个类只是不高效回调的一个变通方法:在等待 I/O 时,它需要一个存储状态的地方,因为局部变量并不能在函数调用间保留。倒是 fetch 协程可以像普通函数一样用局部变量保存它的状态,所以我们不再需要一个类。

fetch 完成对服务器响应的处理,它返回到它的调用者 workwork 方法对队列调用 task_done,接着从队列中取出一个要获取的 URL。

fetch 把新的链接放入队列中,它增加未完成的任务计数器,并停留在主协程,主协程在等待 q.join,处于暂停状态。而当没有新的链接并且这是队列中最后一个 URL 时,当 work 调用task\_done,任务计数器变为 0,主协程从join` 中退出。

与 worker 和主协程一起工作的队列代码像这样(实际的 asyncio.Queue 实现在 Future 所展示的地方使用 asyncio.Event 。不同之处在于 Event 是可以重置的,而 Future 不能从已解决返回变成待决。)

class Queue:
    def __init__(self):
        self._join_future = Future()
        self._unfinished_tasks = 0
        # ... other initialization ...

    def put_nowait(self, item):
        self._unfinished_tasks += 1
        # ... store the item ...

    def task_done(self):
        self._unfinished_tasks -= 1
        if self._unfinished_tasks == 0:
            self._join_future.set_result(None)

    @asyncio.coroutine
    def join(self):
        if self._unfinished_tasks > 0:
            yield from self._join_future

主协程 crawl yield from join。所以当最后一个 worker 把计数器减为 0,它告诉 crawl 恢复运行并结束。

旅程快要结束了。我们的程序从 crawl 调用开始:

loop.run_until_complete(self.crawler.crawl())

程序如何结束?因为 crawl 是一个生成器函数,调用它返回一个生成器。为了驱动它,asyncio 把它包装成一个 task:

class EventLoop:
    def run_until_complete(self, coro):
        """Run until the coroutine is done."""
        task = Task(coro)
        task.add_done_callback(stop_callback)
        try:
            self.run_forever()
        except StopError:
            pass

class StopError(BaseException):
    """Raised to stop the event loop."""

def stop_callback(future):
    raise StopError

当这个任务完成,它抛出 StopError,事件循环把这个异常当作正常退出的信号。

但是,task 的 add_done_callbockresult 方法又是什么呢?你可能认为 task 就像一个 future,不错,你的直觉是对的。我们必须承认一个向你隐藏的细节,task 是 future。

class Task(Future):
    """A coroutine wrapped in a Future."""

通常,一个 future 被别人调用 set_result 解决。但是 task,当协程结束时,它自己解决自己。记得我们解释过当 Python 生成器返回时,它抛出一个特殊的 StopIteration 异常:

    # Method of class Task.
    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except CancelledError:
            self.cancelled = True
            return
        except StopIteration as exc:

            # Task resolves itself with coro's return
            # value.
            self.set_result(exc.value)
            return

        next_future.add_done_callback(self.step)

所以当事件循环调用 task.add_done_callback(stop_callback),它就准备被这个 task 停止。在看一次run_until_complete

    # Method of event loop.
    def run_until_complete(self, coro):
        task = Task(coro)
        task.add_done_callback(stop_callback)
        try:
            self.run_forever()
        except StopError:
            pass

当 task 捕获 StopIteration 并解决自己,这个回调从循环中抛出 StopError。循环结束,调用栈回到run_until_complete。我们的程序结束。

总结

现代的程序越来越多是 I/O 密集型而不是 CPU 密集型。对于这样的程序,Python 的线程在两个方面不合适:全局解释器锁阻止真正的并行计算,并且抢占切换也导致他们更容易出现竞争。异步通常是正确的选择。但是随着基于回调的异步代码增加,它会变得非常混乱。协程是一个更整洁的替代者。它们自然地重构成子过程,有健全的异常处理和栈追溯。

如果我们换个角度看 yield from 语句,一个协程看起来像一个传统的做阻塞 I/O 的线程。甚至我们可以采用经典的多线程模式编程,不需要重新发明。因此,与回调相比,协程更适合有经验的多线程的编码者。

但是当我们睁开眼睛关注 yield from 语句,我们能看到协程放弃控制权、允许其它人运行的标志点。不像多线程,协程展示出我们的代码哪里可以被中断哪里不能。在 Glyph Lefkowitz 富有启发性的文章“Unyielding”:“线程让局部推理变得困难,然而局部推理可能是软件开发中最重要的事”。然而,明确的 yield,让“通过过程本身而不是整个系统理解它的行为(和因此、正确性)”成为可能。

这章写于 Python 和异步的复兴时期。你刚学到的基于生成器的的协程,在 2014 年发布在 Python 3.4 的 asyncio 模块中。2015 年 9 月,Python 3.5 发布,协程成为语言的一部分。这个原生的协程通过“async def”来声明, 使用“await”而不是“yield from”委托一个协程或者等待 Future。

除了这些优点,核心的思想不变。Python 新的原生协程与生成器只是在语法上不同,工作原理非常相似。事实上,在 Python 解释器中它们共用同一个实现方法。Task、Future 和事件循环在 asynico 中扮演着同样的角色。

你已经知道 asyncio 协程是如何工作的了,现在你可以忘记大部分的细节。这些机制隐藏在一个整洁的接口下。但是你对这基本原理的理解能让你在现代异步环境下正确而高效的编写代码。

(题图素材来自:ruth-tay.deviantart.com


via: http://aosabook.org/en/500L/pages/a-web-crawler-with-asyncio-coroutines.html

作者:A. Jesse Jiryu Davis , Guido van Rossum 译者:qingyunha 校对:wxy

本文由 LCTT 原创翻译,Linux中国 荣誉推出

本文作者:

A. Jesse Jiryu Davis 是纽约 MongoDB 的工程师。他编写了异步 MongoDB Python 驱动程序 Motor,也是 MongoDB C 驱动程序的开发领袖和 PyMongo 团队成员。 他也为 asyncio 和 Tornado 做了贡献,在 http://emptysqua.re 上写作。

Guido van Rossum 是主流编程语言 Python 的创造者,Python 社区称他为 BDFL (仁慈的终生大独裁者 (Benevolent Dictator For Life))——这是一个来自 Monty Python 短剧的称号。他的主页是 http://www.python.org/~guido/

协程

还记得我们对你许下的承诺么?我们可以写出这样的异步代码,它既有回调方式的高效,也有多线程代码的简洁。这个结合是同过一种称为 协程 coroutine 的模式来实现的。使用 Python3.4 标准库 asyncio 和一个叫“aiohttp”的包,在协程中获取一个网页是非常直接的( @asyncio.coroutine 修饰符并非魔法。事实上,如果它修饰的是一个生成器函数,并且没有设置 PYTHONASYNCIODEBUG 环境变量的话,这个修饰符基本上没啥用。它只是为了框架的其它部分方便,设置了一个属性 _is_coroutine 而已。也可以直接使用 asyncio 和裸生成器,而没有 @asyncio.coroutine 修饰符):

    @asyncio.coroutine
    def fetch(self, url):
        response = yield from self.session.get(url)
        body = yield from response.read()

它也是可扩展的。在作者 Jesse 的系统上,与每个线程 50k 内存相比,一个 Python 协程只需要 3k 内存。Python 很容易就可以启动上千个协程。

协程的概念可以追溯到计算机科学的远古时代,它很简单,一个可以暂停和恢复的子过程。线程是被操作系统控制的抢占式多任务,而协程的多任务是可合作的,它们自己选择什么时候暂停去执行下一个协程。

有很多协程的实现。甚至在 Python 中也有几种。Python 3.4 标准库 asyncio 中的协程是建立在生成器之上的,这是一个 Future 类和“yield from”语句。从 Python 3.5 开始,协程变成了语言本身的特性(“PEP 492 Coroutines with async and await syntax” 中描述了 Python 3.5 内置的协程)。然而,理解 Python 3.4 中这个通过语言原有功能实现的协程,是我们处理 Python 3.5 中原生协程的基础。

要解释 Python 3.4 中基于生成器的协程,我们需要深入生成器的方方面面,以及它们是如何在 asyncio 中用作协程的。我很高兴就此写点东西,想必你也希望继续读下去。我们解释了基于生成器的协程之后,就会在我们的异步网络爬虫中使用它们。

生成器如何工作

在你理解生成器之前,你需要知道普通的 Python 函数是怎么工作的。正常情况下,当一个函数调用一个子过程,这个被调用函数获得控制权,直到它返回或者有异常发生,才把控制权交给调用者:

>>> def foo():
...     bar()
...
>>> def bar():
...     pass

标准的 Python 解释器是用 C 语言写的。一个 Python 函数被调用所对应的 C 函数是 PyEval_EvalFrameEx。它获得一个 Python 栈帧结构并在这个栈帧的上下文中执行 Python 字节码。这里是 foo 函数的字节码:

>>> import dis
>>> dis.dis(foo)
  2           0 LOAD_GLOBAL              0 (bar)
              3 CALL_FUNCTION            0 (0 positional, 0 keyword pair)
              6 POP_TOP
              7 LOAD_CONST               0 (None)
             10 RETURN_VALUE

foo 函数在它栈中加载 bar 函数并调用它,然后把 bar 的返回值从栈中弹出,加载 None 值到堆栈并返回。

PyEval_EvalFrameEx 遇到 CALL_FUNCTION 字节码时,它会创建一个新的栈帧,并用这个栈帧递归的调用 PyEval_EvalFrameEx 来执行 bar 函数。

非常重要的一点是,Python 的栈帧在堆中分配!Python 解释器是一个标准的 C 程序,所以它的栈帧是正常的栈帧。但是 Python 的栈帧是在堆中处理。这意味着 Python 栈帧在函数调用结束后依然可以存在。我们在 bar 函数中保存当前的栈帧,交互式的看看这种现象:

>>> import inspect
>>> frame = None
>>> def foo():
...     bar()
...
>>> def bar():
...     global frame
...     frame = inspect.currentframe()
...
>>> foo()
>>> # The frame was executing the code for 'bar'.
>>> frame.f_code.co_name
'bar'
>>> # Its back pointer refers to the frame for 'foo'.
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'foo'

Figure 5.1 - Function Calls

现在该说 Python 生成器了,它使用同样构件——代码对象和栈帧——去完成一个不可思议的任务。

这是一个生成器函数:

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
...     

在 Python 把 gen_fn 编译成字节码的过程中,一旦它看到 yield 语句就知道这是一个生成器函数而不是普通的函数。它就会设置一个标志来记住这个事实:

>>> # The generator flag is bit position 5.
>>> generator_bit = 1 << 5
>>> bool(gen_fn.__code__.co_flags & generator_bit)
True

当你调用一个生成器函数,Python 看到这个标志,就不会实际运行它而是创建一个生成器:

>>> gen = gen_fn()
>>> type(gen)
<class 'generator'>

Python 生成器封装了一个栈帧和函数体代码的引用:

>>> gen.gi_code.co_name
'gen_fn'

所有通过调用 gen_fn 的生成器指向同一段代码,但都有各自的栈帧。这些栈帧不再任何一个C函数栈中,而是在堆空间中等待被使用:

Figure 5.2 - Generators

栈帧中有一个指向“最后执行指令”的指针。初始化为 -1,意味着它没开始运行:

>>> gen.gi_frame.f_lasti
-1

当我们调用 send 时,生成器一直运行到第一个 yield 语句处停止,并且 send 返回 1,因为这是 gen 传递给 yield 表达式的值。

>>> gen.send(None)
1

现在,生成器的指令指针是 3,所编译的Python 字节码一共有 56 个字节:

>>> gen.gi_frame.f_lasti
3
>>> len(gen.gi_code.co_code)
56

这个生成器可以在任何时候、任何函数中恢复运行,因为它的栈帧并不在真正的栈中,而是堆中。在调用链中它的位置也是不固定的,它不必遵循普通函数先进后出的顺序。它像云一样自由。

我们可以传递一个值 hello 给生成器,它会成为 yield 语句的结果,并且生成器会继续运行到第二个 yield 语句处。

>>> gen.send('hello')
result of yield: hello
2

现在栈帧中包含局部变量 result

>>> gen.gi_frame.f_locals
{'result': 'hello'}

其它从 gen_fn 创建的生成器有着它自己的栈帧和局部变量。

当我们再一次调用 send,生成器继续从第二个 yield 开始运行,以抛出一个特殊的 StopIteration 异常为结束。

>>> gen.send('goodbye')
result of 2nd yield: goodbye
Traceback (most recent call last):
  File "<input>", line 1, in <module>
StopIteration: done

这个异常有一个值 "done",它就是生成器的返回值。

使用生成器构建协程

所以生成器可以暂停,可以给它一个值让它恢复,并且它还有一个返回值。这些特性看起来很适合去建立一个不使用那种乱糟糟的意面似的回调异步编程模型。我们想创造一个这样的“协程”:一个在程序中可以和其他过程合作调度的过程。我们的协程将会是标准库 asyncio 中协程的一个简化版本,我们将使用生成器,futures 和 yield from 语句。

首先,我们需要一种方法去代表协程所需要等待的 future 事件。一个简化的版本是:

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

一个 future 初始化为“未解决的”,它通过调用 set_result 来“解决”。(这个 future 缺少很多东西,比如说,当这个 future 解决后, 生成 yield 的协程应该马上恢复而不是暂停,但是在我们的代码中却不没有这样做。参见 asyncio 的 Future 类以了解其完整实现。)

让我们用 future 和协程来改写我们的 fetcher。我们之前用回调写的 fetch 如下:

class Fetcher:
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

    def connected(self, key, mask):
        print('connected!')
        # And so on....

fetch 方法开始连接一个套接字,然后注册 connected 回调函数,它会在套接字建立连接后调用。现在我们使用协程把这两步合并:

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass

        f = Future()

        def on_connected():
            f.set_result(None)

        selector.register(sock.fileno(),
                          EVENT_WRITE,
                          on_connected)
        yield f
        selector.unregister(sock.fileno())
        print('connected!')

现在,fetch 是一个生成器,因为它有一个 yield 语句。我们创建一个未决的 future,然后 yield 它,暂停 fetch 直到套接字连接建立。内联函数 on_connected 解决这个 future。

但是当 future 被解决,谁来恢复这个生成器?我们需要一个协程驱动器。让我们叫它 “task”:

class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except StopIteration:
            return

        next_future.add_done_callback(self.step)

# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
Task(fetcher.fetch())

loop()

task 通过传递一个 None 值给 fetch 来启动它。fetch 运行到它 yeild 出一个 future,这个 future 被作为 next_future 而捕获。当套接字连接建立,事件循环运行回调函数 on_connected,这里 future 被解决,step 被调用,fetch 恢复运行。

用 yield from 重构协程

一旦套接字连接建立,我们就可以发送 HTTP GET 请求,然后读取服务器响应。不再需要哪些分散在各处的回调函数,我们把它们放在同一个生成器函数中:

    def fetch(self):
        # ... connection logic from above, then:
        sock.send(request.encode('ascii'))

        while True:
            f = Future()

            def on_readable():
                f.set_result(sock.recv(4096))

            selector.register(sock.fileno(),
                              EVENT_READ,
                              on_readable)
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                # Done reading.
                break

从套接字中读取所有信息的代码看起来很通用。我们能不把它从 fetch 中提取成一个子过程?现在该 Python 3 热捧的 yield from 登场了。它能让一个生成器委派另一个生成器。

让我们先回到原来那个简单的生成器例子:

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
...     

为了从其他生成器调用这个生成器,我们使用 yield from 委派它:

>>> # Generator function:
>>> def caller_fn():
...     gen = gen_fn()
...     rv = yield from gen
...     print('return value of yield-from: {}'
...           .format(rv))
...
>>> # Make a generator from the
>>> # generator function.
>>> caller = caller_fn()

这个 caller 生成器的行为的和它委派的生成器 gen 表现的完全一致:

>>> caller.send(None)
1
>>> caller.gi_frame.f_lasti
15
>>> caller.send('hello')
result of yield: hello
2
>>> caller.gi_frame.f_lasti  # Hasn't advanced.
15
>>> caller.send('goodbye')
result of 2nd yield: goodbye
return value of yield-from: done
Traceback (most recent call last):
  File "<input>", line 1, in <module>
StopIteration

callergen 生成(yield),caller 就不再前进。注意到 caller 的指令指针保持15不变,就是 yield from 的地方,即使内部的生成器 gen 从一个 yield 语句运行到下一个 yield,它始终不变。(事实上,这就是“yield from”在 CPython 中工作的具体方式。函数会在执行每个语句之前提升其指令指针。但是在外部生成器执行“yield from”后,它会将其指令指针减一,以保持其固定在“yield form”语句上。然后其生成其 caller。这个循环不断重复,直到内部生成器抛出 StopIteration,这里指向外部生成器最终允许它自己进行到下一条指令的地方。)从 caller 外部来看,我们无法分辨 yield 出的值是来自 caller 还是它委派的生成器。而从 gen 内部来看,我们也不能分辨传给它的值是来自 caller 还是 caller 的外面。yield from 语句是一个光滑的管道,值通过它进出 gen,一直到 gen 结束。

协程可以用 yield from 把工作委派给子协程,并接收子协程的返回值。注意到上面的 caller 打印出“return value of yield-from: done”。当 gen 完成后,它的返回值成为 calleryield from 语句的值。

    rv = yield from gen

前面我们批评过基于回调的异步编程模式,其中最大的不满是关于 “ 堆栈撕裂 stack ripping ”:当一个回调抛出异常,它的堆栈回溯通常是毫无用处的。它只显示出事件循环运行了它,而没有说为什么。那么协程怎么样?

>>> def gen_fn():
...     raise Exception('my error')
>>> caller = caller_fn()
>>> caller.send(None)
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "<input>", line 3, in caller_fn
  File "<input>", line 2, in gen_fn
Exception: my error

这还是非常有用的,当异常抛出时,堆栈回溯显示出 caller_fn 委派了 gen_fn。令人更欣慰的是,你可以在一次异常处理器中封装这个调用到一个子过程中,像正常函数一样:

>>> def gen_fn():
...     yield 1
...     raise Exception('uh oh')
...
>>> def caller_fn():
...     try:
...         yield from gen_fn()
...     except Exception as exc:
...         print('caught {}'.format(exc))
...
>>> caller = caller_fn()
>>> caller.send(None)
1
>>> caller.send('hello')
caught uh oh

所以我们可以像提取子过程一样提取子协程。让我们从 fetcher 中提取一些有用的子协程。我们先写一个可以读一块数据的协程 read

def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield f  # Read one chunk.
    selector.unregister(sock.fileno())
    return chunk

read 的基础上,read_all 协程读取整个信息:

def read_all(sock):
    response = []
    # Read whole response.
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)

    return b''.join(response)

如果你换个角度看,抛开 yield form 语句的话,它们就像在做阻塞 I/O 的普通函数一样。但是事实上,readread_all 都是协程。yield from read 暂停 read_all 直到 I/O 操作完成。当 read_all 暂停时,asyncio 的事件循环正在做其它的工作并等待其他的 I/O 操作。read 在下次循环中当事件就绪,完成 I/O 操作时,read_all 恢复运行。

最终,fetch 调用了 read_all

class Fetcher:
    def fetch(self):
         # ... connection logic from above, then:
        sock.send(request.encode('ascii'))
        self.response = yield from read_all(sock)

神奇的是,Task 类不需要做任何改变,它像以前一样驱动外部的 fetch 协程:

Task(fetcher.fetch())
loop()

read yield 一个 future 时,task 从 yield from 管道中接收它,就像这个 future 直接从 fetch yield 一样。当循环解决一个 future 时,task 把它的结果送给 fetch,通过管道,read 接受到这个值,这完全就像 task 直接驱动 read 一样:

Figure 5.3 - Yield From

为了完善我们的协程实现,我们再做点打磨:当等待一个 future 时,我们的代码使用 yield;而当委派一个子协程时,使用 yield from。不管是不是协程,我们总是使用 yield form 会更精炼一些。协程并不需要在意它在等待的东西是什么类型。

在 Python 中,我们从生成器和迭代器的高度相似中获得了好处,将生成器进化成 caller,迭代器也可以同样获得好处。所以,我们可以通过特殊的实现方式来迭代我们的 Future 类:

    # Method on Future class.
    def __iter__(self):
        # Tell Task to resume me here.
        yield self
        return self.result

future 的 __iter__ 方法是一个 yield 它自身的一个协程。当我们将代码替换如下时:

# f is a Future.
yield f

以及……:

# f is a Future.
yield from f

……结果是一样的!驱动 Task 从它的调用 send 中接收 future,并当 future 解决后,它发回新的结果给该协程。

在每个地方都使用 yield from 的好处是什么?为什么比用 field 等待 future 并用 yield from 委派子协程更好?之所以更好的原因是,一个方法可以自由地改变其实行而不影响到其调用者:它可以是一个当 future 解决后返回一个值的普通方法,也可以是一个包含 yield from 语句并返回一个值的协程。无论是哪种情况,调用者仅需要 yield from 该方法以等待结果就行。

亲爱的读者,我们已经完成了对 asyncio 协程探索。我们深入观察了生成器的机制,实现了简单的 future 和 task。我们指出协程是如何利用两个世界的优点:比线程高效、比回调清晰的并发 I/O。当然真正的 asyncio 比我们这个简化版本要复杂的多。真正的框架需要处理zero-copy I/0、公平调度、异常处理和其他大量特性。

使用 asyncio 编写协程代码比你现在看到的要简单的多。在前面的代码中,我们从基本原理去实现协程,所以你看到了回调,task 和 future,甚至非阻塞套接字和 select 调用。但是当用 asyncio 编写应用,这些都不会出现在你的代码中。我们承诺过,你可以像这样下载一个网页:

    @asyncio.coroutine
    def fetch(self, url):
        response = yield from self.session.get(url)
        body = yield from response.read()

对我们的探索还满意么?回到我们原始的任务:使用 asyncio 写一个网络爬虫。

(题图素材来自:ruth-tay.deviantart.com


via: http://aosabook.org/en/500L/pages/a-web-crawler-with-asyncio-coroutines.html

作者:A. Jesse Jiryu Davis , Guido van Rossum 译者:qingyunha 校对:wxy

本文由 LCTT 原创翻译,Linux中国 荣誉推出

本文作者:

A. Jesse Jiryu Davis 是纽约 MongoDB 的工程师。他编写了异步 MongoDB Python 驱动程序 Motor,也是 MongoDB C 驱动程序的开发领袖和 PyMongo 团队成员。 他也为 asyncio 和 Tornado 做了贡献,在 http://emptysqua.re 上写作。

Guido van Rossum 是主流编程语言 Python 的创造者,Python 社区称他为 BDFL ( 仁慈的终生大独裁者 Benevolent Dictator For Life )——这是一个来自 Monty Python 短剧的称号。他的主页是 http://www.python.org/~guido/

介绍

经典的计算机科学强调高效的算法,尽可能快地完成计算。但是很多网络程序的时间并不是消耗在计算上,而是在等待许多慢速的连接或者低频事件的发生。这些程序暴露出一个新的挑战:如何高效的等待大量网络事件。一个现代的解决方案是异步 I/O。

这一章我们将实现一个简单的网络爬虫。这个爬虫只是一个原型式的异步应用,因为它等待许多响应而只做少量的计算。一次爬的网页越多,它就能越快的完成任务。如果它为每个动态的请求启动一个线程的话,随着并发请求数量的增加,它会在耗尽套接字之前,耗尽内存或者线程相关的资源。使用异步 I/O 可以避免这个的问题。

我们将分三个阶段展示这个例子。首先,我们会实现一个事件循环并用这个事件循环和回调来勾画出一只网络爬虫。它很有效,但是当把它扩展成更复杂的问题时,就会导致无法管理的混乱代码。然后,由于 Python 的协程不仅有效而且可扩展,我们将用 Python 的生成器函数实现一个简单的协程。在最后一个阶段,我们将使用 Python 标准库“asyncio”中功能完整的协程, 并通过异步队列完成这个网络爬虫。(在 PyCon 2013 上,Guido 介绍了标准的 asyncio 库,当时称之为“Tulip”。)

任务

网络爬虫寻找并下载一个网站上的所有网页,也许还会把它们存档,为它们建立索引。从根 URL 开始,它获取每个网页,解析出没有遇到过的链接加到队列中。当网页没有未见到过的链接并且队列为空时,它便停止运行。

我们可以通过同时下载大量的网页来加快这一过程。当爬虫发现新的链接,它使用一个新的套接字并行的处理这个新链接,解析响应,添加新链接到队列。当并发很大时,可能会导致性能下降,所以我们会限制并发的数量,在队列保留那些未处理的链接,直到一些正在执行的任务完成。

传统方式

怎么使一个爬虫并发?传统的做法是创建一个线程池,每个线程使用一个套接字在一段时间内负责一个网页的下载。比如,下载 xkcd.com 网站的一个网页:

def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0
Host: xkcd.com

'.format(url)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)

    # Page is now downloaded.
    links = parse_links(response)
    q.add(links)

套接字操作默认是阻塞的:当一个线程调用一个类似 connectrecv 方法时,它会阻塞,直到操作完成。(即使是 send 也能被阻塞,比如接收端在接受外发消息时缓慢而系统的外发数据缓存已经满了的情况下)因此,为了同一时间内下载多个网页,我们需要很多线程。一个复杂的应用会通过线程池保持空闲的线程来分摊创建线程的开销。同样的做法也适用于套接字,使用连接池。

到目前为止,使用线程的是成本昂贵的,操作系统对一个进程、一个用户、一台机器能使用线程做了不同的硬性限制。在 作者 Jesse 的系统中,一个 Python 线程需要 50K 的内存,开启上万个线程就会失败。每个线程的开销和系统的限制就是这种方式的瓶颈所在。

在 Dan Kegel 那一篇很有影响力的文章“The C10K problem”中,它提出了多线程方式在 I/O 并发上的局限性。他在开始写道,

网络服务器到了要同时处理成千上万的客户的时代了,你不这样认为么?毕竟,现在网络规模很大了。

Kegel 在 1999 年创造出“C10K”这个术语。一万个连接在今天看来还是可接受的,但是问题依然存在,只不过大小不同。回到那时候,对于 C10K 问题,每个连接启一个线程是不切实际的。现在这个限制已经成指数级增长。确实,我们的玩具网络爬虫使用线程也可以工作的很好。但是,对于有着千万级连接的大规模应用来说,限制依然存在:它会消耗掉所有线程,即使套接字还够用。那么我们该如何解决这个问题?

异步

异步 I/O 框架在一个线程中完成并发操作。让我们看看这是怎么做到的。

异步框架使用非阻塞套接字。异步爬虫中,我们在发起到服务器的连接前把套接字设为非阻塞:

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass

对一个非阻塞套接字调用 connect 方法会立即抛出异常,即使它可以正常工作。这个异常复现了底层 C 语言函数令人厌烦的行为,它把 errno 设置为 EINPROGRESS,告诉你操作已经开始。

现在我们的爬虫需要一种知道连接何时建立的方法,这样它才能发送 HTTP 请求。我们可以简单地使用循环来重试:

request = 'GET {} HTTP/1.0
Host: xkcd.com

'.format(url)
encoded = request.encode('ascii')

while True:
    try:
        sock.send(encoded)
        break  # Done.
    except OSError as e:
        pass

print('sent')

这种方法不仅消耗 CPU,也不能有效的等待多个套接字。在远古时代,BSD Unix 的解决方法是 select,这是一个 C 函数,它在一个或一组非阻塞套接字上等待事件发生。现在,互联网应用大量连接的需求,导致 selectpoll 所代替,在 BSD 上的实现是 kqueue ,在 Linux 上是 epoll。它们的 API 和 select 相似,但在大数量的连接中也能有较好的性能。

Python 3.4 的 DefaultSelector 会使用你系统上最好的 select 类函数。要注册一个网络 I/O 事件的提醒,我们会创建一个非阻塞套接字,并使用默认 selector 注册它。

from selectors import DefaultSelector, EVENT_WRITE

selector = DefaultSelector()

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass

def connected():
    selector.unregister(sock.fileno())
    print('connected!')

selector.register(sock.fileno(), EVENT_WRITE, connected)

我们不理会这个伪造的错误,调用 selector.register,传递套接字文件描述符和一个表示我们想要监听什么事件的常量表达式。为了当连接建立时收到提醒,我们使用 EVENT_WRITE :它表示什么时候这个套接字可写。我们还传递了一个 Python 函数 connected,当对应事件发生时被调用。这样的函数被称为回调

在一个循环中,selector 接收到 I/O 提醒时我们处理它们。

def loop():
    while True:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

connected 回调函数被保存在 event_key.data 中,一旦这个非阻塞套接字建立连接,它就会被取出来执行。

不像我们前面那个快速轮转的循环,这里的 select 调用会暂停,等待下一个 I/O 事件,接着执行等待这些事件的回调函数。没有完成的操作会保持挂起,直到进到下一个事件循环时执行。

到目前为止我们展现了什么?我们展示了如何开始一个 I/O 操作和当操作准备好时调用回调函数。异步框架,它在单线程中执行并发操作,其建立在两个功能之上,非阻塞套接字和事件循环。

我们这里达成了“ 并发性 concurrency ”,但不是传统意义上的“ 并行性 parallelism ”。也就是说,我们构建了一个可以进行重叠 I/O 的微小系统,它可以在其它操作还在进行的时候就开始一个新的操作。它实际上并没有利用多核来并行执行计算。这个系统是用于解决 I/O 密集 I/O-bound 问题的,而不是解决 CPU 密集 CPU-bound 问题的。(Python 的全局解释器锁禁止在一个进程中以任何方式并行执行 Python 代码。在 Python 中并行化 CPU 密集的算法需要多个进程,或者以将该代码移植为 C 语言并行版本。但是这是另外一个话题了。)

所以,我们的事件循环在并发 I/O 上是有效的,因为它并不用为每个连接拨付线程资源。但是在我们开始前,我们需要澄清一个常见的误解:异步比多线程快。通常并不是这样的,事实上,在 Python 中,在处理少量非常活跃的连接时,像我们这样的事件循环是慢于多线程的。在运行时环境中是没有全局解释器锁的,在同样的负载下线程会执行的更好。异步 I/O 真正适用于事件很少、有许多缓慢或睡眠的连接的应用程序。(Jesse 在“什么是异步,它如何工作,什么时候该用它?”一文中指出了异步所适用和不适用的场景。Mike Bayer 在“异步 Python 和数据库”一文中比较了不同负载情况下异步 I/O 和多线程的不同。)

回调

用我们刚刚建立的异步框架,怎么才能完成一个网络爬虫?即使是一个简单的网页下载程序也是很难写的。

首先,我们有一个尚未获取的 URL 集合,和一个已经解析过的 URL 集合。

urls_todo = set(['/'])
seen_urls = set(['/'])

seen_urls 集合包括 urls_todo 和已经完成的 URL。用根 URL / 初始化它们。

获取一个网页需要一系列的回调。在套接字连接建立时会触发 connected 回调,它向服务器发送一个 GET 请求。但是它要等待响应,所以我们需要注册另一个回调函数;当该回调被调用,它仍然不能读取到完整的请求时,就会再一次注册回调,如此反复。

让我们把这些回调放在一个 Fetcher 对象中,它需要一个 URL,一个套接字,还需要一个地方保存返回的字节:

class Fetcher:
    def __init__(self, url):
        self.response = b''  # Empty array of bytes.
        self.url = url
        self.sock = None

我们的入口点在 Fetcher.fetch

    # Method on Fetcher class.
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass

        # Register next callback.
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

fetch 方法从连接一个套接字开始。但是要注意这个方法在连接建立前就返回了。它必须将控制返回到事件循环中等待连接建立。为了理解为什么要这样做,假设我们程序的整体结构如下:

# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
fetcher.fetch()

while True:
    events = selector.select()
    for event_key, event_mask in events:
        callback = event_key.data
        callback(event_key, event_mask)

当调用 select 函数后,所有的事件提醒才会在事件循环中处理,所以 fetch 必须把控制权交给事件循环,这样我们的程序才能知道什么时候连接已建立,接着循环调用 connected 回调,它已经在上面的 fetch 方法中注册过。

这里是我们的 connected 方法的实现:

    # Method on Fetcher class.
    def connected(self, key, mask):
        print('connected!')
        selector.unregister(key.fd)
        request = 'GET {} HTTP/1.0
Host: xkcd.com

'.format(self.url)
        self.sock.send(request.encode('ascii'))

        # Register the next callback.
        selector.register(key.fd,
                          EVENT_READ,
                          self.read_response)

这个方法发送一个 GET 请求。一个真正的应用会检查 send 的返回值,以防所有的信息没能一次发送出去。但是我们的请求很小,应用也不复杂。它只是简单的调用 send,然后等待响应。当然,它必须注册另一个回调并把控制权交给事件循环。接下来也是最后一个回调函数 read_response,它处理服务器的响应:

    # Method on Fetcher class.
    def read_response(self, key, mask):
        global stopped

        chunk = self.sock.recv(4096)  # 4k chunk size.
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)  # Done reading.
            links = self.parse_links()

            # Python set-logic:
            for link in links.difference(seen_urls):
                urls_todo.add(link)
                Fetcher(link).fetch()  # <- New Fetcher.

            seen_urls.update(links)
            urls_todo.remove(self.url)
            if not urls_todo:
                stopped = True

这个回调在每次 selector 发现套接字可读时被调用,可读有两种情况:套接字接受到数据或它被关闭。

这个回调函数从套接字读取 4K 数据。如果不到 4k,那么有多少读多少。如果比 4K 多,chunk 中只包 4K 数据并且这个套接字保持可读,这样在事件循环的下一个周期,会再次回到这个回调函数。当响应完成时,服务器关闭这个套接字,chunk 为空。

这里没有展示的 parse_links 方法,它返回一个 URL 集合。我们为每个新的 URL 启动一个 fetcher。注意一个使用异步回调方式编程的好处:我们不需要为共享数据加锁,比如我们往 seen_urls 增加新链接时。这是一种非抢占式的多任务,它不会在我们代码中的任意一个地方被打断。

我们增加了一个全局变量 stopped,用它来控制这个循环:

stopped = False

def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

一旦所有的网页被下载下来,fetcher 停止这个事件循环,程序退出。

这个例子让异步编程的一个问题明显的暴露出来:意大利面代码。

我们需要某种方式来表达一系列的计算和 I/O 操作,并且能够调度多个这样的系列操作让它们并发的执行。但是,没有线程你不能把这一系列操作写在一个函数中:当函数开始一个 I/O 操作,它明确的把未来所需的状态保存下来,然后返回。你需要考虑如何写这个状态保存的代码。

让我们来解释下这到底是什么意思。先来看一下在线程中使用通常的阻塞套接字来获取一个网页时是多么简单。

# Blocking version.
def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0
Host: xkcd.com

'.format(url)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)

    # Page is now downloaded.
    links = parse_links(response)
    q.add(links)

在一个套接字操作和下一个操作之间这个函数到底记住了什么状态?它有一个套接字,一个 URL 和一个可增长的 response。运行在线程中的函数使用编程语言的基本功能来在栈中的局部变量保存这些临时状态。这样的函数也有一个“continuation”——它会在 I/O 结束后执行这些代码。运行时环境通过线程的指令指针来记住这个 continuation。你不必考虑怎么在 I/O 操作后恢复局部变量和这个 continuation。语言本身的特性帮你解决。

但是用一个基于回调的异步框架时,这些语言特性不能提供一点帮助。当等待 I/O 操作时,一个函数必须明确的保存它的状态,因为它会在 I/O 操作完成之前返回并清除栈帧。在我们基于回调的例子中,作为局部变量的替代,我们把 sockresponse 作为 Fetcher 实例 self 的属性来存储。而作为指令指针的替代,它通过注册 connectedread_response 回调来保存它的 continuation。随着应用功能的增长,我们需要手动保存的回调的复杂性也会增加。如此繁复的记账式工作会让编码者感到头痛。

更糟糕的是,当我们的回调函数抛出异常会发生什么?假设我们没有写好 parse_links 方法,它在解析 HTML 时抛出异常:

Traceback (most recent call last):
  File "loop-with-callbacks.py", line 111, in <module>
    loop()
  File "loop-with-callbacks.py", line 106, in loop
    callback(event_key, event_mask)
  File "loop-with-callbacks.py", line 51, in read_response
    links = self.parse_links()
  File "loop-with-callbacks.py", line 67, in parse_links
    raise Exception('parse error')
Exception: parse error

这个堆栈回溯只能显示出事件循环调用了一个回调。我们不知道是什么导致了这个错误。这条链的两边都被破坏:不知道从哪来也不知到哪去。这种丢失上下文的现象被称为“ 堆栈撕裂 stack ripping ”,经常会导致无法分析原因。它还会阻止我们为回调链设置异常处理,即那种用“try / except”块封装函数调用及其调用树。(对于这个问题的更复杂的解决方案,参见 http://www.tornadoweb.org/en/stable/stack_context.html

所以,除了关于多线程和异步哪个更高效的长期争议之外,还有一个关于这两者之间的争论:谁更容易跪了。如果在同步上出现失误,线程更容易出现数据竞争的问题,而回调因为" 堆栈撕裂 stack ripping "问题而非常难于调试。

(题图素材来自:ruth-tay.deviantart.com


via: http://aosabook.org/en/500L/pages/a-web-crawler-with-asyncio-coroutines.html

作者:A. Jesse Jiryu Davis , Guido van Rossum 译者:qingyunha 校对:wxy

本文由 LCTT 原创翻译,Linux中国 荣誉推出

最近我开始发力钻研 Python 的新 asyncio 模块。原因是我需要做一些事情,使用事件 IO 会使这些事情工作得更好,炙手可热的 asynio 正好可以用来牛刀小试。从试用的经历来看,该模块比我预想的复杂许多,我现在可以非常肯定地说,我不知道该如何恰当地使用 asyncio。

从 Twisted 框架借鉴一些经验来理解 asynio 并非难事,但是,asyncio 包含众多的元素,我开始动摇,不知道如何将这些孤立的零碎拼图组合成一副完整的图画。我已没有足够的智力提出任何更好的建议,在这里,只想分享我的困惑,求大神指点。

原语

asyncio 通过 协程 coroutines 的帮助来实现异步 IO。最初它是通过 yieldyield from 表达式实现的一个库,因为 Python 语言本身演进的缘故,现在它已经变成一个更复杂的怪兽。所以,为了在同一个频道讨论下去,你需要了解如下一些术语:

  • 事件循环
  • 事件循环策略
  • awaitable
  • 协程函数
  • 老式协程函数
  • 协程
  • 协程封装
  • 生成器 generator
  • future
  • 并发的future
  • 任务 task
  • 句柄
  • 执行器 executor
  • 传输 transport
  • 协议

此外,Python 还新增了一些新的特殊方法:

  • __aenter____aenter__,用于异步块操作
  • __aiter____anext__,用于异步迭代器(异步循环和异步推导)。为了更强大些,协议已经改变过一次了。 在 Python 3.5 它返回一个 awaitable(这是个协程);在 3.6它返回一个新的异步生成器。
  • __await__,用于自定义的 awaitable

你还需要了解相当多的内容,文档涵盖了那些部分。尽管如此,我做了一些额外说明以便对其有更好的理解:

事件循环

asyncio 事件循环和你第一眼看上去的略有不同。表面看,每个线程都有一个事件循环,然而事实并非如此。我认为它们应该按照如下的方式工作:

  • 如果是主线程,当调用 asyncio.get_event_loop() 时创建一个事件循环。
  • 如果是其它线程,当调用 asyncio.get_event_loop() 时返回运行时错误。
  • 当前线程可以使用 asyncio.set_event_loop() 在任何时间节点绑定事件循环。该事件循环可由 asyncio.new_evet_loop() 函数创建。
  • 事件循环可以在不绑定到当前线程的情况下使用。
  • asyncio.get_event_loop() 返回绑定线程的事件循环,而非当前运行的事件循环。

这些行为的组合是超混淆的,主要有以下几个原因。 首先,你需要知道这些函数被委托到全局设置的底层事件循环策略。 默认是将事件循环绑定到线程。 或者,如果需要的话,可以在理论上将事件循环绑定到一个 greenlet 或类似的。 然而,重要的是要知道库代码不控制策略,因此不能推断 asyncio 将适用于线程。

其次,asyncio 不需要通过策略将事件循环绑定到上下文。 事件循环可以单独工作。 但是这正是库代码的第一个问题,因为协同程序或类似的东西并不知道哪个事件循环负责调度它。 这意味着,如果从协程中调用 asyncio.get_event_loop(),你可能没有机会取得事件循环。 这也是所有 API 均采用可选的显式事件循环参数的原因。 举例来说,要弄清楚当前哪个协程正在运行,不能使用如下调用:

def get_task():
    loop = asyncio.get_event_loop()
    try:
        return asyncio.Task.get_current(loop)
    except RuntimeError:
        return None

相反,必须显式地传递事件循环。 这进一步要求你在库代码中显式地遍历事件循环,否则可能发生很奇怪的事情。 我不知道这种设计的思想是什么,但如果不解决这个问题(例如 get_event_loop() 返回实际运行的事件循环),那么唯一有意义的其它方案是明确禁止显式事件循环传递,并要求它绑定到当前上下文(线程等)。

由于事件循环策略不提供当前上下文的标识符,因此库也不可能以任何方式“索引”到当前上下文。 也没有回调函数用来监视这样的上下文的拆除,这进一步限制了实际可以开展的操作。

awaitable 与 协程 coroutine

以我的愚见,Python 最大的设计错误是过度重载迭代器。它们现在不仅用于迭代,而且用于各种类型的协程。 Python 中迭代器最大的设计错误之一是如果 StopIteration 没有被捕获形成的空泡。 这可能导致非常令人沮丧的问题,其中某处的异常可能导致其它地方的生成器或协同程序中止。 这是一个长期存在的问题,基于 Python 的模板引擎如 Jinja 经常面临这种问题。 该模板引擎在内部渲染为生成器,并且当由于某种原因的模板引起 StopIteration 时,渲染就停止在那里。

Python 慢慢认识到了过度重载的教训。 首先在 3.x 版本加入 asyncio 模块,并没有语言级支持。 所以自始至终它不过仅仅是装饰器和生成器而已。 为了实现 yield from 以及其它东西,StopIteration 再次重载。 这导致了令人困惑的行为,像这样:

>>> def foo(n):
...  if n in (0, 1):
...   return [1]
...  for item in range(n):
...   yield item * 2
...
>>> list(foo(0))
[]
>>> list(foo(1))
[]
>>> list(foo(2))
[0, 2]

没有错误,没有警告。只是不是你所期望的行为。 这是因为从一个作为生成器的函数中 return 的值实际上引发了一个带有单个参数的 StopIteration,它不是由迭代器协议捕获的,而只是在协程代码中处理。

在 3.5 和 3.6 有很多改变,因为现在除了生成器我们还有协程对象。除了通过封装生成器来生成协程,没有其它可以直接生成协程的单独对象。它是通过用给函数加 async 前缀来实现。 例如 async def x() 会产生这样的协程。 现在在 3.6,将有单独的异步生成器,它通过触发 AsyncStopIteration 保持其独立性。 此外,对于Python 3.5 和更高版本,导入新的 future 对象(generator_stop),如果代码在迭代步骤中触发 StopIteration,它将引发 RuntimeError

为什么我提到这一切? 因为老的实现方式并未真的消失。 生成器仍然具有 sendthrow 方法以及协程仍然在很大程度上表现为生成器。你需要知道这些东西,它们将在未来伴随你相当长的时间。

为了统一很多这样的重复,现在我们在 Python 中有更多的概念了:

  • awaitable:具有__await__方法的对象。 由本地协同程序和旧式协同程序以及一些其它程序实现。
  • 协程函数 coroutinefunction :返回原生协程的函数。 不要与返回协程的函数混淆。
  • 协程 coroutine : 原生的协程程序。 注意,目前为止,当前文档不认为老式 asyncio 协程是协程程序。 至少 inspect.iscoroutine 不认为它是协程。 尽管它被 future/awaitable 分支接纳。

特别令人困惑的是 asyncio.iscoroutinefunctioninspect.iscoroutinefunction 正在做不同的事情,这与 inspect.iscoroutineinspect.iscoroutinefunction 情况相同。 值得注意的是,尽管 inspect 在类型检查中不知道有关 asycnio 旧式协程函数的任何信息,但是当您检查 awaitable 状态时它显然知道它们,即使它与 **await** 不一致。

协程封装器 coroutine wrapper

每当你运行 async def ,Python 就会调用一个线程局部的协程封装器。它由 sys.set_coroutine_wrapper 设置,并且它是可以包装这些东西的一个函数。 看起来有点像如下代码:

>>> import sys
>>> sys.set_coroutine_wrapper(lambda x: 42)
>>> async def foo():
...  pass
...
>>> foo()
__main__:1: RuntimeWarning: coroutine 'foo' was never awaited
42

在这种情况下,我从来没有实际调用原始的函数,只是给你一个提示,说明这个函数可以做什么。 目前我只能说它总是线程局部有效,所以,如果替换事件循环策略,你需要搞清楚如何让协程封装器在相同的上下文同步更新。创建的新线程不会从父线程继承那些标识。

这不要与 asyncio 协程封装代码混淆。

awaitable 和 future

有些东西是 awaitable 的。 据我所见,以下概念被认为是 awaitable:

  • 原生的协程
  • 配置了假的 CO_ITERABLE_COROUTINE 标识的生成器(文中有涉及)
  • 具有 __await__ 方法的对象

除了生成器由于历史遗留的原因不使用之外,其它的对象都使用 __await__ 方法。 CO_ITERABLE_COROUTINE 标志来自哪里?它来自一个协程封装器(现在与 sys.set_coroutine_wrapper 有些混淆),即 @asyncio.coroutine。 通过一些间接方法,它使用 types.coroutine(现在与 types.CoroutineTypeasyncio.coroutine 有些混淆)封装生成器,并通过另外一个标志 CO_ITERABLE_COROUTINE 重新创建内部代码对象。

所以既然我们知道这些东西是什么,那么什么是 future? 首先,我们需要澄清一件事情:在 Python 3 中,实际上有两种(完全不兼容)的 future 类型:asyncio.futures.Futureconcurrent.futures.Future。 其中一个出现在另一个之前,但它们都仍然在 asyncio 中使用。 例如,asyncio.run_coroutine_threadsafe() 将调度一个协程到在另一个线程中运行的事件循环,但它返回一个 concurrent.futures.Future 对象,而不是 asyncio.futures.Future 对象。 这是有道理的,因为只有 concurrent.futures.Future 对象是线程安全的。

所以现在我们知道有两个不兼容的 future,我们应该澄清哪个 future 在 asyncio 中。 老实说,我不完全确定差异在哪里,但我打算暂时称之为“最终”。它是一个最终将持有一个值的对象,当还在计算时你可以对最终结果做一些处理。 future 对象的一些变种称为 deferred,还有一些叫做 promise。 我实在难以理解它们真正的区别。

你能用一个 future 对象做什么? 你可以关联一个准备就绪时将被调用的回调函数,或者你可以关联一个 future 失败时将被触发的回调函数。 此外,你可以 await 它(它实现__await__,因此可等待),此外,future 也可以取消。

那么你怎样才能得到这样的 future 对象? 通过在 awaitable 对象上调用 asyncio.ensure_future。它会把一个旧版的生成器转变为 future 对象。 然而,如果你阅读文档,你会读到 asyncio.ensure_future 实际上返回一个task(任务)。 那么问题来了,什么是任务?

任务

任务 task 某种意义上是一个封装了协程的 futur 对象。它的工作方式和 future 类似,但它也有一些额外的方法来提取所包含的协程的当前堆栈。 我们已经见过了在前面提到过的任务,因为它是通过 Task.get_current 确定事件循环当前正在做什么的主要方式。

在如何取消工作方面,任务和 future 也有区别,但这超出了本文的范围。“取消”是它们自己最大的问题。 如果你处于一个协程中,并且知道自己正在运行,你可以通过前面提到的 Task.get_current 获取自己的任务,但这需要你知道自己被派遣在哪个事件循环,该事件循环可能是、也可能不是已绑定的那个线程。

协程不可能知道它与哪个循环一起使用。task 也没有提供该信息的公共 API。 然而,如果你确实可以获得一个任务,你可以访问 task._loop,通过它反指到事件循环。

句柄

除了上面提到的所有一切还有句柄。 句柄是等待执行的不透明对象,不可等待,但可以被取消。 特别是如果你使用 call_soon 或者 call_soon_threadsafe(还有其它一些)调度执行一个调用,你可以获得句柄,然后使用它尽力尝试取消执行,但不能等待实际调用生效。

执行器 Executor

因为你可以有多个事件循环,但这并不意味着每个线程理所当然地应用多个事件循环,最常见的情形还是一个线程一个事件循环。 那么你如何通知另一个事件循环做一些工作? 你不能到另一个线程的事件循环中执行回调函数并获取结果。 这种情况下,你需要使用执行器。

执行器 Executor 来自 concurrent.futures,它允许你将工作安排到本身未发生事件的线程中。 例如,如果在事件循环中使用 run_in_executor 来调度将在另一个线程中调用的函数。 其返回结果是 asyncio 协程,而不是像 run_coroutine_threadsafe 这样的并发协程。 我还没有足够的心智来弄清楚为什么设计这样的 API,应该如何使用,以及什么时候使用。 文档中建议执行器可以用于构建多进程。

传输和协议

我总是认为传输与协议也凌乱不堪,实际这部分内容基本上是对 Twisted 的逐字拷贝。详情毋庸赘述,请直接阅读相关文档。

如何使用 asyncio

现在我们已经大致了解 asyncio,我发现了一些模式,人们似乎在写 asyncio 代码时使用:

  • 将事件循环传递给所有协程。 这似乎是社区中一部分人的做法。 把事件循环信息提供给协程为协程获取自己运行的任务提供了可能性。
  • 或者你要求事件循环绑定到线程,这也能达到同样的目的。 理想情况下两者都支持。 可悲的是,社区已经分化。
  • 如果想使用上下文数据(如线程本地数据),你可谓是运气不佳。 最流行的变通方法显然是 atlassian 的 aiolocals,它基本上需要你手动传递上下文信息到协程,因为解释器不为此提供支持。 这意味着如果你用一个工具类库生成协程,你将失去上下文。
  • 忽略 Python 中的旧式协程。 只使用 3.5 版本中 async def 关键字和协程。 你总可能要用到它们,因为在老版本中,没有异步上下文管理器,这是非常必要的资源管理。
  • 学习重新启动事件循环进行善后清理。 这部分功能和我预想的不同,我花了比较长的时间来厘清它的实现。清理操作的最好方式是不断重启事件循环直到没有等待事件。 遗憾的是没有什么通用的模式来处理清理操作,你只能用一些丑陋的临时方案糊口度日。 例如 aiohttp 的 web 支持也做这个模式,所以如果你想要结合两个清理逻辑,你可能需要重新实现它提供的工具助手,因为该助手功能实现后,它彻底破坏了事件循环的设计。 当然,它不是我见过的第一个干这种坏事的库 :(。
  • 使用子进程是不明显的。 你需要一个事件循环在主线程中运行,我想它是在监听信号事件,然后分派到其它事件循环。 这需要通过 asyncio.get_child_watcher().attach_loop(...) 通知循环。
  • 编写同时支持异步和同步的代码在某种程度上注定要失败。 尝试在同一个对象上支持 withasync with 是危险的事情。
  • 如果你想给一个协程起个更好的名字,弄清楚为什么它没有被等待,设置 __name__没有帮助。 你需要设置 __qualname__ 而不是打印出错误消息来。
  • 有时内部类型交换会使你麻痹。 特别是 asyncio.wait() 函数将确保所有的事情都是 future,这意味着如果你传递协程,你将很难发现你的协程是否已经完成或者正在等待,因为输入对象不再匹配输出对象。 在这种情况下,唯一真正理智的做法是确保前期一切都是 future。

上下文数据

除了疯狂的复杂性和对如何更好地编写 API 缺乏理解,我最大的问题是完全缺乏对上下文本地数据的考虑。这是 Node 社区现在学习的东西。continuation-local-storage 存在,但该实现被接受的太晚。持续本地存储和类似的概念常用于在并发环境中实施安全策略,并且该信息的损坏可能导致严重的安全问题。

事实上,Python 甚至没有任何存储,这令人失望至极。我正在研究这个内容,因为我正在调查如何最好地支持 Sentry's breadcrumbs 的 asyncio,然而我并没有看到一个合理的方式做到这一点。在 asyncio 中没有上下文的概念,没有办法从通用代码中找出您正在使用的事件循环,并且如果没有 monkeypatching(运行环境下的补丁),也无法获取这些信息。

Node 当前正在经历如何找到这个问题的长期解决方案的过程。这个问题不容忽视,因为它在所有生态系统中反复出现过,如 JavaScript、Python 和 .NET 环境。该问题被命名为异步上下文传播,其解决方案有许多名称。在 Go 中,需要使用上下文包,并明确地传递给所有 goroutine(不是一个完美的解决方案,但至少有一个)。.NET 具有本地调用上下文形式的最佳解决方案。它可以是线程上下文,Web 请求上下文或类似的东西,除非被抑制,否则它会自动传播。微软的解决方案是我们的黄金标准。我现在相信,微软在 15 年前已经解决了该问题。

我不知道该生态系统是否还够年轻,还可以添加逻辑调用上下文,可能现在仍然为时未晚。

个人感想

复杂的东西变得越来越复杂。 我没有随意使用 asyncio 的心智。它需要不断地更新所有 Python 语言的变化的知识,这很大程度上使语言本身变得复杂。 令人鼓舞的是,围绕着它的生态系统正在不断发展,只是不知道还需要几年的时间,才能带给开发者愉快和稳定的开发体验。

3.5 版本引入的东西(新的协程对象)非常棒。 特别是这些变化包括引入了一个合理的基础,这些都是我在早期的版本中一直期盼的。在我心中, 通过重载生成器实现协程是一个错误。 关于什么是 asyncio,我难以置喙。 这是一个非常复杂的事情,内部令人眼花缭乱。 我很难理解它工作的所有细节。你什么时候可以传递一个生成器,什么时候它必须是一个真正的协程,future 是什么,任务是什么,事件循环如何工作,这甚至还没有触碰到真正的 IO 部分。

最糟糕的是,asyncio 甚至不是特别快。 David Beazley 演示的它设计的 asyncio 的替代品是原生版本速度的两倍。 asyncio 巨复杂,很难理解,也无法兑现自己在主要特性上的承诺,对于它,我只想说我想静静。我知道,至少我对 asyncio 理解的不够透彻,没有足够的信心对人们如何用它构建代码给出建议。


作者:

Armin Ronacher

软件开发者和开源骨灰, Flask 框架的创造者。


via: http://lucumr.pocoo.org/2016/10/30/i-dont-understand-asyncio/

作者:Armin Ronacher 译者:firstadream 校对:jasminepeng

本文由 LCTT 原创编译,Linux中国 荣誉推出