2018年7月

最近在开源社区发生了很多事情。首先,微软收购了 GitHub,然后人们开始寻找 GitHub 替代套餐,甚至在 Linus Torvalds 发布 Linux Kernel 4.17 时没有花一点时间考虑它。好吧,如果你一直关注我们,我认为你知道这一切。

但是,如今,GitLab 做出了一个明智的举措,为教育机构和开源项目免费提供高级套餐。当许多开发人员有兴趣将他们的开源项目迁移到 GitLab 时,没有更好的时机来提供这些了。

GitLab 的高级套餐现在对开源项目和教育机构免费

GitLab Logo

在今天(2018/6/7)发布的博客中,GitLab 宣布其旗舰和黄金套餐现在对教育机构和开源项目免费。虽然我们已经知道为什么 GitLab 做出这个举动(一个完美的时机!),但他们还是解释了他们让它免费的动机:

我们让 GitLab 对教育机构免费,因为我们希望学生使用我们最先进的功能。许多大学已经运行了 GitLab。如果学生使用 GitLab 旗舰和黄金套餐的高级功能,他们将把这些高级功能的经验带到他们的工作场所。

我们希望有更多的开源项目使用 GitLab。GitLab.com 上的公共项目已经拥有 GitLab 旗舰套餐的所有功能。像 GnomeDebian 这样的项目已经在自己的服务器运行开源版 GitLab 。随着今天的宣布,在专有软件上运行的开源项目可以使用 GitLab 提供的所有功能,同时我们通过向非开源组织收费来建立可持续的业务模式。

GitLab 提供的这些“免费”套餐是什么?

GitLab Pricing

GitLab 有两类产品。一个是你可以在自己的云托管服务如 Digital Ocean 上运行的软件。另一个是 Gitlab 软件既服务,其中托管由 GitLab 本身管理,你在 GitLab.com 上获得一个帐户。

GitLab Pricing for hosted service

黄金套餐是托管类别中最高的产品,而旗舰套餐是自托管类别中的最高产品。

你可以在 GitLab 定价页面上获得有关其功能的更多详细信息。请注意,支持服务不包括在套餐中。你必须单独购买。

你必须符合某些条件才能使用此优惠

GitLab 还提到 —— 该优惠对谁有效。以下是他们在博客文章中写的内容:

  1. 教育机构:任何为了学习、教育的机构,并且/或者由合格的教育机构、教职人员、学生训练。教育目的不包括商业,专业或任何其他营利目的。
  2. 开源项目:任何使用标准开源许可证且非商业性的项目。它不应该有付费支持或付费贡献者。

虽然免费套餐不包括支持,但是当你迫切需要专家帮助解决问题时,你仍然可以支付每用户每月 4.95 美元的额外费用 —— 当你特别需要一个专家来解决问题时,这是一个非常合理的价格。

GitLab 还为学生们添加了一条说明:

为减轻 GitLab 的管理负担,只有教育机构才能代表学生申请。如果你是学生并且你的教育机构不申请,你可以在 GitLab.com 上使用公共项目的所有功能,使用私人项目的免费功能,或者自己付费。

总结

现在 GitLab 正在加快脚步,你如何看待它?

你有 GitHub 上的项目吗?你会切换么?或者,幸运的是,你从一开始就碰巧使用 GitLab?

请在下面的评论栏告诉我们你的想法。


via: https://itsfoss.com/gitlab-free-open-source/

作者:Ankush Das 选题:lujun9972 译者:geekpi 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

简介

伙计们,请搬好小板凳坐好,下面将是一段漫长的旅程,期望你能够乐在其中。

我将基于 Kubernetes 部署一个分布式应用。我曾试图编写一个尽可能真实的应用,但由于时间和精力有限,最终砍掉了很多细节。

我将聚焦 Kubernetes 及其部署。

让我们开始吧。

应用

TL;DR

该应用本身由 6 个组件构成。代码可以从如下链接中找到:Kubenetes 集群示例

这是一个人脸识别服务,通过比较已知个人的图片,识别给定图片对应的个人。前端页面用表格形式简要的展示图片及对应的个人。具体而言,向 接收器 发送请求,请求包含指向一个图片的链接。图片可以位于任何位置。接受器将图片地址存储到数据库 (MySQL) 中,然后向队列发送处理请求,请求中包含已保存图片的 ID。这里我们使用 NSQ 建立队列。

图片处理 服务一直监听处理请求队列,从中获取任务。处理过程包括如下几步:获取图片 ID,读取图片,通过 gRPC 将图片路径发送至 Python 编写的 人脸识别 后端。如果识别成功,后端给出图片对应个人的名字。图片处理器进而根据个人 ID 更新图片记录,将其标记为处理成功。如果识别不成功,图片被标记为待解决。如果图片识别过程中出现错误,图片被标记为失败。

标记为失败的图片可以通过计划任务等方式进行重试。

那么具体是如何工作的呢?我们深入探索一下。

接收器

接收器服务是整个流程的起点,通过如下形式的 API 接收请求:

curl -d '{"path":"/unknown_images/unknown0001.jpg"}' http://127.0.0.1:8000/image/post

此时,接收器将 路径 path 存储到共享数据库集群中,该实体存储后将从数据库服务收到对应的 ID。本应用采用“ 实体对象 Entity Object 的唯一标识由持久层提供”的模型。获得实体 ID 后,接收器向 NSQ 发送消息,至此接收器的工作完成。

图片处理器

从这里开始变得有趣起来。图片处理器首次运行时会创建两个 Go 协程 routine ,具体为:

Consume

这是一个 NSQ 消费者,需要完成三项必需的任务。首先,监听队列中的消息。其次,当有新消息到达时,将对应的 ID 追加到一个线程安全的 ID 片段中,以供第二个协程处理。最后,告知第二个协程处理新任务,方法为 sync.Condition

ProcessImages

该协程会处理指定 ID 片段,直到对应片段全部处理完成。当处理完一个片段后,该协程并不是在一个通道上睡眠等待,而是进入悬挂状态。对每个 ID,按如下步骤顺序处理:

  • 与人脸识别服务建立 gRPC 连接,其中人脸识别服务会在人脸识别部分进行介绍
  • 从数据库获取图片对应的实体
  • 断路器 准备两个函数

    • 函数 1: 用于 RPC 方法调用的主函数
    • 函数 2: 基于 ping 的断路器健康检查
  • 调用函数 1 将图片路径发送至人脸识别服务,其中路径应该是人脸识别服务可以访问的,最好是共享的,例如 NFS
  • 如果调用失败,将图片实体状态更新为 FAILEDPROCESSING
  • 如果调用成功,返回值是一个图片的名字,对应数据库中的一个个人。通过联合 SQL 查询,获取对应个人的 ID
  • 将数据库中的图片实体状态更新为 PROCESSED,更新图片被识别成的个人的 ID

这个服务可以复制多份同时运行。

断路器

即使对于一个复制资源几乎没有开销的系统,也会有意外的情况发生,例如网络故障或任何两个服务之间的通信存在问题等。我在 gRPC 调用中实现了一个简单的断路器,这十分有趣。

下面给出工作原理:

当出现 5 次不成功的服务调用时,断路器启动并阻断后续的调用请求。经过指定的时间后,它对服务进行健康检查并判断是否恢复。如果问题依然存在,等待时间会进一步增大。如果已经恢复,断路器停止对服务调用的阻断,允许请求流量通过。

前端

前端只包含一个极其简单的表格视图,通过 Go 自身的 html/模板显示一系列图片。

人脸识别

人脸识别是整个识别的关键点。仅因为追求灵活性,我将这个服务设计为基于 gRPC 的服务。最初我使用 Go 编写,但后续发现基于 Python 的实现更加适合。事实上,不算 gRPC 部分的代码,人脸识别部分仅有 7 行代码。我使用的人脸识别库极为出色,它包含 OpenCV 的全部 C 绑定。维护 API 标准意味着只要标准本身不变,实现可以任意改变。

注意:我曾经试图使用 GoCV,这是一个极好的 Go 库,但欠缺所需的 C 绑定。推荐马上了解一下这个库,它会让你大吃一惊,例如编写若干行代码即可实现实时摄像处理。

这个 Python 库的工作方式本质上很简单。准备一些你认识的人的图片,把信息记录下来。对于我而言,我有一个图片文件夹,包含若干图片,名称分别为 hannibal_1.jpghannibal_2.jpggergely_1.jpgjohn_doe.jpg。在数据库中,我使用两个表记录信息,分别为 personperson_images,具体如下:

+----+----------+
| id | name     |
+----+----------+
|  1 | Gergely  |
|  2 | John Doe |
|  3 | Hannibal |
+----+----------+
+----+----------------+-----------+
| id | image_name     | person_id |
+----+----------------+-----------+
|  1 | hannibal_1.jpg |         3 |
|  2 | hannibal_2.jpg |         3 |
+----+----------------+-----------+

人脸识别库识别出未知图片后,返回图片的名字。我们接着使用类似下面的联合查询找到对应的个人。

select person.name, person.id from person inner join person_images as pi on person.id = pi.person_id where image_name = 'hannibal_2.jpg';

gRPC 调用返回的个人 ID 用于更新图片的 person 列。

NSQ

NSQ 是 Go 编写的小规模队列,可扩展且占用系统内存较少。NSQ 包含一个查询服务,用于消费者接收消息;包含一个守护进程,用于发送消息。

在 NSQ 的设计理念中,消息发送程序应该与守护进程在同一台主机上,故发送程序仅需发送至 localhost。但守护进程与查询服务相连接,这使其构成了全局队列。

这意味着有多少 NSQ 守护进程就有多少对应的发送程序。但由于其资源消耗极小,不会影响主程序的资源使用。

配置

为了尽可能增加灵活性以及使用 Kubernetes 的 ConfigSet 特性,我在开发过程中使用 .env 文件记录配置信息,例如数据库服务的地址以及 NSQ 的查询地址。在生产环境或 Kubernetes 环境中,我将使用环境变量属性配置。

应用小结

这就是待部署应用的全部架构信息。应用的各个组件都是可变更的,他们之间仅通过数据库、消息队列和 gRPC 进行耦合。考虑到更新机制的原理,这是部署分布式应用所必须的;在部署部分我会继续分析。

使用 Kubernetes 部署应用

基础知识

Kubernetes 是什么?

这里我会提到一些基础知识,但不会深入细节,细节可以用一本书的篇幅描述,例如 Kubernetes 构建与运行。另外,如果你愿意挑战自己,可以查看官方文档:Kubernetes 文档

Kubernetes 是容器化服务及应用的管理器。它易于扩展,可以管理大量容器;更重要的是,可以通过基于 yaml 的模板文件高度灵活地进行配置。人们经常把 Kubernetes 比作 Docker Swarm,但 Kubernetes 的功能不仅仅如此。例如,Kubernetes 不关心底层容器实现,你可以使用 LXC 与 Kubernetes 的组合,效果与使用 Docker 一样好。Kubernetes 在管理容器的基础上,可以管理已部署的服务或应用集群。如何操作呢?让我们概览一下用于构成 Kubernetes 的模块。

在 Kubernetes 中,你给出期望的应用状态,Kubernetes 会尽其所能达到对应的状态。状态可以是已部署、已暂停,有 2 个副本等,以此类推。

Kubernetes 使用标签和注释标记组件,包括服务、部署、副本组、守护进程组等在内的全部组件都被标记。考虑如下场景,为了识别 pod 与应用的对应关系,使用 app: myapp 标签。假设应用已部署 2 个容器,如果你移除其中一个容器的 app 标签,Kubernetes 只能识别到一个容器(隶属于应用),进而启动一个新的具有 myapp 标签的实例。

Kubernetes 集群

要使用 Kubernetes,需要先搭建一个 Kubernetes 集群。搭建 Kubernetes 集群可能是一个痛苦的经历,但所幸有工具可以帮助我们。Minikube 为我们在本地搭建一个单节点集群。AWS 的一个 beta 服务工作方式类似于 Kubernetes 集群,你只需请求节点并定义你的部署即可。Kubernetes 集群组件的文档如下:Kubernetes 集群组件

