Krishna Mohan Koyya 发布的文章

微服务遵循领域驱动设计(DDD),与开发平台无关。Python 微服务也不例外。Python3 的面向对象特性使得按照 DDD 对服务进行建模变得更加容易。本系列的第 10 部分演示了如何将用户管理系统的查找服务作为 Python 微服务部署在 Kubernetes 上。

微服务架构的强大之处在于它的多语言性。企业将其功能分解为一组微服务,每个团队自由选择一个平台。

我们的用户管理系统已经分解为四个微服务,分别是添加、查找、搜索和日志服务。添加服务在 Java 平台上开发并部署在 Kubernetes 集群上,以实现弹性和可扩展性。这并不意味着其余的服务也要使用 Java 开发,我们可以自由选择适合个人服务的平台。

让我们选择 Python 作为开发查找服务的平台。查找服务的模型已经设计好了(参考 2022 年 3 月份的文章),我们只需要将这个模型转换为代码和配置。

Pythonic 方法

Python 是一种通用编程语言,已经存在了大约 30 年。早期,它是自动化脚本的首选。然而,随着 Django 和 Flask 等框架的出现,它的受欢迎程度越来越高,现在各种领域中都在应用它,如企业应用程序开发。数据科学和机器学习进一步推动了它的发展,Python 现在是三大编程语言之一。

许多人将 Python 的成功归功于它容易编码。这只是一部分原因。只要你的目标是开发小型脚本,Python 就像一个玩具,你会非常喜欢它。然而,当你进入严肃的大规模应用程序开发领域时,你将不得不处理大量的 ifelse,Python 变得与任何其他平台一样好或一样坏。例如,采用一种面向对象的方法!许多 Python 开发人员甚至可能没意识到 Python 支持类、继承等功能。Python 确实支持成熟的面向对象开发,但是有它自己的方式 -- Pythonic!让我们探索一下!

领域模型

AddService 通过将数据保存到一个 MySQL 数据库中来将用户添加到系统中。FindService 的目标是提供一个 REST API 按用户名查找用户。域模型如图 1 所示。它主要由一些值对象组成,如 User 实体的NamePhoneNumber 以及 UserRepository

图 1: 查找服务的域模型

让我们从 Name 开始。由于它是一个值对象,因此必须在创建时进行验证,并且必须保持不可变。基本结构如所示:

class Name:
    value: str
    def __post_init__(self):
        if self.value is None or len(self.value.strip()) < 8 or len(self.value.strip()) > 32:
            raise ValueError("Invalid Name")

如你所见,Name 包含一个字符串类型的值。作为后期初始化的一部分,我们会验证它。

Python 3.7 提供了 @dataclass 装饰器,它提供了许多开箱即用的数据承载类的功能,如构造函数、比较运算符等。如下是装饰后的 Name 类:

from dataclasses import dataclass

@dataclass
class Name:
    value: str
    def __post_init__(self):
        if self.value is None or len(self.value.strip()) < 8 or len(self.value.strip()) > 32:
            raise ValueError("Invalid Name")

以下代码可以创建一个 Name 对象:

name = Name("Krishna")

value 属性可以按照如下方式读取或写入:

name.value = "Mohan"
print(name.value)

可以很容易地与另一个 Name 对象比较,如下所示:

other = Name("Mohan")
if name == other:
    print("same")

如你所见,对象比较的是值而不是引用。这一切都是开箱即用的。我们还可以通过冻结对象使对象不可变。这是 Name 值对象的最终版本:

from dataclasses import dataclass

@dataclass(frozen=True)
class Name:
    value: str
    def __post_init__(self):
        if self.value is None or len(self.value.strip()) < 8 or len(self.value.strip()) > 32:
            raise ValueError("Invalid Name")

PhoneNumber 也遵循类似的方法,因为它也是一个值对象:

@dataclass(frozen=True)
class PhoneNumber:
    value: int
    def __post_init__(self):
        if self.value < 9000000000:
            raise ValueError("Invalid Phone Number")

User 类是一个实体,不是一个值对象。换句话说,User 是可变的。以下是结构:

from dataclasses import dataclass
import datetime

@dataclass
class User:
    _name: Name
    _phone: PhoneNumber
    _since: datetime.datetime

    def __post_init__(self):
        if self._name is None or self._phone is None:
            raise ValueError("Invalid user")
        if self._since is None:
            self.since = datetime.datetime.now()

你能观察到 User 并没有冻结,因为我们希望它是可变的。但是,我们不希望所有属性都是可变的。标识字段如 _name_since 是希望不会修改的。那么,这如何做到呢?

Python3 提供了所谓的描述符协议,它会帮助我们正确定义 getter 和 setter。让我们使用 @property 装饰器将 getter 添加到 User 的所有三个字段中。

@property
def name(self) -> Name:
    return self._name

