标签 python 下的文章

本文作者:

A. Jesse Jiryu Davis 是纽约 MongoDB 的工程师。他编写了异步 MongoDB Python 驱动程序 Motor,也是 MongoDB C 驱动程序的开发领袖和 PyMongo 团队成员。 他也为 asyncio 和 Tornado 做了贡献,在 http://emptysqua.re 上写作。

Guido van Rossum 是主流编程语言 Python 的创造者,Python 社区称他为 BDFL (仁慈的终生大独裁者 (Benevolent Dictator For Life))——这是一个来自 Monty Python 短剧的称号。他的主页是 http://www.python.org/~guido/

协程

还记得我们对你许下的承诺么?我们可以写出这样的异步代码,它既有回调方式的高效,也有多线程代码的简洁。这个结合是同过一种称为 协程 coroutine 的模式来实现的。使用 Python3.4 标准库 asyncio 和一个叫“aiohttp”的包,在协程中获取一个网页是非常直接的( @asyncio.coroutine 修饰符并非魔法。事实上,如果它修饰的是一个生成器函数,并且没有设置 PYTHONASYNCIODEBUG 环境变量的话,这个修饰符基本上没啥用。它只是为了框架的其它部分方便,设置了一个属性 _is_coroutine 而已。也可以直接使用 asyncio 和裸生成器,而没有 @asyncio.coroutine 修饰符):

    @asyncio.coroutine
    def fetch(self, url):
        response = yield from self.session.get(url)
        body = yield from response.read()

它也是可扩展的。在作者 Jesse 的系统上,与每个线程 50k 内存相比,一个 Python 协程只需要 3k 内存。Python 很容易就可以启动上千个协程。

协程的概念可以追溯到计算机科学的远古时代,它很简单,一个可以暂停和恢复的子过程。线程是被操作系统控制的抢占式多任务,而协程的多任务是可合作的,它们自己选择什么时候暂停去执行下一个协程。

有很多协程的实现。甚至在 Python 中也有几种。Python 3.4 标准库 asyncio 中的协程是建立在生成器之上的,这是一个 Future 类和“yield from”语句。从 Python 3.5 开始,协程变成了语言本身的特性(“PEP 492 Coroutines with async and await syntax” 中描述了 Python 3.5 内置的协程)。然而,理解 Python 3.4 中这个通过语言原有功能实现的协程,是我们处理 Python 3.5 中原生协程的基础。

要解释 Python 3.4 中基于生成器的协程,我们需要深入生成器的方方面面,以及它们是如何在 asyncio 中用作协程的。我很高兴就此写点东西,想必你也希望继续读下去。我们解释了基于生成器的协程之后,就会在我们的异步网络爬虫中使用它们。

生成器如何工作

在你理解生成器之前,你需要知道普通的 Python 函数是怎么工作的。正常情况下,当一个函数调用一个子过程,这个被调用函数获得控制权,直到它返回或者有异常发生,才把控制权交给调用者:

>>> def foo():
...     bar()
...
>>> def bar():
...     pass

标准的 Python 解释器是用 C 语言写的。一个 Python 函数被调用所对应的 C 函数是 PyEval_EvalFrameEx。它获得一个 Python 栈帧结构并在这个栈帧的上下文中执行 Python 字节码。这里是 foo 函数的字节码:

>>> import dis
>>> dis.dis(foo)
  2           0 LOAD_GLOBAL              0 (bar)
              3 CALL_FUNCTION            0 (0 positional, 0 keyword pair)
              6 POP_TOP
              7 LOAD_CONST               0 (None)
             10 RETURN_VALUE

foo 函数在它栈中加载 bar 函数并调用它,然后把 bar 的返回值从栈中弹出,加载 None 值到堆栈并返回。

PyEval_EvalFrameEx 遇到 CALL_FUNCTION 字节码时,它会创建一个新的栈帧,并用这个栈帧递归的调用 PyEval_EvalFrameEx 来执行 bar 函数。

非常重要的一点是,Python 的栈帧在堆中分配!Python 解释器是一个标准的 C 程序,所以它的栈帧是正常的栈帧。但是 Python 的栈帧是在堆中处理。这意味着 Python 栈帧在函数调用结束后依然可以存在。我们在 bar 函数中保存当前的栈帧,交互式的看看这种现象:

>>> import inspect
>>> frame = None
>>> def foo():
...     bar()
...
>>> def bar():
...     global frame
...     frame = inspect.currentframe()
...
>>> foo()
>>> # The frame was executing the code for 'bar'.
>>> frame.f_code.co_name
'bar'
>>> # Its back pointer refers to the frame for 'foo'.
>>> caller_frame = frame.f_back
>>> caller_frame.f_code.co_name
'foo'

Figure 5.1 - Function Calls

现在该说 Python 生成器了,它使用同样构件——代码对象和栈帧——去完成一个不可思议的任务。

这是一个生成器函数:

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
...     

在 Python 把 gen_fn 编译成字节码的过程中,一旦它看到 yield 语句就知道这是一个生成器函数而不是普通的函数。它就会设置一个标志来记住这个事实:

>>> # The generator flag is bit position 5.
>>> generator_bit = 1 << 5
>>> bool(gen_fn.__code__.co_flags & generator_bit)
True

当你调用一个生成器函数,Python 看到这个标志,就不会实际运行它而是创建一个生成器:

>>> gen = gen_fn()
>>> type(gen)
<class 'generator'>

Python 生成器封装了一个栈帧和函数体代码的引用:

>>> gen.gi_code.co_name
'gen_fn'

所有通过调用 gen_fn 的生成器指向同一段代码,但都有各自的栈帧。这些栈帧不再任何一个C函数栈中,而是在堆空间中等待被使用:

Figure 5.2 - Generators

栈帧中有一个指向“最后执行指令”的指针。初始化为 -1,意味着它没开始运行:

>>> gen.gi_frame.f_lasti
-1

当我们调用 send 时,生成器一直运行到第一个 yield 语句处停止,并且 send 返回 1,因为这是 gen 传递给 yield 表达式的值。

>>> gen.send(None)
1

现在,生成器的指令指针是 3,所编译的Python 字节码一共有 56 个字节:

>>> gen.gi_frame.f_lasti
3
>>> len(gen.gi_code.co_code)
56

这个生成器可以在任何时候、任何函数中恢复运行,因为它的栈帧并不在真正的栈中,而是堆中。在调用链中它的位置也是不固定的,它不必遵循普通函数先进后出的顺序。它像云一样自由。

我们可以传递一个值 hello 给生成器,它会成为 yield 语句的结果,并且生成器会继续运行到第二个 yield 语句处。

>>> gen.send('hello')
result of yield: hello
2

现在栈帧中包含局部变量 result

>>> gen.gi_frame.f_locals
{'result': 'hello'}

其它从 gen_fn 创建的生成器有着它自己的栈帧和局部变量。

当我们再一次调用 send,生成器继续从第二个 yield 开始运行,以抛出一个特殊的 StopIteration 异常为结束。

>>> gen.send('goodbye')
result of 2nd yield: goodbye
Traceback (most recent call last):
  File "<input>", line 1, in <module>
StopIteration: done

这个异常有一个值 "done",它就是生成器的返回值。

使用生成器构建协程

所以生成器可以暂停,可以给它一个值让它恢复,并且它还有一个返回值。这些特性看起来很适合去建立一个不使用那种乱糟糟的意面似的回调异步编程模型。我们想创造一个这样的“协程”:一个在程序中可以和其他过程合作调度的过程。我们的协程将会是标准库 asyncio 中协程的一个简化版本,我们将使用生成器,futures 和 yield from 语句。

首先,我们需要一种方法去代表协程所需要等待的 future 事件。一个简化的版本是:

class Future:
    def __init__(self):
        self.result = None
        self._callbacks = []

    def add_done_callback(self, fn):
        self._callbacks.append(fn)

    def set_result(self, result):
        self.result = result
        for fn in self._callbacks:
            fn(self)

一个 future 初始化为“未解决的”,它通过调用 set_result 来“解决”。(这个 future 缺少很多东西,比如说,当这个 future 解决后, 生成 yield 的协程应该马上恢复而不是暂停,但是在我们的代码中却不没有这样做。参见 asyncio 的 Future 类以了解其完整实现。)

让我们用 future 和协程来改写我们的 fetcher。我们之前用回调写的 fetch 如下:

class Fetcher:
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

    def connected(self, key, mask):
        print('connected!')
        # And so on....

fetch 方法开始连接一个套接字,然后注册 connected 回调函数,它会在套接字建立连接后调用。现在我们使用协程把这两步合并:

    def fetch(self):
        sock = socket.socket()
        sock.setblocking(False)
        try:
            sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass

        f = Future()

        def on_connected():
            f.set_result(None)

        selector.register(sock.fileno(),
                          EVENT_WRITE,
                          on_connected)
        yield f
        selector.unregister(sock.fileno())
        print('connected!')

现在,fetch 是一个生成器,因为它有一个 yield 语句。我们创建一个未决的 future,然后 yield 它,暂停 fetch 直到套接字连接建立。内联函数 on_connected 解决这个 future。

但是当 future 被解决,谁来恢复这个生成器?我们需要一个协程驱动器。让我们叫它 “task”:

class Task:
    def __init__(self, coro):
        self.coro = coro
        f = Future()
        f.set_result(None)
        self.step(f)

    def step(self, future):
        try:
            next_future = self.coro.send(future.result)
        except StopIteration:
            return

        next_future.add_done_callback(self.step)

# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
Task(fetcher.fetch())

loop()

task 通过传递一个 None 值给 fetch 来启动它。fetch 运行到它 yeild 出一个 future,这个 future 被作为 next_future 而捕获。当套接字连接建立,事件循环运行回调函数 on_connected,这里 future 被解决,step 被调用,fetch 恢复运行。

用 yield from 重构协程

一旦套接字连接建立,我们就可以发送 HTTP GET 请求,然后读取服务器响应。不再需要哪些分散在各处的回调函数,我们把它们放在同一个生成器函数中:

    def fetch(self):
        # ... connection logic from above, then:
        sock.send(request.encode('ascii'))

        while True:
            f = Future()

            def on_readable():
                f.set_result(sock.recv(4096))

            selector.register(sock.fileno(),
                              EVENT_READ,
                              on_readable)
            chunk = yield f
            selector.unregister(sock.fileno())
            if chunk:
                self.response += chunk
            else:
                # Done reading.
                break

从套接字中读取所有信息的代码看起来很通用。我们能不把它从 fetch 中提取成一个子过程?现在该 Python 3 热捧的 yield from 登场了。它能让一个生成器委派另一个生成器。

让我们先回到原来那个简单的生成器例子:

>>> def gen_fn():
...     result = yield 1
...     print('result of yield: {}'.format(result))
...     result2 = yield 2
...     print('result of 2nd yield: {}'.format(result2))
...     return 'done'
...     

为了从其他生成器调用这个生成器,我们使用 yield from 委派它:

>>> # Generator function:
>>> def caller_fn():
...     gen = gen_fn()
...     rv = yield from gen
...     print('return value of yield-from: {}'
...           .format(rv))
...
>>> # Make a generator from the
>>> # generator function.
>>> caller = caller_fn()

这个 caller 生成器的行为的和它委派的生成器 gen 表现的完全一致:

>>> caller.send(None)
1
>>> caller.gi_frame.f_lasti
15
>>> caller.send('hello')
result of yield: hello
2
>>> caller.gi_frame.f_lasti  # Hasn't advanced.
15
>>> caller.send('goodbye')
result of 2nd yield: goodbye
return value of yield-from: done
Traceback (most recent call last):
  File "<input>", line 1, in <module>
StopIteration

callergen 生成(yield),caller 就不再前进。注意到 caller 的指令指针保持15不变,就是 yield from 的地方,即使内部的生成器 gen 从一个 yield 语句运行到下一个 yield,它始终不变。(事实上,这就是“yield from”在 CPython 中工作的具体方式。函数会在执行每个语句之前提升其指令指针。但是在外部生成器执行“yield from”后,它会将其指令指针减一,以保持其固定在“yield form”语句上。然后其生成其 caller。这个循环不断重复,直到内部生成器抛出 StopIteration,这里指向外部生成器最终允许它自己进行到下一条指令的地方。)从 caller 外部来看,我们无法分辨 yield 出的值是来自 caller 还是它委派的生成器。而从 gen 内部来看,我们也不能分辨传给它的值是来自 caller 还是 caller 的外面。yield from 语句是一个光滑的管道,值通过它进出 gen,一直到 gen 结束。

协程可以用 yield from 把工作委派给子协程,并接收子协程的返回值。注意到上面的 caller 打印出“return value of yield-from: done”。当 gen 完成后,它的返回值成为 calleryield from 语句的值。

    rv = yield from gen

前面我们批评过基于回调的异步编程模式,其中最大的不满是关于 “ 堆栈撕裂 stack ripping ”:当一个回调抛出异常,它的堆栈回溯通常是毫无用处的。它只显示出事件循环运行了它,而没有说为什么。那么协程怎么样?

>>> def gen_fn():
...     raise Exception('my error')
>>> caller = caller_fn()
>>> caller.send(None)
Traceback (most recent call last):
  File "<input>", line 1, in <module>
  File "<input>", line 3, in caller_fn
  File "<input>", line 2, in gen_fn
Exception: my error

这还是非常有用的,当异常抛出时,堆栈回溯显示出 caller_fn 委派了 gen_fn。令人更欣慰的是,你可以在一次异常处理器中封装这个调用到一个子过程中,像正常函数一样:

>>> def gen_fn():
...     yield 1
...     raise Exception('uh oh')
...
>>> def caller_fn():
...     try:
...         yield from gen_fn()
...     except Exception as exc:
...         print('caught {}'.format(exc))
...
>>> caller = caller_fn()
>>> caller.send(None)
1
>>> caller.send('hello')
caught uh oh

所以我们可以像提取子过程一样提取子协程。让我们从 fetcher 中提取一些有用的子协程。我们先写一个可以读一块数据的协程 read

def read(sock):
    f = Future()

    def on_readable():
        f.set_result(sock.recv(4096))

    selector.register(sock.fileno(), EVENT_READ, on_readable)
    chunk = yield f  # Read one chunk.
    selector.unregister(sock.fileno())
    return chunk

read 的基础上,read_all 协程读取整个信息:

def read_all(sock):
    response = []
    # Read whole response.
    chunk = yield from read(sock)
    while chunk:
        response.append(chunk)
        chunk = yield from read(sock)

    return b''.join(response)

如果你换个角度看,抛开 yield form 语句的话,它们就像在做阻塞 I/O 的普通函数一样。但是事实上,readread_all 都是协程。yield from read 暂停 read_all 直到 I/O 操作完成。当 read_all 暂停时,asyncio 的事件循环正在做其它的工作并等待其他的 I/O 操作。read 在下次循环中当事件就绪,完成 I/O 操作时,read_all 恢复运行。

最终,fetch 调用了 read_all

class Fetcher:
    def fetch(self):
         # ... connection logic from above, then:
        sock.send(request.encode('ascii'))
        self.response = yield from read_all(sock)

神奇的是,Task 类不需要做任何改变,它像以前一样驱动外部的 fetch 协程:

Task(fetcher.fetch())
loop()

read yield 一个 future 时,task 从 yield from 管道中接收它,就像这个 future 直接从 fetch yield 一样。当循环解决一个 future 时,task 把它的结果送给 fetch,通过管道,read 接受到这个值,这完全就像 task 直接驱动 read 一样:

Figure 5.3 - Yield From

为了完善我们的协程实现,我们再做点打磨:当等待一个 future 时,我们的代码使用 yield;而当委派一个子协程时,使用 yield from。不管是不是协程,我们总是使用 yield form 会更精炼一些。协程并不需要在意它在等待的东西是什么类型。

在 Python 中,我们从生成器和迭代器的高度相似中获得了好处,将生成器进化成 caller,迭代器也可以同样获得好处。所以,我们可以通过特殊的实现方式来迭代我们的 Future 类:

    # Method on Future class.
    def __iter__(self):
        # Tell Task to resume me here.
        yield self
        return self.result

future 的 __iter__ 方法是一个 yield 它自身的一个协程。当我们将代码替换如下时:

# f is a Future.
yield f

以及……:

# f is a Future.
yield from f

……结果是一样的!驱动 Task 从它的调用 send 中接收 future,并当 future 解决后,它发回新的结果给该协程。

在每个地方都使用 yield from 的好处是什么?为什么比用 field 等待 future 并用 yield from 委派子协程更好?之所以更好的原因是,一个方法可以自由地改变其实行而不影响到其调用者:它可以是一个当 future 解决后返回一个值的普通方法,也可以是一个包含 yield from 语句并返回一个值的协程。无论是哪种情况,调用者仅需要 yield from 该方法以等待结果就行。

亲爱的读者,我们已经完成了对 asyncio 协程探索。我们深入观察了生成器的机制,实现了简单的 future 和 task。我们指出协程是如何利用两个世界的优点:比线程高效、比回调清晰的并发 I/O。当然真正的 asyncio 比我们这个简化版本要复杂的多。真正的框架需要处理zero-copy I/0、公平调度、异常处理和其他大量特性。

使用 asyncio 编写协程代码比你现在看到的要简单的多。在前面的代码中,我们从基本原理去实现协程,所以你看到了回调,task 和 future,甚至非阻塞套接字和 select 调用。但是当用 asyncio 编写应用,这些都不会出现在你的代码中。我们承诺过,你可以像这样下载一个网页:

    @asyncio.coroutine
    def fetch(self, url):
        response = yield from self.session.get(url)
        body = yield from response.read()

对我们的探索还满意么?回到我们原始的任务:使用 asyncio 写一个网络爬虫。