节点

节点 node 是工作单位,形式可以是虚拟机、物理机,也可以是各种类型的云主机。

Pod

Pod 是本地容器逻辑上组成的集合,即一个 Pod 中可能包含若干个容器。Pod 创建后具有自己的 DNS 和虚拟 IP,这样 Kubernetes 可以对到达流量进行负载均衡。你几乎不需要直接和容器打交道;即使是调试的时候,例如查看日志,你通常调用 kubectl logs deployment/your-app -f 查看部署日志,而不是使用 -c container_name 查看具体某个容器的日志。-f 参数表示从日志尾部进行流式输出。

部署

在 Kubernetes 中创建任何类型的资源时,后台使用一个 部署 deployment 组件,它指定了资源的期望状态。使用部署对象,你可以将 Pod 或服务变更为另外的状态,也可以更新应用或上线新版本应用。你一般不会直接操作副本组 (后续会描述),而是通过部署对象创建并管理。

服务

默认情况下,Pod 会获取一个 IP 地址。但考虑到 Pod 是 Kubernetes 中的易失性组件,我们需要更加持久的组件。不论是队列,MySQL、内部 API 或前端,都需要长期运行并使用保持不变的 IP 或更好的 DNS 记录。

为解决这个问题,Kubernetes 提供了 服务 service 组件,可以定义访问模式,支持的模式包括负载均衡、简单 IP 或内部 DNS。

Kubernetes 如何获知服务运行正常呢?你可以配置健康性检查和可用性检查。健康性检查是指检查容器是否处于运行状态,但容器处于运行状态并不意味着服务运行正常。对此,你应该使用可用性检查,即请求应用的一个特别 接口 endpoint

由于服务非常重要,推荐你找时间阅读以下文档:服务。严肃的说,需要阅读的东西很多,有 24 页 A4 纸的篇幅,涉及网络、服务及自动发现。这也有助于你决定是否真的打算在生产环境中使用 Kubernetes。

DNS / 服务发现

在 Kubernetes 集群中创建服务后,该服务会从名为 kube-proxykube-dns 的特殊 Kubernetes 部署中获取一个 DNS 记录。它们两个用于提供集群内的服务发现。如果你有一个正在运行的 MySQL 服务并配置 clusterIP: no,那么集群内部任何人都可以通过 mysql.default.svc.cluster.local 访问该服务,其中:

  • mysql – 服务的名称
  • default – 命名空间的名称
  • svc – 对应服务分类
  • cluster.local – 本地集群的域名

可以使用自定义设置更改本地集群的域名。如果想让服务可以从集群外访问,需要使用 DNS 服务,并使用例如 Nginx 将 IP 地址绑定至记录。服务对应的对外 IP 地址可以使用如下命令查询:

  • 节点端口方式 – kubectl get -o jsonpath="{.spec.ports[0].nodePort}" services mysql
  • 负载均衡方式 – kubectl get -o jsonpath="{.spec.ports[0].LoadBalancer}" services mysql

模板文件

类似 Docker Compose、TerraForm 或其它的服务管理工具,Kubernetes 也提供了基础设施描述模板。这意味着,你几乎不用手动操作。

以 Nginx 部署为例,查看下面的 yaml 模板:

apiVersion: apps/v1
kind: Deployment #(1)
metadata: #(2)
  name: nginx-deployment
  labels: #(3)
    app: nginx
spec: #(4)
  replicas: 3 #(5)
  selector:
    matchLabels:
      app: nginx
  template:
    metadata:
      labels:
        app: nginx
    spec:
      containers: #(6)
      - name: nginx
        image: nginx:1.7.9
        ports:
        - containerPort: 80

在这个示例部署中,我们做了如下操作:

  • (1) 使用 kind 关键字定义模板类型
  • (2) 使用 metadata 关键字,增加该部署的识别信息
  • (3) 使用 labels 标记每个需要创建的资源
  • (4) 然后使用 spec 关键字描述所需的状态
  • (5) nginx 应用需要 3 个副本
  • (6) Pod 中容器的模板定义部分
  • 容器名称为 nginx
  • 容器模板为 nginx:1.7.9 (本例使用 Docker 镜像)

副本组

副本组 ReplicaSet 是一个底层的副本管理器,用于保证运行正确数目的应用副本。相比而言,部署是更高层级的操作,应该用于管理副本组。除非你遇到特殊的情况,需要控制副本的特性,否则你几乎不需要直接操作副本组。

守护进程组

上面提到 Kubernetes 始终使用标签,还有印象吗? 守护进程组 DaemonSet 是一个控制器,用于确保守护进程化的应用一直运行在具有特定标签的节点中。

例如,你将所有节点增加 loggermission_critical 的标签,以便运行日志 / 审计服务的守护进程。接着,你创建一个守护进程组并使用 loggermission_critical 节点选择器。Kubernetes 会查找具有该标签的节点,确保守护进程的实例一直运行在这些节点中。因而,节点中运行的所有进程都可以在节点内访问对应的守护进程。

以我的应用为例,NSQ 守护进程可以用守护进程组实现。具体而言,将对应节点增加 recevier 标签,创建一个守护进程组并配置 receiver 应用选择器,这样这些节点上就会一直运行接收者组件。

守护进程组具有副本组的全部优势,可扩展且由 Kubernetes 管理,意味着 Kubernetes 管理其全生命周期的事件,确保持续运行,即使出现故障,也会立即替换。

扩展

在 Kubernetes 中,扩展是稀松平常的事情。副本组负责 Pod 运行的实例数目。就像你在 nginx 部署那个示例中看到的那样,对应设置项 replicas:3。我们可以按应用所需,让 Kubernetes 运行多份应用副本。

当然,设置项有很多。你可以指定让多个副本运行在不同的节点上,也可以指定各种不同的应用启动等待时间。想要在这方面了解更多,可以阅读 水平扩展Kubernetes 中的交互式扩展;当然 副本组 的细节对你也有帮助,毕竟 Kubernetes 中的扩展功能都来自于该模块。

Kubernetes 部分小结

Kubernetes 是容器编排的便捷工具,工作单元为 Pod,具有分层架构。最顶层是部署,用于操作其它资源,具有高度可配置性。对于你的每个命令调用,Kubernetes 提供了对应的 API,故理论上你可以编写自己的代码,向 Kubernetes API 发送数据,得到与 kubectl 命令同样的效果。

截至目前,Kubernetes 原生支持所有主流云服务供应商,而且完全开源。如果你愿意,可以贡献代码;如果你希望对工作原理有深入了解,可以查阅代码:GitHub 上的 Kubernetes 项目

Minikube

接下来我会使用 Minikube 这款本地 Kubernetes 集群模拟器。它并不擅长模拟多节点集群,但可以很容易地给你提供本地学习环境,让你开始探索,这很棒。Minikube 基于可高度调优的虚拟机,由 VirtualBox 类似的虚拟化工具提供。

我用到的全部 Kubernetes 模板文件可以在这里找到:Kubernetes 文件

注意:在你后续测试可扩展性时,会发现副本一直处于 Pending 状态,这是因为 minikube 集群中只有一个节点,不应该允许多副本运行在同一个节点上,否则明显只是耗尽了可用资源。使用如下命令可以查看可用资源:

kubectl get nodes -o yaml

构建容器

Kubernetes 支持大多数现有的容器技术。我这里使用 Docker。每一个构建的服务容器,对应代码库中的一个 Dockerfile 文件。我推荐你仔细阅读它们,其中大多数都比较简单。对于 Go 服务,我采用了最近引入的多步构建的方式。Go 服务基于 Alpine Linux 镜像创建。人脸识别程序使用 Python、NSQ 和 MySQL 使用对应的容器。

上下文

Kubernetes 使用命名空间。如果你不额外指定命名空间,Kubernetes 会使用 default 命名空间。为避免污染默认命名空间,我会一直指定命名空间,具体操作如下:

❯ kubectl config set-context kube-face-cluster --namespace=face
Context "kube-face-cluster" created.

创建上下文之后,应马上启用:

❯ kubectl config use-context kube-face-cluster
Switched to context "kube-face-cluster".

此后,所有 kubectl 命令都会使用 face 命名空间。

(LCTT 译注:作者后续并没有使用 face 命名空间,模板文件中的命名空间仍为 default,可能 face 命名空间用于开发环境。如果希望使用 face 命令空间,需要将内部 DNS 地址中的 default 改成 face;如果只是测试,可以不执行这两条命令。)

应用部署

Pods 和 服务概览:

MySQL

第一个要部署的服务是数据库。

按照 Kubernetes 的示例 Kubenetes MySQL 进行部署,即可以满足我的需求。注意:示例配置文件的 MYSQL\_PASSWORD 字段使用了明文密码,我将使用 Kubernetes Secrets 对象以提高安全性。

我创建了一个 Secret 对象,对应的本地 yaml 文件如下:

apiVersion: v1
kind: Secret
metadata:
  name: kube-face-secret
type: Opaque
data:
  mysql_password: base64codehere
  mysql_userpassword: base64codehere

其中 base64 编码通过如下命令生成:

echo -n "ubersecurepassword" | base64
echo -n "root:ubersecurepassword" | base64

(LCTT 译注:secret yaml 文件中的 data 应该有两条,一条对应 mysql_password,仅包含密码;另一条对应 mysql_userpassword,包含用户和密码。后文会用到 mysql_userpassword,但没有提及相应的生成)

我的部署 yaml 对应部分如下:

...
- name: MYSQL_ROOT_PASSWORD
  valueFrom:
    secretKeyRef:
      name: kube-face-secret
      key: mysql_password
...

另外值得一提的是,我使用卷将数据库持久化,卷对应的定义如下:

...
        volumeMounts:
        - name: mysql-persistent-storage
          mountPath: /var/lib/mysql
...
      volumes:
      - name: mysql-persistent-storage
        persistentVolumeClaim:
          claimName: mysql-pv-claim
...

其中 presistentVolumeClain 是关键,告知 Kubernetes 当前资源需要持久化存储。持久化存储的提供方式对用户透明。类似 Pods,如果想了解更多细节,参考文档:Kubernetes 持久化存储

(LCTT 译注:使用 presistentVolumeClain 之前需要创建 presistentVolume,对于单节点可以使用本地存储,对于多节点需要使用共享存储,因为 Pod 可以能调度到任何一个节点)

使用如下命令部署 MySQL 服务:

kubectl apply -f mysql.yaml

这里比较一下 createapplyapply 是一种 宣告式 declarative 的对象配置命令,而 create 命令式 imperative 的命令。当下我们需要知道的是, create 通常对应一项任务,例如运行某个组件或创建一个部署;相比而言,当我们使用 apply 的时候,用户并没有指定具体操作,Kubernetes 会根据集群目前的状态定义需要执行的操作。故如果不存在名为 mysql 的服务,当我执行 apply -f mysql.yaml 时,Kubernetes 会创建该服务。如果再次执行这个命令,Kubernetes 会忽略该命令。但如果我再次运行 create ,Kubernetes 会报错,告知服务已经创建。

想了解更多信息,请阅读如下文档:Kubernetes 对象管理命令式配置宣告式配置

运行如下命令查看执行进度信息:

# 描述完整信息
kubectl describe deployment mysql
# 仅描述 Pods 信息
kubectl get pods -l app=mysql

(第一个命令)输出示例如下:

...
  Type           Status  Reason
  ----           ------  ------
  Available      True    MinimumReplicasAvailable
  Progressing    True    NewReplicaSetAvailable
OldReplicaSets:  <none>
NewReplicaSet:   mysql-55cd6b9f47 (1/1 replicas created)
...

对于 get pods 命令,输出示例如下:

NAME                     READY     STATUS    RESTARTS   AGE
mysql-78dbbd9c49-k6sdv   1/1       Running   0          18s

可以使用下面的命令测试数据库实例:

kubectl run -it --rm --image=mysql:5.6 --restart=Never mysql-client -- mysql -h mysql -pyourpasswordhere

特别提醒:如果你在这里修改了密码,重新 apply 你的 yaml 文件并不能更新容器。因为数据库是持久化的,密码并不会改变。你需要先使用 kubectl delete -f mysql.yaml 命令删除整个部署。