@property
def phone(self) -> PhoneNumber:
    return self._phone

@property
def since(self) -> datetime.datetime:
    return self._since

phone 字段的 setter 可以使用 @<字段>.setter 来装饰:

@phone.setter
def phone(self, phone: PhoneNumber) -> None:
    if phone is None:
        raise ValueError("Invalid phone")
    self._phone = phone

通过重写 __str__() 函数,也可以为 User 提供一个简单的打印方法:

def __str__(self):
    return self.name.value + " [" + str(self.phone.value) + "] since " + str(self.since)

这样,域模型的实体和值对象就准备好了。创建异常类如下所示:

class UserNotFoundException(Exception):
    pass

域模型现在只剩下 UserRepository 了。Python 提供了一个名为 abc 的有用模块来创建抽象方法和抽象类。因为 UserRepository 只是一个接口,所以我们可以使用 abc 模块。

任何继承自 abc.ABC 的类都将变为抽象类,任何带有 @abc.abstractmethod 装饰器的函数都会变为一个抽象函数。下面是 UserRepository 的结构:

from abc import ABC, abstractmethod

class UserRepository(ABC):
    @abstractmethod
    def fetch(self, name:Name) -> User:
        pass

UserRepository 遵循仓储模式。换句话说,它在 User 实体上提供适当的 CRUD 操作,而不会暴露底层数据存储语义。在本例中,我们只需要 fetch() 操作,因为 FindService 只查找用户。

因为 UserRepository 是一个抽象类,我们不能从抽象类创建实例对象。创建对象必须依赖于一个具体类实现这个抽象类。数据层 UserRepositoryImpl 提供了 UserRepository 的具体实现:

class UserRepositoryImpl(UserRepository):
    def fetch(self, name:Name) -> User:
        pass

由于 AddService 将用户数据存储在一个 MySQL 数据库中,因此 UserRepositoryImpl 也必须连接到相同的数据库去检索数据。下面是连接到数据库的代码。注意,我们正在使用 MySQL 的连接库。

from mysql.connector import connect, Error

class UserRepositoryImpl(UserRepository):
    def fetch(self, name:Name) -> User:
        try:
            with connect(
                    host="mysqldb",
                    user="root",
                    password="admin",
                    database="glarimy",
                ) as connection:
                with connection.cursor() as cursor:
                    cursor.execute("SELECT * FROM ums_users where name=%s", (name.value,))
                    row = cursor.fetchone()
                    if cursor.rowcount == -1:
                        raise UserNotFoundException()
                    else:
                        return User(Name(row[0]), PhoneNumber(row[1]), row[2])
        except Error as e:
            raise e

在上面的片段中,我们使用用户 root / 密码 admin 连接到一个名为 mysqldb 的数据库服务器,使用名为 glarimy 的数据库(模式)。在演示代码中是可以包含这些信息的,但在生产中不建议这么做,因为这会暴露敏感信息。

fetch() 操作的逻辑非常直观,它对 ums_users 表执行 SELECT 查询。回想一下,AddService 正在将用户数据写入同一个表中。如果 SELECT 查询没有返回记录,fetch() 函数将抛出 UserNotFoundException 异常。否则,它会从记录中构造 User 实体并将其返回给调用者。这没有什么特殊的。

应用层

最终,我们需要创建应用层。此模型如图 2 所示。它只包含两个类:控制器和一个 DTO。

图 2: 添加服务的应用层

众所周知,一个 DTO 只是一个没有任何业务逻辑的数据容器。它主要用于在 FindService 和外部之间传输数据。我们只是提供了在 REST 层中将 UserRecord 转换为字典以便用于 JSON 传输:

class UserRecord:
    def toJSON(self):
        return {
            "name": self.name,
            "phone": self.phone,
            "since": self.since
        }

控制器的工作是将 DTO 转换为用于域服务的域对象,反之亦然。可以从 find() 操作中观察到这一点。

class UserController:

    def __init__(self):
        self._repo = UserRepositoryImpl()

    def find(self, name: str):
        try:
            user: User = self._repo.fetch(Name(name))
            record: UserRecord = UserRecord()
            record.name = user.name.value
            record.phone = user.phone.value
            record.since = user.since
            return record
        except UserNotFoundException as e:
            return None