(题图素材来自:ruth-tay.deviantart.com


via: http://aosabook.org/en/500L/pages/a-web-crawler-with-asyncio-coroutines.html

作者:A. Jesse Jiryu Davis , Guido van Rossum 译者:qingyunha 校对:wxy

本文由 LCTT 原创翻译,Linux中国 荣誉推出

本文作者:

A. Jesse Jiryu Davis 是纽约 MongoDB 的工程师。他编写了异步 MongoDB Python 驱动程序 Motor,也是 MongoDB C 驱动程序的开发领袖和 PyMongo 团队成员。 他也为 asyncio 和 Tornado 做了贡献,在 http://emptysqua.re 上写作。

Guido van Rossum 是主流编程语言 Python 的创造者,Python 社区称他为 BDFL ( 仁慈的终生大独裁者 Benevolent Dictator For Life )——这是一个来自 Monty Python 短剧的称号。他的主页是 http://www.python.org/~guido/

介绍

经典的计算机科学强调高效的算法,尽可能快地完成计算。但是很多网络程序的时间并不是消耗在计算上,而是在等待许多慢速的连接或者低频事件的发生。这些程序暴露出一个新的挑战:如何高效的等待大量网络事件。一个现代的解决方案是异步 I/O。

这一章我们将实现一个简单的网络爬虫。这个爬虫只是一个原型式的异步应用,因为它等待许多响应而只做少量的计算。一次爬的网页越多,它就能越快的完成任务。如果它为每个动态的请求启动一个线程的话,随着并发请求数量的增加,它会在耗尽套接字之前,耗尽内存或者线程相关的资源。使用异步 I/O 可以避免这个的问题。

我们将分三个阶段展示这个例子。首先,我们会实现一个事件循环并用这个事件循环和回调来勾画出一只网络爬虫。它很有效,但是当把它扩展成更复杂的问题时,就会导致无法管理的混乱代码。然后,由于 Python 的协程不仅有效而且可扩展,我们将用 Python 的生成器函数实现一个简单的协程。在最后一个阶段,我们将使用 Python 标准库“asyncio”中功能完整的协程, 并通过异步队列完成这个网络爬虫。(在 PyCon 2013 上,Guido 介绍了标准的 asyncio 库,当时称之为“Tulip”。)

任务

网络爬虫寻找并下载一个网站上的所有网页,也许还会把它们存档,为它们建立索引。从根 URL 开始,它获取每个网页,解析出没有遇到过的链接加到队列中。当网页没有未见到过的链接并且队列为空时,它便停止运行。

我们可以通过同时下载大量的网页来加快这一过程。当爬虫发现新的链接,它使用一个新的套接字并行的处理这个新链接,解析响应,添加新链接到队列。当并发很大时,可能会导致性能下降,所以我们会限制并发的数量,在队列保留那些未处理的链接,直到一些正在执行的任务完成。

传统方式

怎么使一个爬虫并发?传统的做法是创建一个线程池,每个线程使用一个套接字在一段时间内负责一个网页的下载。比如,下载 xkcd.com 网站的一个网页:

def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0
Host: xkcd.com

'.format(url)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)

    # Page is now downloaded.
    links = parse_links(response)
    q.add(links)

套接字操作默认是阻塞的:当一个线程调用一个类似 connectrecv 方法时,它会阻塞,直到操作完成。(即使是 send 也能被阻塞,比如接收端在接受外发消息时缓慢而系统的外发数据缓存已经满了的情况下)因此,为了同一时间内下载多个网页,我们需要很多线程。一个复杂的应用会通过线程池保持空闲的线程来分摊创建线程的开销。同样的做法也适用于套接字,使用连接池。

到目前为止,使用线程的是成本昂贵的,操作系统对一个进程、一个用户、一台机器能使用线程做了不同的硬性限制。在 作者 Jesse 的系统中,一个 Python 线程需要 50K 的内存,开启上万个线程就会失败。每个线程的开销和系统的限制就是这种方式的瓶颈所在。

在 Dan Kegel 那一篇很有影响力的文章“The C10K problem”中,它提出了多线程方式在 I/O 并发上的局限性。他在开始写道,

网络服务器到了要同时处理成千上万的客户的时代了,你不这样认为么?毕竟,现在网络规模很大了。

Kegel 在 1999 年创造出“C10K”这个术语。一万个连接在今天看来还是可接受的,但是问题依然存在,只不过大小不同。回到那时候,对于 C10K 问题,每个连接启一个线程是不切实际的。现在这个限制已经成指数级增长。确实,我们的玩具网络爬虫使用线程也可以工作的很好。但是,对于有着千万级连接的大规模应用来说,限制依然存在:它会消耗掉所有线程,即使套接字还够用。那么我们该如何解决这个问题?

异步

异步 I/O 框架在一个线程中完成并发操作。让我们看看这是怎么做到的。

异步框架使用非阻塞套接字。异步爬虫中,我们在发起到服务器的连接前把套接字设为非阻塞:

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass

对一个非阻塞套接字调用 connect 方法会立即抛出异常,即使它可以正常工作。这个异常复现了底层 C 语言函数令人厌烦的行为,它把 errno 设置为 EINPROGRESS,告诉你操作已经开始。

现在我们的爬虫需要一种知道连接何时建立的方法,这样它才能发送 HTTP 请求。我们可以简单地使用循环来重试:

request = 'GET {} HTTP/1.0
Host: xkcd.com

'.format(url)
encoded = request.encode('ascii')

while True:
    try:
        sock.send(encoded)
        break  # Done.
    except OSError as e:
        pass

print('sent')

这种方法不仅消耗 CPU,也不能有效的等待多个套接字。在远古时代,BSD Unix 的解决方法是 select,这是一个 C 函数,它在一个或一组非阻塞套接字上等待事件发生。现在,互联网应用大量连接的需求,导致 selectpoll 所代替,在 BSD 上的实现是 kqueue ,在 Linux 上是 epoll。它们的 API 和 select 相似,但在大数量的连接中也能有较好的性能。

Python 3.4 的 DefaultSelector 会使用你系统上最好的 select 类函数。要注册一个网络 I/O 事件的提醒,我们会创建一个非阻塞套接字,并使用默认 selector 注册它。

from selectors import DefaultSelector, EVENT_WRITE

selector = DefaultSelector()

sock = socket.socket()
sock.setblocking(False)
try:
    sock.connect(('xkcd.com', 80))
except BlockingIOError:
    pass

def connected():
    selector.unregister(sock.fileno())
    print('connected!')

selector.register(sock.fileno(), EVENT_WRITE, connected)

我们不理会这个伪造的错误,调用 selector.register,传递套接字文件描述符和一个表示我们想要监听什么事件的常量表达式。为了当连接建立时收到提醒,我们使用 EVENT_WRITE :它表示什么时候这个套接字可写。我们还传递了一个 Python 函数 connected,当对应事件发生时被调用。这样的函数被称为回调

在一个循环中,selector 接收到 I/O 提醒时我们处理它们。

def loop():
    while True:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

connected 回调函数被保存在 event_key.data 中,一旦这个非阻塞套接字建立连接,它就会被取出来执行。

不像我们前面那个快速轮转的循环,这里的 select 调用会暂停,等待下一个 I/O 事件,接着执行等待这些事件的回调函数。没有完成的操作会保持挂起,直到进到下一个事件循环时执行。

到目前为止我们展现了什么?我们展示了如何开始一个 I/O 操作和当操作准备好时调用回调函数。异步框架,它在单线程中执行并发操作,其建立在两个功能之上,非阻塞套接字和事件循环。

我们这里达成了“ 并发性 concurrency ”,但不是传统意义上的“ 并行性 parallelism ”。也就是说,我们构建了一个可以进行重叠 I/O 的微小系统,它可以在其它操作还在进行的时候就开始一个新的操作。它实际上并没有利用多核来并行执行计算。这个系统是用于解决 I/O 密集 I/O-bound 问题的,而不是解决 CPU 密集 CPU-bound 问题的。(Python 的全局解释器锁禁止在一个进程中以任何方式并行执行 Python 代码。在 Python 中并行化 CPU 密集的算法需要多个进程,或者以将该代码移植为 C 语言并行版本。但是这是另外一个话题了。)

所以,我们的事件循环在并发 I/O 上是有效的,因为它并不用为每个连接拨付线程资源。但是在我们开始前,我们需要澄清一个常见的误解:异步比多线程快。通常并不是这样的,事实上,在 Python 中,在处理少量非常活跃的连接时,像我们这样的事件循环是慢于多线程的。在运行时环境中是没有全局解释器锁的,在同样的负载下线程会执行的更好。异步 I/O 真正适用于事件很少、有许多缓慢或睡眠的连接的应用程序。(Jesse 在“什么是异步,它如何工作,什么时候该用它?”一文中指出了异步所适用和不适用的场景。Mike Bayer 在“异步 Python 和数据库”一文中比较了不同负载情况下异步 I/O 和多线程的不同。)

回调

用我们刚刚建立的异步框架,怎么才能完成一个网络爬虫?即使是一个简单的网页下载程序也是很难写的。

首先,我们有一个尚未获取的 URL 集合,和一个已经解析过的 URL 集合。

urls_todo = set(['/'])
seen_urls = set(['/'])

seen_urls 集合包括 urls_todo 和已经完成的 URL。用根 URL / 初始化它们。

获取一个网页需要一系列的回调。在套接字连接建立时会触发 connected 回调,它向服务器发送一个 GET 请求。但是它要等待响应,所以我们需要注册另一个回调函数;当该回调被调用,它仍然不能读取到完整的请求时,就会再一次注册回调,如此反复。

让我们把这些回调放在一个 Fetcher 对象中,它需要一个 URL,一个套接字,还需要一个地方保存返回的字节:

class Fetcher:
    def __init__(self, url):
        self.response = b''  # Empty array of bytes.
        self.url = url
        self.sock = None

我们的入口点在 Fetcher.fetch

    # Method on Fetcher class.
    def fetch(self):
        self.sock = socket.socket()
        self.sock.setblocking(False)
        try:
            self.sock.connect(('xkcd.com', 80))
        except BlockingIOError:
            pass

        # Register next callback.
        selector.register(self.sock.fileno(),
                          EVENT_WRITE,
                          self.connected)

fetch 方法从连接一个套接字开始。但是要注意这个方法在连接建立前就返回了。它必须将控制返回到事件循环中等待连接建立。为了理解为什么要这样做,假设我们程序的整体结构如下:

# Begin fetching http://xkcd.com/353/
fetcher = Fetcher('/353/')
fetcher.fetch()

while True:
    events = selector.select()
    for event_key, event_mask in events:
        callback = event_key.data
        callback(event_key, event_mask)

当调用 select 函数后,所有的事件提醒才会在事件循环中处理,所以 fetch 必须把控制权交给事件循环,这样我们的程序才能知道什么时候连接已建立,接着循环调用 connected 回调,它已经在上面的 fetch 方法中注册过。

这里是我们的 connected 方法的实现:

    # Method on Fetcher class.
    def connected(self, key, mask):
        print('connected!')
        selector.unregister(key.fd)
        request = 'GET {} HTTP/1.0
Host: xkcd.com

'.format(self.url)
        self.sock.send(request.encode('ascii'))

        # Register the next callback.
        selector.register(key.fd,
                          EVENT_READ,
                          self.read_response)

这个方法发送一个 GET 请求。一个真正的应用会检查 send 的返回值,以防所有的信息没能一次发送出去。但是我们的请求很小,应用也不复杂。它只是简单的调用 send,然后等待响应。当然,它必须注册另一个回调并把控制权交给事件循环。接下来也是最后一个回调函数 read_response,它处理服务器的响应:

    # Method on Fetcher class.
    def read_response(self, key, mask):
        global stopped

        chunk = self.sock.recv(4096)  # 4k chunk size.
        if chunk:
            self.response += chunk
        else:
            selector.unregister(key.fd)  # Done reading.
            links = self.parse_links()

            # Python set-logic:
            for link in links.difference(seen_urls):
                urls_todo.add(link)
                Fetcher(link).fetch()  # <- New Fetcher.

            seen_urls.update(links)
            urls_todo.remove(self.url)
            if not urls_todo:
                stopped = True

这个回调在每次 selector 发现套接字可读时被调用,可读有两种情况:套接字接受到数据或它被关闭。

这个回调函数从套接字读取 4K 数据。如果不到 4k,那么有多少读多少。如果比 4K 多,chunk 中只包 4K 数据并且这个套接字保持可读,这样在事件循环的下一个周期,会再次回到这个回调函数。当响应完成时,服务器关闭这个套接字,chunk 为空。

这里没有展示的 parse_links 方法,它返回一个 URL 集合。我们为每个新的 URL 启动一个 fetcher。注意一个使用异步回调方式编程的好处:我们不需要为共享数据加锁,比如我们往 seen_urls 增加新链接时。这是一种非抢占式的多任务,它不会在我们代码中的任意一个地方被打断。

我们增加了一个全局变量 stopped,用它来控制这个循环:

stopped = False

def loop():
    while not stopped:
        events = selector.select()
        for event_key, event_mask in events:
            callback = event_key.data
            callback()

一旦所有的网页被下载下来,fetcher 停止这个事件循环,程序退出。

这个例子让异步编程的一个问题明显的暴露出来:意大利面代码。

我们需要某种方式来表达一系列的计算和 I/O 操作,并且能够调度多个这样的系列操作让它们并发的执行。但是,没有线程你不能把这一系列操作写在一个函数中:当函数开始一个 I/O 操作,它明确的把未来所需的状态保存下来,然后返回。你需要考虑如何写这个状态保存的代码。

让我们来解释下这到底是什么意思。先来看一下在线程中使用通常的阻塞套接字来获取一个网页时是多么简单。

# Blocking version.
def fetch(url):
    sock = socket.socket()
    sock.connect(('xkcd.com', 80))
    request = 'GET {} HTTP/1.0
Host: xkcd.com

'.format(url)
    sock.send(request.encode('ascii'))
    response = b''
    chunk = sock.recv(4096)
    while chunk:
        response += chunk
        chunk = sock.recv(4096)

    # Page is now downloaded.
    links = parse_links(response)
    q.add(links)

在一个套接字操作和下一个操作之间这个函数到底记住了什么状态?它有一个套接字,一个 URL 和一个可增长的 response。运行在线程中的函数使用编程语言的基本功能来在栈中的局部变量保存这些临时状态。这样的函数也有一个“continuation”——它会在 I/O 结束后执行这些代码。运行时环境通过线程的指令指针来记住这个 continuation。你不必考虑怎么在 I/O 操作后恢复局部变量和这个 continuation。语言本身的特性帮你解决。

但是用一个基于回调的异步框架时,这些语言特性不能提供一点帮助。当等待 I/O 操作时,一个函数必须明确的保存它的状态,因为它会在 I/O 操作完成之前返回并清除栈帧。在我们基于回调的例子中,作为局部变量的替代,我们把 sockresponse 作为 Fetcher 实例 self 的属性来存储。而作为指令指针的替代,它通过注册 connectedread_response 回调来保存它的 continuation。随着应用功能的增长,我们需要手动保存的回调的复杂性也会增加。如此繁复的记账式工作会让编码者感到头痛。

更糟糕的是,当我们的回调函数抛出异常会发生什么?假设我们没有写好 parse_links 方法,它在解析 HTML 时抛出异常:

Traceback (most recent call last):
  File "loop-with-callbacks.py", line 111, in <module>
    loop()
  File "loop-with-callbacks.py", line 106, in loop
    callback(event_key, event_mask)
  File "loop-with-callbacks.py", line 51, in read_response
    links = self.parse_links()
  File "loop-with-callbacks.py", line 67, in parse_links
    raise Exception('parse error')
Exception: parse error

这个堆栈回溯只能显示出事件循环调用了一个回调。我们不知道是什么导致了这个错误。这条链的两边都被破坏:不知道从哪来也不知到哪去。这种丢失上下文的现象被称为“ 堆栈撕裂 stack ripping ”,经常会导致无法分析原因。它还会阻止我们为回调链设置异常处理,即那种用“try / except”块封装函数调用及其调用树。(对于这个问题的更复杂的解决方案,参见 http://www.tornadoweb.org/en/stable/stack_context.html

所以,除了关于多线程和异步哪个更高效的长期争议之外,还有一个关于这两者之间的争论:谁更容易跪了。如果在同步上出现失误,线程更容易出现数据竞争的问题,而回调因为" 堆栈撕裂 stack ripping "问题而非常难于调试。

(题图素材来自:ruth-tay.deviantart.com


via: http://aosabook.org/en/500L/pages/a-web-crawler-with-asyncio-coroutines.html

作者:A. Jesse Jiryu Davis , Guido van Rossum 译者:qingyunha 校对:wxy

本文由 LCTT 原创翻译,Linux中国 荣誉推出

简介

Linux 经常被看成是一个远离外部世界,只有极客才会使用的操作系统,虽然这是一个误解,但事实上,如果你想开发软件,那么 Linux 系统能够为你提供一个很好的开发环境。

刚开始学习编程的新手们经常会问这样一个问题:应该使用哪种语言?当涉及到 Linux 系统的时候,通常的选择是 C、C++、Python、Java、PHP、Perl 和 Ruby On Rails。

Linux 系统的许多核心程序都是用 C 语言写的,但是如果离开 Linux 系统的世界, C 语言就不如其它语言比如 Java 和 Python 那么常用。

对于学习编程的人来说, Python 和 Java 都是不错的选择,因为它们是跨平台的,因此,你在 Linux 系统上写的程序在 Windows 系统和 Mac 系统上也能够很好的工作。

虽然你可以使用任何编辑器来开发 Python 程序,但是如果你使用一个同时包含编辑器和调试器的优秀的集成开发环境(IDE)来进行开发,那么你的编程生涯将会变得更加轻松。

PyCharm 是由 Jetbrains 公司开发的一个跨平台编辑器。如果你之前是在 Windows 环境下进行开发,那么你会立刻认出 Jetbrains 公司,它就是那个开发了 Resharper 的公司。 Resharper 是一个用于重构代码的优秀产品,它能够指出代码可能存在的问题以及自动添加声明:比如当你在使用一个类的时候它会自动为你导入。

这篇文章将讨论如何在 Linux 系统上获取、安装和运行 PyCharm 。

如何获取 PyCharm

你可以通过访问这儿获取 PyCharm 。屏幕中央有一个很大的 'Download' 按钮。

你可以选择下载专业版或者社区版。如果你刚刚接触 Python 编程那么推荐下载社区版。然而,如果你打算发展到专业化的编程,那么专业版的一些优秀特性是不容忽视的。

如何安装 PyCharm

下载好的文件的名称可能是 ‘pycharm-professional-2016.2.3.tar.gz’。

以 “tar.gz” 结尾的文件是被 gzip 工具压缩过的,并且把文件夹用 tar 工具归档到了一起。你可以阅读关于提取 tar.gz 文件指南的更多信息。

加快节奏,为了解压文件,你需要做的是首先打开终端,然后通过下面的命令进入下载文件所在的文件夹:

cd ~/Downloads

现在,通过运行下面的命令找到你下载的文件的名字:

ls pycharm*

然后运行下面的命令解压文件:

tar -xvzf pycharm-professional-2016.2.3.tar.gz -C ~

记得把上面命令中的文件名替换成通过 ls 命令获知的 pycharm 文件名。(也就是你下载的文件的名字)。上面的命令将会把 PyCharm 软件安装在 home 目录中。

如何运行 PyCharm

要运行 PyCharm, 首先需要进入 home 目录:

cd ~

运行 ls 命令查找文件夹名:

ls

查找到文件名以后,运行下面的命令进入 PyCharm 目录:

cd pycharm-2016.2.3/bin

最后,通过运行下面的命令来运行 PyCharm:

sh pycharm.sh &

如果你是在一个桌面环境比如 GNOME、KDE、Unity、Cinnamon 或者其他现代桌面上运行,那么你也可以通过桌面环境的菜单或者快捷方式来找到 PyCharm 。

总结

现在, PyCharm 已经安装好了,你可以开始使用它来开发一个桌面应用、 web 应用和各种工具。

如果你想学习如何使用 Python 编程,那么这儿有很好的学习资源值得一看。里面的文章更多的是关于 Linux 学习,但也有一些资源比如 Pluralsight 和 Udemy 提供了关于 Python 学习的一些很好的教程。

如果想了解 PyCharm 的更多特性,请点击这儿来查看。它覆盖了从创建项目到描述用户界面、调试以及代码重构的全部内容。


via: https://www.lifewire.com/how-to-install-the-pycharm-python-ide-in-linux-4091033

作者:Gary Newell 译者:ucasFL 校对:wxy

本文由 LCTT 组织编译,Linux中国 荣誉推出

介绍

在这篇指南中,我将向你介绍一个集成开发环境 - PyCharm, 你可以在它上面使用 Python 编程语言开发专业应用。

Python 是一门优秀的编程语言,因为它真正实现了跨平台,用它开发的应用程序在 Windows、Linux 以及 Mac 系统上均可运行,无需重新编译任何代码。

PyCharm 是由 Jetbrains 开发的一个编辑器和调试器,Jetbrains 就是那个开发了 Resharper 的公司。不得不说,Resharper 是一个很优秀的工具,它被 Windows 开发者们用来重构代码,同时,它也使得 Windows 开发者们写 .NET 代码更加轻松。Resharper 的许多原则也被加入到了 PyCharm 专业版中。

如何安装 PyCharm

我已经写了一篇关于如何获取 PyCharm 的指南,下载、解压文件,然后运行。

欢迎界面

当你第一次运行 PyCharm 或者关闭一个项目的时候,会出现一个屏幕,上面显示一系列近期项目。

你也会看到下面这些菜单选项:

  • 创建新项目
  • 打开项目
  • 从版本控制仓库检出

还有一个配置设置选项,你可以通过它设置默认 Python 版本或者一些其他设置。

创建一个新项目

当你选择‘创建一个新项目’以后,它会提供下面这一系列可能的项目类型供你选择:

  • Pure Python
  • Django
  • Flask
  • Google App Engine
  • Pyramid
  • Web2Py
  • Angular CLI
  • AngularJS
  • Foundation
  • HTML5 Bolierplate
  • React Starter Kit
  • Twitter Bootstrap
  • Web Starter Kit

这不是一个编程教程,所以我没必要说明这些项目类型是什么。如果你想创建一个可以运行在 Windows、Linux 和 Mac 上的简单桌面运行程序,那么你可以选择 Pure Python 项目,然后使用 Qt 库来开发图形应用程序,这样的图形应用程序无论在何种操作系统上运行,看起来都像是原生的,就像是在该系统上开发的一样。

选择了项目类型以后,你需要输入一个项目名字并且选择一个 Python 版本来进行开发。

打开一个项目

你可以通过单击‘最近打开的项目’列表中的项目名称来打开一个项目,或者,你也可以单击‘打开’,然后浏览到你想打开的项目所在的文件夹,找到该项目,然后选择‘确定’。

从源码控制进行查看

PyCharm 提供了从各种在线资源查看项目源码的选项,在线资源包括 GitHubCVS、Git、Mercurial 以及 Subversion

PyCharm IDE(集成开发环境)

PyCharm IDE 中可以打开顶部的菜单,在这个菜单下方你可以看到每个打开的项目的标签。

屏幕右方是调试选项区,可以单步运行代码。

左侧面板有项目文件和外部库的列表。

如果想在项目中新建一个文件,你可以鼠标右击项目的名字,然后选择‘新建’。然后你可以在下面这些文件类型中选择一种添加到项目中:

  • 文件
  • 目录
  • Python 包
  • Python 包
  • Jupyter 笔记
  • HTML 文件
  • Stylesheet
  • JavaScript
  • TypeScript
  • CoffeeScript
  • Gherkin
  • 数据源

当添加了一个文件,比如 Python 文件以后,你可以在右边面板的编辑器中进行编辑。

文本是全彩色编码的,并且有黑体文本。垂直线显示缩进,从而能够确保缩进正确。

编辑器具有智能补全功能,这意味着当你输入库名字或可识别命令的时候,你可以按 'Tab' 键补全命令。

调试程序

你可以利用屏幕右上角的’调试选项’调试程序的任何一个地方。

如果你是在开发一个图形应用程序,你可以点击‘绿色按钮’来运行程序,你也可以通过 'shift+F10' 快捷键来运行程序。

为了调试应用程序,你可以点击紧挨着‘绿色按钮’的‘绿色箭头’或者按 ‘shift+F9’ 快捷键。你可以点击一行代码的灰色边缘,从而设置断点,这样当程序运行到这行代码的时候就会停下来。

你可以按 'F8' 单步向前运行代码,这意味着你只是运行代码但无法进入函数内部,如果要进入函数内部,你可以按 'F7'。如果你想从一个函数中返回到调用函数,你可以按 'shift+F8'。

调试过程中,你会在屏幕底部看到许多窗口,比如进程和线程列表,以及你正在监视的变量。

当你运行到一行代码的时候,你可以对这行代码中出现的变量进行监视,这样当变量值改变的时候你能够看到。

另一个不错的选择是使用覆盖检查器运行代码。在过去这些年里,编程界发生了很大的变化,现在,对于开发人员来说,进行测试驱动开发是很常见的,这样他们可以检查对程序所做的每一个改变,确保不会破坏系统的另一部分。

覆盖检查器能够很好的帮助你运行程序,执行一些测试,运行结束以后,它会以百分比的形式告诉你测试运行所覆盖的代码有多少。

还有一个工具可以显示‘类函数’或‘类’的名字,以及一个项目被调用的次数和在一个特定代码片段运行所花费的时间。

代码重构

PyCharm 一个很强大的特性是代码重构选项。

当你开始写代码的时候,会在右边缘出现一个小标记。如果你写的代码可能出错或者写的不太好, PyCharm 会标记上一个彩色标记。

点击彩色标记将会告诉你出现的问题并提供一个解决方法。

比如,你通过一个导入语句导入了一个库,但没有使用该库中的任何东西,那么不仅这行代码会变成灰色,彩色标记还会告诉你‘该库未使用’。

对于正确的代码,也可能会出现错误提示,比如在导入语句和函数起始之间只有一个空行。当你创建了一个名称非小写的函数时它也会提示你。

你不必遵循 PyCharm 的所有规则。这些规则大部分只是好的编码准则,与你的代码是否能够正确运行无关。

代码菜单还有其它的重构选项。比如,你可以进行代码清理以及检查文件或项目问题。

总结

PyCharm 是 Linux 系统上开发 Python 代码的一个优秀编辑器,并且有两个可用版本。社区版可供临时开发者使用,专业版则提供了开发者开发专业软件可能需要的所有工具。


via: https://www.lifewire.com/pycharm-the-best-linux-python-ide-4091045

作者:Gary Newell 译者:ucasFL 校对:wxy

本文由 LCTT 组织编译,Linux中国 荣誉推出

最近我开始发力钻研 Python 的新 asyncio 模块。原因是我需要做一些事情,使用事件 IO 会使这些事情工作得更好,炙手可热的 asynio 正好可以用来牛刀小试。从试用的经历来看,该模块比我预想的复杂许多,我现在可以非常肯定地说,我不知道该如何恰当地使用 asyncio。

从 Twisted 框架借鉴一些经验来理解 asynio 并非难事,但是,asyncio 包含众多的元素,我开始动摇,不知道如何将这些孤立的零碎拼图组合成一副完整的图画。我已没有足够的智力提出任何更好的建议,在这里,只想分享我的困惑,求大神指点。

原语

asyncio 通过 协程 coroutines 的帮助来实现异步 IO。最初它是通过 yieldyield from 表达式实现的一个库,因为 Python 语言本身演进的缘故,现在它已经变成一个更复杂的怪兽。所以,为了在同一个频道讨论下去,你需要了解如下一些术语:

  • 事件循环
  • 事件循环策略
  • awaitable
  • 协程函数
  • 老式协程函数
  • 协程
  • 协程封装
  • 生成器 generator
  • future
  • 并发的future
  • 任务 task
  • 句柄
  • 执行器 executor
  • 传输 transport
  • 协议

此外,Python 还新增了一些新的特殊方法:

  • __aenter____aenter__,用于异步块操作
  • __aiter____anext__,用于异步迭代器(异步循环和异步推导)。为了更强大些,协议已经改变过一次了。 在 Python 3.5 它返回一个 awaitable(这是个协程);在 3.6它返回一个新的异步生成器。
  • __await__,用于自定义的 awaitable

你还需要了解相当多的内容,文档涵盖了那些部分。尽管如此,我做了一些额外说明以便对其有更好的理解:

事件循环

asyncio 事件循环和你第一眼看上去的略有不同。表面看,每个线程都有一个事件循环,然而事实并非如此。我认为它们应该按照如下的方式工作:

  • 如果是主线程,当调用 asyncio.get_event_loop() 时创建一个事件循环。
  • 如果是其它线程,当调用 asyncio.get_event_loop() 时返回运行时错误。
  • 当前线程可以使用 asyncio.set_event_loop() 在任何时间节点绑定事件循环。该事件循环可由 asyncio.new_evet_loop() 函数创建。
  • 事件循环可以在不绑定到当前线程的情况下使用。
  • asyncio.get_event_loop() 返回绑定线程的事件循环,而非当前运行的事件循环。

这些行为的组合是超混淆的,主要有以下几个原因。 首先,你需要知道这些函数被委托到全局设置的底层事件循环策略。 默认是将事件循环绑定到线程。 或者,如果需要的话,可以在理论上将事件循环绑定到一个 greenlet 或类似的。 然而,重要的是要知道库代码不控制策略,因此不能推断 asyncio 将适用于线程。

其次,asyncio 不需要通过策略将事件循环绑定到上下文。 事件循环可以单独工作。 但是这正是库代码的第一个问题,因为协同程序或类似的东西并不知道哪个事件循环负责调度它。 这意味着,如果从协程中调用 asyncio.get_event_loop(),你可能没有机会取得事件循环。 这也是所有 API 均采用可选的显式事件循环参数的原因。 举例来说,要弄清楚当前哪个协程正在运行,不能使用如下调用:

def get_task():
    loop = asyncio.get_event_loop()
    try:
        return asyncio.Task.get_current(loop)
    except RuntimeError:
        return None

相反,必须显式地传递事件循环。 这进一步要求你在库代码中显式地遍历事件循环,否则可能发生很奇怪的事情。 我不知道这种设计的思想是什么,但如果不解决这个问题(例如 get_event_loop() 返回实际运行的事件循环),那么唯一有意义的其它方案是明确禁止显式事件循环传递,并要求它绑定到当前上下文(线程等)。

由于事件循环策略不提供当前上下文的标识符,因此库也不可能以任何方式“索引”到当前上下文。 也没有回调函数用来监视这样的上下文的拆除,这进一步限制了实际可以开展的操作。

awaitable 与 协程 coroutine

以我的愚见,Python 最大的设计错误是过度重载迭代器。它们现在不仅用于迭代,而且用于各种类型的协程。 Python 中迭代器最大的设计错误之一是如果 StopIteration 没有被捕获形成的空泡。 这可能导致非常令人沮丧的问题,其中某处的异常可能导致其它地方的生成器或协同程序中止。 这是一个长期存在的问题,基于 Python 的模板引擎如 Jinja 经常面临这种问题。 该模板引擎在内部渲染为生成器,并且当由于某种原因的模板引起 StopIteration 时,渲染就停止在那里。

Python 慢慢认识到了过度重载的教训。 首先在 3.x 版本加入 asyncio 模块,并没有语言级支持。 所以自始至终它不过仅仅是装饰器和生成器而已。 为了实现 yield from 以及其它东西,StopIteration 再次重载。 这导致了令人困惑的行为,像这样:

>>> def foo(n):
...  if n in (0, 1):
...   return [1]
...  for item in range(n):
...   yield item * 2
...
>>> list(foo(0))
[]
>>> list(foo(1))
[]
>>> list(foo(2))
[0, 2]

没有错误,没有警告。只是不是你所期望的行为。 这是因为从一个作为生成器的函数中 return 的值实际上引发了一个带有单个参数的 StopIteration,它不是由迭代器协议捕获的,而只是在协程代码中处理。

在 3.5 和 3.6 有很多改变,因为现在除了生成器我们还有协程对象。除了通过封装生成器来生成协程,没有其它可以直接生成协程的单独对象。它是通过用给函数加 async 前缀来实现。 例如 async def x() 会产生这样的协程。 现在在 3.6,将有单独的异步生成器,它通过触发 AsyncStopIteration 保持其独立性。 此外,对于Python 3.5 和更高版本,导入新的 future 对象(generator_stop),如果代码在迭代步骤中触发 StopIteration,它将引发 RuntimeError

为什么我提到这一切? 因为老的实现方式并未真的消失。 生成器仍然具有 sendthrow 方法以及协程仍然在很大程度上表现为生成器。你需要知道这些东西,它们将在未来伴随你相当长的时间。

为了统一很多这样的重复,现在我们在 Python 中有更多的概念了:

  • awaitable:具有__await__方法的对象。 由本地协同程序和旧式协同程序以及一些其它程序实现。
  • 协程函数 coroutinefunction :返回原生协程的函数。 不要与返回协程的函数混淆。
  • 协程 coroutine : 原生的协程程序。 注意,目前为止,当前文档不认为老式 asyncio 协程是协程程序。 至少 inspect.iscoroutine 不认为它是协程。 尽管它被 future/awaitable 分支接纳。

特别令人困惑的是 asyncio.iscoroutinefunctioninspect.iscoroutinefunction 正在做不同的事情,这与 inspect.iscoroutineinspect.iscoroutinefunction 情况相同。 值得注意的是,尽管 inspect 在类型检查中不知道有关 asycnio 旧式协程函数的任何信息,但是当您检查 awaitable 状态时它显然知道它们,即使它与 **await** 不一致。

协程封装器 coroutine wrapper

每当你运行 async def ,Python 就会调用一个线程局部的协程封装器。它由 sys.set_coroutine_wrapper 设置,并且它是可以包装这些东西的一个函数。 看起来有点像如下代码:

>>> import sys
>>> sys.set_coroutine_wrapper(lambda x: 42)
>>> async def foo():
...  pass
...
>>> foo()
__main__:1: RuntimeWarning: coroutine 'foo' was never awaited
42

在这种情况下,我从来没有实际调用原始的函数,只是给你一个提示,说明这个函数可以做什么。 目前我只能说它总是线程局部有效,所以,如果替换事件循环策略,你需要搞清楚如何让协程封装器在相同的上下文同步更新。创建的新线程不会从父线程继承那些标识。

这不要与 asyncio 协程封装代码混淆。

awaitable 和 future

有些东西是 awaitable 的。 据我所见,以下概念被认为是 awaitable:

  • 原生的协程
  • 配置了假的 CO_ITERABLE_COROUTINE 标识的生成器(文中有涉及)
  • 具有 __await__ 方法的对象

除了生成器由于历史遗留的原因不使用之外,其它的对象都使用 __await__ 方法。 CO_ITERABLE_COROUTINE 标志来自哪里?它来自一个协程封装器(现在与 sys.set_coroutine_wrapper 有些混淆),即 @asyncio.coroutine。 通过一些间接方法,它使用 types.coroutine(现在与 types.CoroutineTypeasyncio.coroutine 有些混淆)封装生成器,并通过另外一个标志 CO_ITERABLE_COROUTINE 重新创建内部代码对象。

所以既然我们知道这些东西是什么,那么什么是 future? 首先,我们需要澄清一件事情:在 Python 3 中,实际上有两种(完全不兼容)的 future 类型:asyncio.futures.Futureconcurrent.futures.Future。 其中一个出现在另一个之前,但它们都仍然在 asyncio 中使用。 例如,asyncio.run_coroutine_threadsafe() 将调度一个协程到在另一个线程中运行的事件循环,但它返回一个 concurrent.futures.Future 对象,而不是 asyncio.futures.Future 对象。 这是有道理的,因为只有 concurrent.futures.Future 对象是线程安全的。

所以现在我们知道有两个不兼容的 future,我们应该澄清哪个 future 在 asyncio 中。 老实说,我不完全确定差异在哪里,但我打算暂时称之为“最终”。它是一个最终将持有一个值的对象,当还在计算时你可以对最终结果做一些处理。 future 对象的一些变种称为 deferred,还有一些叫做 promise。 我实在难以理解它们真正的区别。

你能用一个 future 对象做什么? 你可以关联一个准备就绪时将被调用的回调函数,或者你可以关联一个 future 失败时将被触发的回调函数。 此外,你可以 await 它(它实现__await__,因此可等待),此外,future 也可以取消。

那么你怎样才能得到这样的 future 对象? 通过在 awaitable 对象上调用 asyncio.ensure_future。它会把一个旧版的生成器转变为 future 对象。 然而,如果你阅读文档,你会读到 asyncio.ensure_future 实际上返回一个task(任务)。 那么问题来了,什么是任务?

任务

任务 task 某种意义上是一个封装了协程的 futur 对象。它的工作方式和 future 类似,但它也有一些额外的方法来提取所包含的协程的当前堆栈。 我们已经见过了在前面提到过的任务,因为它是通过 Task.get_current 确定事件循环当前正在做什么的主要方式。

在如何取消工作方面,任务和 future 也有区别,但这超出了本文的范围。“取消”是它们自己最大的问题。 如果你处于一个协程中,并且知道自己正在运行,你可以通过前面提到的 Task.get_current 获取自己的任务,但这需要你知道自己被派遣在哪个事件循环,该事件循环可能是、也可能不是已绑定的那个线程。

协程不可能知道它与哪个循环一起使用。task 也没有提供该信息的公共 API。 然而,如果你确实可以获得一个任务,你可以访问 task._loop,通过它反指到事件循环。

句柄

除了上面提到的所有一切还有句柄。 句柄是等待执行的不透明对象,不可等待,但可以被取消。 特别是如果你使用 call_soon 或者 call_soon_threadsafe(还有其它一些)调度执行一个调用,你可以获得句柄,然后使用它尽力尝试取消执行,但不能等待实际调用生效。

执行器 Executor

因为你可以有多个事件循环,但这并不意味着每个线程理所当然地应用多个事件循环,最常见的情形还是一个线程一个事件循环。 那么你如何通知另一个事件循环做一些工作? 你不能到另一个线程的事件循环中执行回调函数并获取结果。 这种情况下,你需要使用执行器。

执行器 Executor 来自 concurrent.futures,它允许你将工作安排到本身未发生事件的线程中。 例如,如果在事件循环中使用 run_in_executor 来调度将在另一个线程中调用的函数。 其返回结果是 asyncio 协程,而不是像 run_coroutine_threadsafe 这样的并发协程。 我还没有足够的心智来弄清楚为什么设计这样的 API,应该如何使用,以及什么时候使用。 文档中建议执行器可以用于构建多进程。

传输和协议

我总是认为传输与协议也凌乱不堪,实际这部分内容基本上是对 Twisted 的逐字拷贝。详情毋庸赘述,请直接阅读相关文档。

如何使用 asyncio

现在我们已经大致了解 asyncio,我发现了一些模式,人们似乎在写 asyncio 代码时使用:

  • 将事件循环传递给所有协程。 这似乎是社区中一部分人的做法。 把事件循环信息提供给协程为协程获取自己运行的任务提供了可能性。
  • 或者你要求事件循环绑定到线程,这也能达到同样的目的。 理想情况下两者都支持。 可悲的是,社区已经分化。
  • 如果想使用上下文数据(如线程本地数据),你可谓是运气不佳。 最流行的变通方法显然是 atlassian 的 aiolocals,它基本上需要你手动传递上下文信息到协程,因为解释器不为此提供支持。 这意味着如果你用一个工具类库生成协程,你将失去上下文。
  • 忽略 Python 中的旧式协程。 只使用 3.5 版本中 async def 关键字和协程。 你总可能要用到它们,因为在老版本中,没有异步上下文管理器,这是非常必要的资源管理。
  • 学习重新启动事件循环进行善后清理。 这部分功能和我预想的不同,我花了比较长的时间来厘清它的实现。清理操作的最好方式是不断重启事件循环直到没有等待事件。 遗憾的是没有什么通用的模式来处理清理操作,你只能用一些丑陋的临时方案糊口度日。 例如 aiohttp 的 web 支持也做这个模式,所以如果你想要结合两个清理逻辑,你可能需要重新实现它提供的工具助手,因为该助手功能实现后,它彻底破坏了事件循环的设计。 当然,它不是我见过的第一个干这种坏事的库 :(。
  • 使用子进程是不明显的。 你需要一个事件循环在主线程中运行,我想它是在监听信号事件,然后分派到其它事件循环。 这需要通过 asyncio.get_child_watcher().attach_loop(...) 通知循环。
  • 编写同时支持异步和同步的代码在某种程度上注定要失败。 尝试在同一个对象上支持 withasync with 是危险的事情。
  • 如果你想给一个协程起个更好的名字,弄清楚为什么它没有被等待,设置 __name__没有帮助。 你需要设置 __qualname__ 而不是打印出错误消息来。
  • 有时内部类型交换会使你麻痹。 特别是 asyncio.wait() 函数将确保所有的事情都是 future,这意味着如果你传递协程,你将很难发现你的协程是否已经完成或者正在等待,因为输入对象不再匹配输出对象。 在这种情况下,唯一真正理智的做法是确保前期一切都是 future。

上下文数据

除了疯狂的复杂性和对如何更好地编写 API 缺乏理解,我最大的问题是完全缺乏对上下文本地数据的考虑。这是 Node 社区现在学习的东西。continuation-local-storage 存在,但该实现被接受的太晚。持续本地存储和类似的概念常用于在并发环境中实施安全策略,并且该信息的损坏可能导致严重的安全问题。

事实上,Python 甚至没有任何存储,这令人失望至极。我正在研究这个内容,因为我正在调查如何最好地支持 Sentry's breadcrumbs 的 asyncio,然而我并没有看到一个合理的方式做到这一点。在 asyncio 中没有上下文的概念,没有办法从通用代码中找出您正在使用的事件循环,并且如果没有 monkeypatching(运行环境下的补丁),也无法获取这些信息。

Node 当前正在经历如何找到这个问题的长期解决方案的过程。这个问题不容忽视,因为它在所有生态系统中反复出现过,如 JavaScript、Python 和 .NET 环境。该问题被命名为异步上下文传播,其解决方案有许多名称。在 Go 中,需要使用上下文包,并明确地传递给所有 goroutine(不是一个完美的解决方案,但至少有一个)。.NET 具有本地调用上下文形式的最佳解决方案。它可以是线程上下文,Web 请求上下文或类似的东西,除非被抑制,否则它会自动传播。微软的解决方案是我们的黄金标准。我现在相信,微软在 15 年前已经解决了该问题。

我不知道该生态系统是否还够年轻,还可以添加逻辑调用上下文,可能现在仍然为时未晚。

个人感想

复杂的东西变得越来越复杂。 我没有随意使用 asyncio 的心智。它需要不断地更新所有 Python 语言的变化的知识,这很大程度上使语言本身变得复杂。 令人鼓舞的是,围绕着它的生态系统正在不断发展,只是不知道还需要几年的时间,才能带给开发者愉快和稳定的开发体验。

3.5 版本引入的东西(新的协程对象)非常棒。 特别是这些变化包括引入了一个合理的基础,这些都是我在早期的版本中一直期盼的。在我心中, 通过重载生成器实现协程是一个错误。 关于什么是 asyncio,我难以置喙。 这是一个非常复杂的事情,内部令人眼花缭乱。 我很难理解它工作的所有细节。你什么时候可以传递一个生成器,什么时候它必须是一个真正的协程,future 是什么,任务是什么,事件循环如何工作,这甚至还没有触碰到真正的 IO 部分。

最糟糕的是,asyncio 甚至不是特别快。 David Beazley 演示的它设计的 asyncio 的替代品是原生版本速度的两倍。 asyncio 巨复杂,很难理解,也无法兑现自己在主要特性上的承诺,对于它,我只想说我想静静。我知道,至少我对 asyncio 理解的不够透彻,没有足够的信心对人们如何用它构建代码给出建议。


作者:

Armin Ronacher

软件开发者和开源骨灰, Flask 框架的创造者。


via: http://lucumr.pocoo.org/2016/10/30/i-dont-understand-asyncio/

作者:Armin Ronacher 译者:firstadream 校对:jasminepeng

本文由 LCTT 原创编译,Linux中国 荣誉推出

聊天机器人(Bot) 是一种像 Slack 一样的实用的互动聊天服务方式。如果你之前从来没有建立过聊天机器人,那么这篇文章提供了一个简单的入门指南,告诉你如何用 Python 结合 Slack API 建立你第一个聊天机器人。

我们通过搭建你的开发环境, 获得一个 Slack API 的聊天机器人令牌,并用 Pyhon 开发一个简单聊天机器人。

我们所需的工具

我们的聊天机器人我们将它称作为“StarterBot”,它需要 Python 和 Slack API。要运行我们的 Python 代码,我们需要:

当你在本教程中进行构建时,Slack API 文档 是很有用的。

本教程中所有的代码都放在 slack-starterbot 公共库里,并以 MIT 许可证开源。

搭建我们的环境

我们现在已经知道我们的项目需要什么样的工具,因此让我们来搭建我们所的开发环境吧。首先到终端上(或者 Windows 上的命令提示符)并且切换到你想要存储这个项目的目录。在那个目录里,创建一个新的 virtualenv 以便和其他的 Python 项目相隔离我们的应用程序依赖关系。

virtualenv starterbot

激活 virtualenv:

source starterbot/bin/activate

你的提示符现在应该看起来如截图:

已经激活的 starterbot 的 virtualenv的命令提示符

这个官方的 slack 客户端 API 帮助库是由 Slack 建立的,它可以通过 Slack 通道发送和接收消息。通过这个 pip 命令安装 slackclient 库:

pip install slackclient

pip 命令完成时,你应该看到类似这样的输出,并返回提示符。

在已经激活的 virtualenv 用 pip 安装 slackclient 的输出

我们也需要为我们的 Slack 项目获得一个访问令牌,以便我们的聊天机器人可以用它来连接到 Slack API。

Slack 实时消息传递(RTM)API

Slack 允许程序通过一个 Web API 来访问他们的消息传递通道。去这个 Slack Web API 页面 注册建立你自己的 Slack 项目。你也可以登录一个你拥有管理权限的已有账号。

使用 Web API页面的右上角登录按钮

登录后你会到达 聊天机器人用户页面

定制聊天机器人用户页面

给你的聊天机器人起名为“starterbot”然后点击 “Add bot integration” 按钮。

添加一个bot integration 并起名为“starterbot”

这个页面将重新加载,你将看到一个新生成的访问令牌。你还可以将标志改成你自己设计的。例如我给的这个“Full Stack Python”标志。

为你的新 Slack 聊天机器人复制和粘贴访问令牌

在页面底部点击“Save Integration”按钮。你的聊天机器人现在已经准备好连接 Slack API。

Python 开发人员的一个常见的做法是以环境变量输出秘密令牌。输出的 Slack 令牌名字为SLACK_BOT_TOKEN

export SLACK_BOT_TOKEN='你的 slack 令牌粘帖在这里'

好了,我们现在得到了将这个 Slack API 用作聊天机器人的授权。

我们建立聊天机器人还需要更多信息:我们的聊天机器人的 ID。接下来我们将会写一个简短的脚本,从 Slack API 获得该 ID。

获得我们聊天机器人的 ID

这是最后写一些 Python 代码的时候了! 我们编写一个简短的 Python 脚本获得 StarterBot 的 ID 来热身一下。这个 ID 基于 Slack 项目而不同。

我们需要该 ID,当解析从 Slack RTM 上发给 StarterBot 的消息时,它用于对我们的应用验明正身。我们的脚本也会测试我们 SLACK_BOT_TOKEN 环境变量是否设置正确。

建立一个命名为 printbotid.py 的新文件,并且填入下面的代码:

import os
from slackclient import SlackClient

BOT_NAME = 'starterbot'

slack_client = SlackClient(os.environ.get('SLACK_BOT_TOKEN'))

if __name__ == "__main__":
    api_call = slack_client.api_call("users.list")
    if api_call.get('ok'):
        # retrieve all users so we can find our bot
        users = api_call.get('members')
        for user in users:
            if 'name' in user and user.get('name') == BOT_NAME:
                print("Bot ID for '" + user['name'] + "' is " + user.get('id'))
    else:
        print("could not find bot user with the name " + BOT_NAME)

我们的代码导入 SlackClient,并用我们设置的环境变量 SLACK_BOT_TOKEN 实例化它。 当该脚本通过 python 命令执行时,我们通过会访问 Slack API 列出所有的 Slack 用户并且获得匹配一个名字为“satrterbot”的 ID。

这个获得聊天机器人的 ID 的脚本我们仅需要运行一次。

python print_bot_id.py

当它运行为我们提供了聊天机器人的 ID 时,脚本会打印出简单的一行输出。

在你的 Slack 项目中用 Python 脚本打印 Slack 聊天机器人的 ID

复制这个脚本打印出的唯一 ID。并将该 ID 作为一个环境变量 BOT_ID 输出。

(starterbot)$ export BOT_ID='bot id returned by script'

这个脚本仅仅需要运行一次来获得聊天机器人的 ID。 我们现在可以在我们的运行 StarterBot 的 Python应用程序中使用这个 ID 。

编码我们的 StarterBot

现在我们拥有了写我们的 StarterBot 代码所需的一切。 创建一个新文件命名为 starterbot.py ,它包括以下代码。

import os
import time
from slackclient import SlackClient

osSlackClient 的导入我们看起来很熟悉,因为我们已经在 theprintbotid.py 中用过它们了。

通过我们导入的依赖包,我们可以使用它们获得环境变量值,并实例化 Slack 客户端。

# starterbot 的 ID 作为一个环境变量
BOT_ID = os.environ.get("BOT_ID")

# 常量
AT_BOT = "<@" + BOT_ID + ">:"
EXAMPLE_COMMAND = "do"

# 实例化 Slack 和 Twilio 客户端
slack_client = SlackClient(os.environ.get('SLACK_BOT_TOKEN'))

该代码通过我们以输出的环境变量 SLACK_BOT_TOKEN 实例化SlackClient` 客户端。

if __name__ == "__main__":
    READ_WEBSOCKET_DELAY = 1 # 1 从 firehose 读取延迟 1 秒
    if slack_client.rtm_connect():
        print("StarterBot connected and running!")
        while True:
            command, channel = parse_slack_output(slack_client.rtm_read())
            if command and channel:
                handle_command(command, channel)
            time.sleep(READ_WEBSOCKET_DELAY)
    else:
        print("Connection failed. Invalid Slack token or bot ID?")

Slack 客户端会连接到 Slack RTM API WebSocket,然后当解析来自 firehose 的消息时会不断循环。如果有任何发给 StarterBot 的消息,那么一个被称作 handle_command 的函数会决定做什么。

接下来添加两个函数来解析 Slack 的输出并处理命令。

def handle_command(command, channel):
    """
        Receives commands directed at the bot and determines if they
        are valid commands. If so, then acts on the commands. If not,
        returns back what it needs for clarification.
    """
    response = "Not sure what you mean. Use the *" + EXAMPLE_COMMAND + \
               "* command with numbers, delimited by spaces."
    if command.startswith(EXAMPLE_COMMAND):
        response = "Sure...write some more code then I can do that!"
    slack_client.api_call("chat.postMessage", channel=channel,
                          text=response, as_user=True)

def parse_slack_output(slack_rtm_output):
    """
        The Slack Real Time Messaging API is an events firehose.
        this parsing function returns None unless a message is
        directed at the Bot, based on its ID.
    """
    output_list = slack_rtm_output
    if output_list and len(output_list) > 0:
        for output in output_list:
            if output and 'text' in output and AT_BOT in output['text']:
                # 返回 @ 之后的文本,删除空格
                return output['text'].split(AT_BOT)[1].strip().lower(), \
                       output['channel']
    return None, None

parse_slack_output 函数从 Slack 接受信息,并且如果它们是发给我们的 StarterBot 时会作出判断。消息以一个给我们的聊天机器人 ID 的直接命令开始,然后交由我们的代码处理。目前只是通过 Slack 管道发布一个消息回去告诉用户去多写一些 Python 代码!

这是整个程序组合在一起的样子 (你也可以 在 GitHub 中查看该文件):

import os
import time
from slackclient import SlackClient

# starterbot 的 ID 作为一个环境变量
BOT_ID = os.environ.get("BOT_ID")

# 常量
AT_BOT = "<@" + BOT_ID + ">:"
EXAMPLE_COMMAND = "do"

# 实例化 Slack 和 Twilio 客户端
slack_client = SlackClient(os.environ.get('SLACK_BOT_TOKEN'))

def handle_command(command, channel):
    """
        Receives commands directed at the bot and determines if they
        are valid commands. If so, then acts on the commands. If not,
        returns back what it needs for clarification.
    """
    response = "Not sure what you mean. Use the *" + EXAMPLE_COMMAND + \
               "* command with numbers, delimited by spaces."
    if command.startswith(EXAMPLE_COMMAND):
        response = "Sure...write some more code then I can do that!"
    slack_client.api_call("chat.postMessage", channel=channel,
                          text=response, as_user=True)

def parse_slack_output(slack_rtm_output):
    """
        The Slack Real Time Messaging API is an events firehose.
        this parsing function returns None unless a message is
        directed at the Bot, based on its ID.
    """
    output_list = slack_rtm_output
    if output_list and len(output_list) > 0:
        for output in output_list:
            if output and 'text' in output and AT_BOT in output['text']:
                # 返回 @ 之后的文本,删除空格
                return output['text'].split(AT_BOT)[1].strip().lower(), \
                       output['channel']
    return None, None

if __name__ == "__main__":
    READ_WEBSOCKET_DELAY = 1 # 1 second delay between reading from firehose
    if slack_client.rtm_connect():
        print("StarterBot connected and running!")
        while True:
            command, channel = parse_slack_output(slack_client.rtm_read())
            if command and channel:
                handle_command(command, channel)
            time.sleep(READ_WEBSOCKET_DELAY)
    else:
        print("Connection failed. Invalid Slack token or bot ID?")

现在我们的代码已经有了,我们可以通过 python starterbot.py 来运行我们 StarterBot 的代码了。

当 StarterBot 开始运行而且连接到 API 的输出通道

在 Slack 中创建新通道,并且把 StarterBot 邀请进来,或者把 StarterBot 邀请进一个已经存在的通道中。

在 Slack 界面创建一个新通道并且邀请 StarterBot

现在在你的通道中给 StarterBot 发命令。

在你的 Slack 通道里给你的 StarterBot 发命令

如果你从聊天机器人得到的响应中遇见问题,你可能需要做一个修改。正如上面所写的这个教程,其中一行 AT_BOT = "<@" + BOT_ID + ">:",在“@starter”(你给你自己的聊天机器人起的名字)后需要一个冒号。从 AT_BOT 字符串后面移除:。Slack 似乎需要在@ 一个人名后加一个冒号,但这好像是有些不协调的。

结束

好吧,你现在已经获得一个简易的聊天机器人,你可以在代码中很多地方加入你想要创建的任何特性。

我们能够使用 Slack RTM API 和 Python 完成很多功能。看看通过这些文章你还可以学习到什么:

有问题? 通过 Twitter 联系我 @fullstackpython@mattmakai。 我在 GitHub 上的用户名是 mattmakai

这篇文章感兴趣? Fork 这个 GitHub 上的页面吧。


via: https://www.fullstackpython.com/blog/build-first-slack-bot-python.html

作者:Matt Makai 译者:jiajia9llinuxer 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出aa