运行 show databases 后,应该可以看到如下信息:

If you don't see a command prompt, try pressing enter.

mysql>
mysql>
mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| kube               |
| mysql              |
| performance_schema |
+--------------------+
4 rows in set (0.00 sec)

mysql> exit
Bye

你会注意到,我还将一个数据库初始化 SQL 文件挂载到容器中,MySQL 容器会自动运行该文件,导入我将用到的部分数据和模式。

对应的卷定义如下:

  volumeMounts:
  - name: mysql-persistent-storage
    mountPath: /var/lib/mysql
  - name: bootstrap-script
    mountPath: /docker-entrypoint-initdb.d/database_setup.sql
volumes:
- name: mysql-persistent-storage
  persistentVolumeClaim:
    claimName: mysql-pv-claim
- name: bootstrap-script
  hostPath:
    path: /Users/hannibal/golang/src/github.com/Skarlso/kube-cluster-sample/database_setup.sql
    type: File

(LCTT 译注:数据库初始化脚本需要改成对应的路径,如果是多节点,需要是共享存储中的路径。另外,作者给的 sql 文件似乎有误,person_images 表中的 person_id 列数字都小 1,作者默认 id 从 0 开始,但应该是从 1 开始)

运行如下命令查看引导脚本是否正确执行:

~/golang/src/github.com/Skarlso/kube-cluster-sample/kube_files master*
❯ kubectl run -it --rm --image=mysql:5.6 --restart=Never mysql-client -- mysql -h mysql -uroot -pyourpasswordhere kube
If you don't see a command prompt, try pressing enter.

mysql> show tables;
+----------------+
| Tables_in_kube |
+----------------+
| images         |
| person         |
| person_images  |
+----------------+
3 rows in set (0.00 sec)

mysql>

(LCTT 译注:上述代码块中的第一行是作者执行命令所在路径,执行第二行的命令无需在该目录中进行)

上述操作完成了数据库服务的初始化。使用如下命令可以查看服务日志:

kubectl logs deployment/mysql -f

NSQ 查询

NSQ 查询将以内部服务的形式运行。由于不需要外部访问,这里使用 clusterIP: None 在 Kubernetes 中将其设置为 无头服务 headless service ,意味着该服务不使用负载均衡模式,也不使用单独的服务 IP。DNS 将基于服务 选择器 selectors

我们的 NSQ 查询服务对应的选择器为:

  selector:
    matchLabels:
      app: nsqlookup

那么,内部 DNS 对应的实体类似于:nsqlookup.default.svc.cluster.local

无头服务的更多细节,可以参考:无头服务

NSQ 服务与 MySQL 服务大同小异,只需要少许修改即可。如前所述,我将使用 NSQ 原生的 Docker 镜像,名称为 nsqio/nsq。镜像包含了全部的 nsq 命令,故 nsqd 也将使用该镜像,只是使用的命令不同。对于 nsqlookupd,命令如下:

command: ["/nsqlookupd"]
args: ["--broadcast-address=nsqlookup.default.svc.cluster.local"]

你可能会疑惑,--broadcast-address 参数是做什么用的?默认情况下,nsqlookup 使用容器的主机名作为广播地址;这意味着,当用户运行回调时,回调试图访问的地址类似于 http://nsqlookup-234kf-asdf:4161/lookup?topics=image,但这显然不是我们期望的。将广播地址设置为内部 DNS 后,回调地址将是 http://nsqlookup.default.svc.cluster.local:4161/lookup?topic=images,这正是我们期望的。

NSQ 查询还需要转发两个端口,一个用于广播,另一个用于 nsqd 守护进程的回调。在 Dockerfile 中暴露相应端口,在 Kubernetes 模板中使用它们,类似如下:

容器模板:

        ports:
        - containerPort: 4160
          hostPort: 4160
        - containerPort: 4161
          hostPort: 4161

服务模板:

spec:
  ports:
  - name: main
    protocol: TCP
    port: 4160
    targetPort: 4160
  - name: secondary
    protocol: TCP
    port: 4161
    targetPort: 4161

端口名称是必须的,Kubernetes 基于名称进行区分。(LCTT 译注:端口名更新为作者 GitHub 对应文件中的名称)

像之前那样,使用如下命令创建服务:

kubectl apply -f nsqlookup.yaml

nsqlookupd 部分到此结束。截至目前,我们已经准备好两个主要的组件。

接收器

这部分略微复杂。接收器需要完成三项工作:

  • 创建一些部署
  • 创建 nsq 守护进程
  • 将本服务对外公开

部署

第一个要创建的部署是接收器本身,容器镜像为 skarlso/kube-receiver-alpine

NSQ 守护进程

接收器需要使用 NSQ 守护进程。如前所述,接收器在其内部运行一个 NSQ,这样与 nsq 的通信可以在本地进行,无需通过网络。为了让接收器可以这样操作,NSQ 需要与接收器部署在同一个节点上。

NSQ 守护进程也需要一些调整的参数配置:

        ports:
        - containerPort: 4150
          hostPort: 4150
        - containerPort: 4151
          hostPort: 4151
        env:
        - name: NSQLOOKUP_ADDRESS
          value: nsqlookup.default.svc.cluster.local
        - name: NSQ_BROADCAST_ADDRESS
          value: nsqd.default.svc.cluster.local
        command: ["/nsqd"]
        args: ["--lookupd-tcp-address=$(NSQLOOKUP_ADDRESS):4160", "--broadcast-address=$(NSQ_BROADCAST_ADDRESS)"]

其中我们配置了 lookup-tcp-addressbroadcast-address 参数。前者是 nslookup 服务的 DNS 地址,后者用于回调,就像 nsqlookupd 配置中那样。

对外公开

下面即将创建第一个对外公开的服务。有两种方式可供选择。考虑到该 API 负载较高,可以使用负载均衡的方式。另外,如果希望将其部署到生产环境中的任选节点,也应该使用负载均衡方式。

但由于我使用的本地集群只有一个节点,那么使用 NodePort 的方式就足够了。NodePort 方式将服务暴露在对应节点的固定端口上。如果未指定端口,将从 30000-32767 数字范围内随机选其一个。也可以指定端口,可以在模板文件中使用 nodePort 设置即可。可以通过 <NodeIP>:<NodePort> 访问该服务。如果使用多个节点,负载均衡可以将多个 IP 合并为一个 IP。

更多信息,请参考文档:服务发布

结合上面的信息,我们定义了接收器服务,对应的模板如下:

apiVersion: v1
kind: Service
metadata:
  name: receiver-service
spec:
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000
  selector:
    app: receiver
  type: NodePort

如果希望固定使用 8000 端口,需要增加 nodePort 配置,具体如下:

apiVersion: v1
kind: Service
metadata:
  name: receiver-service
spec:
  ports:
  - protocol: TCP
    port: 8000
    targetPort: 8000
  selector:
    app: receiver
  type: NodePort
  nodePort: 8000

(LCTT 译注:虽然作者没有写,但我们应该知道需要运行的部署命令 kubectl apply -f receiver.yaml。)

图片处理器

图片处理器用于将图片传送至识别组件。它需要访问 nslookupd、 mysql 以及后续部署的人脸识别服务的 gRPC 接口。事实上,这是一个无聊的服务,甚至其实并不是服务(LCTT 译注:第一个服务是指在整个架构中,图片处理器作为一个服务;第二个服务是指 Kubernetes 服务)。它并需要对外暴露端口,这是第一个只包含部署的组件。长话短说,下面是完整的模板:

---
apiVersion: apps/v1
kind: Deployment
metadata:
  name: image-processor-deployment
spec:
  selector:
    matchLabels:
      app: image-processor
  replicas: 1
  template:
    metadata:
      labels:
        app: image-processor
    spec:
      containers:
      - name: image-processor
        image: skarlso/kube-processor-alpine:latest
        env:
        - name: MYSQL_CONNECTION
          value: "mysql.default.svc.cluster.local"
        - name: MYSQL_USERPASSWORD
          valueFrom:
            secretKeyRef:
              name: kube-face-secret
              key: mysql_userpassword
        - name: MYSQL_PORT
          # TIL: If this is 3306 without " kubectl throws an error.
          value: "3306"
        - name: MYSQL_DBNAME
          value: kube
        - name: NSQ_LOOKUP_ADDRESS
          value: "nsqlookup.default.svc.cluster.local:4161"
        - name: GRPC_ADDRESS
          value: "face-recog.default.svc.cluster.local:50051"

文件中唯一需要提到的是用于配置应用的多个环境变量属性,主要关注 nsqlookupd 地址 和 gRPC 地址。

运行如下命令完成部署:

kubectl apply -f image_processor.yaml

人脸识别

人脸识别服务的确包含一个 Kubernetes 服务,具体而言是一个比较简单、仅供图片处理器使用的服务。模板如下:

apiVersion: v1
kind: Service
metadata:
  name: face-recog
spec:
  ports:
  - protocol: TCP
    port: 50051
    targetPort: 50051
  selector:
    app: face-recog
  clusterIP: None

更有趣的是,该服务涉及两个卷,分别为 known_peopleunknown_people。你能猜到卷中包含什么内容吗?对,是图片。known_people 卷包含所有新图片,接收器收到图片后将图片发送至该卷对应的路径,即挂载点。在本例中,挂载点为 /unknown_people,人脸识别服务需要能够访问该路径。

对于 Kubernetes 和 Docker 而言,这很容易。卷可以使用挂载的 S3 或 某种 nfs,也可以是宿主机到虚拟机的本地挂载。可选方式有很多 (至少有一打那么多)。为简洁起见,我将使用本地挂载方式。

挂载卷分为两步。第一步,需要在 Dockerfile 中指定卷:

VOLUME [ "/unknown_people", "/known_people" ]

第二步,就像之前为 MySQL Pod 挂载卷那样,需要在 Kubernetes 模板中配置;相比而言,这里使用 hostPath,而不是 MySQL 例子中的 PersistentVolumeClaim

        volumeMounts:
        - name: known-people-storage
          mountPath: /known_people
        - name: unknown-people-storage
          mountPath: /unknown_people
      volumes:
      - name: known-people-storage
        hostPath:
          path: /Users/hannibal/Temp/known_people
          type: Directory
      - name: unknown-people-storage
        hostPath:
          path: /Users/hannibal/Temp/
          type: Directory

(LCTT 译注:对于多节点模式,由于人脸识别服务和接收器服务可能不在一个节点上,故需要使用共享存储而不是节点本地存储。另外,出于 Python 代码的逻辑,推荐保持两个文件夹的嵌套结构,即 known\_people 作为子目录。)

我们还需要为 known_people 文件夹做配置设置,用于人脸识别程序。当然,使用环境变量属性可以完成该设置:

        env:
        - name: KNOWN_PEOPLE
          value: "/known_people"

Python 代码按如下方式搜索图片:

        known_people = os.getenv('KNOWN_PEOPLE', 'known_people')
        print("Known people images location is: %s" % known_people)
        images = self.image_files_in_folder(known_people)

其中 image_files_in_folder 函数定义如下:

    def image_files_in_folder(self, folder):
        return [os.path.join(folder, f) for f in os.listdir(folder) if re.match(r'.*\.(jpg|jpeg|png)', f, flags=re.I)]

看起来不错。

如果接收器现在收到一个类似下面的请求(接收器会后续将其发送出去):

curl -d '{"path":"/unknown_people/unknown220.jpg"}' http://192.168.99.100:30251/image/post

图像处理器会在 /unknown_people 目录搜索名为 unknown220.jpg 的图片,接着在 known_folder 文件中找到 unknown220.jpg 对应个人的图片,最后返回匹配图片的名称。

查看日志,大致信息如下:

# 接收器
❯ curl -d '{"path":"/unknown_people/unknown219.jpg"}' http://192.168.99.100:30251/image/post
got path: {Path:/unknown_people/unknown219.jpg}
image saved with id: 4
image sent to nsq

