分类 软件开发 下的文章

在这篇文章中,我们将介绍单元测试的布尔断言方法 assertTrueassertFalse 与身份断言 assertIs 之间的区别。

定义

下面是目前单元测试模块文档中关于 assertTrueassertFalse 的说明,代码进行了高亮:

assertTrue(expr, msg=None)

assertFalse(expr, msg=None)

测试该表达式是真值(或假值)。

注:这等价于

bool(expr) is True

而不等价于

expr is True

(后一种情况请使用 assertIs(expr, True))。

Mozilla 开发者网络中定义 真值 如下:

在一个布尔值的上下文环境中能变成“真”的值

在 Python 中等价于:

bool(expr) is True

这个和 assertTrue 的测试目的完全匹配。

因此该文档中已经指出 assertTrue 返回真值,assertFalse 返回假值。这些断言方法从接受到的值构造出一个布尔值,然后判断它。同样文档中也建议我们根本不应该使用 assertTrueassertFalse

在实践中怎么理解?

我们使用一个非常简单的例子 - 一个名称为 always_true 的函数,它返回 True。我们为它写一些测试用例,然后改变代码,看看测试用例的表现。

作为开始,我们先写两个测试用例。一个是“宽松的”:使用 assertTrue 来测试真值。另外一个是“严格的”:使用文档中建议的 assertIs 函数。

import unittest
from func import always_true

class TestAlwaysTrue(unittest.TestCase):
    def test_assertTrue(self):
        """
        always_true returns a truthy value
        """
        result = always_true()

        self.assertTrue(result)

    def test_assertIs(self):
        """
        always_true returns True
        """
        result = always_true()

        self.assertIs(result, True)

下面是 func.py 中的非常简单的函数代码:

def always_true():
    """
    I'm always True.

    Returns:
        bool: True
    """
    return True

当你运行时,所有测试都通过了:

always_true returns True ... ok
always_true returns a truthy value ... ok

----------------------------------------------------------------------
Ran 2 tests in 0.004s

OK

开心ing~

现在,某个人将 always_true 函数改变成下面这样:

def always_true():
    """
    I'm always True.

    Returns:
        bool: True
    """
    return 'True'

它现在是用返回字符串 "True" 来替代之前反馈的 True (布尔值)。(当然,那个“某人”并没有更新文档 - 后面我们会增加难度。)

这次结果并不如开心了:

always_true returns True ... FAIL
always_true returns a truthy value ... ok

======================================================================
FAIL: always_true returns True
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/tmp/assertttt/test.py", line 22, in test_is_true
    self.assertIs(result, True)
AssertionError: 'True' is not True

----------------------------------------------------------------------
Ran 2 tests in 0.004s

FAILED (failures=1)

只有一个测试用例失败了!这意味着 assertTrue 给了我们一个 误判 false-positive 。在它不应该通过测试时,它通过了。很幸运的是我们第二个测试是使用 assertIs 来写的。

因此,跟手册上了解到的信息一样,为了保证 always_true 的功能和更严格测试的结果保持一致,应该使用 assertIs 而不是 assertTrue

使用断言的辅助方法

使用 assertIs 来测试返回 TrueFalse 来冗长了。因此,如果你有个项目需要经常检查是否是返回了 True 或者 False,那们你可以自己编写一些断言的辅助方法。

这好像并没有节省大量的代码,但是我个人觉得提高了代码的可读性。

def assertIsTrue(self, value):
    self.assertIs(value, True)

def assertIsFalse(self, value):
    self.assertIs(value, False)

总结

一般来说,我的建议是让测试越严格越好。如果你想测试 True 或者 False,听从文档的建议,使用 assertIs。除非不得已,否则不要使用 assertTrueassertFalse

如果你面对的是一个可以返回多种类型的函数,例如,有时候返回布尔值,有时候返回整形,那么考虑重构它。这是代码的异味。在 Python 中,抛出一个异常比使用 False 表示错误更好。

此外,如果你确实想使用断言来判断函数的返回值是否是真,可能还存在第二个代码异味 - 代码是正确封装了吗?如果 assertTrueassertFalse 是根据正确的 if 语句来执行,那么值得检查下你是否把所有你想要的东西都封装在合适的位置。也许这些 if 语句应该封装在测试的函数中。

测试开心!


via: http://jamescooke.info/python-unittest-asserttrue-is-truthy-assertfalse-is-falsy.html

作者:James Cooke 译者:chunyang-wen 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

这是这个系列发布的第三篇关于如何构建数据 科学作品集 Data Science Portfolio 的文章。如果你喜欢这个系列并且想继续关注,你可以在订阅页面的底部找到链接

数据科学公司在决定雇佣时越来越关注你在数据科学方面的 作品集 Portfolio 。这其中的一个原因是,这样的作品集是判断某人的实际技能的最好的方法。好消息是构建这样的作品集完全要看你自己。只要你在这方面付出了努力,你一定可以取得让这些公司钦佩的作品集。

构建高质量的作品集的第一步就是知道需要什么技能。公司想要在数据科学方面拥有的、他们希望你能够运用的主要技能有:

  • 沟通能力
  • 协作能力
  • 技术能力
  • 数据推理能力
  • 动机和主动性

任何好的作品集都由多个项目表现出来,其中每个都能够表现出以上一到两点。这是本系列的第三篇,本系列我们主要讲包括如何打造面面俱到的数据科学作品集。在这一篇中,我们主要涵盖了如何构建组成你的作品集的第二个项目,以及如何创建一个端对端的机器学习项目。在最后,我们将拥有一个展示你的数据推理能力和技术能力的项目。如果你想看一下的话,这里有一个完整的例子。

一个端到端的项目

作为一个数据科学家,有时候你会拿到一个数据集并被问如何用它来讲故事。在这个时候,沟通就是非常重要的,你需要用它来完成这个事情。像我们在前一篇文章中用过的,类似 Jupyter notebook 这样的工具,将对你非常有帮助。在这里你能找到一些可以用的报告或者总结文档。

不管怎样,有时候你会被要求创建一个具有操作价值的项目。具有操作价值的项目将直接影响到公司的日常业务,它会使用不止一次,经常是许多人使用。这个任务可能像这样 “创建一个算法来预测周转率”或者“创建一个模型来自动对我们的文章打标签”。在这种情况下,技术能力比讲故事更重要。你必须能够得到一个数据集,并且理解它,然后创建脚本处理该数据。这个脚本要运行的很快,占用系统资源很小。通常它可能要运行很多次,脚本的可使用性也很重要,并不仅仅是一个演示版。可使用性是指整合进操作流程,并且甚至是是面向用户的。

端对端项目的主要组成部分:

  • 理解背景
  • 浏览数据并找出细微差别
  • 创建结构化项目,那样比较容易整合进操作流程
  • 运行速度快、占用系统资源小的高性能代码
  • 写好安装和使用文档以便其他人用

为了有效的创建这种类型的项目,我们可能需要处理多个文件。强烈推荐使用 Atom 这样的文本编辑器或者 PyCharm 这样的 IDE。这些工具允许你在文件间跳转,编辑不同类型的文件,例如 markdown 文件,Python 文件,和 csv 文件等等。结构化你的项目还利于版本控制,并上传一个类似 Github 这样的协作开发工具上也很有用。

Github 上的这个项目

在这一节中我们将使用 Pandasscikit-learn 这样的库,我们还将大量用到 Pandas DataFrames,它使得 python 读取和处理表格数据更加方便。

找到好的数据集

为一个端到端的作品集项目的找到好的数据集很难。在内存和性能的限制下,数据集需要尽量的大。它还需要是实际有用的。例如,这个数据集,它包含有美国院校的录取标准、毕业率以及毕业以后的收入,是个很好的可以讲故事的数据集。但是,不管你如何看待这个数据,很显然它不适合创建端到端的项目。比如,你能告诉人们他们去了这些大学以后的未来收入,但是这个快速检索却并不足够呈现出你的技术能力。你还能找出院校的招生标准和更高的收入相关,但是这更像是常理而不是你的技术结论。

这里还有内存和性能约束的问题,比如你有几千兆的数据,而且当你需要找到一些差异时,就需要对数据集一遍遍运行算法。

一个好的可操作的数据集可以让你构建一系列脚本来转换数据、动态地回答问题。一个很好的例子是股票价格数据集,当股市关闭时,就会给算法提供新的数据。这可以让你预测明天的股价,甚至预测收益。这不是讲故事,它带来的是真金白银。

一些找到数据集的好地方:

  • /r/datasets – 有上百的有趣数据的 subreddit(Reddit 是国外一个社交新闻站点,subreddit 指该论坛下的各不同版块)。
  • Google Public Datasets – 通过 Google BigQuery 使用的公开数据集。
  • Awesome datasets – 一个数据集列表,放在 Github 上。

当你查看这些数据集时,想一下人们想要在这些数据集中得到什么答案,哪怕这些问题只想过一次(“房价是如何与标准普尔 500 指数关联的?”),或者更进一步(“你能预测股市吗?”)。这里的关键是更进一步地找出问题,并且用相同的代码在不同输入(不同的数据)上运行多次。

对于本文的目标,我们来看一下 房利美 Fannie Mae 贷款数据。房利美是一家在美国的政府赞助的企业抵押贷款公司,它从其他银行购买按揭贷款,然后捆绑这些贷款为贷款证券来转卖它们。这使得贷款机构可以提供更多的抵押贷款,在市场上创造更多的流动性。这在理论上会带来更多的住房和更好的贷款期限。从借款人的角度来说,它们大体上差不多,话虽这样说。

房利美发布了两种类型的数据 – 它获得的贷款的数据,和贷款偿还情况的数据。在理想的情况下,有人向贷款人借钱,然后还款直到还清。不管怎样,有些人多次不还,从而丧失了抵押品赎回权。抵押品赎回权是指没钱还了被银行把房子给收走了。房利美会追踪谁没还钱,并且哪个贷款需要收回抵押的房屋(取消赎回权)。每个季度会发布此数据,发布的是滞后一年的数据。当前可用是 2015 年第一季度数据。

“贷款数据”是由房利美发布的贷款发放的数据,它包含借款人的信息、信用评分,和他们的家庭贷款信息。“执行数据”,贷款发放后的每一个季度公布,包含借贷人的还款信息和是否丧失抵押品赎回权的状态,一个“贷款数据”的“执行数据”可能有十几行。可以这样理解,“贷款数据”告诉你房利美所控制的贷款,“执行数据”包含该贷款一系列的状态更新。其中一个状态更新可以告诉我们一笔贷款在某个季度被取消赎回权了。

一个没有及时还贷的房子就这样的被卖了

选择一个角度

这里有几个我们可以去分析房利美数据集的方向。我们可以:

  • 预测房屋的销售价格。
  • 预测借款人还款历史。
  • 在获得贷款时为每一笔贷款打分。

最重要的事情是坚持单一的角度。同时关注太多的事情很难做出效果。选择一个有着足够细节的角度也很重要。下面的角度就没有太多细节:

  • 找出哪些银行将贷款出售给房利美的多数被取消赎回权。
  • 计算贷款人的信用评分趋势。
  • 找到哪些类型的家庭没有偿还贷款的能力。
  • 找到贷款金额和抵押品价格之间的关系。

上面的想法非常有趣,如果我们关注于讲故事,那是一个不错的角度,但是不是很适合一个操作性项目。

在房利美数据集中,我们将仅使用申请贷款时有的那些信息来预测贷款是否将来会被取消赎回权。实际上, 我们将为每一笔贷款建立“分数”来告诉房利美买还是不买。这将给我们打下良好的基础,并将组成这个漂亮的作品集的一部分。

理解数据

我们来简单看一下原始数据文件。下面是 2012 年 1 季度前几行的贷款数据:

100000853384|R|OTHER|4.625|280000|360|02/2012|04/2012|31|31|1|23|801|N|C|SF|1|I|CA|945||FRM|
100003735682|R|SUNTRUST MORTGAGE INC.|3.99|466000|360|01/2012|03/2012|80|80|2|30|794|N|P|SF|1|P|MD|208||FRM|788
100006367485|C|PHH MORTGAGE CORPORATION|4|229000|360|02/2012|04/2012|67|67|2|36|802|N|R|SF|1|P|CA|959||FRM|794

下面是 2012 年 1 季度的前几行执行数据:

100000853384|03/01/2012|OTHER|4.625||0|360|359|03/2042|41860|0|N||||||||||||||||
100000853384|04/01/2012||4.625||1|359|358|03/2042|41860|0|N||||||||||||||||
100000853384|05/01/2012||4.625||2|358|357|03/2042|41860|0|N||||||||||||||||

在开始编码之前,花些时间真正理解数据是值得的。这对于操作性项目优为重要,因为我们没有交互式探索数据,将很难察觉到细微的差别,除非我们在前期发现他们。在这种情况下,第一个步骤是阅读房利美站点的资料:

在看完这些文件后后,我们了解到一些能帮助我们的关键点:

  • 从 2000 年到现在,每季度都有一个贷款和执行文件,因数据是滞后一年的,所以到目前为止最新数据是 2015 年的。
  • 这些文件是文本格式的,采用管道符号|进行分割。
  • 这些文件是没有表头的,但我们有个文件列明了各列的名称。
  • 所有一起,文件包含 2200 万个贷款的数据。
  • 由于执行数据的文件包含过去几年获得的贷款的信息,在早些年获得的贷款将有更多的执行数据(即在 2014 获得的贷款没有多少历史执行数据)。

这些小小的信息将会为我们节省很多时间,因为这样我们就知道如何构造我们的项目和利用这些数据了。

构造项目

在我们开始下载和探索数据之前,先想一想将如何构造项目是很重要的。当建立端到端项目时,我们的主要目标是:

  • 创建一个可行解决方案
  • 有一个快速运行且占用最小资源的解决方案
  • 容易可扩展
  • 写容易理解的代码
  • 写尽量少的代码

为了实现这些目标,需要对我们的项目进行良好的构造。一个结构良好的项目遵循几个原则:

  • 分离数据文件和代码文件
  • 从原始数据中分离生成的数据。
  • 有一个 README.md 文件帮助人们安装和使用该项目。
  • 有一个 requirements.txt 文件列明项目运行所需的所有包。
  • 有一个单独的 settings.py 文件列明其它文件中使用的所有的设置

    • 例如,如果从多个 Python 脚本读取同一个文件,让它们全部 import 设置并从一个集中的地方获得文件名是有用的。
  • 有一个 .gitignore 文件,防止大的或密码文件被提交。
  • 分解任务中每一步可以单独执行的步骤到单独的文件中。

    • 例如,我们将有一个文件用于读取数据,一个用于创建特征,一个用于做出预测。
  • 保存中间结果,例如,一个脚本可以输出下一个脚本可读取的文件。

    • 这使我们无需重新计算就可以在数据处理流程中进行更改。

我们的文件结构大体如下:

loan-prediction
├── data
├── processed
├── .gitignore
├── README.md
├── requirements.txt
├── settings.py

创建初始文件

首先,我们需要创建一个 loan-prediction 文件夹,在此文件夹下面,再创建一个 data 文件夹和一个 processed 文件夹。data 文件夹存放原始数据,processed 文件夹存放所有的中间计算结果。

其次,创建 .gitignore 文件,.gitignore 文件将保证某些文件被 git 忽略而不会被推送至 GitHub。关于这个文件的一个好的例子是由 OSX 在每一个文件夹都会创建的 .DS_Store 文件,.gitignore 文件一个很好的范本在这里。我们还想忽略数据文件,因为它们实在是太大了,同时房利美的条文禁止我们重新分发该数据文件,所以我们应该在我们的文件后面添加以下 2 行:

data
processed

这里是该项目的一个关于 .gitignore 文件的例子。

再次,我们需要创建 README.md 文件,它将帮助人们理解该项目。后缀 .md 表示这个文件采用 markdown 格式。Markdown 使你能够写纯文本文件,同时还可以添加你想要的神奇的格式。这里是关于 markdown 的导引。如果你上传一个叫 README.md 的文件至 Github,Github 会自动处理该 markdown,同时展示给浏览该项目的人。例子在这里