find() 操作接收一个字符串作为用户名,然后将其转换为 Name 对象,并调用 UserRepository 获取相应的 User 对象。如果找到了,则使用检索到的 User` 对象创建UserRecord。回想一下,将域对象转换为 DTO 是很有必要的,这样可以对外部服务隐藏域模型。

UserController 不需要有多个实例,它也可以是单例的。通过重写 __new__,可以将其建模为一个单例。

class UserController:
    def __new__(self):
        if not hasattr(self, ‘instance’):
            self.instance = super().__new__(self)
        return self.instance

    def __init__(self):
        self._repo = UserRepositoryImpl()

    def find(self, name: str):
        try:
            user: User = self._repo.fetch(Name(name))
            record: UserRecord = UserRecord()
            record.name = user.name.getValue()
            record.phone = user.phone.getValue()
            record.since = user.since
            return record
        except UserNotFoundException as e:
            return None

我们已经完全实现了 FindService 的模型,剩下的唯一任务是将其作为 REST 服务公开。

REST API

FindService 只提供一个 API,那就是通过用户名查找用户。显然 URI 如下所示:

GET /user/{name}

此 API 希望根据提供的用户名查找用户,并以 JSON 格式返回用户的电话号码等详细信息。如果没有找到用户,API 将返回一个 404 状态码。

我们可以使用 Flask 框架来构建 REST API,它最初的目的是使用 Python 开发 Web 应用程序。除了 HTML 视图,它还进一步扩展到支持 REST 视图。我们选择这个框架是因为它足够简单。 创建一个 Flask 应用程序:

from flask import Flask
app = Flask(__name__)

然后为 Flask 应用程序定义路由,就像函数一样简单:

@app.route('/user/<name>')
def get(name):
    pass

注意 @app.route 映射到 API /user/<name>,与之对应的函数的 get()

如你所见,每次用户访问 API 如 http://server:port/user/Krishna 时,都将调用这个 get() 函数。Flask 足够智能,可以从 URL 中提取 Krishna 作为用户名,并将其传递给 get() 函数。

get() 函数很简单。它要求控制器找到该用户,并将其与通常的 HTTP 头一起打包为 JSON 格式后返回。如果控制器返回 None,则 get() 函数返回合适的 HTTP 状态码。

from flask import jsonify, abort

controller = UserController()
record = controller.find(name)
if record is None:
    abort(404)
else:
    resp = jsonify(record.toJSON())
    resp.status_code = 200
    return resp

最后,我们需要 Flask 应用程序提供服务,可以使用 waitress 服务:

from waitress import serve
serve(app, host="0.0.0.0", port=8080)

在上面的片段中,应用程序在本地主机的 8080 端口上提供服务。最终代码如下所示:

from flask import Flask, jsonify, abort
from waitress import serve

app = Flask(__name__)

@app.route('/user/<name>')
def get(name):
    controller = UserController()
    record = controller.find(name)
    if record is None:
        abort(404)
    else:
        resp = jsonify(record.toJSON())
        resp.status_code = 200
        return resp

serve(app, host="0.0.0.0", port=8080)

部署

FindService 的代码已经准备完毕。除了 REST API 之外,它还有域模型、数据层和应用程序层。下一步是构建此服务,将其容器化,然后部署到 Kubernetes 上。此过程与部署其他服务妹有任何区别,但有一些 Python 特有的步骤。

在继续前进之前,让我们来看下文件夹和文件结构:

+ ums-find-service
+ ums
- domain.py
- data.py
- app.py
- Dockerfile
- requirements.txt
- kube-find-deployment.yml

如你所见,整个工作文件夹都位于 ums-find-service 下,它包含了 ums 文件夹中的代码和一些配置文件,例如 Dockerfilerequirements.txtkube-find-deployment.yml

domain.py 包含域模型,data.py 包含 UserRepositoryImplapp.py 包含剩余代码。我们已经阅读过代码了,现在我们来看看配置文件。

第一个是 requirements.txt,它声明了 Python 系统需要下载和安装的外部依赖项。我们需要用查找服务中用到的每个外部 Python 模块来填充它。如你所见,我们使用了 MySQL 连接器、Flask 和 Waitress 模块。因此,下面是 requirements.txt 的内容。

Flask==2.1.1
Flask_RESTful
mysql-connector-python
waitress

第二步是在 Dockerfile 中声明 Docker 相关的清单,如下:

FROM python:3.8-slim-buster

WORKDIR /ums
ADD ums /ums
ADD requirements.txt requirements.txt
RUN pip3 install -r requirements.txt

EXPOSE 8080
ENTRYPOINT ["python"]
CMD ["/ums/app.py"]

总的来说,我们使用 Python 3.8 作为基线,除了移动 requirements.txt 之外,我们还将代码从 ums 文件夹移动到 Docker 容器中对应的文件夹中。然后,我们指示容器运行 pip3 install 命令安装对应模块。最后,我们向外暴露 8080 端口(因为 waitress 运行在此端口上)。

为了运行此服务,我们指示容器使用使用以下命令:

python /ums/app.py

一旦 Dockerfile 准备完成,在 ums-find-service 文件夹中运行以下命令,创建 Docker 镜像:

docker build -t glarimy/ums-find-service

它会创建 Docker 镜像,可以使用以下命令查找镜像:

docker images

尝试将镜像推送到 Docker Hub,你也可以登录到 Docker。

docker login
docker push glarimy/ums-find-service

最后一步是为 Kubernetes 部署构建清单。

在之前的文章中,我们已经介绍了如何建立 Kubernetes 集群、部署和使用服务的方法。我假设仍然使用之前文章中的清单文件来部署添加服务、MySQL、Kafka 和 Zookeeper。我们只需要将以下内容添加到 kube-find-deployment.yml 文件中:

apiVersion: apps/v1
kind: Deployment
metadata:
name: ums-find-service
labels:
app: ums-find-service
spec:
replicas: 3
selector:
matchLabels:
app: ums-find-service
template:
metadata:
labels:
app: ums-find-service
spec:
containers:
- name: ums-find-service
image: glarimy/ums-find-service
ports:
- containerPort: 8080
---
apiVersion: v1
kind: Service
metadata:
name: ums-find-service
labels:
name: ums-find-service
spec:
type: LoadBalancer
ports:
- port: 8080
selector:
app: ums-find-service

上面清单文件的第一部分声明了 glarimy/ums-find-service 镜像的 FindService,它包含三个副本。它还暴露 8080 端口。清单的后半部分声明了一个 Kubernetes 服务作为 FindService 部署的前端。请记住,在之前文章中,mysqldb 服务已经是上述清单的一部分了。

运行以下命令在 Kubernetes 集群上部署清单文件:

kubectl create -f kube-find-deployment.yml

部署完成后,可以使用以下命令验证容器组和服务:

kubectl get services

输出如图 3 所示:

图 3: Kubernetes 服务

它会列出集群上运行的所有服务。注意查找服务的外部 IP,使用 curl 调用此服务:

curl http://10.98.45.187:8080/user/KrishnaMohan

注意:10.98.45.187 对应查找服务,如图 3 所示。

如果我们使用 AddService 创建一个名为 KrishnaMohan 的用户,那么上面的 curl 命令看起来如图 4 所示:

图 4: 查找服务

用户管理系统(UMS)的体系结构包含 AddServiceFindService,以及存储和消息传递所需的后端服务,如图 5 所示。可以看到终端用户使用 ums-add-service 的 IP 地址添加新用户,使用 ums-find-service 的 IP 地址查找已有用户。每个 Kubernetes 服务都由三个对应容器的节点支持。还要注意:同样的 mysqldb 服务用于存储和检索用户数据。

图 5: UMS 的添加服务和查找服务

其他服务

UMS 系统还包含两个服务:SearchServiceJournalService。在本系列的下一部分中,我们将在 Node 平台上设计这些服务,并将它们部署到同一个 Kubernetes 集群,以演示多语言微服务架构的真正魅力。最后,我们将观察一些与微服务相关的设计模式。


via: https://www.opensourceforu.com/2022/09/python-microservices-using-flask-on-kubernetes/

作者:Krishna Mohan Koyya 选题:lkxed 译者:MjSeven 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

无服务器计算将服务器从规划中移除,使企业能够专注于应用功能。那么,企业是不是都应该选择无服务器计算呢?让我们来探究一下吧!

直至不久之前,几乎每个产品经理都会将他/她的工程资源,分成两个独立的团队 —— 开发团队和运维团队。开发团队通常参与编码、测试和构建应用功能,而运维团队负责应用程序的交付、部署和运行维护。

当开发团队构建电商应用时,运维团队会搭建好服务器来托管该应用。搭建服务器涉及到许多方面,其中包括:

  • 选择合适的硬件和操作系统
  • 应用所需的补丁集
  • 搭建所需服务器环境,如 JDK、Python、Tomcat、NodeJS 等
  • 部署、配置和提供实际的应用
  • 打开并固定合适的端口
  • 搭建所需的数据库引擎

……这个名单还在继续。

除此之外,管理人员还对容量规划感到头疼。毕竟,任何重要应用都应始终保持 100% 可用、可靠且可扩展。这需要对硬件进行最佳投资。众所周知,在一些关键时期,硬件短缺会导致业务损失,而硬件冗余又会损害利润。因此,无论应用是针对本地数据中心,还是针对云基础架构,容量规划都是至关重要的。到目前为止,很明显,企业不仅在功能构建上投入了大量的精力,还在功能交付上也花费了大量的时间。

无服务器计算 Serverless computing 旨在提供一种无缝的方式来交付功能,而无需担心服务器的设置和维护。换句话说,无服务器计算平台提供了一个“ 即用型 ready-to-use ”环境,企业可以尽快将应用程序构建和部署为一些较小的功能。这就是为什么这种方法被称为“ 功能即服务 Function as a Service ”(FaaS)。

请记住,无服务器计算中仍然存在服务器,但它由 AWS、微软和谷歌等 FaaS 供应商负责。

例如,AWS 以 “Lambda 函数”的形式提供了一个无服务器计算环境。开发人员可以选择将应用程序构建为一组 Lambda 函数,这些函数可以用 NodeJS、Java、Python 和其他一些语言编写。AWS 提供了一个现成的环境来部署这些函数。它还提供了即用​​型数据库服务器、文件服务器、应用程序网关和身份验证服务器等。

同样,微软 Azure 也提供了一个环境,它可以用 C# 等语言构建和部署 Azure 函数。

为什么选择无服务器?

有两个主要因素推动了无服务器计算的普及。

1、即用型环境

显然,这是无服务器计算的最大卖点。企业无需提前采购/预订硬件或实例,也无需操心许可证,以及设置和配置服务器。他们不需要为扩大和缩小规模而烦恼。所有这些都由 FaaS 供应商负责。

2、最优成本

由于 FaaS 供应商总是根据环境的利用率向客户收费(按使用付费模式),因此企业无需担心前期成本和资源浪费。例如,AWS 根据 Lambda 函数接收的请求数量、在数据表上运行的查询数量等指标来向客户端收费。

无服务器计算的挑战

与任何其他方法一样,无服务器计算也不是每个人都可以盲目遵循的完美方法。它本身也有一系列限制。以下是其中的几个。

1、供应商锁定

当使用无服务器计算时,第一个也是最重要的问题就是,Lambda 或 Azure 等函数将使用供应商提供的 API 来编写。例如,使用 AWS Lambda API 编写的函数无法部署到 Google Cloud 中,反之亦然。因此,无服务器计算迫使企业在许多年内,只能使用同一家供应商。并且,应用的成功或失败不仅取决于它的功能,还取决于供应商在性能等方面的能力。

2、编程语言

没有哪家无服务器计算平台支持所有的编程语言。此外,对于它支持的编程语言,它也可能不支持其所有版本。这样一来,应用开发团队只能选择供应商提供的语言。就团队的能力而言,这可能是非常关键的。

3、最优成本,真的吗?

其实也不一定,这一切都取决于资源的使用情况。如果你的应用正在承受巨大的负载,例如每秒数百万个请求,那么你所支付的费用可能会过高。在这样的规模下,在本地或云端拥有自己的服务器可能会更便宜。这并不意味着具有 Web 规模的应用不适合用无服务器计算。归根结底,它还是取决于你的平台的构建方式,以及你与供应商签署的协议。

4、生态系统

没有哪个应用是为了一个孤立的环境而编写的。它总是需要其他组件,如数据存储、数据库、安全引擎、网关、消息服务器、队列、缓存等。每个平台都提供自己的一组此类工具。例如,AWS 提供了 Dynamo DB 作为其 NoSQL 解决方案之一。显然,其他供应商也提供了自己的 NoSQL 解决方案。因此,团队又会被迫地基于所选平台来构建应用程序。尽管大多数商业 FaaS 供应商都为特定需求提供了多个组件,但并非每个组件都可能是同类型中最佳的。

为什么不考虑容器呢?

在过去十年中,我们中的许多人都迁移到了容器化部署模型,因为它们为昂贵的物理机或虚拟机提供了一种轻量级的替代方案。有了 Kubernetes 等编排工具后,我们乐于部署容器化应用,同时也满足了 Web 规模的要求。容器提供了与底层环境一定程度的隔离,这使得部署相对容易。但是,我们仍然需要在硬件(本地或云)、许可证、网络、配置等方面进行投资,这需要具有前瞻性的规划、合适的技术能力和仔细的监控。无服务器计算,尽管它也有自己的优点和缺点,但它让我们把这些责任也摆脱了。

展望未来

我们正处于持续开发、持续集成和持续部署的时代。每个企业都面临着竞争。产品 上市时间 Time to market (TTM)在吸引客户、留住客户这两个方面,发挥着重要作用。在这种背景下,企业喜欢花更多时间来尽可能快地推出功能,而不是在部署和维护的细节上苦苦挣扎。无服务器计算有可能满足这些需求。大玩家们正在投入巨额资金,以使 FaaS 尽可能地无缝且经济。无服务器计算的未来看起来是一片光明。


via: https://www.opensourceforu.com/2021/12/should-businesses-opt-for-serverless-computing/

作者:Krishna Mohan Koyya 选题:lkxed 译者:lkxed 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

Apache Kafka 是最流行的开源消息代理之一。它已经成为了大数据操作的重要组成部分,你能够在几乎所有的微服务环境中找到它。本文对 Apache Kafka 进行了简要介绍,并提供了一个案例来展示它的使用方式。

你有没有想过,电子商务平台是如何在处理巨大的流量时,做到不会卡顿的呢?有没有想过,OTT 平台是如何在同时向数百万用户交付内容时,做到平稳运行的呢?其实,关键就在于它们的分布式架构。

采用分布式架构设计的系统由多个功能组件组成。这些功能组件通常分布在多个机器上,它们通过网络,异步地交换消息,从而实现相互协作。正是由于异步消息的存在,组件之间才能实现可伸缩、无阻塞的通信,整个系统才能够平稳运行。

异步消息

异步消息的常见特性有:

  • 消息的 生产者 producer 消费者 consumer 都不知道彼此的存在。它们在不知道对方的情况下,加入和离开系统。
  • 消息 代理 broker 充当了生产者和消费者之间的中介。
  • 生产者把每条消息,都与一个“ 主题 topic ”相关联。主题是一个简单的字符串。
  • 生产者可以在多个主题上发送消息,不同的生产者也可以在同一主题上发送消息。
  • 消费者向代理订阅一个或多个主题的消息。
  • 生产者只将消息发送给代理,而不发送给消费者。
  • 代理会把消息发送给订阅该主题的所有消费者。
  • 代理将消息传递给针对该主题注册的所有消费者。
  • 生产者并不期望得到消费者的任何回应。换句话说,生产者和消费者不会相互阻塞。

市场上的消息代理有很多,而 Apache Kafka 是其中最受欢迎的之一。

Apache Kafka

Apache Kafka 是一个支持流式处理的、开源的分布式消息系统,它由 Apache 软件基金会开发。在架构上,它是多个代理组成的集群,这些代理间通过 Apache ZooKeeper 服务来协调。在接收、持久化和发送消息时,这些代理分担集群上的负载。

分区

Kafka 将消息写入称为“ 分区 partition ”的桶中。一个特定分区只保存一个主题上的消息。例如,Kafka 会把 heartbeats 主题上的消息写入名为 heartbeats-0 的分区(假设它是个单分区主题),这个过程和生产者无关。

图 1:异步消息

不过,为了利用 Kafka 集群所提供的并行处理能力,管理员通常会为指定主题创建多个分区。举个例子,假设管理员为 heartbeats 主题创建了三个分区,Kafka 会将它们分别命名为 heartbeats-0heartbeats-1heartbeats-2。Kafka 会以某种方式,把消息分配到这三个分区中,并使它们均匀分布。

还有另一种可能的情况,生产者将每条消息与一个 消息键 key 相关联。例如,同样都是在 heartbeats 主题上发送消息,有个组件使用 C1 作为消息键,另一个则使用 C2。在这种情况下,Kafka 会确保,在一个主题中,带有相同消息键的消息,总是会被写入到同一个分区。不过,在一个分区中,消息的消息键却不一定相同。下面的图 2 显示了消息在不同分区中的一种可能分布。

图 2:消息在不同分区中的分布

领导者和同步副本

Kafka 在(由多个代理组成的)集群中维护了多个分区。其中,负责维护分区的那个代理被称为“ 领导者 leader ”。只有领导者能够在它的分区上接收和发送消息。

可是,万一分区的领导者发生故障了,又该怎么办呢?为了确保业务连续性,每个领导者(代理)都会把它的分区复制到其他代理上。此时,这些其他代理就称为该分区的 同步副本 in-sync-replicas (ISR)。一旦分区的领导者发生故障,ZooKeeper 就会发起一次选举,把选中的那个同步副本任命为新的领导者。此后,这个新的领导者将承担该分区的消息接受和发送任务。管理员可以指定分区需要维护的同步副本的大小。

图 3:生产者命令行工具

消息持久化

代理会将每个分区都映射到一个指定的磁盘文件,从而实现持久化。默认情况下,消息会在磁盘上保留一个星期。当消息写入分区后,它们的内容和顺序就不能更改了。管理员可以配置一些策略,如消息的保留时长、压缩算法等。

图 4:消费者命令行工具

消费消息

与大多数其他消息系统不同,Kafka 不会主动将消息发送给消费者。相反,消费者应该监听主题,并主动读取消息。一个消费者可以从某个主题的多个分区中读取消息。多个消费者也可以读取来自同一个分区的消息。Kafka 保证了同一条消息不会被同一个消费者重复读取。

Kafka 中的每个消费者都有一个组 ID。那些组 ID 相同的消费者们共同组成了一个消费者组。通常,为了从 N 个主题分区读取消息,管理员会创建一个包含 N 个消费者的消费者组。这样一来,组内的每个消费者都可以从它的指定分区中读取消息。如果组内的消费者比可用分区还要多,那么多出来的消费者就会处于闲置状态。

在任何情况下,Kafka 都保证:不管组内有多少个消费者,同一条消息只会被该消费者组读取一次。这个架构提供了一致性、高性能、高可扩展性、准实时交付和消息持久性,以及零消息丢失。

安装、运行 Kafka

尽管在理论上,Kafka 集群可以由任意数量的代理组成,但在生产环境中,大多数集群通常由三个或五个代理组成。

在这里,我们将搭建一个单代理集群,对于生产环境来说,它已经够用了。

在浏览器中访问 https://kafka.apache.org/downloads,下载 Kafka 的最新版本。在 Linux 终端中,我们也可以使用下面的命令来下载它:

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.12-2.8.0.tgz

如果需要的话,我们也可以把下载来的档案文件 kafka_2.12-2.8.0.tgz 移动到另一个目录下。解压这个档案,你会得到一个名为 kafka_2.12-2.8.0 的目录,它就是之后我们要设置的 KAFKA_HOME

打开 KAFKA_HOME/config 目录下的 server.properties 文件,取消注释下面这一行配置:

listeners=PLAINTEXT://:9092

这行配置的作用是让 Kafka 在本机的 9092 端口接收普通文本消息。我们也可以配置 Kafka 通过 安全通道 secure channel 接收消息,在生产环境中,我们也推荐这么做。

无论集群中有多少个代理,Kafka 都需要 ZooKeeper 来管理和协调它们。即使是单代理集群,也是如此。Kafka 在安装时,会附带安装 ZooKeeper,因此,我们可以在 KAFKA_HOME 目录下,在命令行中使用下面的命令来启动它:

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

当 ZooKeeper 运行起来后,我们就可以在另一个终端中启动 Kafka 了,命令如下:

./bin/kafka-server-start.sh ./config/server.properties

到这里,一个单代理的 Kafka 集群就运行起来了。

验证 Kafka

让我们在 topic-1 主题上尝试下发送和接收消息吧!我们可以使用下面的命令,在创建主题时为它指定分区的个数:

./bin/kafka-topics.sh --create --topic topic-1 --zookeeper localhost:2181 --partitions 3 --replication-factor 1

上述命令还同时指定了 复制因子 replication factor ,它的值不能大于集群中代理的数量。我们使用的是单代理集群,因此,复制因子只能设置为 1。

当主题创建完成后,生产者和消费者就可以在上面交换消息了。Kafka 的发行版内附带了生产者和消费者的命令行工具,供测试时用。

打开第三个终端,运行下面的命令,启动生产者:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-1

上述命令显示了一个提示符,我们可以在后面输入简单文本消息。由于我们指定的命令选项,生产者会把 topic-1 上的消息,发送到运行在本机的 9092 端口的 Kafka 中。

打开第四个终端,运行下面的命令,启动消费者:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-1 –-from-beginning

上述命令启动了一个消费者,并指定它连接到本机 9092 端口的 Kafka。它订阅了 topic-1 主题,以读取其中的消息。由于命令行的最后一个选项,这个消费者会从最开头的位置,开始读取该主题的所有消息。

我们注意到,生产者和消费者连接的是同一个代理,访问的是同一个主题,因此,消费者在收到消息后会把消息打印到终端上。

下面,让我们在实际应用场景中,尝试使用 Kafka 吧!

案例

假设有一家叫做 ABC 的公共汽车运输公司,它拥有一支客运车队,往返于全国不同城市之间。由于 ABC 希望实时跟踪每辆客车,以提高其运营质量,因此,它提出了一个基于 Apache Kafka 的解决方案。

首先,ABC 公司为所有公交车都配备了位置追踪设备。然后,它使用 Kafka 建立了一个操作中心,以接收来自数百辆客车的位置更新。它还开发了一个 仪表盘 dashboard ,以显示任一时间点所有客车的当前位置。图 5 展示了上述架构:

图 5:基于 Kafka 的架构

在这种架构下,客车上的设备扮演了消息生产者的角色。它们会周期性地把当前位置发送到 Kafka 的 abc-bus-location 主题上。ABC 公司选择以客车的 行程编号 trip code 作为消息键,以处理来自不同客车的消息。例如,对于从 Bengaluru 到 Hubballi 的客车,它的行程编号就会是 BLRHL003,那么在这段旅程中,对于所有来自该客车的消息,它们的消息键都会是 BLRHL003

仪表盘应用扮演了消息消费者的角色。它在代理上注册了同一个主题 abc-bus-location。如此,这个主题就成为了生产者(客车)和消费者(仪表盘)之间的虚拟通道。

客车上的设备不会期待得到来自仪表盘应用的任何回复。事实上,它们相互之间都不知道对方的存在。得益于这种架构,数百辆客车和操作中心之间实现了非阻塞通信。

实现

假设 ABC 公司想要创建三个分区来维护位置更新。由于我们的开发环境只有一个代理,因此复制因子应设置为 1。

相应地,以下命令创建了符合需求的主题:

./bin/kafka-topics.sh --create --topic abc-bus-location --zookeeper localhost:2181 --partitions 3 --replication-factor 1

生产者和消费者应用可以用多种语言编写,如 Java、Scala、Python 和 JavaScript 等。下面几节中的代码展示了它们在 Java 中的编写方式,好让我们有一个初步了解。

Java 生产者

下面的 Fleet 类模拟了在 ABC 公司的 6 辆客车上运行的 Kafka 生产者应用。它会把位置更新发送到指定代理的 abc-bus-location 主题上。请注意,简单起见,主题名称、消息键、消息内容和代理地址等,都在代码里硬编码的。

public class Fleet {
    public static void main(String[] args) throws Exception {
        String broker = “localhost:9092”;
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        String topic = “abc-bus-location”;
        Map<String, String> locations = new HashMap<>();
        locations.put(“BLRHBL001”, “13.071362, 77.461906”);
        locations.put(“BLRHBL002”, “14.399654, 76.045834”);
        locations.put(“BLRHBL003”, “15.183959, 75.137622”);
        locations.put(“BLRHBL004”, “13.659576, 76.944675”);
        locations.put(“BLRHBL005”, “12.981337, 77.596181”);
        locations.put(“BLRHBL006”, “13.024843, 77.546983”);

        IntStream.range(0, 10).forEach(i -> {
            for (String trip : locations.keySet()) {
                ProducerRecord<String, String> record
                    = new ProducerRecord<String, String>(
                        topic, trip, locations.get(trip));
                producer.send(record);
            }
        });
        producer.flush();
        producer.close();
    }
}
Java 消费者

下面的 Dashboard 类实现了一个 Kafka 消费者应用,运行在 ABC 公司的操作中心。它会监听 abc-bus-location 主题,并且它的消费者组 ID 是 abc-dashboard。当收到消息后,它会立即显示来自客车的详细位置信息。我们本该配置这些详细位置信息,但简单起见,它们也是在代码里硬编码的:

public static void main(String[] args) {
    String broker = “127.0.0.1:9092”;
    String groupId = “abc-dashboard”;
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    @SuppressWarnings(“resource”)
    Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Arrays.asList(“abc-bus-location”));
    while (true) {
        ConsumerRecords<String, String> records
            = consumer.poll(Duration.ofMillis(1000));

        for (ConsumerRecord<String, String> record : records) {
            String topic = record.topic();
            int partition = record.partition();
            String key = record.key();
            String value = record.value();
            System.out.println(String.format(
                “Topic=%s, Partition=%d, Key=%s, Value=%s”,
                topic, partition, key, value));
        }
    }
}
依赖

为了编译和运行这些代码,我们需要 JDK 8 及以上版本。看到下面的 pom.xml 文件中的 Maven 依赖了吗?它们会把所需的 Kafka 客户端库下载并添加到类路径中:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.8.0</version>
</dependency>
<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-simple</artifactId>
  <version>1.7.25</version>
</dependency>

部署

由于 abc-bus-location 主题在创建时指定了 3 个分区,我们自然就会想要运行 3 个消费者,来让读取位置更新的过程更快一些。为此,我们需要同时在 3 个不同的终端中运行仪表盘。因为所有这 3 个仪表盘都注册在同一个组 ID 下,它们自然就构成了一个消费者组。Kafka 会为每个仪表盘都分配一个特定的分区(来消费)。

当所有仪表盘实例都运行起来后,在另一个终端中启动 Fleet 类。图 6、7、8 展示了仪表盘终端中的控制台示例输出。

图 6:仪表盘终端之一

仔细看看控制台消息,我们会发现第一个、第二个和第三个终端中的消费者,正在分别从 partition-2partition-1partition-0 中读取消息。另外,我们还能发现,消息键为 BLRHBL002BLRHBL004BLRHBL006 的消息写入了 partition-2,消息键为 BLRHBL005 的消息写入了 partition-1,剩下的消息写入了 partition-0

图 7:仪表盘终端之二

使用 Kafka 的好处在于,只要集群设计得当,它就可以水平扩展,从而支持大量客车和数百万条消息。

图 8:仪表盘终端之三

不止是消息

根据 Kafka 官网上的数据,在《财富》100 强企业中,超过 80% 都在使用 Kafka。它部署在许多垂直行业,如金融服务、娱乐等。虽然 Kafka 起初只是一种简单的消息服务,但它已凭借行业级的流处理能力,成为了大数据生态系统的一环。对于那些喜欢托管解决方案的企业,Confluent 提供了基于云的 Kafka 服务,只需支付订阅费即可。(LCTT 译注:Confluent 是一个基于 Kafka 的商业公司,它提供的 Confluent Kafka 在 Apache Kafka 的基础上,增加了许多企业级特性,被认为是“更完整的 Kafka”。)


via: https://www.opensourceforu.com/2021/11/apache-kafka-asynchronous-messaging-for-seamless-systems/

作者:Krishna Mohan Koyya 选题:lkxed 译者:lkxed 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出