# 图片处理器
2018/03/26 18:11:21 INF    1 [images/ch] querying nsqlookupd http://nsqlookup.default.svc.cluster.local:4161/lookup?topic=images
2018/03/26 18:11:59 Got a message: 4
2018/03/26 18:11:59 Processing image id:  4
2018/03/26 18:12:00 got person:  Hannibal
2018/03/26 18:12:00 updating record with person id
2018/03/26 18:12:00 done

我们已经使用 Kubernetes 部署了应用正常工作所需的全部服务。

前端

更进一步,可以使用简易的 Web 应用更好的显示数据库中的信息。这也是一个对外公开的服务,使用的参数可以参考接收器。

部署后效果如下:

回顾

到目前为止我们做了哪些操作呢?我一直在部署服务,用到的命令汇总如下:

kubectl apply -f mysql.yaml
kubectl apply -f nsqlookup.yaml
kubectl apply -f receiver.yaml
kubectl apply -f image_processor.yaml
kubectl apply -f face_recognition.yaml
kubectl apply -f frontend.yaml

命令顺序可以打乱,因为除了图片处理器的 NSQ 消费者外的应用在启动时并不会建立连接,而且图片处理器的 NSQ 消费者会不断重试。

使用 kubectl get pods 查询正在运行的 Pods,示例如下:

❯ kubectl get pods
NAME                                          READY     STATUS    RESTARTS   AGE
face-recog-6bf449c6f-qg5tr                    1/1       Running   0          1m
image-processor-deployment-6467468c9d-cvx6m   1/1       Running   0          31s
mysql-7d667c75f4-bwghw                        1/1       Running   0          36s
nsqd-584954c44c-299dz                         1/1       Running   0          26s
nsqlookup-7f5bdfcb87-jkdl7                    1/1       Running   0          11s
receiver-deployment-5cb4797598-sf5ds          1/1       Running   0          26s

运行 minikube service list

❯ minikube service list
|-------------|----------------------|-----------------------------|
|  NAMESPACE  |         NAME         |             URL             |
|-------------|----------------------|-----------------------------|
| default     | face-recog           | No node port                |
| default     | kubernetes           | No node port                |
| default     | mysql                | No node port                |
| default     | nsqd                 | No node port                |
| default     | nsqlookup            | No node port                |
| default     | receiver-service     | http://192.168.99.100:30251 |
| kube-system | kube-dns             | No node port                |
| kube-system | kubernetes-dashboard | http://192.168.99.100:30000 |
|-------------|----------------------|-----------------------------|

滚动更新

滚动更新 Rolling Update 过程中会发生什么呢?

在软件开发过程中,需要变更应用的部分组件是常有的事情。如果我希望在不影响其它组件的情况下变更一个组件,我们的集群会发生什么变化呢?我们还需要最大程度的保持向后兼容性,以免影响用户体验。谢天谢地,Kubernetes 可以帮我们做到这些。

目前的 API 一次只能处理一个图片,不能批量处理,对此我并不满意。

代码

目前,我们使用下面的代码段处理单个图片的情形:

// PostImage 对图片提交做出响应,将图片信息保存到数据库中
// 并将该信息发送给 NSQ 以供后续处理使用
func PostImage(w http.ResponseWriter, r *http.Request) {
...
}

func main() {
    router := mux.NewRouter()
    router.HandleFunc("/image/post", PostImage).Methods("POST")
    log.Fatal(http.ListenAndServe(":8000", router))
}

我们有两种选择。一种是增加新接口 /images/post 给用户使用;另一种是在原接口基础上修改。

新版客户端有回退特性,在新接口不可用时回退使用旧接口。但旧版客户端没有这个特性,故我们不能马上修改代码逻辑。考虑如下场景,你有 90 台服务器,计划慢慢执行滚动更新,依次对各台服务器进行业务更新。如果一台服务需要大约 1 分钟更新业务,那么整体更新完成需要大约 1 个半小时的时间(不考虑并行更新的情形)。

更新过程中,一些服务器运行新代码,一些服务器运行旧代码。用户请求被负载均衡到各个节点,你无法控制请求到达哪台服务器。如果客户端的新接口请求被调度到运行旧代码的服务器,请求会失败;客户端可能会回退使用旧接口,(但由于我们已经修改旧接口,本质上仍然是调用新接口),故除非请求刚好到达到运行新代码的服务器,否则一直都会失败。这里我们假设不使用 粘性会话 sticky sessions

而且,一旦所有服务器更新完毕,旧版客户端不再能够使用你的服务。

这里,你可能会说你并不需要保留旧代码;某些情况下,确实如此。因此,我们打算直接修改旧代码,让其通过少量参数调用新代码。这样操作操作相当于移除了旧代码。当所有客户端迁移完毕后,这部分代码也可以安全地删除。

新的接口

让我们添加新的路由方法:

...
router.HandleFunc("/images/post", PostImages).Methods("POST")
...

更新旧的路由方法,使其调用新的路由方法,修改部分如下:

// PostImage 对图片提交做出响应,将图片信息保存到数据库中
// 并将该信息发送给 NSQ 以供后续处理使用
func PostImage(w http.ResponseWriter, r *http.Request) {
    var p Path
    err := json.NewDecoder(r.Body).Decode(&p)
    if err != nil {
      fmt.Fprintf(w, "got error while decoding body: %s", err)
      return
    }
    fmt.Fprintf(w, "got path: %+v\n", p)
    var ps Paths
    paths := make([]Path, 0)
    paths = append(paths, p)
    ps.Paths = paths
    var pathsJSON bytes.Buffer
    err = json.NewEncoder(&pathsJSON).Encode(ps)
    if err != nil {
      fmt.Fprintf(w, "failed to encode paths: %s", err)
      return
    }
    r.Body = ioutil.NopCloser(&pathsJSON)
    r.ContentLength = int64(pathsJSON.Len())
    PostImages(w, r)
}

当然,方法名可能容易混淆,但你应该能够理解我想表达的意思。我将请求中的单个路径封装成新方法所需格式,然后将其作为请求发送给新接口处理。仅此而已。在 滚动更新批量图片的 PR 中可以找到更多的修改方式。

至此,我们使用两种方法调用接收器:

# 单路径模式
curl -d '{"path":"unknown4456.jpg"}' http://127.0.0.1:8000/image/post

# 多路径模式
curl -d '{"paths":[{"path":"unknown4456.jpg"}]}' http://127.0.0.1:8000/images/post

这里用到的客户端是 curl。一般而言,如果客户端本身是一个服务,我会做一些修改,在新接口返回 404 时继续尝试旧接口。

为了简洁,我不打算为 NSQ 和其它组件增加批量图片处理的能力。这些组件仍然是一次处理一个图片。这部分修改将留给你作为扩展内容。 :)

新镜像

为实现滚动更新,我首先需要为接收器服务创建一个新的镜像。新镜像使用新标签,告诉大家版本号为 v1.1。

docker build -t skarlso/kube-receiver-alpine:v1.1 .

新镜像创建后,我们可以开始滚动更新了。

滚动更新

在 Kubernetes 中,可以使用多种方式完成滚动更新。

手动更新

不妨假设在我配置文件中使用的容器版本为 v1.0,那么实现滚动更新只需运行如下命令:

kubectl rolling-update receiver --image:skarlso/kube-receiver-alpine:v1.1

如果滚动更新过程中出现问题,我们总是可以回滚:

kubectl rolling-update receiver --rollback

容器将回滚到使用上一个版本镜像,操作简捷无烦恼。

应用新的配置文件

手动更新的不足在于无法版本管理。

试想下面的场景。你使用手工更新的方式对若干个服务器进行滚动升级,但其它人并不知道这件事。之后,另外一个人修改了模板文件并将其应用到集群中,更新了全部服务器;更新过程中,突然发现服务不可用了。

长话短说,由于模板无法识别已经手动更新的服务器,这些服务器会按模板变更成错误的状态。这种做法很危险,千万不要这样做。

推荐的做法是,使用新版本信息更新模板文件,然后使用 apply 命令应用模板文件。

对于滚动扩展,Kubernetes 推荐通过部署结合副本组完成。但这意味着待滚动更新的应用至少有 2 个副本,否则无法完成 (除非将 maxUnavailable 设置为 1)。我在模板文件中增加了副本数量、设置了接收器容器的新镜像版本。

  replicas: 2
...
    spec:
      containers:
      - name: receiver
        image: skarlso/kube-receiver-alpine:v1.1
...

更新过程中,你会看到如下信息:

❯ kubectl rollout status deployment/receiver-deployment
Waiting for rollout to finish: 1 out of 2 new replicas have been updated...

通过在模板中增加 strategy 段,你可以增加更多的滚动扩展配置:

  strategy:
    type: RollingUpdate
    rollingUpdate:
      maxSurge: 1
      maxUnavailable: 0

关于滚动更新的更多信息,可以参考如下文档:部署的滚动更新部署的更新部署的管理使用副本控制器完成滚动更新等。

MINIKUBE 用户需要注意:由于我们使用单个主机上使用单节点配置,应用只有 1 份副本,故需要将 maxUnavailable 设置为 1。否则 Kubernetes 会阻止更新,新版本会一直处于 Pending 状态;这是因为我们在任何时刻都不允许出现没有(正在运行的) receiver 容器的场景。

扩展

Kubernetes 让扩展成为相当容易的事情。由于 Kubernetes 管理整个集群,你仅需在模板文件中添加你需要的副本数目即可。

这篇文章已经比较全面了,但文章的长度也越来越长。我计划再写一篇后续文章,在 AWS 上使用多节点、多副本方式实现扩展。敬请期待。

清理环境

kubectl delete deployments --all
kubectl delete services -all

写在最后的话

各位看官,本文就写到这里了。我们在 Kubernetes 上编写、部署、更新和扩展(老实说,并没有实现)了一个分布式应用。

如果你有任何疑惑,请在下面的评论区留言交流,我很乐意回答相关问题。

希望阅读本文让你感到愉快。我知道,这是一篇相对长的文章,我也曾经考虑进行拆分;但整合在一起的单页教程也有其好处,例如利于搜索、保存页面或更进一步将页面打印为 PDF 文档。

Gergely 感谢你阅读本文。


via: https://skarlso.github.io/2018/03/15/kubernetes-distributed-application/

作者:hannibal 译者:pinewall 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

了解区块链是如何工作的最快的方法是构建一个。

你看到这篇文章是因为和我一样,对加密货币的大热而感到兴奋。并且想知道区块链是如何工作的 —— 它们背后的技术基础是什么。

但是理解区块链并不容易 —— 至少对我来说是这样。我徜徉在各种难懂的视频中,并且因为示例太少而陷入深深的挫败感中。

我喜欢在实践中学习。这会使得我在代码层面上处理主要问题,从而可以让我坚持到底。如果你也是这么做的,在本指南结束的时候,你将拥有一个功能正常的区块链,并且实实在在地理解了它的工作原理。

开始之前 …

记住,区块链是一个 不可更改的、有序的 记录(被称为区块)的链。它们可以包括 交易 transaction 、文件或者任何你希望的真实数据。最重要的是它们是通过使用哈希链接到一起的。

如果你不知道哈希是什么,这里有解释

本指南的目标读者是谁? 你应该能轻松地读、写一些基本的 Python 代码,并能够理解 HTTP 请求是如何工作的,因为我们讨论的区块链将基于 HTTP。

我需要做什么? 确保安装了 Python 3.6+(以及 pip),还需要去安装 Flask 和非常好用的 Requests 库:

pip install Flask==0.12.2 requests==2.18.4 

当然,你也需要一个 HTTP 客户端,像 Postman 或者 cURL。哪个都行。

最终的代码在哪里可以找到? 源代码在 这里

第 1 步:构建一个区块链

打开你喜欢的文本编辑器或者 IDE,我个人喜欢 PyCharm。创建一个名为 blockchain.py 的新文件。我将仅使用一个文件,如果你看晕了,可以去参考 源代码

描述一个区块链

我们将创建一个 Blockchain 类,它的构造函数将去初始化一个空列表(去存储我们的区块链),以及另一个列表去保存交易。下面是我们的类规划:

class Blockchain(object):
    def __init__(self):
        self.chain = []
        self.current_transactions = []

    def new_block(self):
        # Creates a new Block and adds it to the chain
        pass

    def new_transaction(self):
        # Adds a new transaction to the list of transactions
        pass

    @staticmethod
    def hash(block):
        # Hashes a Block
        pass

    @property
    def last_block(self):
        # Returns the last Block in the chain
        pass

我们的 Blockchain 类的原型

我们的 Blockchain 类负责管理链。它将存储交易并且有一些为链中增加新区块的辅助性质的方法。现在我们开始去充实一些类的方法。

区块是什么样子的?

每个区块有一个索引、一个时间戳(Unix 时间)、一个交易的列表、一个证明(后面会详细解释)、以及前一个区块的哈希。

单个区块的示例应该是下面的样子:

block = {
    'index': 1,
    'timestamp': 1506057125.900785,
    'transactions': [
        {
            'sender': "8527147fe1f5426f9dd545de4b27ee00",
            'recipient': "a77f5cdfa2934df3954a5c7c7da5df1f",
            'amount': 5,
        }
    ],
    'proof': 324984774000,
    'previous_hash': "2cf24dba5fb0a30e26e83b2ac5b9e29e1b161e5c1fa7425e73043362938b9824"
}

我们的区块链中的块示例

此刻,链的概念应该非常明显 —— 每个新区块包含它自身的信息和前一个区域的哈希。这一点非常重要,因为这就是区块链不可更改的原因:如果攻击者修改了一个早期的区块,那么所有的后续区块将包含错误的哈希。

这样做有意义吗?如果没有,就让时间来埋葬它吧 —— 这就是区块链背后的核心思想。

添加交易到一个区块

我们将需要一种区块中添加交易的方式。我们的 new_transaction() 就是做这个的,它非常简单明了:

class Blockchain(object):
    ...

    def new_transaction(self, sender, recipient, amount):
        """
        Creates a new transaction to go into the next mined Block
        :param sender: <str> Address of the Sender
        :param recipient: <str> Address of the Recipient
        :param amount: <int> Amount
        :return: <int> The index of the Block that will hold this transaction
        """

        self.current_transactions.append({
            'sender': sender,
            'recipient': recipient,
            'amount': amount,
        })

        return self.last_block['index'] + 1

new_transaction() 运行后将在列表中添加一个交易,它返回添加交易后的那个区块的索引 —— 那个区块接下来将被挖矿。提交交易的用户后面会用到这些。

创建新区块

当我们的 Blockchain 被实例化后,我们需要一个创世区块(一个没有祖先的区块)来播种它。我们也需要去添加一些 “证明” 到创世区块,它是挖矿(工作量证明 PoW)的成果。我们在后面将讨论更多挖矿的内容。

除了在我们的构造函数中创建创世区块之外,我们还需要写一些方法,如 new_block()new_transaction() 以及 hash()

import hashlib
import json
from time import time


class Blockchain(object):
    def __init__(self):
        self.current_transactions = []
        self.chain = []

        # Create the genesis block
        self.new_block(previous_hash=1, proof=100)

    def new_block(self, proof, previous_hash=None):
        """
        Create a new Block in the Blockchain
        :param proof: <int> The proof given by the Proof of Work algorithm
        :param previous_hash: (Optional) <str> Hash of previous Block
        :return: <dict> New Block
        """

        block = {
            'index': len(self.chain) + 1,
            'timestamp': time(),
            'transactions': self.current_transactions,
            'proof': proof,
            'previous_hash': previous_hash or self.hash(self.chain[-1]),
        }

        # Reset the current list of transactions
        self.current_transactions = []

        self.chain.append(block)
        return block

    def new_transaction(self, sender, recipient, amount):
        """
        Creates a new transaction to go into the next mined Block
        :param sender: <str> Address of the Sender
        :param recipient: <str> Address of the Recipient
        :param amount: <int> Amount
        :return: <int> The index of the Block that will hold this transaction
        """
        self.current_transactions.append({
            'sender': sender,
            'recipient': recipient,
            'amount': amount,
        })

        return self.last_block['index'] + 1

    @property
    def last_block(self):
        return self.chain[-1]

    @staticmethod
    def hash(block):
        """
        Creates a SHA-256 hash of a Block
        :param block: <dict> Block
        :return: <str>
        """

        # We must make sure that the Dictionary is Ordered, or we'll have inconsistent hashes
        block_string = json.dumps(block, sort_keys=True).encode()
        return hashlib.sha256(block_string).hexdigest()

上面的内容简单明了 —— 我添加了一些注释和文档字符串,以使代码清晰可读。到此为止,表示我们的区块链基本上要完成了。但是,你肯定想知道新区块是如何被创建、打造或者挖矿的。

理解工作量证明

工作量证明 Proof of Work (PoW)算法是在区块链上创建或者挖出新区块的方法。PoW 的目标是去撞出一个能够解决问题的数字。这个数字必须满足“找到它很困难但是验证它很容易”的条件 —— 网络上的任何人都可以计算它。这就是 PoW 背后的核心思想。

我们来看一个非常简单的示例来帮助你了解它。

我们来解决一个问题,一些整数 x 乘以另外一个整数 y 的结果的哈希值必须以 0 结束。因此,hash(x * y) = ac23dc…0。为简单起见,我们先把 x = 5 固定下来。在 Python 中的实现如下:

from hashlib import sha256

x = 5
y = 0  # We don't know what y should be yet...

while sha256(f'{x*y}'.encode()).hexdigest()[-1] != "0":
    y += 1

print(f'The solution is y = {y}')

在这里的答案是 y = 21。因为它产生的哈希值是以 0 结尾的:

hash(5 * 21) = 1253e9373e...5e3600155e860

在比特币中,工作量证明算法被称之为 Hashcash。与我们上面的例子没有太大的差别。这就是矿工们进行竞赛以决定谁来创建新块的算法。一般来说,其难度取决于在一个字符串中所查找的字符数量。然后矿工会因其做出的求解而得到奖励的币——在一个交易当中。

网络上的任何人都可以很容易地去核验它的答案。

实现基本的 PoW

为我们的区块链来实现一个简单的算法。我们的规则与上面的示例类似:

找出一个数字 p,它与前一个区块的答案进行哈希运算得到一个哈希值,这个哈希值的前四位必须是由 0 组成。
import hashlib
import json

from time import time
from uuid import uuid4


class Blockchain(object):
    ...

    def proof_of_work(self, last_proof):
        """
        Simple Proof of Work Algorithm:
         - Find a number p' such that hash(pp') contains leading 4 zeroes, where p is the previous p'
         - p is the previous proof, and p' is the new proof
        :param last_proof: <int>
        :return: <int>
        """

        proof = 0
        while self.valid_proof(last_proof, proof) is False:
            proof += 1

        return proof

    @staticmethod
    def valid_proof(last_proof, proof):
        """
        Validates the Proof: Does hash(last_proof, proof) contain 4 leading zeroes?
        :param last_proof: <int> Previous Proof
        :param proof: <int> Current Proof
        :return: <bool> True if correct, False if not.
        """

        guess = f'{last_proof}{proof}'.encode()
        guess_hash = hashlib.sha256(guess).hexdigest()
        return guess_hash[:4] == "0000"

为了调整算法的难度,我们可以修改前导 0 的数量。但是 4 个零已经足够难了。你会发现,将前导 0 的数量每增加一,那么找到正确答案所需要的时间难度将大幅增加。

我们的类基本完成了,现在我们开始去使用 HTTP 请求与它交互。

第 2 步:以 API 方式去访问我们的区块链

我们将使用 Python Flask 框架。它是个微框架,使用它去做端点到 Python 函数的映射很容易。这样我们可以使用 HTTP 请求基于 web 来与我们的区块链对话。

我们将创建三个方法:

  • /transactions/new 在一个区块上创建一个新交易
  • /mine 告诉我们的服务器去挖矿一个新区块
  • /chain 返回完整的区块链

配置 Flask

我们的 “服务器” 将在我们的区块链网络中产生一个单个的节点。我们来创建一些样板代码:

import hashlib
import json
from textwrap import dedent
from time import time
from uuid import uuid4

from flask import Flask


class Blockchain(object):
    ...


# Instantiate our Node
app = Flask(__name__)

# Generate a globally unique address for this node
node_identifier = str(uuid4()).replace('-', '')

# Instantiate the Blockchain
blockchain = Blockchain()


@app.route('/mine', methods=['GET'])
def mine():
    return "We'll mine a new Block"

@app.route('/transactions/new', methods=['POST'])
def new_transaction():
    return "We'll add a new transaction"

@app.route('/chain', methods=['GET'])
def full_chain():
    response = {
        'chain': blockchain.chain,
        'length': len(blockchain.chain),
    }
    return jsonify(response), 200

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

对上面的代码,我们做添加一些详细的解释:

  • Line 15:实例化我们的节点。更多关于 Flask 的知识读 这里
  • Line 18:为我们的节点创建一个随机的名字。
  • Line 21:实例化我们的区块链类。
  • Line 24–26:创建 /mine 端点,这是一个 GET 请求。
  • Line 28–30:创建 /transactions/new 端点,这是一个 POST 请求,因为我们要发送数据给它。
  • Line 32–38:创建 /chain 端点,它返回全部区块链。
  • Line 40–41:在 5000 端口上运行服务器。

交易端点

这就是对一个交易的请求,它是用户发送给服务器的:

{
 "sender": "my address",
 "recipient": "someone else's address",
 "amount": 5
}

因为我们已经有了添加交易到块中的类方法,剩下的就很容易了。让我们写个函数来添加交易:

import hashlib
import json
from textwrap import dedent
from time import time
from uuid import uuid4

from flask import Flask, jsonify, request

...

@app.route('/transactions/new', methods=['POST'])
def new_transaction():
    values = request.get_json()

    # Check that the required fields are in the POST'ed data
    required = ['sender', 'recipient', 'amount']
    if not all(k in values for k in required):
        return 'Missing values', 400

    # Create a new Transaction
    index = blockchain.new_transaction(values['sender'], values['recipient'], values['amount'])

    response = {'message': f'Transaction will be added to Block {index}'}
    return jsonify(response), 201

创建交易的方法

挖矿端点

我们的挖矿端点是见证奇迹的地方,它实现起来很容易。它要做三件事情:

  1. 计算工作量证明
  2. 因为矿工(我们)添加一个交易而获得报酬,奖励矿工(我们) 1 个币
  3. 通过将它添加到链上而打造一个新区块
import hashlib
import json

from time import time
from uuid import uuid4

from flask import Flask, jsonify, request

...

@app.route('/mine', methods=['GET'])
def mine():
    # We run the proof of work algorithm to get the next proof...
    last_block = blockchain.last_block
    last_proof = last_block['proof']
    proof = blockchain.proof_of_work(last_proof)

    # We must receive a reward for finding the proof.
    # The sender is "0" to signify that this node has mined a new coin.
    blockchain.new_transaction(
        sender="0",
        recipient=node_identifier,
        amount=1,
    )

    # Forge the new Block by adding it to the chain
    previous_hash = blockchain.hash(last_block)
    block = blockchain.new_block(proof, previous_hash)

    response = {
        'message': "New Block Forged",
        'index': block['index'],
        'transactions': block['transactions'],
        'proof': block['proof'],
        'previous_hash': block['previous_hash'],
    }
    return jsonify(response), 200

注意,挖掘出的区块的接收方是我们的节点地址。现在,我们所做的大部分工作都只是与我们的 Blockchain 类的方法进行交互的。到目前为止,我们已经做完了,现在开始与我们的区块链去交互。

第 3 步:与我们的区块链去交互

你可以使用简单的 cURL 或者 Postman 通过网络与我们的 API 去交互。

启动服务器:

$ python blockchain.py
* Running on http://127.0.0.1:5000/ (Press CTRL+C to quit)

我们通过生成一个 GET 请求到 http://localhost:5000/mine 去尝试挖一个区块:

使用 Postman 去生成一个 GET 请求

我们通过生成一个 POST 请求到 http://localhost:5000/transactions/new 去创建一个区块,请求数据包含我们的交易结构:

使用 Postman 去生成一个 POST 请求