至此,我们仅需在 README.md 文件中添加简单的描述:

Loan Prediction
-----------------------

Predict whether or not loans acquired by Fannie Mae will go into foreclosure.  Fannie Mae acquires loans from other lenders as a way of inducing them to lend more.  Fannie Mae releases data on the loans it has acquired and their performance afterwards [here](http://www.fanniemae.com/portal/funding-the-market/data/loan-performance-data.html).

现在,我们可以创建 requirements.txt 文件了。这会帮助其它人可以很方便地安装我们的项目。我们还不知道我们将会具体用到哪些库,但是以下几个库是需要的:

pandas
matplotlib
scikit-learn
numpy
ipython
scipy

以上几个是在 python 数据分析任务中最常用到的库。可以认为我们将会用到大部分这些库。这里是该项目 requirements.txt 文件的一个例子。

创建 requirements.txt 文件之后,你应该安装这些包了。我们将会使用 python3。如果你没有安装 python,你应该考虑使用 Anaconda,它是一个 python 安装程序,同时安装了上面列出的所有包。

最后,我们可以建立一个空白的 settings.py 文件,因为我们的项目还没有任何设置。

获取数据

一旦我们有了项目的基本架构,我们就可以去获得原始数据。

房利美对获取数据有一些限制,所以你需要去注册一个账户。在创建完账户之后,你可以找到在这里的下载页面,你可以按照你所需要的下载或多或少的贷款数据文件。文件格式是 zip,在解压后当然是非常大的。

为了达到我们这个文章的目的,我们将要下载从 2012 年 1 季度到 2015 年 1 季度的所有数据。接着我们需要解压所有的文件。解压过后,删掉原来的 .zip 格式的文件。最后,loan-prediction 文件夹看起来应该像下面的一样:

loan-prediction
├── data
│   ├── Acquisition_2012Q1.txt
│   ├── Acquisition_2012Q2.txt
│   ├── Performance_2012Q1.txt
│   ├── Performance_2012Q2.txt
│   └── ...
├── processed
├── .gitignore
├── README.md
├── requirements.txt
├── settings.py

在下载完数据后,你可以在 shell 命令行中使用 headtail 命令去查看文件中的行数据,你看到任何的不需要的数据列了吗?在做这件事的同时查阅列名称的 pdf 文件可能有帮助。

读入数据

有两个问题让我们的数据难以现在就使用:

  • 贷款数据和执行数据被分割在多个文件中
  • 每个文件都缺少列名标题

在我们开始使用数据之前,我们需要首先明白我们要在哪里去存一个贷款数据的文件,同时到哪里去存储一个执行数据的文件。每个文件仅仅需要包括我们关注的那些数据列,同时拥有正确的列名标题。这里有一个小问题是执行数据非常大,因此我们需要尝试去修剪一些数据列。

第一步是向 settings.py 文件中增加一些变量,这个文件中同时也包括了我们原始数据的存放路径和处理出的数据存放路径。我们同时也将添加其他一些可能在接下来会用到的设置数据:

DATA_DIR = "data"
PROCESSED_DIR = "processed"
MINIMUM_TRACKING_QUARTERS = 4
TARGET = "foreclosure_status"
NON_PREDICTORS = [TARGET, "id"]
CV_FOLDS = 3

把路径设置在 settings.py 中使它们放在一个集中的地方,同时使其修改更加的容易。当在多个文件中用到相同的变量时,你想改变它的话,把他们放在一个地方比分散放在每一个文件时更加容易。这里的是一个这个工程的示例 settings.py 文件

第二步是创建一个文件名为 assemble.py,它将所有的数据分为 2 个文件。当我们运行 Python assemble.py,我们在处理数据文件的目录会获得 2 个数据文件。

接下来我们开始写 assemble.py 文件中的代码。首先我们需要为每个文件定义相应的列名标题,因此我们需要查看列名称的 pdf 文件,同时创建在每一个贷款数据和执行数据的文件的数据列的列表:

HEADERS = {
    "Acquisition": [
        "id",
        "channel",
        "seller",
        "interest_rate",
        "balance",
        "loan_term",
        "origination_date",
        "first_payment_date",
        "ltv",
        "cltv",
        "borrower_count",
        "dti",
        "borrower_credit_score",
        "first_time_homebuyer",
        "loan_purpose",
        "property_type",
        "unit_count",
        "occupancy_status",
        "property_state",
        "zip",
        "insurance_percentage",
        "product_type",
        "co_borrower_credit_score"
    ],
    "Performance": [
        "id",
        "reporting_period",
        "servicer_name",
        "interest_rate",
        "balance",
        "loan_age",
        "months_to_maturity",
        "maturity_date",
        "msa",
        "delinquency_status",
        "modification_flag",
        "zero_balance_code",
        "zero_balance_date",
        "last_paid_installment_date",
        "foreclosure_date",
        "disposition_date",
        "foreclosure_costs",
        "property_repair_costs",
        "recovery_costs",
        "misc_costs",
        "tax_costs",
        "sale_proceeds",
        "credit_enhancement_proceeds",
        "repurchase_proceeds",
        "other_foreclosure_proceeds",
        "non_interest_bearing_balance",
        "principal_forgiveness_balance"
    ]
}

接下来一步是定义我们想要保留的数据列。因为我们要预测一个贷款是否会被撤回,我们可以丢弃执行数据中的许多列。我们将需要保留贷款数据中的所有数据列,因为我们需要尽量多的了解贷款发放时的信息(毕竟我们是在预测贷款发放时这笔贷款将来是否会被撤回)。丢弃数据列将会使我们节省下内存和硬盘空间,同时也会加速我们的代码。

SELECT = {
    "Acquisition": HEADERS["Acquisition"],
    "Performance": [
        "id",
        "foreclosure_date"
    ]
}

下一步,我们将编写一个函数来连接数据集。下面的代码将:

  • 引用一些需要的库,包括 settings
  • 定义一个函数 concatenate,目的是:

    • 获取到所有 data 目录中的文件名。
    • 遍历每个文件。

      • 如果文件不是正确的格式 (不是以我们需要的格式作为开头),我们将忽略它。
      • 通过使用 Pandas 的 read\_csv 函数及正确的设置把文件读入一个 DataFrame

        • 设置分隔符为,以便所有的字段能被正确读出。
        • 数据没有标题行,因此设置 headerNone 来进行标示。
        • HEADERS 字典中设置正确的标题名称 – 这将会是我们的 DataFrame 中的数据列名称。
        • 仅选择我们加在 SELECT 中的 DataFrame 的列。
  • 把所有的 DataFrame 共同连接在一起。
  • 把已经连接好的 DataFrame 写回一个文件。
import os
import settings
import pandas as pd

def concatenate(prefix="Acquisition"):
    files = os.listdir(settings.DATA_DIR)
    full = []
    for f in files:
        if not f.startswith(prefix):
            continue

        data = pd.read_csv(os.path.join(settings.DATA_DIR, f), sep="|", header=None, names=HEADERS[prefix], index_col=False)
        data = data[SELECT[prefix]]
        full.append(data)

    full = pd.concat(full, axis=0)

    full.to_csv(os.path.join(settings.PROCESSED_DIR, "{}.txt".format(prefix)), sep="|", header=SELECT[prefix], index=False)

我们可以通过调用上面的函数,通过传递的参数 AcquisitionPerformance 两次以将所有的贷款和执行文件连接在一起。下面的代码将:

  • 仅在命令行中运行 python assemble.py 时执行。
  • 将所有的数据连接在一起,并且产生 2 个文件:

    • processed/Acquisition.txt
    • processed/Performance.txt
if __name__ == "__main__":
    concatenate("Acquisition")
    concatenate("Performance")

我们现在拥有了一个漂亮的,划分过的 assemble.py 文件,它很容易执行,也容易建立。通过像这样把问题分解为一块一块的,我们构建工程就会变的容易许多。不用一个可以做所有工作的凌乱脚本,我们定义的数据将会在多个脚本间传递,同时使脚本间完全的彼此隔离。当你正在一个大的项目中工作时,这样做是一个好的想法,因为这样可以更加容易修改其中的某一部分而不会引起其他项目中不关联部分产生预料之外的结果。

一旦我们完成 assemble.py 脚本文件,我们可以运行 python assemble.py 命令。你可以在这里查看完整的 assemble.py 文件。

这将会在 processed 目录下产生 2 个文件:

loan-prediction
├── data
│   ├── Acquisition_2012Q1.txt
│   ├── Acquisition_2012Q2.txt
│   ├── Performance_2012Q1.txt
│   ├── Performance_2012Q2.txt
│   └── ...
├── processed
│   ├── Acquisition.txt
│   ├── Performance.txt
├── .gitignore
├── assemble.py
├── README.md
├── requirements.txt
├── settings.py

计算来自执行数据的值

接下来我们会计算来自 processed/Performance.txt 中的值。我们要做的就是推测这些资产是否被取消赎回权。如果能够计算出来,我们只要看一下关联到贷款的执行数据的参数 foreclosure_date 就可以了。如果这个参数的值是 None,那么这些资产肯定没有收回。为了避免我们的样例中只有少量的执行数据,我们会为每个贷款计算出执行数据文件中的行数。这样我们就能够从我们的训练数据中筛选出贷款数据,排除了一些执行数据。

下面是我认为贷款数据和执行数据的关系:

在上面的表格中,贷款数据中的每一行数据都关联到执行数据中的多行数据。在执行数据中,在取消赎回权的时候 foreclosure_date 就会出现在该季度,而之前它是空的。一些贷款还没有被取消赎回权,所以与执行数据中的贷款数据有关的行在 foreclosure_date 列都是空格。

我们需要计算 foreclosure_status 的值,它的值是布尔类型,可以表示一个特殊的贷款数据 id 是否被取消赎回权过,还有一个参数 performance_count ,它记录了执行数据中每个贷款 id 出现的行数。 

计算这些行数有多种不同的方法:

  • 我们能够读取所有的执行数据,然后我们用 Pandas 的 groupby 方法在 DataFrame 中计算出与每个贷款 id 有关的行的行数,然后就可以查看贷款 idforeclosure_date 值是否为 None

    • 这种方法的优点是从语法上来说容易执行。
    • 它的缺点需要读取所有的 129236094 行数据,这样就会占用大量内存,并且运行起来极慢。
  • 我们可以读取所有的执行数据,然后在贷款 DataFrame 上使用 apply 去计算每个贷款 id 出现的次数。

    • 这种方法的优点是容易理解。
    • 缺点是需要读取所有的 129236094 行数据。这样会占用大量内存,并且运行起来极慢。
  • 我们可以在迭代访问执行数据中的每一行数据,而且会建立一个单独的计数字典。

    • 这种方法的优点是数据不需要被加载到内存中,所以运行起来会很快且不需要占用内存。
    • 缺点是这样的话理解和执行上可能有点耗费时间,我们需要对每一行数据进行语法分析。

加载所有的数据会非常耗费内存,所以我们采用第三种方法。我们要做的就是迭代执行数据中的每一行数据,然后为每一个贷款 id 在字典中保留一个计数。在这个字典中,我们会计算出贷款 id 在执行数据中出现的次数,而且看看 foreclosure_date 是否是 None 。我们可以查看 foreclosure_statusperformance_count 的值 。

我们会新建一个 annotate.py 文件,文件中的代码可以计算这些值。我们会使用下面的代码:

  • 导入需要的库
  • 定义一个函数 count_performance_rows

    • 打开 processed/Performance.txt 文件。这不是在内存中读取文件而是打开了一个文件标识符,这个标识符可以用来以行为单位读取文件。
    • 迭代文件的每一行数据。
    • 使用分隔符|分开每行的不同数据。
    • 检查 loan_id 是否在计数字典中。

      • 如果不存在,把它加进去。
    • loan_idperformance_count 参数自增 1 次,因为我们这次迭代也包含其中。
    • 如果 date 不是 None ,我们就会知道贷款被取消赎回权了,然后为foreclosure\_status` 设置合适的值。
import os
import settings
import pandas as pd

def count_performance_rows():
    counts = {}
    with open(os.path.join(settings.PROCESSED_DIR, "Performance.txt"), 'r') as f:
        for i, line in enumerate(f):
            if i == 0:
                # Skip header row
                continue
            loan_id, date = line.split("|")
            loan_id = int(loan_id)
            if loan_id not in counts:
                counts[loan_id] = {
                    "foreclosure_status": False,
                    "performance_count": 0
                }
            counts[loan_id]["performance_count"] += 1
            if len(date.strip()) > 0:
                counts[loan_id]["foreclosure_status"] = True
    return counts

获取值

只要我们创建了计数字典,我们就可以使用一个函数通过一个 loan_id 和一个 key 从字典中提取到需要的参数的值:

def get_performance_summary_value(loan_id, key, counts):
    value = counts.get(loan_id, {
        "foreclosure_status": False,
        "performance_count": 0
    })
    return value[key]

上面的函数会从 counts 字典中返回合适的值,我们也能够为贷款数据中的每一行赋一个 foreclosure_status 值和一个 performance_count 值。如果键不存在,字典的 get 方法会返回一个默认值,所以在字典中不存在键的时候我们就可以得到一个可知的默认值。

转换数据

我们已经在 annotate.py 中添加了一些功能,现在我们来看一看数据文件。我们需要将贷款到的数据转换到训练数据集来进行机器学习算法的训练。这涉及到以下几件事情:

  • 转换所有列为数字。
  • 填充缺失值。
  • 为每一行分配 performance_countforeclosure_status
  • 移除出现执行数据很少的行(performance_count 计数低)。

我们有几个列是文本类型的,看起来对于机器学习算法来说并不是很有用。然而,它们实际上是分类变量,其中有很多不同的类别代码,例如 RS 等等. 我们可以把这些类别标签转换为数值:

通过这种方法转换的列我们可以应用到机器学习算法中。

还有一些包含日期的列 (first_payment_dateorigination_date)。我们可以将这些日期放到两个列中:

在下面的代码中,我们将转换贷款数据。我们将定义一个函数如下:

  • acquisition 中创建 foreclosure_status 列,并从 counts 字典中得到值。
  • acquisition 中创建 performance_count 列,并从 counts 字典中得到值。
  • 将下面的列从字符串转换为整数:

    • channel
    • seller
    • first_time_homebuyer
    • loan_purpose
    • property_type
    • occupancy_status
    • property_state
    • product_type
  • first_payment_dateorigination_date 分别转换为两列:

    • 通过斜杠分离列。
    • 将第一部分分离成 month 列。
    • 将第二部分分离成 year 列。
    • 删除该列。
    • 最后,我们得到 first_payment_monthfirst_payment_yearrigination_monthorigination_year
  • 所有缺失值填充为 -1
def annotate(acquisition, counts):
    acquisition["foreclosure_status"] = acquisition["id"].apply(lambda x: get_performance_summary_value(x, "foreclosure_status", counts))
    acquisition["performance_count"] = acquisition["id"].apply(lambda x: get_performance_summary_value(x, "performance_count", counts))
    for column in [
        "channel",
        "seller",
        "first_time_homebuyer",
        "loan_purpose",
        "property_type",
        "occupancy_status",
        "property_state",
        "product_type"
    ]:
        acquisition[column] = acquisition[column].astype('category').cat.codes

    for start in ["first_payment", "origination"]:
        column = "{}_date".format(start)
        acquisition["{}_year".format(start)] = pd.to_numeric(acquisition[column].str.split('/').str.get(1))
        acquisition["{}_month".format(start)] = pd.to_numeric(acquisition[column].str.split('/').str.get(0))
        del acquisition[column]

    acquisition = acquisition.fillna(-1)
    acquisition = acquisition[acquisition["performance_count"] > settings.MINIMUM_TRACKING_QUARTERS]
    return acquisition

聚合到一起

我们差不多准备就绪了,我们只需要再在 annotate.py 添加一点点代码。在下面代码中,我们将:

  • 定义一个函数来读取贷款的数据。
  • 定义一个函数来写入处理过的数据到 processed/train.csv
  • 如果该文件在命令行以 python annotate.py 的方式运行:

    • 读取贷款数据。
    • 计算执行数据的计数,并将其赋予 counts
    • 转换 acquisition DataFrame。
    • acquisition DataFrame 写入到 train.csv
def read():
    acquisition = pd.read_csv(os.path.join(settings.PROCESSED_DIR, "Acquisition.txt"), sep="|")
    return acquisition

def write(acquisition):
    acquisition.to_csv(os.path.join(settings.PROCESSED_DIR, "train.csv"), index=False)

if __name__ == "__main__":
    acquisition = read()
    counts = count_performance_rows()
    acquisition = annotate(acquisition, counts)
    write(acquisition)

修改完成以后,确保运行 python annotate.py 来生成 train.csv 文件。 你可以在这里找到完整的 annotate.py 文件。

现在文件夹看起来应该像这样:

loan-prediction
├── data
│   ├── Acquisition_2012Q1.txt
│   ├── Acquisition_2012Q2.txt
│   ├── Performance_2012Q1.txt
│   ├── Performance_2012Q2.txt
│   └── ...
├── processed
│   ├── Acquisition.txt
│   ├── Performance.txt
│   ├── train.csv
├── .gitignore
├── annotate.py
├── assemble.py
├── README.md
├── requirements.txt
├── settings.py

找到误差标准

我们已经完成了训练数据表的生成,现在我们需要最后一步,生成预测。我们需要找到误差的标准,以及该如何评估我们的数据。在这种情况下,因为有很多的贷款没有被取消赎回权,所以根本不可能做到精确的计算。

我们需要读取训练数据,并且计算 foreclosure_status 列的计数,我们将得到如下信息:

import pandas as pd
import settings

train = pd.read_csv(os.path.join(settings.PROCESSED_DIR, "train.csv"))
train["foreclosure_status"].value_counts()
False    4635982
True        1585
Name: foreclosure_status, dtype: int64

因为只有很少的贷款被取消赎回权,只需要检查正确预测的标签的百分比就意味着我们可以创建一个机器学习模型,来为每一行预测 False,并能取得很高的精确度。相反,我们想要使用一个度量来考虑分类的不平衡,确保我们可以准确预测。我们要避免太多的误报率(预测贷款被取消赎回权,但是实际没有),也要避免太多的漏报率(预测贷款没有别取消赎回权,但是实际被取消了)。对于这两个来说,漏报率对于房利美来说成本更高,因为他们买的贷款可能是他们无法收回投资的贷款。

所以我们将定义一个漏报率,就是模型预测没有取消赎回权但是实际上取消了,这个数除以总的取消赎回权数。这是“缺失的”实际取消赎回权百分比的模型。下面看这个图表:

通过上面的图表,有 1 个贷款预测不会取消赎回权,但是实际上取消了。如果我们将这个数除以实际取消赎回权的总数 2,我们将得到漏报率 50%。 我们将使用这个误差标准,因此我们可以评估一下模型的行为。

设置机器学习分类器

我们使用交叉验证预测。通过交叉验证法,我们将数据分为3组。按照下面的方法来做:

  • 用组 1 和组 2 训练模型,然后用该模型来预测组 3
  • 用组 1 和组 3 训练模型,然后用该模型来预测组 2
  • 用组 2 和组 3 训练模型,然后用该模型来预测组 1

将它们分割到不同的组,这意味着我们永远不会用相同的数据来为其预测训练模型。这样就避免了过拟合。如果过拟合,我们将错误地拉低了漏报率,这使得我们难以改进算法或者应用到现实生活中。

Scikit-learn 有一个叫做 crossvalpredict ,它可以帮助我们理解交叉算法.

我们还需要一种算法来帮我们预测。我们还需要一个分类器来做二元分类。目标变量 foreclosure_status 只有两个值, TrueFalse

这里我们用 逻辑回归算法,因为它能很好的进行二元分类,并且运行很快,占用内存很小。我们来说一下它是如何工作的:不使用像随机森林一样多树结构,也不像支持向量机一样做复杂的转换,逻辑回归算法涉及更少的步骤和更少的矩阵。

我们可以使用 scikit-learn 实现的逻辑回归分类器算法。我们唯一需要注意的是每个类的权重。如果我们使用等权重的类,算法将会预测每行都为 false,因为它总是试图最小化误差。不管怎样,我们重视有多少贷款要被取消赎回权而不是有多少不能被取消。因此,我们给逻辑回归类的 class_weight 关键字传递 balanced 参数,让算法可以为不同 counts 的每个类考虑不同的取消赎回权的权重。这将使我们的算法不会为每一行都预测 false,而是两个类的误差水平一致。

做出预测

既然完成了前期准备,我们可以开始准备做出预测了。我将创建一个名为 predict.py 的新文件,它会使用我们在最后一步创建的 train.csv 文件。下面的代码:

  • 导入所需的库
  • 创建一个名为 cross_validate 的函数:

    • 使用正确的关键词参数创建逻辑回归分类器
    • 创建用于训练模型的数据列的列表,移除 idforeclosure_status
    • 交叉验证 train DataFrame
    • 返回预测结果
import os
import settings
import pandas as pd
from sklearn import cross_validation
from sklearn.linear_model import LogisticRegression
from sklearn import metrics

def cross_validate(train):
    clf = LogisticRegression(random_state=1, class_weight="balanced")

    predictors = train.columns.tolist()
    predictors = [p for p in predictors if p not in settings.NON_PREDICTORS]

    predictions = cross_validation.cross_val_predict(clf, train[predictors], train[settings.TARGET], cv=settings.CV_FOLDS)
    return predictions

预测误差

现在,我们仅仅需要写一些函数来计算误差。下面的代码:

  • 创建函数 compute_error

    • 使用 scikit-learn 计算一个简单的精确分数(与实际 foreclosure_status 值匹配的预测百分比)
  • 创建函数 compute_false_negatives

    • 为了方便,将目标和预测结果合并到一个 DataFrame
    • 查找漏报率

      • 找到原本应被预测模型取消赎回权,但实际没有取消的贷款数目
      • 除以没被取消赎回权的贷款总数目
def compute_error(target, predictions):
    return metrics.accuracy_score(target, predictions)

def compute_false_negatives(target, predictions):
    df = pd.DataFrame({"target": target, "predictions": predictions})
    return df[(df["target"] == 1) & (df["predictions"] == 0)].shape[0] / (df[(df["target"] == 1)].shape[0] + 1)

def compute_false_positives(target, predictions):
    df = pd.DataFrame({"target": target, "predictions": predictions})
    return df[(df["target"] == 0) & (df["predictions"] == 1)].shape[0] / (df[(df["target"] == 0)].shape[0] + 1)

聚合到一起

现在,我们可以把函数都放在 predict.py。下面的代码:

  • 读取数据集
  • 计算交叉验证预测
  • 计算上面的 3 个误差
  • 打印误差
def read():
    train = pd.read_csv(os.path.join(settings.PROCESSED_DIR, "train.csv"))
    return train

if __name__ == "__main__":
    train = read()
    predictions = cross_validate(train)
    error = compute_error(train[settings.TARGET], predictions)
    fn = compute_false_negatives(train[settings.TARGET], predictions)
    fp = compute_false_positives(train[settings.TARGET], predictions)
    print("Accuracy Score: {}".format(error))
    print("False Negatives: {}".format(fn))
    print("False Positives: {}".format(fp))

一旦你添加完代码,你可以运行 python predict.py 来产生预测结果。运行结果向我们展示漏报率为 .26,这意味着我们没能预测 26% 的取消贷款。这是一个好的开始,但仍有很多改善的地方!

你可以在这里找到完整的 predict.py 文件。

你的文件树现在看起来像下面这样:

loan-prediction
├── data
│   ├── Acquisition_2012Q1.txt
│   ├── Acquisition_2012Q2.txt
│   ├── Performance_2012Q1.txt
│   ├── Performance_2012Q2.txt
│   └── ...
├── processed
│   ├── Acquisition.txt
│   ├── Performance.txt
│   ├── train.csv
├── .gitignore
├── annotate.py
├── assemble.py
├── predict.py
├── README.md
├── requirements.txt
├── settings.py

撰写 README

既然我们完成了端到端的项目,那么我们可以撰写 README.md 文件了,这样其他人便可以知道我们做的事,以及如何复制它。一个项目典型的 README.md 应该包括这些部分:

  • 一个高水准的项目概览,并介绍项目目的
  • 任何必需的数据和材料的下载地址
  • 安装命令

    • 如何安装要求依赖
  • 使用命令

    • 如何运行项目
    • 每一步之后会看到的结果
  • 如何为这个项目作贡献

    • 扩展项目的下一步计划

这里 是这个项目的一个 README.md 样例。

下一步

恭喜你完成了端到端的机器学习项目!你可以在这里找到一个完整的示例项目。一旦你完成了项目,把它上传到 Github 是一个不错的主意,这样其他人也可以看到你的文件夹的部分项目。

这里仍有一些留待探索数据的角度。总的来说,我们可以把它们分割为 3 类: 扩展这个项目并使它更加精确,发现其他可以预测的列,并探索数据。这是其中一些想法:

  • annotate.py 中生成更多的特性
  • 切换 predict.py 中的算法
  • 尝试使用比我们发表在这里的更多的房利美数据
  • 添加对未来数据进行预测的方法。如果我们添加更多数据,我们所写的代码仍然可以起作用,这样我们可以添加更多过去和未来的数据。
  • 尝试看看是否你能预测一个银行原本是否应该发放贷款(相对地,房利美是否应该获得贷款)

    • 移除 train 中银行在发放贷款时间的不知道的任何列

      • 当房利美购买贷款时,一些列是已知的,但之前是不知道的
    • 做出预测
  • 探索是否你可以预测除了 foreclosure_status 的其他列

    • 你可以预测在销售时资产值是多少?
  • 探索探索执行数据更新之间的细微差别

    • 你能否预测借款人会逾期还款多少次?
    • 你能否标出的典型贷款周期?
  • 将数据按州或邮政编码标出

    • 你看到一些有趣的模式了吗?

如果你建立了任何有趣的东西,请在评论中让我们知道!

如果你喜欢这篇文章,或许你也会喜欢阅读“构建你的数据科学作品集”系列的其他文章:


via: https://www.dataquest.io/blog/data-science-portfolio-machine-learning/

作者:Vik Paruchuri 译者:kokialoves, zky001, vim-kakali, cposture, ideas4u 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

序言

这篇教程将会教你怎么制作你的第一个 Atom 文本编辑器的插件。我们将会制作一个山寨版的 Sourcerer,这是一个从 StackOverflow 查询并使用代码片段的插件。到教程结束时,你将会制作好一个将编程问题(用英语描述的)转换成获取自 StackOverflow 的代码片段的插件,像这样:

教程须知

Atom 文本编辑器是用 web 技术创造出来的。我们将完全使用 JavaScript 的 EcmaScript 6 规范来制作插件。你需要熟悉以下内容:

教程的仓库

你可以跟着教程一步一步走,或者看看 放在 GitHub 上的仓库,这里有插件的源代码。这个仓库的历史提交记录包含了这里每一个标题。

开始

安装 Atom

根据 Atom 官网 的说明来下载 Atom。我们同时还要安装上 apm(Atom 包管理器的命令行工具)。你可以打开 Atom 并在应用菜单中导航到 Atom > Install Shell Commands 来安装。打开你的命令行终端,运行 apm -v 来检查 apm 是否已经正确安装好,安装成功的话打印出来的工具版本和相关环境信息应该是像这样的:

apm -v
> apm  1.9.2
> npm  2.13.3
> node 0.10.40
> python 2.7.10
> git 2.7.4

生成骨架代码

让我们使用 Atom 提供的一个实用工具创建一个新的 package(软件包)来开始这篇教程。

  • 启动编辑器,按下 Cmd+Shift+P(MacOS)或者 Ctrl+Shift+P(Windows/Linux)来打开 命令面板 Command Palette
  • 搜索“Package Generator: Generate Package”并点击列表中正确的条目,你会看到一个输入提示,输入软件包的名称:“sourcefetch”。
  • 按下回车键来生成这个骨架代码包,它会自动在 Atom 中打开。

如果你在侧边栏没有看到软件包的文件,依次按下 Cmd+K Cmd+B(MacOS)或者 Ctrl+K Ctrl+B(Windows/Linux)。

命令面板 Command Palette 可以让你通过模糊搜索来找到并运行软件包。这是一个执行命令比较方便的途径,你不用去找导航菜单,也不用刻意去记快捷键。我们将会在整篇教程中使用这个方法。

运行骨架代码包

在开始编程前让我们来试用一下这个骨架代码包。我们首先需要重启 Atom,这样它才可以识别我们新增的软件包。再次打开命令面板,执行 Window: Reload 命令。

重新加载当前窗口以确保 Atom 执行的是我们最新的源代码。每当需要测试我们对软件包的改动的时候,就需要运行这条命令。

通过导航到编辑器菜单的 Packages > sourcefetch > Toggle 或者在命令面板执行 sourcefetch:toggle 来运行软件包的 toggle 命令。你应该会看到屏幕的顶部出现了一个小黑窗。再次运行这条命令就可以隐藏它。

“toggle”命令

打开 lib/sourcefetch.js,这个文件包含有软件包的逻辑和 toggle 命令的定义。

toggle() {
 console.log('Sourcefetch was toggled!');
 return (
   this.modalPanel.isVisible() ?
   this.modalPanel.hide() :
   this.modalPanel.show()
 );
}

toggle 是这个模块导出的一个函数。根据模态面板的可见性,它通过一个三目运算符 来调用 showhide 方法。modalPanelPanel(一个由 Atom API 提供的 UI 元素) 的一个实例。我们需要在 export default 内部声明 modalPanel 才可以让我们通过一个实例变量 this 来访问它。

this.subscriptions.add(atom.commands.add('atom-workspace', {
  'sourcefetch:toggle': () => this.toggle()
}));

上面的语句让 Atom 在用户运行 sourcefetch:toggle 的时候执行 toggle 方法。我们指定了一个 匿名函数 () => this.toggle(),每次执行这条命令的时候都会执行这个函数。这是事件驱动编程(一种常用的 JavaScript 模式)的一个范例。

Atom 命令

命令只是用户触发事件时使用的一些字符串标识符,它定义在软件包的命名空间内。我们已经用过的命令有:

  • package-generator:generate-package
  • Window:reload
  • sourcefetch:toggle

软件包对应到命令,以执行代码来响应事件。

进行你的第一次代码更改

让我们来进行第一次代码更改——我们将通过改变 toggle 函数来实现逆转用户选中文本的功能。

改变 “toggle” 函数

如下更改 toggle 函数。

toggle() {
  let editor
  if (editor = atom.workspace.getActiveTextEditor()) {
    let selection = editor.getSelectedText()
    let reversed = selection.split('').reverse().join('')
    editor.insertText(reversed)
  }
}

测试你的改动

  • 通过在命令面板运行 Window: Reload 来重新加载 Atom。
  • 通过导航到 File > New 来创建一个新文件,随便写点什么并通过光标选中它。
  • 通过命令面板、Atom 菜单或者右击文本然后选中 Toggle sourcefetch 来运行 sourcefetch:toggle 命令。

更新后的命令将会改变选中文本的顺序:

sourcefetch 教程仓库 查看这一步的全部代码更改。

Atom 编辑器 API

我们添加的代码通过用 TextEditor API 来访问编辑器内的文本并进行操作。让我们来仔细看看。

let editor
if (editor = atom.workspace.getActiveTextEditor()) { /* ... */ }

头两行代码获取了 TextEditor 实例的一个引用。变量的赋值和后面的代码被包在一个条件结构里,这是为了处理没有可用的编辑器实例的情况,例如,当用户在设置菜单中运行该命令时。

let selection = editor.getSelectedText()

调用 getSelectedText 方法可以让我们访问到用户选中的文本。如果当前没有文本被选中,函数将返回一个空字符串。

let reversed = selection.split('').reverse().join('')
editor.insertText(reversed)

我们选中的文本通过一个 JavaScript 字符串方法 来逆转。最后,我们调用 insertText 方法来将选中的文本替换为逆转后的文本副本。通过阅读 Atom API 文档,你可以学到更多关于 TextEditor 的不同的方法。

浏览骨架代码

现在我们已经完成第一次代码更改了,让我们浏览骨架代码包的代码来深入了解一下 Atom 的软件包是怎样构成的。

主文件

主文件是 Atom 软件包的入口文件。Atom 通过 package.json 里的条目设置来找到主文件的位置:

"main": "./lib/sourcefetch",

这个文件导出一个带有生命周期函数(Atom 在特定的事件发生时调用的处理函数)的对象。

  • activate 会在 Atom 初次加载软件包的时候调用。这个函数用来初始化一些诸如软件包所需的用户界面元素的对象,以及订阅软件包命令的处理函数。
  • deactivate 会在软件包停用的时候调用,例如,当用户关闭或者刷新编辑器的时候。
  • serialize Atom 调用它在使用软件包的过程中保存软件包的当前状态。它的返回值会在 Atom 下一次加载软件包的时候作为一个参数传递给 activate

我们将会重命名我们的软件包命令为 fetch,并移除一些我们不再需要的用户界面元素。按照如下更改主文件:

'use babel';

import { CompositeDisposable } from 'atom'

export default {

  subscriptions: null,

  activate() {
    this.subscriptions = new CompositeDisposable()

    this.subscriptions.add(atom.commands.add('atom-workspace', {
      'sourcefetch:fetch': () => this.fetch()
    }))
  },

  deactivate() {
    this.subscriptions.dispose()
  },

  fetch() {
    let editor
    if (editor = atom.workspace.getActiveTextEditor()) {
      let selection = editor.getSelectedText()
      selection = selection.split('').reverse().join('')
      editor.insertText(selection)
    }
  }
};

“启用”命令

为了提升性能,Atom 软件包可以用时加载。我们可以让 Atom 在用户执行特定的命令的时候才加载我们的软件包。这些命令被称为 启用命令,它们在 package.json 中定义:

"activationCommands": {
  "atom-workspace": "sourcefetch:toggle"
},

更新一下这个条目设置,让 fetch 成为一个启用命令。

"activationCommands": {
  "atom-workspace": "sourcefetch:fetch"
},

有一些软件包需要在 Atom 启动的时候被加载,例如那些改变 Atom 外观的软件包。在那样的情况下,activationCommands 会被完全忽略。

“触发”命令

菜单项

menus 目录下的 JSON 文件指定了哪些菜单项是为我们的软件包而建的。让我们看看 menus/sourcefetch.json

"context-menu": {
  "atom-text-editor": [
    {
      "label": "Toggle sourcefetch",
      "command": "sourcefetch:toggle"
    }
  ]
},

这个 context-menu 对象可以让我们定义右击菜单的一些新条目。每一个条目都是通过一个显示在菜单的 label 属性和一个点击后执行的命令的 command 属性来定义的。

"context-menu": {
  "atom-text-editor": [
    {
      "label": "Fetch code",
      "command": "sourcefetch:fetch"
    }
  ]
},

同一个文件中的这个 menu 对象用来定义插件的自定义应用菜单。我们如下重命名它的条目:

"menu": [
  {
    "label": "Packages",
    "submenu": [
      {
        "label": "sourcefetch",
        "submenu": [
          {
            "label": "Fetch code",
            "command": "sourcefetch:fetch"
          }
        ]
      }
    ]
  }
]

键盘快捷键

命令还可以通过键盘快捷键来触发。快捷键通过 keymaps 目录的 JSON 文件来定义:

{
  "atom-workspace": {
    "ctrl-alt-o": "sourcefetch:toggle"
  }
}

以上代码可以让用户通过 Ctrl+Alt+O(Windows/Linux) 或 Cmd+Alt+O(MacOS) 来触发 toggle 命令。

重命名引用的命令为 fetch

"ctrl-alt-o": "sourcefetch:fetch"

通过执行 Window: Reload 命令来重启 Atom。你应该会看到 Atom 的右击菜单更新了,并且逆转文本的功能应该还可以像之前一样使用。

sourcefetch 教程仓库 查看这一步所有的代码更改。

使用 NodeJS 模块

现在我们已经完成了第一次代码更改并且了解了 Atom 软件包的结构,让我们介绍一下 Node 包管理器(npm) 中的第一个依赖项模块。我们将使用 request 模块发起 HTTP 请求来下载网站的 HTML 文件。稍后将会用到这个功能来扒 StackOverflow 的页面。

安装依赖

打开你的命令行工具,切换到你的软件包的根目录并运行:

npm install --save [email protected]
apm install

这两条命令将 request 模块添加到我们软件包的依赖列表并将模块安装到 node_modules 目录。你应该会在 package.json 看到一个新条目。@ 符号的作用是让 npm 安装我们这篇教程需要用到的特定版本的模块。运行 apm install 是为了让 Atom 知道使用我们新安装的模块。

"dependencies": {
  "request": "^2.73.0"
}

下载 HTML 并将记录打印在开发者控制台

通过在 lib/sourcefetch.js 的顶部添加一条引用语句引入 request 模块到我们的主文件:

import { CompositeDisposable } from 'atom'
import request from 'request'

现在,在 fetch 函数下面添加一个新函数 download 作为模块的导出项:

export default {  

  /* subscriptions, activate(), deactivate() */

  fetch() {
    ...
  },

  download(url) {
    request(url, (error, response, body) => {
      if (!error && response.statusCode == 200) {
        console.log(body)
      }
    })
  }
}

这个函数用 request 模块来下载一个页面的内容并将记录输出到控制台。当 HTTP 请求完成之后,我们的回调函数会将响应体作为参数来被调用。

最后一步是更新 fetch 函数以调用 download 函数:

fetch() {
  let editor
  if (editor = atom.workspace.getActiveTextEditor()) {
    let selection = editor.getSelectedText()
    this.download(selection)
  }
},

fetch 函数现在的功能是将 selection 当作一个 URL 传递给 download 函数,而不再是逆转选中的文本了。让我们来看看这次的更改:

  • 通过执行 Window: Reload 命令来重新加载 Atom。
  • 打开开发者工具。为此,导航到菜单中的 View > Developer > Toggle Developer Tools
  • 新建一个文件,导航到 File > New
  • 输入一个 URL 并选中它,例如:http://www.atom.io
  • 用上述的任意一种方法执行我们软件包的命令:

开发者工具让 Atom 软件包的调试更轻松。每个 console.log 语句都可以将信息打印到交互控制台,你还可以使用 Elements 选项卡来浏览整个应用的可视化结构——即 HTML 的文本对象模型(DOM)

sourcefetch 教程仓库 查看这一步所有的代码更改。

用 Promises 来将下载好的 HTML 插入到编辑器中

理想情况下,我们希望 download 函数可以将 HTML 作为一个字符串来返回,而不仅仅是将页面的内容打印到控制台。然而,返回文本内容是无法实现的,因为我们要在回调函数里面访问内容而不是在 download 函数那里。

我们会通过返回一个 Promise 来解决这个问题,而不再是返回一个值。让我们改动 download 函数来返回一个 Promise:

download(url) {
  return new Promise((resolve, reject) => {
    request(url, (error, response, body) => {
      if (!error && response.statusCode == 200) {
        resolve(body)
      } else {
        reject({
          reason: 'Unable to download page'
        })
      }
    })
  })
}

Promises 允许我们通过将异步逻辑封装在一个提供两个回调方法的函数里来返回获得的值(resolve 用来处理请求成功的返回值,reject 用来向使用者报错)。如果请求返回了错误我们就调用 reject,否则就用 resolve 来处理 HTML。

让我们更改 fetch 函数来使用 download 返回的 Promise:

fetch() {
  let editor
  if (editor = atom.workspace.getActiveTextEditor()) {
    let selection = editor.getSelectedText()
    this.download(selection).then((html) => {
      editor.insertText(html)
    }).catch((error) => {
      atom.notifications.addWarning(error.reason)
    })
  }
},

在我们新版的 fetch 函数里,我们通过在 download 返回的 Promise 调用 then 方法来对 HTML 进行操作。这会将 HTML 插入到编辑器中。我们同样会通过调用 catch 方法来接收并处理所有的错误。我们通过用 Atom Notification API 来显示警告的形式来处理错误。

看看发生了什么变化。重新加载 Atom 并在一个选中的 URL 上执行软件包命令:

如果这个 URL 是无效的,一个警告通知将会弹出来:

sourcefetch 教程仓库 查看这一步所有的代码更改。

编写一个爬虫来提取 StackOverflow 页面的代码片段

下一步涉及用我们前面扒到的 StackOverflow 的页面的 HTML 来提取代码片段。我们尤其关注那些来自采纳答案(提问者选择的一个正确答案)的代码。我们可以在假设这类答案都是相关且正确的前提下大大简化我们这个软件包的实现。

使用 jQuery 和 Chrome 开发者工具来构建查询

这一部分假设你使用的是 Chrome 浏览器。你接下来可以使用其它浏览器,但是提示可能会不一样。

让我们先看看一张典型的包含采纳答案和代码片段的 StackOverflow 页面。我们将会使用 Chrome 开发者工具来浏览 HTML:

  • 打开 Chrome 并跳到任意一个带有采纳答案和代码的 StackOverflow 页面,比如像这个用 Python 写的 hello world 的例子或者这个关于 C 来读取文本内容的问题
  • 滚动窗口到采纳答案的位置并选中一部分代码。
  • 右击选中文本并选择 检查
  • 使用元素侦察器来检查代码片段在 HTML 中的位置。

注意文本结构应该是这样的:

<div class="accepted-answer">
  ...
    ...
      <pre>
        <code>
          ...snippet elements...
        </code>
      </pre>
    ...
  ...
</div>
  • 采纳的答案通过一个 class 为 accepted-answerdiv 来表示
  • 代码块位于 pre 元素的内部
  • 呈现代码片段的元素就是里面那一对 code 标签

现在让我们写一些 jQuery 代码来提取代码片段:

  • 在开发者工具那里点击 Console 选项卡来访问 Javascript 控制台。
  • 在控制台中输入 $('div.accepted-answer pre code').text() 并按下回车键。

你应该会看到控制台中打印出采纳答案的代码片段。我们刚刚运行的代码使用了一个 jQuery 提供的特别的 $ 函数。$ 接收要选择的查询字符串并返回网站中的某些 HTML 元素。让我们通过思考几个查询案例看看这段代码的工作原理:

$('div.accepted-answer')
> [<div id="answer-1077349" class="answer accepted-answer" ... ></div>]

上面的查询会匹配所有 class 为 accepted-answer<div> 元素,在我们的案例中只有一个 div。

$('div.accepted-answer pre code')
> [<code>...</code>]

在前面的基础上改造了一下,这个查询会匹配所有在之前匹配的 <div> 内部的 <pre> 元素内部的 <code> 元素。

$('div.accepted-answer pre code').text()
> "print("Hello World!")"

text 函数提取并连接原本将由上一个查询返回的元素列表中的所有文本。这也从代码中去除了用来使语法高亮的元素。

介绍 Cheerio

我们的下一步涉及使用我们创建好的查询结合 Cheerio(一个服务器端实现的 jQuery)来实现扒页面的功能。

安装 Cheerio

打开你的命令行工具,切换到你的软件包的根目录并执行:

npm install --save [email protected]
apm install

实现扒页面的功能

lib/sourcefetch.jscheerio 添加一条引用语句:

import { CompositeDisposable } from 'atom'
import request from 'request'
import cheerio from 'cheerio'

现在创建一个新函数 scrape,它用来提取 StackOverflow HTML 里面的代码片段:

fetch() {
  ...
},

scrape(html) {
  $ = cheerio.load(html)
  return $('div.accepted-answer pre code').text()
},

download(url) {
  ...
}

最后,让我们更改 fetch 函数以传递下载好的 HTML 给 scrape 而不是将其插入到编辑器:

fetch() {
  let editor
  let self = this

  if (editor = atom.workspace.getActiveTextEditor()) {
    let selection = editor.getSelectedText()
    this.download(selection).then((html) => {
      let answer = self.scrape(html)
      if (answer === '') {
        atom.notifications.addWarning('No answer found :(')
      } else {
        editor.insertText(answer)
      }
    }).catch((error) => {
      console.log(error)
      atom.notifications.addWarning(error.reason)
    })
  }
},

我们扒取页面的功能仅仅用两行代码就实现了,因为 cheerio 已经替我们做好了所有的工作!我们通过调用 load 方法加载 HTML 字符串来创建一个 $ 函数,然后用这个函数来执行 jQuery 语句并返回结果。你可以在官方 开发者文档 查看完整的 Cheerio API

测试更新后的软件包

重新加载 Atom 并在一个选中的 StackOverflow URL 上运行 soucefetch:fetch 以查看到目前为止的进度。

如果我们在一个有采纳答案的页面上运行这条命令,代码片段将会被插入到编辑器中:

如果我们在一个没有采纳答案的页面上运行这条命令,将会弹出一个警告通知:

我们最新的 fetch 函数给我们提供了一个 StackOverflow 页面的代码片段而不再是整个 HTML 内容。要注意我们更新的 fetch 函数会检查有没有答案并显示通知以提醒用户。

sourcefetch 教程仓库 查看这一步所有的代码更改。

实现用来查找相关的 StackOverflow URL 的谷歌搜索功能

现在我们已经将 StackOverflow 的 URL 转化为代码片段了,让我们来实现最后一个函数——search,它应该要返回一个相关的 URL 并附加一些像“hello world”或者“快速排序”这样的描述。我们会通过一个非官方的 google npm 模块来使用谷歌搜索功能,这样可以让我们以编程的方式来搜索。

安装这个 Google npm 模块

通过在软件包的根目录打开命令行工具并执行命令来安装 google 模块:

npm install --save [email protected]
apm install

引入并配置模块

lib/sourcefetch.js 的顶部为 google 模块添加一条引用语句:

import google from "google"

我们将配置一下 google 以限制搜索期间返回的结果数。将下面这行代码添加到引用语句下面以限制搜索返回最热门的那个结果。

google.resultsPerPage = 1

实现 search 函数

接下来让我们来实现我们的 search 函数:

fetch() {
  ...
},

search(query, language) {
  return new Promise((resolve, reject) => {
    let searchString = `${query} in ${language} site:stackoverflow.com`

    google(searchString, (err, res) => {
      if (err) {
        reject({
          reason: 'A search error has occured :('
        })
      } else if (res.links.length === 0) {
        reject({
          reason: 'No results found :('
        })
      } else {
        resolve(res.links[0].href)
      }
    })
  })
},

scrape() {
  ...
}

以上代码通过谷歌来搜索一个和指定的关键词以及编程语言相关的 StackOverflow 页面,并返回一个最热门的 URL。让我们看看这是怎样来实现的:

let searchString = `${query} in ${language} site:stackoverflow.com`

我们使用用户输入的查询和当前所选的语言来构造搜索字符串。比方说,当用户在写 Python 的时候输入“hello world”,查询语句就会变成 hello world in python site:stackoverflow.com。字符串的最后一部分是谷歌搜索提供的一个过滤器,它让我们可以将搜索结果的来源限制为 StackOverflow。

google(searchString, (err, res) => {
  if (err) {
    reject({
      reason: 'A search error has occured :('
    })
  } else if (res.links.length === 0) {
    reject({
      reason: 'No results found :('
    })
  } else {
    resolve(res.links[0].href)
  }
})

我们将 google 方法放在一个 Promise 里面,这样我们可以异步地返回我们的 URL。我们会传递由 google 返回的所有错误并且会在没有可用的搜索结果的时候返回一个错误。否则我们将通过 resolve 来解析最热门结果的 URL。

更新 fetch 来使用 search

我们的最后一步是更新 fetch 函数来使用 search 函数:

fetch() {
  let editor
  let self = this

  if (editor = atom.workspace.getActiveTextEditor()) {
    let query = editor.getSelectedText()
    let language = editor.getGrammar().name

    self.search(query, language).then((url) => {
      atom.notifications.addSuccess('Found google results!')
      return self.download(url)
    }).then((html) => {
      let answer = self.scrape(html)
      if (answer === '') {
        atom.notifications.addWarning('No answer found :(')
      } else {
        atom.notifications.addSuccess('Found snippet!')
        editor.insertText(answer)
      }
    }).catch((error) => {
      atom.notifications.addWarning(error.reason)
    })
  }
}

让我们看看发生了什么变化:

  • 我们选中的文本现在变成了用户输入的 query
  • 我们使用 TextEditor API 来获取当前编辑器选项卡使用的 language
  • 我们调用 search 方法来获取一个 URL,然后通过在得到的 Promise 上调用 then 方法来访问这个 URL

我们不在 download 返回的 Promise 上调用 then 方法,而是在前面 search 方法本身链式调用的另一个 then 方法返回的 Promise 上面接着调用 then 方法。这样可以帮助我们避免回调地狱

sourcefetch 教程仓库 查看这一步所有的代码更改。

测试最终的插件

大功告成了!重新加载 Atom,对一个“问题描述”运行软件包的命令来看看我们最终的插件是否工作,不要忘了在编辑器右下角选择一种语言。

下一步

现在你知道怎么去 “hack” Atom 的基本原理了,通过 分叉 sourcefetch 这个仓库并添加你的特性 来随心所欲地实践你所学到的知识。


via: https://github.com/blog/2231-building-your-first-atom-plugin

作者:NickTikhonov 译者:OneNewLife 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

很久之前,我的个人网站被攻击了。我不知道它是如何发生的,但它确实发生了。幸运的是,攻击带来的破坏是很小的:一小段 JavaScript 被注入到了某些页面的底部。我更新了 FTP 和其它的口令,清理了一些文件,事情就这样结束了。

有一点使我很恼火:在当时,还没有一种简便的方案能够使我知道那里有问题,更重要的是能够保护网站的访客不被这段恼人的代码所扰。

现在有一种方案出现了,这种技术在上述两方面都十分的成功。它就是 内容安全策略 content security policy (CSP)。

什么是 CSP?

其核心思想十分简单:网站通过发送一个 CSP 头部,来告诉浏览器什么是被授权执行的与什么是需要被禁止的。

这里有一个 PHP 的例子:

<?php
    header("Content-Security-Policy: <your directives>");
?>

一些指令

你可以定义一些全局规则或者定义一些涉及某一类资源的规则:

default-src 'self' ;
     # self = 同端口,同域名,同协议 => 允许

基础参数是 default-src:如果没有为某一类资源设置指令规则,那么浏览器就会使用这个默认参数值。

script-src 'self' www.google-analytics.com ;
     # 来自这些域名的 JS 文件 => 允许

在这个例子中,我们已经授权了 www.google-analytics.com 这个域名来源的 JavaScript 文件使用到我们的网站上。我们也添加了 'self' 这个关键词;如果我们通过 script-src 来重新设置其它的规则指令,它将会覆盖 default-src 规则。

如果没有指明协议(scheme)或端口,它就会强制选择与当前页面相同的协议或端口。这样做防止了混合内容(LCTT 译注:混合内容指 HTTPS 页面中也有非 HTTPS 资源,可参见: https://developer.mozilla.org/zh-CN/docs/Security/MixedContent )。如果页面是 https://example.com,那么你将无法加载 http://www.google-analytics.com/file.js 因为它已经被禁止了(协议不匹配)。然而,有一个例外就是协议的提升是被允许的。如果 http://example.com 尝试加载 https://www.google-analytics.com/file.js,接着协议或端口允许被更改以便协议的提升。

style-src 'self' data: ;
     # Data-Uri 嵌入 CSS => 允许

在这个例子中,关键词 data: 授权了在 CSS 文件中 data 内嵌内容。

在 CSP 1 规范下,你也可以设置如下规则:

  • img-src 有效的图片来源
  • connect-src 应用于 XMLHttpRequest(AJAX),WebSocket 或 EventSource
  • font-src 有效的字体来源
  • object-src 有效的插件来源(例如,<object><embed><applet>
  • media-src 有效的 <audio><video> 来源

CSP 2 规范包含了如下规则:

  • child-src 有效的 web workers 和 元素来源,如 <frame><iframe> (这个指令用来替代 CSP 1 中废弃了的 frame-src 指令)
  • form-action 可以作为 HTML <form> 的 action 的有效来源
  • frame-ancestors 使用 <frame><iframe><object><embed><applet> 内嵌资源的有效来源
  • upgrade-insecure-requests 命令用户代理来重写 URL 协议,将 HTTP 改到 HTTPS (为一些需要重写大量陈旧 URL 的网站提供了方便)。

为了更好的向后兼容一些废弃的属性,你可以简单的复制当前指令的内容同时为那个废弃的指令创建一个相同的副本。例如,你可以复制 child-src 的内容同时在 frame-src 中添加一份相同的副本。

CSP 2 允许你添加路径到白名单中(CSP 1 只允许域名被添加到白名单中)。因此,相较于将整个 www.foo.com 域添加到白名单,你可以通过添加 www.foo.com/some/folder 这样的路径到白名单中来作更多的限制。这个需要浏览器中 CSP 2 的支持,但它很明显更安全。

一个例子

我为 Web 2015 巴黎大会上我的演讲 “CSP in Action”制作了一个简单的例子。

在没有 CSP 的情况下,页面展示如下图所示:

不是十分优美。要是我们启用了如下的 CSP 指令又会怎样呢?

<?php
    header("Content-Security-Policy: 
      default-src 'self' ;
      script-src 'self' www.google-analytics.com stats.g.doubleclick.net ; 
      style-src 'self' data: ;
      img-src 'self' www.google-analytics.com stats.g.doubleclick.net data: ;
      frame-src 'self' ;");
?>

浏览器将会作什么呢?它会(非常严格的)在 CSP 基础规则之下应用这些指令,这意味着任何没有在 CSP 指令中被授权允许的都将会被禁止(“blocked” 指的是不被执行、不被显示并且不被使用在网站中)。

在 CSP 的默认设置中,内联脚本和样式是不被授权的,意味着每一个 <script>onclick 事件属性或 style 属性都将会被禁止。你可以使用 style-src 'unsafe-inline' ; 指令来授权使用内联 CSS。

在一个支持 CSP 的现代浏览器中,上述示例看起来如下图:

发生了什么?浏览器应用了指令并且拒绝了所有没有被授权的内容。它在浏览器调试终端中发送了这些通知:

如果你依然不确定 CSP 的价值,请看一下 Aaron Gustafson 文章 “More Proof We Don't Control Our Web Pages”。

当然,你可以使用比我们在示例中提供的更严格的指令:

  • 设置 default-src 为 'none'
  • 为每条规则指定你的设置
  • 为请求的文件指定它的绝对路径

更多关于 CSP 的信息

支持

CSP 不是一个需要复杂的配置才能正常工作的每日构建特性。CSP 1 和 2 是候选推荐标准!浏览器可以非常完美的支持 CSP 1

CSP 2 是较新的规范,因此对它的支持会少那么一点。

现在 CSP 3 还是一个早期草案,因此还没有被支持,但是你依然可以使用 CSP 1 和 2 来做一些重大的事。

其他需要考虑的因素

CSP 被设计用来降低跨站脚本攻击(XSS)的风险,这就是不建议开启内联脚本和 script-src 指令的原因。Firefox 对这个问题做了很好的说明:在浏览器中,敲击 Shift + F2 并且键入 security csp,它就会向你展示指令和对应的建议。这里有一个在 Twitter 网站中应用的例子:

如果你确实需要使用内联脚本和样式的话,另一种可能就是生成一份散列值。例如,我们假定你需要使用如下的内联脚本:

<script>alert('Hello, world.');</script>

你应该在 script-src 指令中添加 sha256-qznLcsROx4GACP2dm0UCKCzCG-HiZ1guq6ZZDob_Tng= 作为有效来源。这个散列值用下面的 PHP 脚本执行获得的结果:

<?php
    echo base64_encode(hash('sha256', "alert('Hello, world.');", true));
?>

我在前文中说过 CSP 被设计用来降低 XSS 风险,我还得加上“……与降低未经请求内容的风险。”伴随着 CSP 的使用,你必须知道你内容的来源是哪里它们在你的前端都作了些什么(内联样式,等)。CSP 同时可以帮助你让贡献者、开发人员和其他人员来遵循你内容来源的规则!

现在你的问题就只是,“不错,这很好,但是我们如何在生产环境中使用它呢?”

如何在现实世界中使用它

想要在第一次使用 CSP 之后就失望透顶的方法就是在生产环境中测试。不要想当然的认为,“这会很简单。我的代码是完美并且相当清晰的。”不要这样作。我这样干过。相信我,这相当的蠢。

正如我之前说明的,CSP 指令由 CSP 头部来激活,这中间没有过渡阶段。你恰恰就是其中的薄弱环节。你可能会忘记授权某些东西或者遗忘了你网站中的一小段代码。CSP 不会饶恕你的疏忽。然而,CSP 的两个特性将这个问题变得相当的简单。

report-uri

还记得 CSP 发送到终端中的那些通知么?report-uri 指令可以被用来告诉浏览器发送那些通知到指定的地址。报告以 JSON 格式送出。

report-uri /csp-parser.php ;

因此,我们可以在 csp-parser.php 文件中处理有浏览器送出的数据。这里有一个由 PHP 实现的最基础的例子:

$data = file_get_contents('php://input');

    if ($data = json_decode($data, true)) {
     $data = json_encode(
      $data,
      JSON_PRETTY_PRINT | JSON_UNESCAPED_SLASHES
      );
     mail(EMAIL, SUBJECT, $data);
    }

这个通知将会被转换成为一封邮件。在开发过程中,你可能不会需要比这更复杂的其它东西。

对于一个生产环境(或者是一个有较多访问的开发环境),你应该使用一种比邮件更好的收集信息的方式,因为这种方式在节点上没有验证和速率限制,并且 CSP 可能变得乱哄哄的。只需想像一个会产生 100 个 CSP 通知(例如,一个从未授权来源展示图片的脚本)并且每天会被浏览 100 次的页面,那你就会每天收到 10000 个通知啊!

例如 report-uri.io 这样的服务可以用来简化你的通知管理。你也可以在 GitHub上看一些另外的使用 report-uri (与数据库搭配,添加一些优化,等)的简单例子。

report-only

正如我们所见的,最大的问题就是在使用和不使用 CSP 之间没有中间地带。然而,一个名为 report-only 的特性会发送一个稍有不同的头部:

<?php
    header("Content-Security-Policy-Report-Only: <your directives>");
?>

总的来说,这个头部就是告诉浏览器,“表现得似乎所有的 CSP 指令都被应用了,但是不禁止任何东西。只是发送通知给自己。”这是一种相当棒的测试指令的方式,避免了任何有价值的东西被禁止的风险。

report-onlyreport-uri 的帮助下你可以毫无风险的测试 CSP 指令,并且可以实时的监控网站上一切与 CSP 相关的内容。这两个特性对部署和维护 CSP 来说真是相当的有用!

结论

为什么 CSP 很酷

CSP 对你的用户来说是尤其重要的:他们在你的网站上不再需要遭受任何的未经请求的脚本,内容或 XSS 的威胁了。

对于网站维护者来说 CSP 最重要的优势就是可感知。如果你对图片来源设置了严格的规则,这时一个脚本小子尝试在你的网站上插入一张未授权来源的图片,那么这张图片就会被禁止,并且你会在第一时间收到提醒。

开发者也需要确切的知道他们的前端代码都在做些什么,CSP 可以帮助他们掌控一切。会促使他们去重构他们代码中的某些部分(避免内联函数和样式,等)并且促使他们遵循最佳实践。

如何让 CSP 变得更酷

讽刺的是,CSP 在一些浏览器中过分的高效了,在和书签栏小程序一起使用时会产生一些 bug。因此,不要更新你的 CSP 指令来允许书签栏小程序。我们无法单独的责备任何一个浏览器;它们都有些问题:

  • Firefox
  • Chrome (Blink)
  • WebKit

大多数情况下,这些 bug 都是禁止通知中的误报。所有的浏览器提供者都在努力解决这些问题,因此我们可以期待很快就会被解决。无论怎样,这都不会成为你使用 CSP 的绊脚石。


via: https://www.smashingmagazine.com/2016/09/content-security-policy-your-future-best-friend/

作者:Nicolas Hoffmann 译者:wcnnbdk1 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

“只有在创造中才能够学到更多。” ——皮亚杰

在本系列的第二部分中,你创造了一个可以处理基本 HTTP GET 请求的、朴素的 WSGI 服务器。当时我问了一个问题:“你该如何让你的服务器在同一时间处理多个请求呢?”在这篇文章中,你会找到答案。系好安全带,我们要认真起来,全速前进了!你将会体验到一段非常快速的旅程。准备好你的 Linux、Mac OS X(或者其他 *nix 系统),还有你的 Python。本文中所有源代码均可在 GitHub 上找到。

服务器的基本结构及如何处理请求

首先,我们来回顾一下 Web 服务器的基本结构,以及服务器处理来自客户端的请求时,所需的必要步骤。你在第一部分第二部分中创建的轮询服务器只能够一次处理一个请求。在处理完当前请求之前,它不能够接受新的客户端连接。所有请求为了等待服务都需要排队,在服务繁忙时,这个队伍可能会排的很长,一些客户端可能会感到不开心。

这是轮询服务器 webserver3a.py 的代码:

#####################################################################
# 轮询服务器 - webserver3a.py                                       #
#                                                                   #
# 使用 Python 2.7.9 或 3.4                                          #
# 在 Ubuntu 14.04 及 Mac OS X 环境下测试通过                        #
#####################################################################
import socket

SERVER_ADDRESS = (HOST, PORT) = '', 8888
REQUEST_QUEUE_SIZE = 5


def handle_request(client_connection):
    request = client_connection.recv(1024)
    print(request.decode())
    http_response = b"""\
HTTP/1.1 200 OK

Hello, World!
"""
    client_connection.sendall(http_response)


def serve_forever():
    listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listen_socket.bind(SERVER_ADDRESS)
    listen_socket.listen(REQUEST_QUEUE_SIZE)
    print('Serving HTTP on port {port} ...'.format(port=PORT))

    while True:
        client_connection, client_address = listen_socket.accept()
        handle_request(client_connection)
        client_connection.close()

if __name__ == '__main__':
    serve_forever()

为了观察到你的服务器在同一时间只能处理一个请求的行为,我们对服务器的代码做一点点修改:在将响应发送至客户端之后,将程序阻塞 60 秒。这个修改只需要一行代码,来告诉服务器进程暂停 60 秒钟。

这是我们更改后的代码,包含暂停语句的服务器 webserver3b.py

######################################################################
# 轮询服务器 - webserver3b.py                                         #
#                                                                    #
# 使用 Python 2.7.9 或 3.4                                            #
# 在 Ubuntu 14.04 及 Mac OS X 环境下测试通过                           #
#                                                                    #
# - 服务器向客户端发送响应之后,会阻塞 60 秒                             #
######################################################################
import socket
import time

SERVER_ADDRESS = (HOST, PORT) = '', 8888
REQUEST_QUEUE_SIZE = 5


def handle_request(client_connection):
    request = client_connection.recv(1024)
    print(request.decode())
    http_response = b"""\
HTTP/1.1 200 OK

Hello, World!
"""
    client_connection.sendall(http_response)
    time.sleep(60)  ### 睡眠语句,阻塞该进程 60 秒


def serve_forever():
    listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listen_socket.bind(SERVER_ADDRESS)
    listen_socket.listen(REQUEST_QUEUE_SIZE)
    print('Serving HTTP on port {port} ...'.format(port=PORT))

    while True:
        client_connection, client_address = listen_socket.accept()
        handle_request(client_connection)
        client_connection.close()

if __name__ == '__main__':
    serve_forever()

用以下命令启动服务器:

$ python webserver3b.py

现在,打开一个新的命令行窗口,然后运行 curl 语句。你应该可以立刻看到屏幕上显示的字符串“Hello, World!”:

$ curl http://localhost:8888/hello
Hello, World!

然后,立刻打开第二个命令行窗口,运行相同的 curl 命令:

$ curl http://localhost:8888/hello

如果你在 60 秒之内完成了以上步骤,你会看到第二条 curl 指令不会立刻产生任何输出,而只是挂在了哪里。同样,服务器也不会在标准输出流中输出新的请求内容。这是这个过程在我的 Mac 电脑上的运行结果(在右下角用黄色框标注出来的窗口中,我们能看到第二个 curl 指令被挂起,正在等待连接被服务器接受):

当你等待足够长的时间(60 秒以上)后,你会看到第一个 curl 程序完成,而第二个 curl 在屏幕上输出了“Hello, World!”,然后休眠 60 秒,进而终止。

这样运行的原因是因为在服务器在处理完第一个来自 curl 的请求之后,只有等待 60 秒才能开始处理第二个请求。这个处理请求的过程按顺序进行(也可以说,迭代进行),一步一步进行,在我们刚刚给出的例子中,在同一时间内只能处理一个请求。

现在,我们来简单讨论一下客户端与服务器的交流过程。为了让两个程序在网络中互相交流,它们必须使用套接字。你应当在本系列的前两部分中见过它几次了。但是,套接字是什么?

套接字 socket 是一个通讯通道 端点 endpoint 的抽象描述,它可以让你的程序通过文件描述符来与其它程序进行交流。在这篇文章中,我只会单独讨论 Linux 或 Mac OS X 中的 TCP/IP 套接字。这里有一个重点概念需要你去理解:TCP 套接字对 socket pair

TCP 连接使用的套接字对是一个由 4 个元素组成的元组,它确定了 TCP 连接的两端:本地 IP 地址、本地端口、远端 IP 地址及远端端口。一个套接字对唯一地确定了网络中的每一个 TCP 连接。在连接一端的两个值:一个 IP 地址和一个端口,通常被称作一个套接字。(引自《UNIX 网络编程 卷1:套接字联网 API (第3版)》

所以,元组 {10.10.10.2:49152, 12.12.12.3:8888} 就是一个能够在客户端确定 TCP 连接两端的套接字对,而元组 {12.12.12.3:8888, 10.10.10.2:49152} 则是在服务端确定 TCP 连接两端的套接字对。在这个例子中,确定 TCP 服务端的两个值(IP 地址 12.12.12.3 及端口 8888),代表一个套接字;另外两个值则代表客户端的套接字。

一个服务器创建一个套接字并开始建立连接的基本工作流程如下:

  1. 服务器创建一个 TCP/IP 套接字。我们可以用这条 Python 语句来创建:
listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  1. 服务器可能会设定一些套接字选项(这个步骤是可选的,但是你可以看到上面的服务器代码做了设定,这样才能够在重启服务器时多次复用同一地址):
listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
  1. 然后,服务器绑定一个地址。绑定函数 bind 可以将一个本地协议地址赋给套接字。若使用 TCP 协议,调用绑定函数 bind 时,需要指定一个端口号,一个 IP 地址,或两者兼有,或两者全无。(引自《UNIX网络编程 卷1:套接字联网 API (第3版)》
listen_socket.bind(SERVER_ADDRESS)
  1. 然后,服务器开启套接字的监听模式。
listen_socket.listen(REQUEST_QUEUE_SIZE)

监听函数 listen 只应在服务端调用。它会通知操作系统内核,表明它会接受所有向该套接字发送的入站连接请求。

以上四步完成后,服务器将循环接收来自客户端的连接,一次循环处理一条。当有连接可用时,接受请求函数 accept 将会返回一个已连接的客户端套接字。然后,服务器从这个已连接的客户端套接字中读取请求数据,将数据在其标准输出流中输出出来,并向客户端回送一条消息。然后,服务器会关闭这个客户端连接,并准备接收一个新的客户端连接。

这是客户端使用 TCP/IP 协议与服务器通信的必要步骤:

下面是一段示例代码,使用这段代码,客户端可以连接你的服务器,发送一个请求,并输出响应内容:

import socket

### 创建一个套接字,并连接值服务器
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', 8888))

### 发送一段数据,并接收响应数据
sock.sendall(b'test')
data = sock.recv(1024)
print(data.decode())

在创建套接字后,客户端需要连接至服务器。我们可以调用连接函数 connect 来完成这个操作:

sock.connect(('localhost', 8888))

客户端只需提供待连接的远程服务器的 IP 地址(或主机名),及端口号,即可连接至远端服务器。

你可能已经注意到了,客户端不需要调用 bindaccept 函数,就可以与服务器建立连接。客户端不需要调用 bind 函数是因为客户端不需要关注本地 IP 地址及端口号。操作系统内核中的 TCP/IP 协议栈会在客户端调用 connect 函数时,自动为套接字分配本地 IP 地址及本地端口号。这个本地端口被称为 临时端口 ephemeral port ,即一个短暂开放的端口。

服务器中有一些端口被用于承载一些众所周知的服务,它们被称作 通用 well-known 端口:如 80 端口用于 HTTP 服务,22 端口用于 SSH 服务。打开你的 Python shell,与你在本地运行的服务器建立一个连接,来看看内核给你的客户端套接字分配了哪个临时端口(在尝试这个例子之前,你需要运行服务器程序 webserver3a.pywebserver3b.py):

>>> import socket
>>> sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>>> sock.connect(('localhost', 8888))
>>> host, port = sock.getsockname()[:2]
>>> host, port
('127.0.0.1', 60589)

在上面的例子中,内核将临时端口 60589 分配给了你的套接字。

在我开始回答我在第二部分中提出的问题之前,我还需要快速讲解一些概念。你很快就会明白这些概念为什么非常重要。这两个概念,一个是进程,另外一个是文件描述符。

什么是进程?进程就是一个程序执行的实体。举个例子:当你的服务器代码被执行时,它会被载入内存,而内存中表现此次程序运行的实体就叫做进程。内核记录了进程的一系列有关信息——比如进程 ID——来追踪它的运行情况。当你在执行轮询服务器 webserver3a.pywebserver3b.py 时,你其实只是启动了一个进程。

我们在终端窗口中运行 webserver3b.py

$ python webserver3b.py

在另一个终端窗口中,我们可以使用 ps 命令获取该进程的相关信息:

$ ps | grep webserver3b | grep -v grep
7182 ttys003    0:00.04 python webserver3b.py

ps 命令显示,我们刚刚只运行了一个 Python 进程 webserver3b.py。当一个进程被创建时,内核会为其分配一个进程 ID,也就是 PID。在 UNIX 中,所有用户进程都有一个父进程;当然,这个父进程也有进程 ID,叫做父进程 ID,缩写为 PPID。假设你默认使用 BASH shell,那当你启动服务器时,就会启动一个新的进程,同时被赋予一个 PID,而它的父进程 PID 会被设为 BASH shell 的 PID。

自己尝试一下,看看这一切都是如何工作的。重新开启你的 Python shell,它会创建一个新进程,然后在其中使用系统调用 os.getpid()os.getppid() 来获取 Python shell 进程的 PID 及其父进程 PID(也就是你的 BASH shell 的 PID)。然后,在另一个终端窗口中运行 ps 命令,然后用 grep 来查找 PPID(父进程 ID,在我的例子中是 3148)。在下面的屏幕截图中,你可以看到一个我的 Mac OS X 系统中关于进程父子关系的例子,在这个例子中,子进程是我的 Python shell 进程,而父进程是 BASH shell 进程:

另外一个需要了解的概念,就是文件描述符。什么是文件描述符?文件描述符是一个非负整数,当进程打开一个现有文件、创建新文件或创建一个新的套接字时,内核会将这个数返回给进程。你以前可能听说过,在 UNIX 中,一切皆是文件。内核会按文件描述符来找到一个进程所打开的文件。当你需要读取文件或向文件写入时,我们同样通过文件描述符来定位这个文件。Python 提供了高层次的操作文件(或套接字)的对象,所以你不需要直接通过文件描述符来定位文件。但是,在高层对象之下,我们就是用它来在 UNIX 中定位文件及套接字,通过这个整数的文件描述符。

一般情况下,UNIX shell 会将一个进程的标准输入流(STDIN)的文件描述符设为 0,标准输出流(STDOUT)设为 1,而标准错误打印(STDERR)的文件描述符会被设为 2。

我之前提到过,即使 Python 提供了高层次的文件对象或类文件对象来供你操作,你仍然可以在对象上使用 fileno() 方法,来获取与该文件相关联的文件描述符。回到 Python shell 中,我们来看看你该怎么做到这一点:

>>> import sys
>>> sys.stdin
<open file '<stdin>', mode 'r' at 0x102beb0c0>
>>> sys.stdin.fileno()
0
>>> sys.stdout.fileno()
1
>>> sys.stderr.fileno()
2

当你在 Python 中操作文件及套接字时,你可能会使用高层次的文件/套接字对象,但是你仍然有可能会直接使用文件描述符。下面有一个例子,来演示如何用文件描述符做参数来进行一次写入的系统调用:

>>> import sys
>>> import os
>>> res = os.write(sys.stdout.fileno(), 'hello\n')
hello

下面是比较有趣的部分——不过你可能不会为此感到惊讶,因为你已经知道在 Unix 中,一切皆为文件——你的套接字对象同样有一个相关联的文件描述符。和刚才操纵文件时一样,当你在 Python 中创建一个套接字时,你会得到一个对象而不是一个非负整数,但你永远可以用我之前提到过的 fileno() 方法获取套接字对象的文件描述符,并可以通过这个文件描述符来直接操纵套接字。

>>> import socket
>>> sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
>>> sock.fileno()
3

我还想再提一件事:不知道你有没有注意到,在我们的第二个轮询服务器 webserver3b.py 中,当你的服务器休眠 60 秒的过程中,你仍然可以通过第二个 curl 命令连接至服务器。当然 curl 命令并没有立刻输出任何内容而是挂在哪里,但是既然服务器没有接受连接,那它为什么不立即拒绝掉连接,而让它还能够继续与服务器建立连接呢?这个问题的答案是:当我在调用套接字对象的 listen 方法时,我为该方法提供了一个 BACKLOG 参数,在代码中用 REQUEST_QUEUE_SIZE 常量来表示。BACKLOG 参数决定了在内核中为存放即将到来的连接请求所创建的队列的大小。当服务器 webserver3b.py 在睡眠的时候,你运行的第二个 curl 命令依然能够连接至服务器,因为内核中用来存放即将接收的连接请求的队列依然拥有足够大的可用空间。

尽管增大 BACKLOG 参数并不能神奇地使你的服务器同时处理多个请求,但当你的服务器很繁忙时,将它设置为一个较大的值还是相当重要的。这样,在你的服务器调用 accept 方法时,不需要再等待一个新的连接建立,而可以立刻直接抓取队列中的第一个客户端连接,并不加停顿地立刻处理它。

欧耶!现在你已经了解了一大块内容。我们来快速回顾一下我们刚刚讲解的知识(当然,如果这些对你来说都是基础知识的话,那我们就当复习好啦)。

  • 轮询服务器
  • 服务端套接字创建流程(创建套接字,绑定,监听及接受)
  • 客户端连接创建流程(创建套接字,连接)
  • 套接字对
  • 套接字
  • 临时端口及通用端口
  • 进程
  • 进程 ID(PID),父进程 ID(PPID),以及进程父子关系
  • 文件描述符
  • 套接字的 listen 方法中,BACKLOG 参数的含义

如何并发处理多个请求

现在,我可以开始回答第二部分中的那个问题了:“你该如何让你的服务器在同一时间处理多个请求呢?”或者换一种说法:“如何编写一个并发服务器?”

在 UNIX 系统中编写一个并发服务器最简单的方法,就是使用系统调用 fork()

下面是全新出炉的并发服务器 webserver3c.py 的代码,它可以同时处理多个请求(和我们之前的例子 webserver3b.py 一样,每个子进程都会休眠 60 秒):

#######################################################
# 并发服务器 - webserver3c.py                          #
#                                                     #
# 使用 Python 2.7.9 或 3.4                             #
# 在 Ubuntu 14.04 及 Mac OS X 环境下测试通过            #
#                                                     #
# - 完成客户端请求处理之后,子进程会休眠 60 秒             #
# - 父子进程会关闭重复的描述符                           #
#                                                     #
#######################################################
import os
import socket
import time

SERVER_ADDRESS = (HOST, PORT) = '', 8888
REQUEST_QUEUE_SIZE = 5


def handle_request(client_connection):
    request = client_connection.recv(1024)
    print(
        'Child PID: {pid}. Parent PID {ppid}'.format(
            pid=os.getpid(),
            ppid=os.getppid(),
        )
    )
    print(request.decode())
    http_response = b"""\
HTTP/1.1 200 OK

Hello, World!
"""
    client_connection.sendall(http_response)
    time.sleep(60)


def serve_forever():
    listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listen_socket.bind(SERVER_ADDRESS)
    listen_socket.listen(REQUEST_QUEUE_SIZE)
    print('Serving HTTP on port {port} ...'.format(port=PORT))
    print('Parent PID (PPID): {pid}\n'.format(pid=os.getpid()))

    while True:
        client_connection, client_address = listen_socket.accept()
        pid = os.fork()
        if pid == 0:  ### 子进程
            listen_socket.close()  ### 关闭子进程中复制的套接字对象
            handle_request(client_connection)
            client_connection.close()
            os._exit(0)  ### 子进程在这里退出
        else:  ### 父进程
            client_connection.close()  ### 关闭父进程中的客户端连接对象,并循环执行

if __name__ == '__main__':
    serve_forever()

在深入研究代码、讨论 fork 如何工作之前,先尝试运行它,自己看一看这个服务器是否真的可以同时处理多个客户端请求,而不是像轮询服务器 webserver3a.pywebserver3b.py 一样。在命令行中使用如下命令启动服务器:

$ python webserver3c.py

然后,像我们之前测试轮询服务器那样,运行两个 curl 命令,来看看这次的效果。现在你可以看到,即使子进程在处理客户端请求后会休眠 60 秒,但它并不会影响其它客户端连接,因为他们都是由完全独立的进程来处理的。你应该看到你的 curl 命令立即输出了“Hello, World!”然后挂起 60 秒。你可以按照你的想法运行尽可能多的 curl 命令(好吧,并不能运行特别特别多 ^_^),所有的命令都会立刻输出来自服务器的响应 “Hello, World!”,并不会出现任何可被察觉到的延迟行为。试试看吧。

如果你要理解 fork(),那最重要的一点是:你调用了它一次,但是它会返回两次 —— 一次在父进程中,另一次是在子进程中。当你创建了一个新进程,那么 fork() 在子进程中的返回值是 0。如果是在父进程中,那 fork() 函数会返回子进程的 PID。

我依然记得在第一次看到它并尝试使用 fork() 的时候,我是多么的入迷。它在我眼里就像是魔法一样。这就好像我在读一段顺序执行的代码,然后“砰!”地一声,代码变成了两份,然后出现了两个实体,同时并行地运行相同的代码。讲真,那个时候我觉得它真的跟魔法一样神奇。

当父进程创建出一个新的子进程时,子进程会复制从父进程中复制一份文件描述符:

你可能注意到,在上面的代码中,父进程关闭了客户端连接:

else:  ### 父进程
    client_connection.close()  # 关闭父进程的副本并循环

不过,既然父进程关闭了这个套接字,那为什么子进程仍然能够从来自客户端的套接字中读取数据呢?答案就在上面的图片中。内核会使用描述符引用计数器来决定是否要关闭一个套接字。当你的服务器创建一个子进程时,子进程会复制父进程的所有文件描述符,内核中该描述符的引用计数也会增加。如果只有一个父进程及一个子进程,那客户端套接字的文件描述符引用数应为 2;当父进程关闭客户端连接的套接字时,内核只会减少它的引用计数,将其变为 1,但这仍然不会使内核关闭该套接字。子进程也关闭了父进程中 listen_socket 的复制实体,因为子进程不需要关注新的客户端连接,而只需要处理已建立的客户端连接中的请求。

listen_socket.close()  ### 关闭子进程中的复制实体

我们将会在后文中讨论,如果你不关闭那些重复的描述符,会发生什么。

你可以从你的并发服务器源码中看到,父进程的主要职责为:接受一个新的客户端连接,复制出一个子进程来处理这个连接,然后继续循环来接受另外的客户端连接,仅此而已。服务器父进程并不会处理客户端连接——子进程才会做这件事。

打个岔:当我们说两个事件并发执行时,我们所要表达的意思是什么?

当我们说“两个事件并发执行”时,它通常意味着这两个事件同时发生。简单来讲,这个定义没问题,但你应该记住它的严格定义:

如果你不能在代码中判断两个事件的发生顺序,那这两个事件就是并发执行的。(引自《信号系统简明手册 (第二版): 并发控制深入浅出及常见错误》

好的,现在你又该回顾一下你刚刚学过的知识点了。

  • 在 Unix 中,编写一个并发服务器的最简单的方式——使用 fork() 系统调用;
  • 当一个进程分叉(fork)出另一个进程时,它会变成刚刚分叉出的进程的父进程;
  • 在进行 fork 调用后,父进程和子进程共享相同的文件描述符;
  • 系统内核通过描述符的引用计数来决定是否要关闭该描述符对应的文件或套接字;
  • 服务器父进程的主要职责:现在它做的只是从客户端接受一个新的连接,分叉出子进程来处理这个客户端连接,然后开始下一轮循环,去接收新的客户端连接。

进程分叉后不关闭重复的套接字会发生什么?

我们来看看,如果我们不在父进程与子进程中关闭重复的套接字描述符会发生什么。下面是刚才的并发服务器代码的修改版本,这段代码(webserver3d.py 中,服务器不会关闭重复的描述符):

#######################################################
# 并发服务器 - webserver3d.py                          #
#                                                     #
# 使用 Python 2.7.9 或 3.4                             #
# 在 Ubuntu 14.04 及 Mac OS X 环境下测试通过            #
#######################################################
import os
import socket

SERVER_ADDRESS = (HOST, PORT) = '', 8888
REQUEST_QUEUE_SIZE = 5


def handle_request(client_connection):
    request = client_connection.recv(1024)
    http_response = b"""\
HTTP/1.1 200 OK

Hello, World!
"""
    client_connection.sendall(http_response)


def serve_forever():
    listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listen_socket.bind(SERVER_ADDRESS)
    listen_socket.listen(REQUEST_QUEUE_SIZE)
    print('Serving HTTP on port {port} ...'.format(port=PORT))

    clients = []
    while True:
        client_connection, client_address = listen_socket.accept()
        ### 将引用存储起来,否则在下一轮循环时,他们会被垃圾回收机制销毁
        clients.append(client_connection)
        pid = os.fork()
        if pid == 0:  ### 子进程
            listen_socket.close()  ### 关闭子进程中多余的套接字
            handle_request(client_connection)
            client_connection.close()
            os._exit(0)  ### 子进程在这里结束
        else:  ### 父进程
            # client_connection.close()
            print(len(clients))

if __name__ == '__main__':
    serve_forever()

用以下命令来启动服务器:

$ python webserver3d.py

curl 命令连接服务器:

$ curl http://localhost:8888/hello
Hello, World!

好,curl 命令输出了来自并发服务器的响应内容,但程序并没有退出,而是仍然挂起。到底发生了什么?这个服务器并不会挂起 60 秒:子进程只处理客户端连接,关闭连接然后退出,但客户端的 curl 命令并没有终止。

所以,为什么 curl 不终止呢?原因就在于文件描述符的副本。当子进程关闭客户端连接时,系统内核会减少客户端套接字的引用计数,将其变为 1。服务器子进程退出了,但客户端套接字并没有被内核关闭,因为该套接字的描述符引用计数并没有变为 0,所以,这就导致了连接终止包(在 TCP/IP 协议中称作 FIN)不会被发送到客户端,所以客户端会一直保持连接。这里也会出现另一个问题:如果你的服务器长时间运行,并且不关闭文件描述符的副本,那么可用的文件描述符会被消耗殆尽:

使用 Control-C 关闭服务器 webserver3d.py,然后在 shell 中使用内置命令 ulimit 来查看系统默认为你的服务器进程分配的可用资源数:

$ ulimit -a
core file size          (blocks, -c) 0
data seg size           (kbytes, -d) unlimited
scheduling priority             (-e) 0
file size               (blocks, -f) unlimited
pending signals                 (-i) 3842
max locked memory       (kbytes, -l) 64
max memory size         (kbytes, -m) unlimited
open files                      (-n) 1024
pipe size            (512 bytes, -p) 8
POSIX message queues     (bytes, -q) 819200
real-time priority              (-r) 0
stack size              (kbytes, -s) 8192
cpu time               (seconds, -t) unlimited
max user processes              (-u) 3842
virtual memory          (kbytes, -v) unlimited
file locks                      (-x) unlimited

你可以从上面的结果看到,在我的 Ubuntu 机器中,系统为我的服务器进程分配的最大可用文件描述符(文件打开)数为 1024。

现在我们来看一看,如果你的服务器不关闭重复的描述符,它会如何消耗可用的文件描述符。在一个已有的或新建的终端窗口中,将你的服务器进程的最大可用文件描述符设为 256:

$ ulimit -n 256

在你刚刚运行 ulimit -n 256 的终端窗口中运行服务器 webserver3d.py

$ python webserver3d.py

然后使用下面的客户端 client3.py 来测试你的服务器。

#######################################################
# 测试客户端 - client3.py                              #
#                                                     #
# 使用 Python 2.7.9 或 3.4                             #
# 在 Ubuntu 14.04 及 Mac OS X 环境下测试通过            #
#######################################################
import argparse
import errno
import os
import socket


SERVER_ADDRESS = 'localhost', 8888
REQUEST = b"""\
GET /hello HTTP/1.1
Host: localhost:8888

"""


def main(max_clients, max_conns):
    socks = []
    for client_num in range(max_clients):
        pid = os.fork()
        if pid == 0:
            for connection_num in range(max_conns):
                sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                sock.connect(SERVER_ADDRESS)
                sock.sendall(REQUEST)
                socks.append(sock)
                print(connection_num)
                os._exit(0)


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description='Test client for LSBAWS.',
        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
    )
    parser.add_argument(
        '--max-conns',
        type=int,
        default=1024,
        help='Maximum number of connections per client.'
    )
    parser.add_argument(
        '--max-clients',
        type=int,
        default=1,
        help='Maximum number of clients.'
    )
    args = parser.parse_args()
    main(args.max_clients, args.max_conns)

在一个新建的终端窗口中,运行 client3.py 然后让它与服务器同步创建 300 个连接:

$ python client3.py --max-clients=300

过一会,你的服务器进程就该爆了。这是我的环境中出现的异常截图:

这个例子很明显——你的服务器应该关闭描述符副本。

僵尸进程

但是,即使你关闭了描述符副本,你依然没有摆脱险境,因为你的服务器还有一个问题,这个问题在于“ 僵尸 zombies ”!

没错,这个服务器代码确实在制造僵尸进程。我们来看看怎么回事。重新运行你的服务器:

$ python webserver3d.py

在另一个终端窗口中运行以下 curl 命令:

$ curl http://localhost:8888/hello

现在,运行 ps 环境,来查看正在运行的 Python 进程。下面是我的环境中 ps 的运行结果:

$ ps auxw | grep -i python | grep -v grep
vagrant   9099  0.0  1.2  31804  6256 pts/0    S+   16:33   0:00 python webserver3d.py
vagrant   9102  0.0  0.0      0     0 pts/0    Z+   16:33   0:00 [python] <defunct>

你看到第二行中,pid 为 9102,状态为 Z+,名字里面有个 <defunct> 的进程了吗?那就是我们的僵尸进程。这个僵尸进程的问题在于:你无法将它杀掉!

就算你尝试使用 kill -9 来杀死僵尸进程,它们仍旧会存活。自己试试看,看看结果。

这个僵尸到底是什么,为什么我们的服务器会造出它们呢?一个 僵尸进程 zombie 是一个已经结束的进程,但它的父进程并没有等待(waited)它结束,并且也没有收到它的终结状态。如果一个进程在父进程退出之前退出,系统内核会把它变为一个僵尸进程,存储它的部分信息,以便父进程读取。内核保存的进程信息通常包括进程 ID、进程终止状态,以及进程的资源占用情况。OK,所以僵尸进程确实有存在的意义,但如果服务器不管这些僵尸进程,你的系统将会被壅塞。我们来看看这个会如何发生。首先,关闭你运行的服务器;然后,在一个新的终端窗口中,使用 ulimit 命令将最大用户进程数设为 400(同时,要确保你的最大可用描述符数大于这个数字,我们在这里设为 500):

$ ulimit -u 400
$ ulimit -n 500

在你刚刚运行 ulimit -u 400 命令的终端中,运行服务器 webserver3d.py

$ python webserver3d.py

在一个新的终端窗口中,运行 client3.py,并且让它与服务器同时创建 500 个连接:

$ python client3.py --max-clients=500

然后,过一会,你的服务器进程应该会再次爆了,它会在创建新进程时抛出一个 OSError: 资源暂时不可用 的异常。但它并没有达到系统允许的最大进程数。这是我的环境中输出的异常信息截图:

你可以看到,如果服务器不管僵尸进程,它们会引发问题。接下来我会简单探讨一下僵尸进程问题的解决方案。

我们来回顾一下你刚刚掌握的知识点:

  • 如果你不关闭文件描述符副本,客户端就不会在请求处理完成后终止,因为客户端连接没有被关闭;
  • 如果你不关闭文件描述符副本,长久运行的服务器最终会把可用的文件描述符(最大文件打开数)消耗殆尽;
  • 当你创建一个新进程,而父进程不等待(wait)子进程,也不在子进程结束后收集它的终止状态,它会变为一个僵尸进程;
  • 僵尸通常都会吃东西,在我们的例子中,僵尸进程会吃掉资源。如果你的服务器不管僵尸进程,它最终会消耗掉所有的可用进程(最大用户进程数);
  • 你不能杀死(kill)僵尸进程,你需要等待(wait)它。

如何处理僵尸进程?

所以,你需要做什么来处理僵尸进程呢?你需要修改你的服务器代码,来等待(wait)僵尸进程,并收集它们的终止信息。你可以在代码中使用系统调用 wait 来完成这个任务。不幸的是,这个方法离理想目标还很远,因为在没有终止的子进程存在的情况下调用 wait 会导致服务器进程阻塞,这会阻碍你的服务器处理新的客户端连接请求。那么,我们有其他选择吗?嗯,有的,其中一个解决方案需要结合信号处理以及 wait 系统调用。

这是它的工作流程。当一个子进程退出时,内核会发送 SIGCHLD 信号。父进程可以设置一个信号处理器,它可以异步响应 SIGCHLD 信号,并在信号响应函数中等待(wait)子进程收集终止信息,从而阻止了僵尸进程的存在。

顺便说一下,异步事件意味着父进程无法提前知道事件的发生时间。

修改你的服务器代码,设置一个 SIGCHLD 信号处理器,在信号处理器中等待(wait)终止的子进程。修改后的代码如下(webserver3e.py):

#######################################################
# 并发服务器 - webserver3e.py                          #
#                                                     #
# 使用 Python 2.7.9 或 3.4                             #
# 在 Ubuntu 14.04 及 Mac OS X 环境下测试通过            #
#######################################################
import os
import signal
import socket
import time

SERVER_ADDRESS = (HOST, PORT) = '', 8888
REQUEST_QUEUE_SIZE = 5


def grim_reaper(signum, frame):
    pid, status = os.wait()
    print(
        'Child {pid} terminated with status {status}'
        '\n'.format(pid=pid, status=status)
    )


def handle_request(client_connection):
    request = client_connection.recv(1024)
    print(request.decode())
    http_response = b"""\
HTTP/1.1 200 OK

Hello, World!
"""
    client_connection.sendall(http_response)
    ### 挂起进程,来允许父进程完成循环,并在 "accept" 处阻塞
    time.sleep(3)


def serve_forever():
    listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listen_socket.bind(SERVER_ADDRESS)
    listen_socket.listen(REQUEST_QUEUE_SIZE)
    print('Serving HTTP on port {port} ...'.format(port=PORT))

    signal.signal(signal.SIGCHLD, grim_reaper)

    while True:
        client_connection, client_address = listen_socket.accept()
        pid = os.fork()
        if pid == 0:  ### 子进程
            listen_socket.close()  ### 关闭子进程中多余的套接字
            handle_request(client_connection)
            client_connection.close()
            os._exit(0)
        else:  ### 父进程
            client_connection.close()

if __name__ == '__main__':
    serve_forever()

运行服务器:

$ python webserver3e.py

使用你的老朋友——curl 命令来向修改后的并发服务器发送一个请求:

$ curl http://localhost:8888/hello

再来看看服务器:

刚刚发生了什么?accept 调用失败了,错误信息为 EINTR

当子进程退出并触发 SIGCHLD 事件时,父进程的 accept 调用被阻塞了,系统转去运行信号处理器,当信号处理函数完成时,accept 系统调用被打断:

别担心,这个问题很好解决。你只需要重新运行 accept 系统调用即可。这是修改后的服务器代码 webserver3f.py,它可以解决这个问题:

#######################################################
# 并发服务器 - webserver3f.py                          #
#                                                     #
# 使用 Python 2.7.9 或 3.4                             #
# 在 Ubuntu 14.04 及 Mac OS X 环境下测试通过            #
#######################################################
import errno
import os
import signal
import socket

SERVER_ADDRESS = (HOST, PORT) = '', 8888
REQUEST_QUEUE_SIZE = 1024


def grim_reaper(signum, frame):
    pid, status = os.wait()


def handle_request(client_connection):
    request = client_connection.recv(1024)
    print(request.decode())
    http_response = b"""\
HTTP/1.1 200 OK

Hello, World!
"""
    client_connection.sendall(http_response)


def serve_forever():
    listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listen_socket.bind(SERVER_ADDRESS)
    listen_socket.listen(REQUEST_QUEUE_SIZE)
    print('Serving HTTP on port {port} ...'.format(port=PORT))

    signal.signal(signal.SIGCHLD, grim_reaper)

    while True:
        try:
            client_connection, client_address = listen_socket.accept()
        except IOError as e:
            code, msg = e.args
            ### 若 'accept' 被打断,那么重启它
            if code == errno.EINTR:
                continue
            else:
                raise

        pid = os.fork()
        if pid == 0:  ### 子进程
            listen_socket.close()  ### 关闭子进程中多余的描述符
            handle_request(client_connection)
            client_connection.close()
            os._exit(0)
        else:  ### 父进程
            client_connection.close()  ### 关闭父进程中多余的描述符,继续下一轮循环


if __name__ == '__main__':
    serve_forever()

运行更新后的服务器 webserver3f.py

$ python webserver3f.py

curl 来向更新后的并发服务器发送一个请求:

$ curl http://localhost:8888/hello

看到了吗?没有 EINTR 异常出现了。现在检查一下,确保没有僵尸进程存活,调用 wait 函数的 SIGCHLD 信号处理器能够正常处理被终止的子进程。我们只需使用 ps 命令,然后看看现在没有处于 Z+ 状态(或名字包含 <defunct> )的 Python 进程就好了。很棒!僵尸进程没有了,我们很安心。

  • 如果你创建了一个子进程,但是不等待它,它就会变成一个僵尸进程;
  • 使用 SIGCHLD 信号处理器可以异步地等待子进程终止,并收集其终止状态;
  • 当使用事件处理器时,你需要牢记,系统调用可能会被打断,所以你需要处理这种情况发生时带来的异常。

正确处理 SIGCHLD 信号

好的,一切顺利。是不是没问题了?额,几乎是。重新尝试运行 webserver3f.py 但我们这次不会只发送一个请求,而是同步创建 128 个连接:

$ python client3.py --max-clients 128

现在再次运行 ps 命令:

$ ps auxw | grep -i python | grep -v grep

看到了吗?天啊,僵尸进程又出来了!

这回怎么回事?当你同时运行 128 个客户端,建立 128 个连接时,服务器的子进程几乎会在同一时间处理好你的请求,然后退出。这会导致非常多的 SIGCHLD 信号被发送到父进程。问题在于,这些信号不会存储在队列中,所以你的服务器进程会错过很多信号,这也就导致了几个僵尸进程处于无主状态:

这个问题的解决方案依然是设置 SIGCHLD 事件处理器。但我们这次将会用 WNOHANG 参数循环调用 waitpid 来替代 wait,以保证所有处于终止状态的子进程都会被处理。下面是修改后的代码,webserver3g.py

#######################################################
# 并发服务器 - webserver3g.py                          #
#                                                     #
# 使用 Python 2.7.9 或 3.4                             #
# 在 Ubuntu 14.04 及 Mac OS X 环境下测试通过            #
#######################################################
import errno
import os
import signal
import socket

SERVER_ADDRESS = (HOST, PORT) = '', 8888
REQUEST_QUEUE_SIZE = 1024


def grim_reaper(signum, frame):
    while True:
        try:
            pid, status = os.waitpid(
                -1,          ### 等待所有子进程
                 os.WNOHANG  ### 无终止进程时,不阻塞进程,并抛出 EWOULDBLOCK 错误
            )
        except OSError:
            return

        if pid == 0:  ### 没有僵尸进程存在了
            return


def handle_request(client_connection):
    request = client_connection.recv(1024)
    print(request.decode())
    http_response = b"""\
HTTP/1.1 200 OK

Hello, World!
"""
    client_connection.sendall(http_response)


def serve_forever():
    listen_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    listen_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    listen_socket.bind(SERVER_ADDRESS)
    listen_socket.listen(REQUEST_QUEUE_SIZE)
    print('Serving HTTP on port {port} ...'.format(port=PORT))

    signal.signal(signal.SIGCHLD, grim_reaper)

    while True:
        try:
            client_connection, client_address = listen_socket.accept()
        except IOError as e:
            code, msg = e.args
            ### 若 'accept' 被打断,那么重启它
            if code == errno.EINTR:
                continue
            else:
                raise

        pid = os.fork()
        if pid == 0:  ### 子进程
            listen_socket.close()  ### 关闭子进程中多余的描述符
            handle_request(client_connection)
            client_connection.close()
            os._exit(0)
        else:  ### 父进程
            client_connection.close()  ### 关闭父进程中多余的描述符,继续下一轮循环

if __name__ == '__main__':
    serve_forever()

运行服务器:

$ python webserver3g.py

使用测试客户端 client3.py

$ python client3.py --max-clients 128

现在来查看一下,确保没有僵尸进程存在。耶!没有僵尸的生活真美好 ^_^

大功告成

恭喜!你刚刚经历了一段很长的旅程,我希望你能够喜欢它。现在你拥有了自己的简易并发服务器,并且这段代码能够为你在继续研究生产级 Web 服务器的路上奠定基础。

我将会留一个作业:你需要将第二部分中的 WSGI 服务器升级,将它改造为一个并发服务器。你可以在这里找到更改后的代码。但是,当你实现了自己的版本之后,你才应该来看我的代码。你已经拥有了实现这个服务器所需的所有信息。所以,快去实现它吧 ^_^

然后要做什么呢?乔希·比林斯说过:

“就像一枚邮票一样——专注于一件事,不达目的不罢休。”

开始学习基本知识。回顾你已经学过的知识。然后一步一步深入。

“如果你只学会了方法,你将会被这些方法所困。但如果你学会了原理,那你就能发明出新的方法。”——拉尔夫·沃尔多·爱默生

“有道无术,术尚可求也,有术无道,止于术”——中国古代也有这样的话,LCTT 译注

下面是一份书单,我从这些书中提炼出了这篇文章所需的素材。他们能助你在我刚刚所述的几个方面中发掘出兼具深度和广度的知识。我极力推荐你们去搞到这几本书看看:从你的朋友那里借,在当地的图书馆中阅读,或者直接在亚马逊上把它买回来。下面是我的典藏秘籍:

  1. 《UNIX 网络编程 卷1:套接字联网 API (第3版)》
  2. 《UNIX 环境高级编程(第3版)》
  3. 《Linux/UNIX 系统编程手册》
  4. 《TCP/IP 详解 卷1:协议(第2版)
  5. 《信号系统简明手册 (第二版): 并发控制深入浅出及常见错误》,这本书也可以从作者的个人网站中免费下载到。

顺便,我在撰写一本名为《搭个 Web 服务器:从头开始》的书。这本书讲解了如何从头开始编写一个基本的 Web 服务器,里面包含本文中没有的更多细节。订阅原文下方的邮件列表,你就可以获取到这本书的最新进展,以及发布日期。


via: https://ruslanspivak.com/lsbaws-part3/

作者:Ruslan 译者:StdioA 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

聊天机器人(Bot) 是一种像 Slack 一样的实用的互动聊天服务方式。如果你之前从来没有建立过聊天机器人,那么这篇文章提供了一个简单的入门指南,告诉你如何用 Python 结合 Slack API 建立你第一个聊天机器人。

我们通过搭建你的开发环境, 获得一个 Slack API 的聊天机器人令牌,并用 Pyhon 开发一个简单聊天机器人。

我们所需的工具

我们的聊天机器人我们将它称作为“StarterBot”,它需要 Python 和 Slack API。要运行我们的 Python 代码,我们需要:

当你在本教程中进行构建时,Slack API 文档 是很有用的。

本教程中所有的代码都放在 slack-starterbot 公共库里,并以 MIT 许可证开源。

搭建我们的环境

我们现在已经知道我们的项目需要什么样的工具,因此让我们来搭建我们所的开发环境吧。首先到终端上(或者 Windows 上的命令提示符)并且切换到你想要存储这个项目的目录。在那个目录里,创建一个新的 virtualenv 以便和其他的 Python 项目相隔离我们的应用程序依赖关系。

virtualenv starterbot

激活 virtualenv:

source starterbot/bin/activate

你的提示符现在应该看起来如截图:

已经激活的 starterbot 的 virtualenv的命令提示符

这个官方的 slack 客户端 API 帮助库是由 Slack 建立的,它可以通过 Slack 通道发送和接收消息。通过这个 pip 命令安装 slackclient 库:

pip install slackclient

pip 命令完成时,你应该看到类似这样的输出,并返回提示符。

在已经激活的 virtualenv 用 pip 安装 slackclient 的输出

我们也需要为我们的 Slack 项目获得一个访问令牌,以便我们的聊天机器人可以用它来连接到 Slack API。

Slack 实时消息传递(RTM)API

Slack 允许程序通过一个 Web API 来访问他们的消息传递通道。去这个 Slack Web API 页面 注册建立你自己的 Slack 项目。你也可以登录一个你拥有管理权限的已有账号。

使用 Web API页面的右上角登录按钮

登录后你会到达 聊天机器人用户页面

定制聊天机器人用户页面

给你的聊天机器人起名为“starterbot”然后点击 “Add bot integration” 按钮。

添加一个bot integration 并起名为“starterbot”

这个页面将重新加载,你将看到一个新生成的访问令牌。你还可以将标志改成你自己设计的。例如我给的这个“Full Stack Python”标志。

为你的新 Slack 聊天机器人复制和粘贴访问令牌

在页面底部点击“Save Integration”按钮。你的聊天机器人现在已经准备好连接 Slack API。

Python 开发人员的一个常见的做法是以环境变量输出秘密令牌。输出的 Slack 令牌名字为SLACK_BOT_TOKEN

export SLACK_BOT_TOKEN='你的 slack 令牌粘帖在这里'

好了,我们现在得到了将这个 Slack API 用作聊天机器人的授权。

我们建立聊天机器人还需要更多信息:我们的聊天机器人的 ID。接下来我们将会写一个简短的脚本,从 Slack API 获得该 ID。

获得我们聊天机器人的 ID

这是最后写一些 Python 代码的时候了! 我们编写一个简短的 Python 脚本获得 StarterBot 的 ID 来热身一下。这个 ID 基于 Slack 项目而不同。

我们需要该 ID,当解析从 Slack RTM 上发给 StarterBot 的消息时,它用于对我们的应用验明正身。我们的脚本也会测试我们 SLACK_BOT_TOKEN 环境变量是否设置正确。

建立一个命名为 printbotid.py 的新文件,并且填入下面的代码:

import os
from slackclient import SlackClient

BOT_NAME = 'starterbot'

slack_client = SlackClient(os.environ.get('SLACK_BOT_TOKEN'))

if __name__ == "__main__":
    api_call = slack_client.api_call("users.list")
    if api_call.get('ok'):
        # retrieve all users so we can find our bot
        users = api_call.get('members')
        for user in users:
            if 'name' in user and user.get('name') == BOT_NAME:
                print("Bot ID for '" + user['name'] + "' is " + user.get('id'))
    else:
        print("could not find bot user with the name " + BOT_NAME)

我们的代码导入 SlackClient,并用我们设置的环境变量 SLACK_BOT_TOKEN 实例化它。 当该脚本通过 python 命令执行时,我们通过会访问 Slack API 列出所有的 Slack 用户并且获得匹配一个名字为“satrterbot”的 ID。

这个获得聊天机器人的 ID 的脚本我们仅需要运行一次。

python print_bot_id.py

当它运行为我们提供了聊天机器人的 ID 时,脚本会打印出简单的一行输出。

在你的 Slack 项目中用 Python 脚本打印 Slack 聊天机器人的 ID

复制这个脚本打印出的唯一 ID。并将该 ID 作为一个环境变量 BOT_ID 输出。

(starterbot)$ export BOT_ID='bot id returned by script'

这个脚本仅仅需要运行一次来获得聊天机器人的 ID。 我们现在可以在我们的运行 StarterBot 的 Python应用程序中使用这个 ID 。

编码我们的 StarterBot

现在我们拥有了写我们的 StarterBot 代码所需的一切。 创建一个新文件命名为 starterbot.py ,它包括以下代码。

import os
import time
from slackclient import SlackClient

osSlackClient 的导入我们看起来很熟悉,因为我们已经在 theprintbotid.py 中用过它们了。

通过我们导入的依赖包,我们可以使用它们获得环境变量值,并实例化 Slack 客户端。

# starterbot 的 ID 作为一个环境变量
BOT_ID = os.environ.get("BOT_ID")

# 常量
AT_BOT = "<@" + BOT_ID + ">:"
EXAMPLE_COMMAND = "do"

# 实例化 Slack 和 Twilio 客户端
slack_client = SlackClient(os.environ.get('SLACK_BOT_TOKEN'))

该代码通过我们以输出的环境变量 SLACK_BOT_TOKEN 实例化SlackClient` 客户端。

if __name__ == "__main__":
    READ_WEBSOCKET_DELAY = 1 # 1 从 firehose 读取延迟 1 秒
    if slack_client.rtm_connect():
        print("StarterBot connected and running!")
        while True:
            command, channel = parse_slack_output(slack_client.rtm_read())
            if command and channel:
                handle_command(command, channel)
            time.sleep(READ_WEBSOCKET_DELAY)
    else:
        print("Connection failed. Invalid Slack token or bot ID?")

Slack 客户端会连接到 Slack RTM API WebSocket,然后当解析来自 firehose 的消息时会不断循环。如果有任何发给 StarterBot 的消息,那么一个被称作 handle_command 的函数会决定做什么。

接下来添加两个函数来解析 Slack 的输出并处理命令。

def handle_command(command, channel):
    """
        Receives commands directed at the bot and determines if they
        are valid commands. If so, then acts on the commands. If not,
        returns back what it needs for clarification.
    """
    response = "Not sure what you mean. Use the *" + EXAMPLE_COMMAND + \
               "* command with numbers, delimited by spaces."
    if command.startswith(EXAMPLE_COMMAND):
        response = "Sure...write some more code then I can do that!"
    slack_client.api_call("chat.postMessage", channel=channel,
                          text=response, as_user=True)

def parse_slack_output(slack_rtm_output):
    """
        The Slack Real Time Messaging API is an events firehose.
        this parsing function returns None unless a message is
        directed at the Bot, based on its ID.
    """
    output_list = slack_rtm_output
    if output_list and len(output_list) > 0:
        for output in output_list:
            if output and 'text' in output and AT_BOT in output['text']:
                # 返回 @ 之后的文本,删除空格
                return output['text'].split(AT_BOT)[1].strip().lower(), \
                       output['channel']
    return None, None

parse_slack_output 函数从 Slack 接受信息,并且如果它们是发给我们的 StarterBot 时会作出判断。消息以一个给我们的聊天机器人 ID 的直接命令开始,然后交由我们的代码处理。目前只是通过 Slack 管道发布一个消息回去告诉用户去多写一些 Python 代码!

这是整个程序组合在一起的样子 (你也可以 在 GitHub 中查看该文件):

import os
import time
from slackclient import SlackClient

# starterbot 的 ID 作为一个环境变量
BOT_ID = os.environ.get("BOT_ID")

# 常量
AT_BOT = "<@" + BOT_ID + ">:"
EXAMPLE_COMMAND = "do"

# 实例化 Slack 和 Twilio 客户端
slack_client = SlackClient(os.environ.get('SLACK_BOT_TOKEN'))

def handle_command(command, channel):
    """
        Receives commands directed at the bot and determines if they
        are valid commands. If so, then acts on the commands. If not,
        returns back what it needs for clarification.
    """
    response = "Not sure what you mean. Use the *" + EXAMPLE_COMMAND + \
               "* command with numbers, delimited by spaces."
    if command.startswith(EXAMPLE_COMMAND):
        response = "Sure...write some more code then I can do that!"
    slack_client.api_call("chat.postMessage", channel=channel,
                          text=response, as_user=True)

def parse_slack_output(slack_rtm_output):
    """
        The Slack Real Time Messaging API is an events firehose.
        this parsing function returns None unless a message is
        directed at the Bot, based on its ID.
    """
    output_list = slack_rtm_output
    if output_list and len(output_list) > 0:
        for output in output_list:
            if output and 'text' in output and AT_BOT in output['text']:
                # 返回 @ 之后的文本,删除空格
                return output['text'].split(AT_BOT)[1].strip().lower(), \
                       output['channel']
    return None, None

if __name__ == "__main__":
    READ_WEBSOCKET_DELAY = 1 # 1 second delay between reading from firehose
    if slack_client.rtm_connect():
        print("StarterBot connected and running!")
        while True:
            command, channel = parse_slack_output(slack_client.rtm_read())
            if command and channel:
                handle_command(command, channel)
            time.sleep(READ_WEBSOCKET_DELAY)
    else:
        print("Connection failed. Invalid Slack token or bot ID?")

现在我们的代码已经有了,我们可以通过 python starterbot.py 来运行我们 StarterBot 的代码了。

当 StarterBot 开始运行而且连接到 API 的输出通道

在 Slack 中创建新通道,并且把 StarterBot 邀请进来,或者把 StarterBot 邀请进一个已经存在的通道中。

在 Slack 界面创建一个新通道并且邀请 StarterBot

现在在你的通道中给 StarterBot 发命令。

在你的 Slack 通道里给你的 StarterBot 发命令

如果你从聊天机器人得到的响应中遇见问题,你可能需要做一个修改。正如上面所写的这个教程,其中一行 AT_BOT = "<@" + BOT_ID + ">:",在“@starter”(你给你自己的聊天机器人起的名字)后需要一个冒号。从 AT_BOT 字符串后面移除:。Slack 似乎需要在@ 一个人名后加一个冒号,但这好像是有些不协调的。

结束

好吧,你现在已经获得一个简易的聊天机器人,你可以在代码中很多地方加入你想要创建的任何特性。

我们能够使用 Slack RTM API 和 Python 完成很多功能。看看通过这些文章你还可以学习到什么:

有问题? 通过 Twitter 联系我 @fullstackpython@mattmakai。 我在 GitHub 上的用户名是 mattmakai

这篇文章感兴趣? Fork 这个 GitHub 上的页面吧。


via: https://www.fullstackpython.com/blog/build-first-slack-bot-python.html

作者:Matt Makai 译者:jiajia9llinuxer 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出aa