如果你不使用 Postman,也可以使用 cURL 去生成一个等价的请求:

$ curl -X POST -H "Content-Type: application/json" -d '{
 "sender": "d4ee26eee15148ee92c6cd394edd974e",
 "recipient": "someone-other-address",
 "amount": 5
}' "http://localhost:5000/transactions/new"

我重启动我的服务器,然后我挖到了两个区块,这样总共有了 3 个区块。我们通过请求 http://localhost:5000/chain 来检查整个区块链:

{
  "chain": [
    {
      "index": 1,
      "previous_hash": 1,
      "proof": 100,
      "timestamp": 1506280650.770839,
      "transactions": []
    },
    {
      "index": 2,
      "previous_hash": "c099bc...bfb7",
      "proof": 35293,
      "timestamp": 1506280664.717925,
      "transactions": [
        {
          "amount": 1,
          "recipient": "8bbcb347e0634905b0cac7955bae152b",
          "sender": "0"
        }
      ]
    },
    {
      "index": 3,
      "previous_hash": "eff91a...10f2",
      "proof": 35089,
      "timestamp": 1506280666.1086972,
      "transactions": [
        {
          "amount": 1,
          "recipient": "8bbcb347e0634905b0cac7955bae152b",
          "sender": "0"
        }
      ]
    }
  ],
  "length": 3
}

第 4 步:共识

这是很酷的一个地方。我们已经有了一个基本的区块链,它可以接收交易并允许我们去挖掘出新区块。但是区块链的整个重点在于它是 去中心化的 decentralized 。而如果它们是去中心化的,那我们如何才能确保它们表示在同一个区块链上?这就是 共识 Consensus 问题,如果我们希望在我们的网络上有多于一个的节点运行,那么我们将必须去实现一个共识算法。

注册新节点

在我们能实现一个共识算法之前,我们需要一个办法去让一个节点知道网络上的邻居节点。我们网络上的每个节点都保留有一个该网络上其它节点的注册信息。因此,我们需要更多的端点:

  1. /nodes/register 以 URL 的形式去接受一个新节点列表
  2. /nodes/resolve 去实现我们的共识算法,由它来解决任何的冲突 —— 确保节点有一个正确的链。

我们需要去修改我们的区块链的构造函数,来提供一个注册节点的方法:

...
from urllib.parse import urlparse
...


class Blockchain(object):
    def __init__(self):
        ...
        self.nodes = set()
        ...

    def register_node(self, address):
        """
        Add a new node to the list of nodes
        :param address: <str> Address of node. Eg. 'http://192.168.0.5:5000'
        :return: None
        """

        parsed_url = urlparse(address)
        self.nodes.add(parsed_url.netloc)

一个添加邻居节点到我们的网络的方法

注意,我们将使用一个 set() 去保存节点列表。这是一个非常合算的方式,它将确保添加的节点是 幂等 idempotent 的 —— 这意味着不论你将特定的节点添加多少次,它都是精确地只出现一次。

实现共识算法

正如前面提到的,当一个节点与另一个节点有不同的链时就会产生冲突。为解决冲突,我们制定一个规则,即最长的有效的链才是权威的链。换句话说就是,网络上最长的链就是事实上的区块链。使用这个算法,可以在我们的网络上节点之间达到共识。

...
import requests


class Blockchain(object)
    ...

    def valid_chain(self, chain):
        """
        Determine if a given blockchain is valid
        :param chain: <list> A blockchain
        :return: <bool> True if valid, False if not
        """

        last_block = chain[0]
        current_index = 1

        while current_index < len(chain):
            block = chain[current_index]
            print(f'{last_block}')
            print(f'{block}')
            print("\n-----------\n")
            # Check that the hash of the block is correct
            if block['previous_hash'] != self.hash(last_block):
                return False

            # Check that the Proof of Work is correct
            if not self.valid_proof(last_block['proof'], block['proof']):
                return False

            last_block = block
            current_index += 1

        return True

    def resolve_conflicts(self):
        """
        This is our Consensus Algorithm, it resolves conflicts
        by replacing our chain with the longest one in the network.
        :return: <bool> True if our chain was replaced, False if not
        """

        neighbours = self.nodes
        new_chain = None

        # We're only looking for chains longer than ours
        max_length = len(self.chain)

        # Grab and verify the chains from all the nodes in our network
        for node in neighbours:
            response = requests.get(f'http://{node}/chain')

            if response.status_code == 200:
                length = response.json()['length']
                chain = response.json()['chain']

                # Check if the length is longer and the chain is valid
                if length > max_length and self.valid_chain(chain):
                    max_length = length
                    new_chain = chain

        # Replace our chain if we discovered a new, valid chain longer than ours
        if new_chain:
            self.chain = new_chain
            return True

        return False

第一个方法 valid_chain() 是负责来检查链是否有效,它通过遍历区块链上的每个区块并验证它们的哈希和工作量证明来检查这个区块链是否有效。

resolve_conflicts() 方法用于遍历所有的邻居节点,下载它们的链并使用上面的方法去验证它们是否有效。如果找到有效的链,确定谁是最长的链,然后我们就用最长的链来替换我们的当前的链。

在我们的 API 上来注册两个端点,一个用于添加邻居节点,另一个用于解决冲突:

@app.route('/nodes/register', methods=['POST'])
def register_nodes():
    values = request.get_json()

    nodes = values.get('nodes')
    if nodes is None:
        return "Error: Please supply a valid list of nodes", 400

    for node in nodes:
        blockchain.register_node(node)

    response = {
        'message': 'New nodes have been added',
        'total_nodes': list(blockchain.nodes),
    }
    return jsonify(response), 201


@app.route('/nodes/resolve', methods=['GET'])
def consensus():
    replaced = blockchain.resolve_conflicts()

    if replaced:
        response = {
            'message': 'Our chain was replaced',
            'new_chain': blockchain.chain
        }
    else:
        response = {
            'message': 'Our chain is authoritative',
            'chain': blockchain.chain
        }

    return jsonify(response), 200

这种情况下,如果你愿意,可以使用不同的机器来做,然后在你的网络上启动不同的节点。或者是在同一台机器上使用不同的端口启动另一个进程。我是在我的机器上使用了不同的端口启动了另一个节点,并将它注册到了当前的节点上。因此,我现在有了两个节点:http://localhost:5000http://localhost:5001

注册一个新节点

我接着在节点 2 上挖出一些新区块,以确保这个链是最长的。之后我在节点 1 上以 GET 方式调用了 /nodes/resolve,这时,节点 1 上的链被共识算法替换成节点 2 上的链了:

工作中的共识算法

然后将它们封装起来 … 找一些朋友来帮你一起测试你的区块链。


我希望以上内容能够鼓舞你去创建一些新的东西。我是加密货币的狂热拥护者,因此我相信区块链将迅速改变我们对经济、政府和记录保存的看法。

更新: 我正计划继续它的第二部分,其中我将扩展我们的区块链,使它具备交易验证机制,同时讨论一些你可以在其上产生你自己的区块链的方式。(LCTT 译注:第二篇并没有~!)


via: https://hackernoon.com/learn-blockchains-by-building-one-117428612f46

作者:Daniel van Flymen 译者:qhwdw 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

Python Sets: What, Why and How

Python 配备了几种内置数据类型来帮我们组织数据。这些结构包括列表、字典、元组和集合。

根据 Python 3 文档:

集合是一个无序集合,没有重复元素。基本用途包括成员测试消除重复的条目。集合对象还支持数学运算,如并集交集差集对等差分

在本文中,我们将回顾并查看上述定义中列出的每个要素的示例。让我们马上开始,看看如何创建它。

初始化一个集合

有两种方法可以创建一个集合:一个是给内置函数 set() 提供一个元素列表,另一个是使用花括号 {}

使用内置函数 set() 来初始化一个集合:

>>> s1 = set([1, 2, 3])
>>> s1
{1, 2, 3}
>>> type(s1)
<class 'set'>

使用 {}

>>> s2 = {3, 4, 5}
>>> s2
{3, 4, 5}
>>> type(s2)
<class 'set'>
>>>

如你所见,这两种方法都是有效的。但问题是,如果我们想要一个空的集合呢?

>>> s = {}
>>> type(s)
<class 'dict'>

没错,如果我们使用空花括号,我们将得到一个字典而不是一个集合。=)

值得一提的是,为了简单起见,本文中提供的所有示例都将使用整数集合,但集合可以包含 Python 支持的所有 可哈希的 hashable 数据类型。换句话说,即整数、字符串和元组,而不是列表字典这样的可变类型。

>>> s = {1, 'coffee', [4, 'python']}
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: unhashable type: 'list'

既然你知道了如何创建一个集合以及它可以包含哪些类型的元素,那么让我们继续看看为什么我们总是应该把它放在我们的工具箱中。

为什么你需要使用它

写代码时,你可以用不止一种方法来完成它。有些被认为是相当糟糕的,另一些则是清晰的、简洁的和可维护的,或者是 “ Python 式的 pythonic ”。

根据 Hitchhiker 对 Python 的建议:

当一个经验丰富的 Python 开发人员( Python 人 Pythonista )调用一些不够 “ Python 式的 pythonic ” 的代码时,他们通常认为着这些代码不遵循通用指南,并且无法被认为是以一种好的方式(可读性)来表达意图。

让我们开始探索 Python 集合那些不仅可以帮助我们提高可读性,还可以加快程序执行时间的方式。

无序的集合元素

首先你需要明白的是:你无法使用索引访问集合中的元素。

>>> s = {1, 2, 3}
>>> s[0]
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'set' object does not support indexing

或者使用切片修改它们:

>>> s[0:2]
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
TypeError: 'set' object is not subscriptable

但是,如果我们需要删除重复项,或者进行组合列表(与)之类的数学运算,那么我们可以,并且应该始终使用集合。

我不得不提一下,在迭代时,集合的表现优于列表。所以,如果你需要它,那就加深对它的喜爱吧。为什么?好吧,这篇文章并不打算解释集合的内部工作原理,但是如果你感兴趣的话,这里有几个链接,你可以阅读它:

没有重复项

写这篇文章的时候,我总是不停地思考,我经常使用 for 循环和 if 语句检查并删除列表中的重复元素。记得那时我的脸红了,而且不止一次,我写了类似这样的代码:

>>> my_list = [1, 2, 3, 2, 3, 4]
>>> no_duplicate_list = []
>>> for item in my_list:
...     if item not in no_duplicate_list:
...             no_duplicate_list.append(item)
...
>>> no_duplicate_list
[1, 2, 3, 4]

或者使用列表解析:

>>> my_list = [1, 2, 3, 2, 3, 4]
>>> no_duplicate_list = []
>>> [no_duplicate_list.append(item) for item in my_list if item not in no_duplicate_list]
[None, None, None, None]
>>> no_duplicate_list
[1, 2, 3, 4]

但没关系,因为我们现在有了武器装备,没有什么比这更重要的了:

>>> my_list = [1, 2, 3, 2, 3, 4]
>>> no_duplicate_list = list(set(my_list))
>>> no_duplicate_list
[1, 2, 3, 4]
>>>

现在让我们使用 timeit 模块,查看列表和集合在删除重复项时的执行时间:

>>> from timeit import timeit
>>> def no_duplicates(list):
...     no_duplicate_list = []
...     [no_duplicate_list.append(item) for item in list if item not in no_duplicate_list]
...     return no_duplicate_list
...
>>> # 首先,让我们看看列表的执行情况:
>>> print(timeit('no_duplicates([1, 2, 3, 1, 7])', globals=globals(), number=1000))
0.0018683355819786227
>>> from timeit import timeit
>>> # 使用集合:
>>> print(timeit('list(set([1, 2, 3, 1, 2, 3, 4]))', number=1000))
0.0010220493243764395
>>> # 快速而且干净 =)

使用集合而不是列表推导不仅让我们编写更少的代码,而且还能让我们获得更具可读性高性能的代码。

注意:请记住集合是无序的,因此无法保证在将它们转换回列表时,元素的顺序不变。

Python 之禅

优美胜于丑陋 Beautiful is better than ugly.

明了胜于晦涩 Explicit is better than implicit.

简洁胜于复杂 Simple is better than complex.

扁平胜于嵌套 Flat is better than nested.

集合不正是这样美丽、明了、简单且扁平吗?

成员测试

每次我们使用 if 语句来检查一个元素,例如,它是否在列表中时,意味着你正在进行成员测试:

my_list = [1, 2, 3]
>>> if 2 in my_list:
...     print('Yes, this is a membership test!')
...
Yes, this is a membership test!

在执行这些操作时,集合比列表更高效:

>>> from timeit import timeit
>>> def in_test(iterable):
...     for i in range(1000):
...             if i in iterable:
...                     pass
...
>>> timeit('in_test(iterable)',
... setup="from __main__ import in_test; iterable = list(range(1000))",
... number=1000)
12.459663048726043
>>> from timeit import timeit
>>> def in_test(iterable):
...     for i in range(1000):
...             if i in iterable:
...                     pass
...
>>> timeit('in_test(iterable)',
... setup="from __main__ import in_test; iterable = set(range(1000))",
... number=1000)
.12354438152988223

注意:上面的测试来自于这个 StackOverflow 话题。

因此,如果你在巨大的列表中进行这样的比较,尝试将该列表转换为集合,它应该可以加快你的速度。

如何使用

现在你已经了解了集合是什么以及为什么你应该使用它,现在让我们快速浏览一下,看看我们如何修改和操作它。

添加元素

根据要添加的元素数量,我们要在 add()update() 方法之间进行选择。

add() 适用于添加单个元素:

>>> s = {1, 2, 3}
>>> s.add(4)
>>> s
{1, 2, 3, 4}

update() 适用于添加多个元素:

>>> s = {1, 2, 3}
>>> s.update([2, 3, 4, 5, 6])
>>> s
{1, 2, 3, 4, 5, 6}

请记住,集合会移除重复项。

移除元素

如果你希望在代码中尝试删除不在集合中的元素时收到警报,请使用 remove()。否则,discard() 提供了一个很好的选择:

>>> s = {1, 2, 3}
>>> s.remove(3)
>>> s
{1, 2}
>>> s.remove(3)
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
KeyError: 3

discard() 不会引起任何错误:

>>> s = {1, 2, 3}
>>> s.discard(3)
>>> s
{1, 2}
>>> s.discard(3)
>>> # 什么都不会发生

我们也可以使用 pop() 来随机丢弃一个元素:

>>> s = {1, 2, 3, 4, 5}
>>> s.pop()  # 删除一个任意的元素
1
>>> s
{2, 3, 4, 5}

或者 clear() 方法来清空一个集合:

>>> s = {1, 2, 3, 4, 5}
>>> s.clear()  # 清空集合
>>> s
set()

union()

union() 或者 | 将创建一个新集合,其中包含我们提供集合中的所有元素:

>>> s1 = {1, 2, 3}
>>> s2 = {3, 4, 5}
>>> s1.union(s2)  # 或者 's1 | s2'
{1, 2, 3, 4, 5}

intersection()

intersection& 将返回一个由集合共同元素组成的集合:

>>> s1 = {1, 2, 3}
>>> s2 = {2, 3, 4}
>>> s3 = {3, 4, 5}
>>> s1.intersection(s2, s3)  # 或者 's1 & s2 & s3'
{3}

difference()

使用 diference()- 创建一个新集合,其值在 “s1” 中但不在 “s2” 中:

>>> s1 = {1, 2, 3}
>>> s2 = {2, 3, 4}
>>> s1.difference(s2)  # 或者 's1 - s2'
{1}

symmetric\_diference()

symetric_difference^ 将返回集合之间的不同元素。

>>> s1 = {1, 2, 3}
>>> s2 = {2, 3, 4}
>>> s1.symmetric_difference(s2)  # 或者 's1 ^ s2'
{1, 4}

结论

我希望在阅读本文之后,你会知道集合是什么,如何操纵它的元素以及它可以执行的操作。知道何时使用集合无疑会帮助你编写更清晰的代码并加速你的程序。

如果你有任何疑问,请发表评论,我很乐意尝试回答。另外,不要忘记,如果你已经理解了集合,它们在 Python Cheatsheet 中有自己的一席之地,在那里你可以快速参考并重新认知你已经知道的内容。


via: https://www.pythoncheatsheet.org/blog/python-sets-what-why-how

作者:wilfredinni 译者:MjSeven 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

wttr.in 是一个功能丰富的天气预报服务,它支持在命令行显示天气。它可以(根据你的 IP 地址)自动检测你的位置,也支持指定位置或搜索地理位置(如城市、山区等)等。哦,另外你不需要安装它 —— 你只需要使用 cURL 或 Wget(见下文)。

wttr.in 功能包括:

  • 显示当前天气以及 3 天内的天气预报,分为早晨、中午、傍晚和夜晚(包括温度范围、风速和风向、可见度、降水量和概率)
  • 可以显示月相
  • 基于你的 IP 地址自动检测位置
  • 允许指定城市名称、3 字母的机场代码、区域代码、GPS 坐标、IP 地址或域名。你还可以指定地理位置,如湖泊、山脉、地标等)
  • 支持多语言位置名称(查询字符串必须以 Unicode 指定)
  • 支持指定天气预报显示的语言(它支持超过 50 种语言)
  • 来自美国的查询使用 USCS 单位用于,世界其他地方使用公制系统,但你可以通过附加 ?u 使用 USCS,附加 ?m 使用公制系统。 )
  • 3 种输出格式:终端的 ANSI,浏览器的 HTML 和 PNG

就像我在文章开头提到的那样,使用 wttr.in,你只需要 cURL 或 Wget,但你也可以在你的服务器上安装它。 或者你可以安装 wego,这是一个使用 wtter.in 的终端气候应用,虽然 wego 要求注册一个 API 密钥来安装。

在使用 wttr.in 之前,请确保已安装 cURL。在 Debian、Ubuntu 或 Linux Mint(以及其他基于 Debian 或 Ubuntu 的 Linux 发行版)中,使用以下命令安装 cURL:

sudo apt install curl

wttr.in 命令行示例

获取你所在位置的天气(wttr.in 会根据你的 IP 地址猜测你的位置):

curl wttr.in

通过在 curl 之后添加 -4,强制 cURL 将名称解析为 IPv4 地址(如果你用 IPv6 访问 wttr.in 有问题):

curl -4 wttr.in

如果你想检索天气预报保存为 png,还可以使用 Wget(而不是 cURL),或者你想这样使用它:

wget -O- -q wttr.in

如果相对 cURL 你更喜欢 Wget ,可以在下面的所有命令中用 wget -O- -q 替换 curl

指定位置:

curl wttr.in/Dublin

显示地标的天气信息(本例中为艾菲尔铁塔):

curl wttr.in/~Eiffel+Tower

获取 IP 地址位置的天气信息(以下 IP 属于 GitHub):

curl wttr.in/@192.30.253.113

使用 USCS 单位检索天气:

curl wttr.in/Paris?u

如果你在美国,强制 wttr.in 使用公制系统(SI):

curl wttr.in/New+York?m

使用 Wget 将当前天气和 3 天预报下载为 PNG 图像:

wget wttr.in/Istanbul.png

你可以指定 PNG 的透明度,这在你要使用一个脚本自动添加天气信息到某些图片(比如墙纸)上有用。

对于其他示例,请查看 wttr.in 项目页面或在终端中输入:

curl wttr.in/:help

via: https://www.linuxuprising.com/2018/07/display-weather-forecast-in-your.html

作者:Logix 选题:lujun9972 译者:geekpi 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

直到几个月以前,对于我来说,在消息传递的环境中, streams 只是一个有趣且相对简单的概念。这个概念在 Kafka 流行之后,我主要研究它们在 Disque 案例中的应用,Disque 是一个消息队列,它将在 Redis 4.2 中被转换为 Redis 的一个模块。后来我决定让 Disque 都用 AP 消息(LCTT 译注:参见 CAP 定理),也就是说,它将在不需要客户端过多参与的情况下实现容错和可用性,这样一来,我更加确定地认为流的概念在那种情况下并不适用。

然而在那时 Redis 有个问题,那就是缺省情况下导出数据结构并不轻松。它在 Redis 列表 list 有序集 sorted list 发布/订阅 Pub/Sub 功能之间有某些缺陷。你可以权衡使用这些工具对一系列消息或事件建模。

有序集是内存消耗大户,那自然就不能对投递的相同消息进行一次又一次的建模,客户端不能阻塞新消息。因为有序集并不是一个序列化的数据结构,它是一个元素可以根据它们量的变化而移动的集合:所以它不像时序性的数据那样。

列表有另外的问题,它在某些特定的用例中会产生类似的适用性问题:你无法浏览列表中间的内容,因为在那种情况下,访问时间是线性的。此外,没有任何指定输出的功能,列表上的阻塞操作仅为单个客户端提供单个元素。列表中没有固定的元素标识,也就是说,不能指定从哪个元素开始给我提供内容。

对于一对多的工作任务,有发布/订阅机制,它在大多数情况下是非常好的,但是,对于某些不想 “即发即弃” fire-and-forget 的东西:保留一个历史是很重要的,不只是因为是断开之后会重新获得消息,也因为某些如时序性的消息列表,用范围查询浏览是非常重要的:比如在这 10 秒范围内温度读数是多少?

我试图解决上述问题,我想规划一个通用的有序集合,并列入一个独特的、更灵活的数据结构,然而,我的设计尝试最终以生成一个比当前的数据结构更加矫揉造作的结果而告终。Redis 有个好处,它的数据结构导出更像自然的计算机科学的数据结构,而不是 “Salvatore 发明的 API”。因此,我最终停止了我的尝试,并且说,“ok,这是我们目前能提供的”,或许我会为发布/订阅增加一些历史信息,或者为列表访问增加一些更灵活的方式。然而,每次在会议上有用户对我说 “你如何在 Redis 中模拟时间系列” 或者类似的问题时,我的脸就绿了。

起源

在 Redis 4.0 中引入模块之后,用户开始考虑他们自己怎么去修复这些问题。其中一个用户 Timothy Downs 通过 IRC 和我说道:

\<forkfork> 我计划给这个模块增加一个事务日志式的数据类型 —— 这意味着大量的订阅者可以在不导致 redis 内存激增的情况下做一些像发布/订阅那样的事情
\<forkfork> 订阅者持有他们在消息队列中的位置,而不是让 Redis 必须维护每个消费者的位置和为每个订阅者复制消息

他的思路启发了我。我想了几天,并且意识到这可能是我们马上同时解决上面所有问题的契机。我需要去重新构思 “日志” 的概念是什么。日志是个基本的编程元素,每个人都使用过它,因为它只是简单地以追加模式打开一个文件,并以一定的格式写入数据。然而 Redis 数据结构必须是抽象的。它们在内存中,并且我们使用内存并不是因为我们懒,而是因为使用一些指针,我们可以概念化数据结构并把它们抽象,以使它们摆脱明确的限制。例如,一般来说日志有几个问题:偏移不是逻辑化的,而是真实的字节偏移,如果你想要与条目插入的时间相关的逻辑偏移应该怎么办?我们有范围查询可用。同样,日志通常很难进行垃圾回收:在一个只能进行追加操作的数据结构中怎么去删除旧的元素?好吧,在我们理想的日志中,我们只需要说,我想要数字最大的那个条目,而旧的元素一个也不要,等等。

当我从 Timothy 的想法中受到启发,去尝试着写一个规范的时候,我使用了 Redis 集群中的 radix 树去实现,优化了它内部的某些部分。这为实现一个有效利用空间的日志提供了基础,而且仍然有可能在 对数时间 logarithmic time 内访问范围。同时,我开始去读关于 Kafka 的流相关的内容以获得另外的灵感,它也非常适合我的设计,最后借鉴了 Kafka 消费组 consumer groups 的概念,并且再次针对 Redis 进行优化,以适用于 Redis 在内存中使用的情况。然而,该规范仅停留在纸面上,在一段时间后我几乎把它从头到尾重写了一遍,以便将我与别人讨论的所得到的许多建议一起增加到 Redis 升级中。我希望 Redis 流能成为对于时间序列有用的特性,而不仅是一个常见的事件和消息类的应用程序。

让我们写一些代码吧

从 Redis 大会回来后,整个夏天我都在实现一个叫 listpack 的库。这个库是 ziplist.c 的继任者,那是一个表示在单个分配中的字符串元素列表的数据结构。它是一个非常特殊的序列化格式,其特点在于也能够以逆序(从右到左)解析:以便在各种用例中替代 ziplists。

结合 radix 树和 listpacks 的特性,它可以很容易地去构建一个空间高效的日志,并且还是可索引的,这意味着允许通过 ID 和时间进行随机访问。自从这些就绪后,我开始去写一些代码以实现流数据结构。我还在完成这个实现,不管怎样,现在在 Github 上的 Redis 的 streams 分支里它已经可以跑起来了。我并没有声称那个 API 是 100% 的最终版本,但是,这有两个有意思的事实:一,在那时只有消费群组是缺失的,加上一些不太重要的操作流的命令,但是,所有的大的方面都已经实现了。二,一旦各个方面比较稳定了之后,我决定大概用两个月的时间将所有的流的特性 向后移植 backport 到 4.0 分支。这意味着 Redis 用户想要使用流,不用等待 Redis 4.2 发布,它们在生产环境马上就可用了。这是可能的,因为作为一个新的数据结构,几乎所有的代码改变都出现在新的代码里面。除了阻塞列表操作之外:该代码被重构了,我们对于流和列表阻塞操作共享了相同的代码,而极大地简化了 Redis 内部实现。

教程:欢迎使用 Redis 的 streams

在某些方面,你可以认为流是 Redis 列表的一个增强版本。流元素不再是一个单一的字符串,而是一个 字段 field value 组成的对象。范围查询更适用而且更快。在流中,每个条目都有一个 ID,它是一个逻辑偏移量。不同的客户端可以 阻塞等待 blocking-wait 比指定的 ID 更大的元素。Redis 流的一个基本的命令是 XADD。是的,所有的 Redis 流命令都是以一个 X 为前缀的。

> XADD mystream * sensor-id 1234 temperature 10.5
1506871964177.0

这个 XADD 命令将追加指定的条目作为一个指定的流 —— “mystream” 的新元素。上面示例中的这个条目有两个字段:sensor-idtemperature,每个条目在同一个流中可以有不同的字段。使用相同的字段名可以更好地利用内存。有意思的是,字段的排序是可以保证顺序的。XADD 仅返回插入的条目的 ID,因为在第三个参数中是星号(*),表示由命令自动生成 ID。通常这样做就够了,但是也可以去强制指定一个 ID,这种情况用于复制这个命令到 从服务器 slave server AOF append-only file 文件。

这个 ID 是由两部分组成的:一个毫秒时间和一个序列号。1506871964177 是毫秒时间,它只是一个毫秒级的 UNIX 时间戳。圆点(.)后面的数字 0 是一个序号,它是为了区分相同毫秒数的条目增加上去的。这两个数字都是 64 位的无符号整数。这意味着,我们可以在流中增加所有想要的条目,即使是在同一毫秒中。ID 的毫秒部分使用 Redis 服务器的当前本地时间生成的 ID 和流中的最后一个条目 ID 两者间的最大的一个。因此,举例来说,即使是计算机时间回跳,这个 ID 仍然是增加的。在某些情况下,你可以认为流条目的 ID 是完整的 128 位数字。然而,事实上它们与被添加到的实例的本地时间有关,这意味着我们可以在毫秒级的精度的范围随意查询。

正如你想的那样,快速添加两个条目后,结果是仅一个序号递增了。我们可以用一个 MULTI/EXEC 块来简单模拟“快速插入”:

> MULTI
OK
> XADD mystream * foo 10
QUEUED
> XADD mystream * bar 20
QUEUED
> EXEC
1) 1506872463535.0
2) 1506872463535.1

在上面的示例中,也展示了无需指定任何初始 模式 schema 的情况下,对不同的条目使用不同的字段。会发生什么呢?就像前面提到的一样,只有每个块(它通常包含 50-150 个消息内容)的第一个消息被使用。并且,相同字段的连续条目都使用了一个标志进行了压缩,这个标志表示与“它们与这个块中的第一个条目的字段相同”。因此,使用相同字段的连续消息可以节省许多内存,即使是字段集随着时间发生缓慢变化的情况下也很节省内存。

为了从流中检索数据,这里有两种方法:范围查询,它是通过 XRANGE 命令实现的; 流播 streaming ,它是通过 XREAD 命令实现的。XRANGE 命令仅取得包括从开始到停止范围内的全部条目。因此,举例来说,如果我知道它的 ID,我可以使用如下的命名取得单个条目:

> XRANGE mystream 1506871964177.0 1506871964177.0
1) 1) 1506871964177.0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "10.5"

不管怎样,你都可以使用指定的开始符号 - 和停止符号 + 表示最小和最大的 ID。为了限制返回条目的数量,也可以使用 COUNT 选项。下面是一个更复杂的 XRANGE 示例:

> XRANGE mystream - + COUNT 2
1) 1) 1506871964177.0
   2) 1) "sensor-id"
      2) "1234"
      3) "temperature"
      4) "10.5"
2) 1) 1506872463535.0
   2) 1) "foo"
      2) "10"

这里我们讲的是 ID 的范围,然后,为了取得在一个给定时间范围内的特定范围的元素,你可以使用 XRANGE,因为 ID 的“序号” 部分可以省略。因此,你可以只指定“毫秒”时间即可,下面的命令的意思是:“从 UNIX 时间 1506872463 开始给我 10 个条目”:

127.0.0.1:6379> XRANGE mystream 1506872463000 + COUNT 10
1) 1) 1506872463535.0
   2) 1) "foo"
      2) "10"
2) 1) 1506872463535.1
   2) 1) "bar"
      2) "20"

关于 XRANGE 需要注意的最重要的事情是,假设我们在回复中收到 ID,随后连续的 ID 只是增加了序号部分,所以可以使用 XRANGE 遍历整个流,接收每个调用的指定个数的元素。Redis 中的*SCAN 系列命令允许迭代 Redis 数据结构,尽管事实上它们不是为迭代设计的,但这样可以避免再犯相同的错误。

使用 XREAD 处理流播:阻塞新的数据

当我们想通过 ID 或时间去访问流中的一个范围或者是通过 ID 去获取单个元素时,使用 XRANGE 是非常完美的。然而,在使用流的案例中,当数据到达时,它必须由不同的客户端来消费时,这就不是一个很好的解决方案,这需要某种形式的 汇聚池 pooling 。(对于 某些 应用程序来说,这可能是个好主意,因为它们仅是偶尔连接查询的)

XREAD 命令是为读取设计的,在同一个时间,从多个流中仅指定我们从该流中得到的最后条目的 ID。此外,如果没有数据可用,我们可以要求阻塞,当数据到达时,就解除阻塞。类似于阻塞列表操作产生的效果,但是这里并没有消费从流中得到的数据,并且多个客户端可以同时访问同一份数据。

这里有一个典型的 XREAD 调用示例:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ $

它的意思是:从 mystreamotherstream 取得数据。如果没有数据可用,阻塞客户端 5000 毫秒。在 STREAMS 选项之后指定我们想要监听的关键字,最后的是指定想要监听的 ID,指定的 ID 为 $ 的意思是:假设我现在需要流中的所有元素,因此,只需要从下一个到达的元素开始给我。

如果我从另一个客户端发送这样的命令:

> XADD otherstream * message “Hi There”

XREAD 侧会出现什么情况呢?

1) 1) "otherstream"
   2) 1) 1) 1506935385635.0
         2) 1) "message"
            2) "Hi There"

与收到的数据一起,我们也得到了数据的关键字。在下次调用中,我们将使用接收到的最新消息的 ID:

> XREAD BLOCK 5000 STREAMS mystream otherstream $ 1506935385635.0

依次类推。然而需要注意的是使用方式,客户端有可能在一个非常大的延迟之后再次连接(因为它处理消息需要时间,或者其它什么原因)。在这种情况下,期间会有很多消息堆积,为了确保客户端不被消息淹没,以及服务器不会因为给单个客户端提供大量消息而浪费太多的时间,使用 XREADCOUNT 选项是非常明智的。

流封顶

目前看起来还不错……然而,有些时候,流需要删除一些旧的消息。幸运的是,这可以使用 XADD 命令的 MAXLEN 选项去做:

> XADD mystream MAXLEN 1000000 * field1 value1 field2 value2

它是基本意思是,如果在流中添加新元素后发现消息数量超过了 1000000 个,那么就删除旧的消息,以便于元素总量重新回到 1000000 以内。它很像是在列表中使用的 RPUSH + LTRIM,但是,这次我们是使用了一个内置机制去完成的。然而,需要注意的是,上面的意思是每次我们增加一个新的消息时,我们还需要另外的工作去从流中删除旧的消息。这将消耗一些 CPU 资源,所以在计算 MAXLEN 之前,尽可能使用 ~ 符号,以表明我们不要求非常 精确 的 1000000 个消息,就是稍微多一些也不是大问题:

> XADD mystream MAXLEN ~ 1000000 * foo bar

这种方式的 XADD 仅当它可以删除整个节点的时候才会删除消息。相比普通的 XADD,这种方式几乎可以自由地对流进行封顶。

消费组(开发中)

这是第一个 Redis 中尚未实现而在开发中的特性。灵感也是来自 Kafka,尽管在这里是以不同的方式实现的。重点是使用了 XREAD,客户端也可以增加一个 GROUP <name> 选项。相同组的所有客户端将自动得到 不同的 消息。当然,同一个流可以被多个组读取。在这种情况下,所有的组将收到流中到达的消息的相同副本。但是,在每个组内,消息是不会重复的。

当指定组时,能够指定一个 RETRY <milliseconds> 选项去扩展组:在这种情况下,如果消息没有通过 XACK 进行确认,它将在指定的毫秒数后进行再次投递。这将为消息投递提供更佳的可靠性,这种情况下,客户端没有私有的方法将消息标记为已处理。这一部分也正在开发中。

内存使用和节省加载时间

因为用来建模 Redis 流的设计,内存使用率是非常低的。这取决于它们的字段、值的数量和长度,对于简单的消息,每使用 100MB 内存可以有几百万条消息。此外,该格式设想为需要极少的序列化:listpack 块以 radix 树节点方式存储,在磁盘上和内存中都以相同方式表示的,因此它们可以很轻松地存储和读取。例如,Redis 可以在 0.3 秒内从 RDB 文件中读取 500 万个条目。这使流的复制和持久存储非常高效。

我还计划允许从条目中间进行部分删除。现在仅实现了一部分,策略是在条目在标记中标识条目为已删除,并且,当已删除条目占全部条目的比例达到指定值时,这个块将被回收重写,如果需要,它将被连到相邻的另一个块上,以避免碎片化。

关于最终发布时间的结论

Redis 的流特性将包含在年底前(LCTT 译注:本文原文发布于 2017 年 10 月)推出的 Redis 4.0 系列的稳定版中。我认为这个通用的数据结构将为 Redis 提供一个巨大的补丁,以用于解决很多现在很难以解决的情况:那意味着你(之前)需要创造性地“滥用”当前提供的数据结构去解决那些问题。一个非常重要的使用场景是时间序列,但是,我觉得对于其它场景来说,通过 TREAD 来流播消息将是非常有趣的,因为对于那些需要更高可靠性的应用程序,可以使用发布/订阅模式来替换“即用即弃”,还有其它全新的使用场景。现在,如果你想在有问题环境中评估这个新数据结构,可以更新 GitHub 上的 streams 分支开始试用。欢迎向我们报告所有的 bug。:-)

如果你喜欢观看视频的方式,这里有一个现场演示:https://www.youtube.com/watch?v=ELDzy9lCFHQ


via: http://antirez.com/news/114

作者:antirez 译者:qhwdw 校对:wxy, pityonline

本文由 LCTT 原创编译,Linux中国 荣誉推出