标签 Kafka 下的文章

Apache Kafka 是最流行的开源消息代理之一。它已经成为了大数据操作的重要组成部分,你能够在几乎所有的微服务环境中找到它。本文对 Apache Kafka 进行了简要介绍,并提供了一个案例来展示它的使用方式。

你有没有想过,电子商务平台是如何在处理巨大的流量时,做到不会卡顿的呢?有没有想过,OTT 平台是如何在同时向数百万用户交付内容时,做到平稳运行的呢?其实,关键就在于它们的分布式架构。

采用分布式架构设计的系统由多个功能组件组成。这些功能组件通常分布在多个机器上,它们通过网络,异步地交换消息,从而实现相互协作。正是由于异步消息的存在,组件之间才能实现可伸缩、无阻塞的通信,整个系统才能够平稳运行。

异步消息

异步消息的常见特性有:

  • 消息的 生产者 producer 消费者 consumer 都不知道彼此的存在。它们在不知道对方的情况下,加入和离开系统。
  • 消息 代理 broker 充当了生产者和消费者之间的中介。
  • 生产者把每条消息,都与一个“ 主题 topic ”相关联。主题是一个简单的字符串。
  • 生产者可以在多个主题上发送消息,不同的生产者也可以在同一主题上发送消息。
  • 消费者向代理订阅一个或多个主题的消息。
  • 生产者只将消息发送给代理,而不发送给消费者。
  • 代理会把消息发送给订阅该主题的所有消费者。
  • 代理将消息传递给针对该主题注册的所有消费者。
  • 生产者并不期望得到消费者的任何回应。换句话说,生产者和消费者不会相互阻塞。

市场上的消息代理有很多,而 Apache Kafka 是其中最受欢迎的之一。

Apache Kafka

Apache Kafka 是一个支持流式处理的、开源的分布式消息系统,它由 Apache 软件基金会开发。在架构上,它是多个代理组成的集群,这些代理间通过 Apache ZooKeeper 服务来协调。在接收、持久化和发送消息时,这些代理分担集群上的负载。

分区

Kafka 将消息写入称为“ 分区 partition ”的桶中。一个特定分区只保存一个主题上的消息。例如,Kafka 会把 heartbeats 主题上的消息写入名为 heartbeats-0 的分区(假设它是个单分区主题),这个过程和生产者无关。

图 1:异步消息

不过,为了利用 Kafka 集群所提供的并行处理能力,管理员通常会为指定主题创建多个分区。举个例子,假设管理员为 heartbeats 主题创建了三个分区,Kafka 会将它们分别命名为 heartbeats-0heartbeats-1heartbeats-2。Kafka 会以某种方式,把消息分配到这三个分区中,并使它们均匀分布。

还有另一种可能的情况,生产者将每条消息与一个 消息键 key 相关联。例如,同样都是在 heartbeats 主题上发送消息,有个组件使用 C1 作为消息键,另一个则使用 C2。在这种情况下,Kafka 会确保,在一个主题中,带有相同消息键的消息,总是会被写入到同一个分区。不过,在一个分区中,消息的消息键却不一定相同。下面的图 2 显示了消息在不同分区中的一种可能分布。

图 2:消息在不同分区中的分布

领导者和同步副本

Kafka 在(由多个代理组成的)集群中维护了多个分区。其中,负责维护分区的那个代理被称为“ 领导者 leader ”。只有领导者能够在它的分区上接收和发送消息。

可是,万一分区的领导者发生故障了,又该怎么办呢?为了确保业务连续性,每个领导者(代理)都会把它的分区复制到其他代理上。此时,这些其他代理就称为该分区的 同步副本 in-sync-replicas (ISR)。一旦分区的领导者发生故障,ZooKeeper 就会发起一次选举,把选中的那个同步副本任命为新的领导者。此后,这个新的领导者将承担该分区的消息接受和发送任务。管理员可以指定分区需要维护的同步副本的大小。

图 3:生产者命令行工具

消息持久化

代理会将每个分区都映射到一个指定的磁盘文件,从而实现持久化。默认情况下,消息会在磁盘上保留一个星期。当消息写入分区后,它们的内容和顺序就不能更改了。管理员可以配置一些策略,如消息的保留时长、压缩算法等。

图 4:消费者命令行工具

消费消息

与大多数其他消息系统不同,Kafka 不会主动将消息发送给消费者。相反,消费者应该监听主题,并主动读取消息。一个消费者可以从某个主题的多个分区中读取消息。多个消费者也可以读取来自同一个分区的消息。Kafka 保证了同一条消息不会被同一个消费者重复读取。

Kafka 中的每个消费者都有一个组 ID。那些组 ID 相同的消费者们共同组成了一个消费者组。通常,为了从 N 个主题分区读取消息,管理员会创建一个包含 N 个消费者的消费者组。这样一来,组内的每个消费者都可以从它的指定分区中读取消息。如果组内的消费者比可用分区还要多,那么多出来的消费者就会处于闲置状态。

在任何情况下,Kafka 都保证:不管组内有多少个消费者,同一条消息只会被该消费者组读取一次。这个架构提供了一致性、高性能、高可扩展性、准实时交付和消息持久性,以及零消息丢失。

安装、运行 Kafka

尽管在理论上,Kafka 集群可以由任意数量的代理组成,但在生产环境中,大多数集群通常由三个或五个代理组成。

在这里,我们将搭建一个单代理集群,对于生产环境来说,它已经够用了。

在浏览器中访问 https://kafka.apache.org/downloads,下载 Kafka 的最新版本。在 Linux 终端中,我们也可以使用下面的命令来下载它:

wget https://www.apache.org/dyn/closer.cgi?path=/kafka/2.8.0/kafka_2.12-2.8.0.tgz

如果需要的话,我们也可以把下载来的档案文件 kafka_2.12-2.8.0.tgz 移动到另一个目录下。解压这个档案,你会得到一个名为 kafka_2.12-2.8.0 的目录,它就是之后我们要设置的 KAFKA_HOME

打开 KAFKA_HOME/config 目录下的 server.properties 文件,取消注释下面这一行配置:

listeners=PLAINTEXT://:9092

这行配置的作用是让 Kafka 在本机的 9092 端口接收普通文本消息。我们也可以配置 Kafka 通过 安全通道 secure channel 接收消息,在生产环境中,我们也推荐这么做。

无论集群中有多少个代理,Kafka 都需要 ZooKeeper 来管理和协调它们。即使是单代理集群,也是如此。Kafka 在安装时,会附带安装 ZooKeeper,因此,我们可以在 KAFKA_HOME 目录下,在命令行中使用下面的命令来启动它:

./bin/zookeeper-server-start.sh ./config/zookeeper.properties

当 ZooKeeper 运行起来后,我们就可以在另一个终端中启动 Kafka 了,命令如下:

./bin/kafka-server-start.sh ./config/server.properties

到这里,一个单代理的 Kafka 集群就运行起来了。

验证 Kafka

让我们在 topic-1 主题上尝试下发送和接收消息吧!我们可以使用下面的命令,在创建主题时为它指定分区的个数:

./bin/kafka-topics.sh --create --topic topic-1 --zookeeper localhost:2181 --partitions 3 --replication-factor 1

上述命令还同时指定了 复制因子 replication factor ,它的值不能大于集群中代理的数量。我们使用的是单代理集群,因此,复制因子只能设置为 1。

当主题创建完成后,生产者和消费者就可以在上面交换消息了。Kafka 的发行版内附带了生产者和消费者的命令行工具,供测试时用。

打开第三个终端,运行下面的命令,启动生产者:

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-1

上述命令显示了一个提示符,我们可以在后面输入简单文本消息。由于我们指定的命令选项,生产者会把 topic-1 上的消息,发送到运行在本机的 9092 端口的 Kafka 中。

打开第四个终端,运行下面的命令,启动消费者:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic topic-1 –-from-beginning

上述命令启动了一个消费者,并指定它连接到本机 9092 端口的 Kafka。它订阅了 topic-1 主题,以读取其中的消息。由于命令行的最后一个选项,这个消费者会从最开头的位置,开始读取该主题的所有消息。

我们注意到,生产者和消费者连接的是同一个代理,访问的是同一个主题,因此,消费者在收到消息后会把消息打印到终端上。

下面,让我们在实际应用场景中,尝试使用 Kafka 吧!

案例

假设有一家叫做 ABC 的公共汽车运输公司,它拥有一支客运车队,往返于全国不同城市之间。由于 ABC 希望实时跟踪每辆客车,以提高其运营质量,因此,它提出了一个基于 Apache Kafka 的解决方案。

首先,ABC 公司为所有公交车都配备了位置追踪设备。然后,它使用 Kafka 建立了一个操作中心,以接收来自数百辆客车的位置更新。它还开发了一个 仪表盘 dashboard ,以显示任一时间点所有客车的当前位置。图 5 展示了上述架构:

图 5:基于 Kafka 的架构

在这种架构下,客车上的设备扮演了消息生产者的角色。它们会周期性地把当前位置发送到 Kafka 的 abc-bus-location 主题上。ABC 公司选择以客车的 行程编号 trip code 作为消息键,以处理来自不同客车的消息。例如,对于从 Bengaluru 到 Hubballi 的客车,它的行程编号就会是 BLRHL003,那么在这段旅程中,对于所有来自该客车的消息,它们的消息键都会是 BLRHL003

仪表盘应用扮演了消息消费者的角色。它在代理上注册了同一个主题 abc-bus-location。如此,这个主题就成为了生产者(客车)和消费者(仪表盘)之间的虚拟通道。

客车上的设备不会期待得到来自仪表盘应用的任何回复。事实上,它们相互之间都不知道对方的存在。得益于这种架构,数百辆客车和操作中心之间实现了非阻塞通信。

实现

假设 ABC 公司想要创建三个分区来维护位置更新。由于我们的开发环境只有一个代理,因此复制因子应设置为 1。

相应地,以下命令创建了符合需求的主题:

./bin/kafka-topics.sh --create --topic abc-bus-location --zookeeper localhost:2181 --partitions 3 --replication-factor 1

生产者和消费者应用可以用多种语言编写,如 Java、Scala、Python 和 JavaScript 等。下面几节中的代码展示了它们在 Java 中的编写方式,好让我们有一个初步了解。

Java 生产者

下面的 Fleet 类模拟了在 ABC 公司的 6 辆客车上运行的 Kafka 生产者应用。它会把位置更新发送到指定代理的 abc-bus-location 主题上。请注意,简单起见,主题名称、消息键、消息内容和代理地址等,都在代码里硬编码的。

public class Fleet {
    public static void main(String[] args) throws Exception {
        String broker = “localhost:9092”;
        Properties props = new Properties();
        props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
        props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());
        props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
            StringSerializer.class.getName());

        Producer<String, String> producer = new KafkaProducer<String, String>(props);
        String topic = “abc-bus-location”;
        Map<String, String> locations = new HashMap<>();
        locations.put(“BLRHBL001”, “13.071362, 77.461906”);
        locations.put(“BLRHBL002”, “14.399654, 76.045834”);
        locations.put(“BLRHBL003”, “15.183959, 75.137622”);
        locations.put(“BLRHBL004”, “13.659576, 76.944675”);
        locations.put(“BLRHBL005”, “12.981337, 77.596181”);
        locations.put(“BLRHBL006”, “13.024843, 77.546983”);

        IntStream.range(0, 10).forEach(i -> {
            for (String trip : locations.keySet()) {
                ProducerRecord<String, String> record
                    = new ProducerRecord<String, String>(
                        topic, trip, locations.get(trip));
                producer.send(record);
            }
        });
        producer.flush();
        producer.close();
    }
}
Java 消费者

下面的 Dashboard 类实现了一个 Kafka 消费者应用,运行在 ABC 公司的操作中心。它会监听 abc-bus-location 主题,并且它的消费者组 ID 是 abc-dashboard。当收到消息后,它会立即显示来自客车的详细位置信息。我们本该配置这些详细位置信息,但简单起见,它们也是在代码里硬编码的:

public static void main(String[] args) {
    String broker = “127.0.0.1:9092”;
    String groupId = “abc-dashboard”;
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, broker);
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
        StringDeserializer.class.getName());
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);

    @SuppressWarnings(“resource”)
    Consumer<String, String> consumer = new KafkaConsumer<String, String>(props);
    consumer.subscribe(Arrays.asList(“abc-bus-location”));
    while (true) {
        ConsumerRecords<String, String> records
            = consumer.poll(Duration.ofMillis(1000));

        for (ConsumerRecord<String, String> record : records) {
            String topic = record.topic();
            int partition = record.partition();
            String key = record.key();
            String value = record.value();
            System.out.println(String.format(
                “Topic=%s, Partition=%d, Key=%s, Value=%s”,
                topic, partition, key, value));
        }
    }
}
依赖

为了编译和运行这些代码,我们需要 JDK 8 及以上版本。看到下面的 pom.xml 文件中的 Maven 依赖了吗?它们会把所需的 Kafka 客户端库下载并添加到类路径中:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-clients</artifactId>
  <version>2.8.0</version>
</dependency>
<dependency>
  <groupId>org.slf4j</groupId>
  <artifactId>slf4j-simple</artifactId>
  <version>1.7.25</version>
</dependency>

部署

由于 abc-bus-location 主题在创建时指定了 3 个分区,我们自然就会想要运行 3 个消费者,来让读取位置更新的过程更快一些。为此,我们需要同时在 3 个不同的终端中运行仪表盘。因为所有这 3 个仪表盘都注册在同一个组 ID 下,它们自然就构成了一个消费者组。Kafka 会为每个仪表盘都分配一个特定的分区(来消费)。

当所有仪表盘实例都运行起来后,在另一个终端中启动 Fleet 类。图 6、7、8 展示了仪表盘终端中的控制台示例输出。

图 6:仪表盘终端之一

仔细看看控制台消息,我们会发现第一个、第二个和第三个终端中的消费者,正在分别从 partition-2partition-1partition-0 中读取消息。另外,我们还能发现,消息键为 BLRHBL002BLRHBL004BLRHBL006 的消息写入了 partition-2,消息键为 BLRHBL005 的消息写入了 partition-1,剩下的消息写入了 partition-0

图 7:仪表盘终端之二

使用 Kafka 的好处在于,只要集群设计得当,它就可以水平扩展,从而支持大量客车和数百万条消息。

图 8:仪表盘终端之三

不止是消息

根据 Kafka 官网上的数据,在《财富》100 强企业中,超过 80% 都在使用 Kafka。它部署在许多垂直行业,如金融服务、娱乐等。虽然 Kafka 起初只是一种简单的消息服务,但它已凭借行业级的流处理能力,成为了大数据生态系统的一环。对于那些喜欢托管解决方案的企业,Confluent 提供了基于云的 Kafka 服务,只需支付订阅费即可。(LCTT 译注:Confluent 是一个基于 Kafka 的商业公司,它提供的 Confluent Kafka 在 Apache Kafka 的基础上,增加了许多企业级特性,被认为是“更完整的 Kafka”。)


via: https://www.opensourceforu.com/2021/11/apache-kafka-asynchronous-messaging-for-seamless-systems/

作者:Krishna Mohan Koyya 选题:lkxed 译者:lkxed 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

SigNoz 帮助开发者使用最小的精力快速实现他们的可观测性目标。

 title=

SigNoz 是一个开源的应用可观察性平台。SigNoz 是用 React 和 Go 编写的,它从头到尾都是为了让开发者能够以最小的精力尽快实现他们的可观察性目标。

本文将详细介绍该软件,包括架构、基于 Kubernetes 的部署以及一些常见的 SigNoz 用途。

SigNoz 架构

SigNoz 将几个组件捆绑在一起,创建了一个可扩展的、耦合松散的系统,很容易上手使用。其中一些最重要的组件有:

  • OpenTelemetry Collector
  • Apache Kafka
  • Apache Druid

OpenTelemetry Collector 是跟踪或度量数据收集引擎。这使得 SigNoz 能够以行业标准格式获取数据,包括 Jaeger、Zipkin 和 OpenConsensus。之后,收集的数据被转发到 Apache Kafka。

SigNoz 使用 Kafka 和流处理器来实时获取大量的可观测数据。然后,这些数据被传递到 Apache Druid,它擅长于存储这些数据,用于短期和长期的 SQL 分析。

当数据被扁平化并存储在 Druid 中,SigNoz 的查询服务可以查询并将数据传递给 SigNoz React 前端。然后,前端为用户创建漂亮的图表,使可观察性数据可视化。

 title=

安装 SigNoz

SigNoz 的组件包括 Apache Kafka 和 Druid。这些组件是松散耦合的,并协同工作,以确保终端用户的无缝体验。鉴于这些组件,最好将 SigNoz 作为 Kubernetes 或 Docker Compose(用于本地测试)上的微服务组合来运行。

这个例子使用基于 Kubernetes Helm Chart 的部署在 Kubernetes 上安装 SigNoz。作为先决条件,你需要一个 Kubernetes 集群。如果你没有可用的 Kubernetes 集群,你可以使用 MiniKubeKind 等工具,在你的本地机器上创建一个测试集群。注意,这台机器至少要有 4GB 的可用空间才能工作。

当你有了可用的集群,并配置了 kubectl 来与集群通信,运行:

$ git clone https://github.com/SigNoz/signoz.git && cd signoz
$ helm dependency update deploy/kubernetes/platform
$ kubectl create ns platform
$ helm -n platform install signoz deploy/kubernetes/platform
$ kubectl -n platform apply -Rf deploy/kubernetes/jobs
$ kubectl -n platform apply -f deploy/kubernetes/otel-collector

这将在集群上安装 SigNoz 和相关容器。要访问用户界面 (UI),运行 kubectl port-forward 命令。例如:

$ kubectl -n platform port-forward svc/signoz-frontend 3000:3000

现在你应该能够使用本地浏览器访问你的 SigNoz 仪表板,地址为 http://localhost:3000

现在你的可观察性平台已经建立起来了,你需要一个能产生可观察性数据的应用来进行可视化和追踪。对于这个例子,你可以使用 HotROD,一个由 Jaegar 团队开发的示例应用。

要安装它,请运行:

$ kubectl create ns sample-application
$ kubectl -n sample-application apply -Rf sample-apps/hotrod/

探索功能

现在你应该有一个已经安装合适仪表的应用,并可在演示设置中运行。看看 SigNoz 仪表盘上的指标和跟踪数据。当你登录到仪表盘的主页时,你会看到一个所有已配置的应用列表,这些应用正在向 SigNoz 发送仪表数据。

 title=

指标

当你点击一个特定的应用时,你会登录到该应用的主页上。指标页面显示最近 15 分钟的信息(这个数字是可配置的),如应用的延迟、平均吞吐量、错误率和应用目前访问最高的接口。这让你对应用的状态有一个大概了解。任何错误、延迟或负载的峰值都可以立即看到。

 title=

追踪

追踪页面按时间顺序列出了每个请求的高层细节。当你发现一个感兴趣的请求(例如,比预期时间长的东西),你可以点击追踪,查看该请求中发生的每个行为的单独时间跨度。下探模式提供了对每个请求的彻底检查。

 title=

 title=

用量资源管理器

大多数指标和跟踪数据都非常有用,但只在一定时期内有用。随着时间的推移,数据在大多数情况下不再有用。这意味着为数据计划一个适当的保留时间是很重要的。否则,你将为存储支付更多的费用。用量资源管理器提供了每小时、每一天和每一周获取数据的概况。

 title=

添加仪表

到目前为止,你一直在看 HotROD 应用的指标和追踪。理想情况下,你会希望对你的应用进行检测,以便它向 SigNoz 发送可观察数据。参考 SigNoz 网站上的仪表概览

SigNoz 支持一个与供应商无关的仪表库,OpenTelemetry,作为配置仪表的主要方式。OpenTelemetry 提供了各种语言的仪表库,支持自动和手动仪表。

了解更多

SigNoz 帮助开发者快速开始度量和跟踪应用。要了解更多,你可以查阅 文档,加入社区,并访问 GitHub 上的源代码。


via: https://opensource.com/article/21/4/observability-apache-kafka-signoz

作者:Nitish Tiwari 选题:lujun9972 译者:geekpi 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

在我前面的博客文章 “我的第一个 Go 微服务:使用 MongoDB 和 Docker 多阶段构建” 中,我创建了一个 Go 微服务示例,它发布一个 REST 式的 http 端点,并将从 HTTP POST 中接收到的数据保存到 MongoDB 数据库。

在这个示例中,我将数据的保存和 MongoDB 分离,并创建另一个微服务去处理它。我还添加了 Kafka 为消息层服务,这样微服务就可以异步处理它自己关心的东西了。

如果你有时间去看,我将这个博客文章的整个过程录制到 这个视频中了 :)

下面是这个使用了两个微服务的简单的异步处理示例的上层架构图。

rest-kafka-mongo-microservice-draw-io

微服务 1 —— 是一个 REST 式微服务,它从一个 /POST http 调用中接收数据。接收到请求之后,它从 http 请求中检索数据,并将它保存到 Kafka。保存之后,它通过 /POST 发送相同的数据去响应调用者。

微服务 2 —— 是一个订阅了 Kafka 中的一个主题的微服务,微服务 1 的数据保存在该主题。一旦消息被微服务消费之后,它接着保存数据到 MongoDB 中。

在你继续之前,我们需要能够去运行这些微服务的几件东西:

  1. 下载 Kafka —— 我使用的版本是 kafka\_2.11-1.1.0
  2. 安装 librdkafka —— 不幸的是,这个库应该在目标系统中
  3. 安装 Kafka Go 客户端
  4. 运行 MongoDB。你可以去看我的 以前的文章 中关于这一块的内容,那篇文章中我使用了一个 MongoDB docker 镜像。

我们开始吧!

首先,启动 Kafka,在你运行 Kafka 服务器之前,你需要运行 Zookeeper。下面是示例:

$ cd /<download path>/kafka_2.11-1.1.0
$ bin/zookeeper-server-start.sh config/zookeeper.properties

接着运行 Kafka —— 我使用 9092 端口连接到 Kafka。如果你需要改变端口,只需要在 config/server.properties 中配置即可。如果你像我一样是个新手,我建议你现在还是使用默认端口。

$ bin/kafka-server-start.sh config/server.properties

Kafka 跑起来之后,我们需要 MongoDB。它很简单,只需要使用这个 docker-compose.yml 即可。

version: '3'
services:
  mongodb:
    image: mongo
    ports:
      - "27017:27017"
    volumes:
      - "mongodata:/data/db"
    networks:
      - network1

volumes:
   mongodata:

networks:
   network1:

使用 Docker Compose 去运行 MongoDB docker 容器。

docker-compose up

这里是微服务 1 的相关代码。我只是修改了我前面的示例去保存到 Kafka 而不是 MongoDB:

rest-to-kafka/rest-kafka-sample.go

func jobsPostHandler(w http.ResponseWriter, r *http.Request) {

    //Retrieve body from http request
    b, err := ioutil.ReadAll(r.Body)
    defer r.Body.Close()
    if err != nil {
        panic(err)
    }

    //Save data into Job struct
    var _job Job
    err = json.Unmarshal(b, &_job)
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    saveJobToKafka(_job)

    //Convert job struct into json
    jsonString, err := json.Marshal(_job)
    if err != nil {
        http.Error(w, err.Error(), 500)
        return
    }

    //Set content-type http header
    w.Header().Set("content-type", "application/json")

    //Send back data as response
    w.Write(jsonString)

}

func saveJobToKafka(job Job) {

    fmt.Println("save to kafka")

    jsonString, err := json.Marshal(job)

    jobString := string(jsonString)
    fmt.Print(jobString)

    p, err := kafka.NewProducer(&kafka.ConfigMap{"bootstrap.servers": "localhost:9092"})
    if err != nil {
        panic(err)
    }

    // Produce messages to topic (asynchronously)
    topic := "jobs-topic1"
    for _, word := range []string{string(jobString)} {
        p.Produce(&kafka.Message{
            TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
            Value:          []byte(word),
        }, nil)
    }
}

这里是微服务 2 的代码。在这个代码中最重要的东西是从 Kafka 中消费数据,保存部分我已经在前面的博客文章中讨论过了。这里代码的重点部分是从 Kafka 中消费数据:

kafka-to-mongo/kafka-mongo-sample.go

func main() {

    //Create MongoDB session
    session := initialiseMongo()
    mongoStore.session = session

    receiveFromKafka()

}

func receiveFromKafka() {

    fmt.Println("Start receiving from Kafka")
    c, err := kafka.NewConsumer(&kafka.ConfigMap{
        "bootstrap.servers": "localhost:9092",
        "group.id":          "group-id-1",
        "auto.offset.reset": "earliest",
    })

    if err != nil {
        panic(err)
    }

    c.SubscribeTopics([]string{"jobs-topic1"}, nil)

    for {
        msg, err := c.ReadMessage(-1)

        if err == nil {
            fmt.Printf("Received from Kafka %s: %s\n", msg.TopicPartition, string(msg.Value))
            job := string(msg.Value)
            saveJobToMongo(job)
        } else {
            fmt.Printf("Consumer error: %v (%v)\n", err, msg)
            break
        }
    }

    c.Close()

}

func saveJobToMongo(jobString string) {

    fmt.Println("Save to MongoDB")
    col := mongoStore.session.DB(database).C(collection)

    //Save data into Job struct
    var _job Job
    b := []byte(jobString)
    err := json.Unmarshal(b, &_job)
    if err != nil {
        panic(err)
    }

    //Insert job into MongoDB
    errMongo := col.Insert(_job)
    if errMongo != nil {
        panic(errMongo)
    }

    fmt.Printf("Saved to MongoDB : %s", jobString)

}

我们来演示一下,运行微服务 1。确保 Kafka 已经运行了。

$ go run rest-kafka-sample.go

我使用 Postman 向微服务 1 发送数据。

Screenshot-2018-04-29-22.20.33

这里是日志,你可以在微服务 1 中看到。当你看到这些的时候,说明已经接收到了来自 Postman 发送的数据,并且已经保存到了 Kafka。

Screenshot-2018-04-29-22.22.00

因为我们尚未运行微服务 2,数据被微服务 1 只保存在了 Kafka。我们来消费它并通过运行的微服务 2 来将它保存到 MongoDB。

$ go run kafka-mongo-sample.go

现在,你将在微服务 2 上看到消费的数据,并将它保存到了 MongoDB。

Screenshot-2018-04-29-22.24.15

检查一下数据是否保存到了 MongoDB。如果有数据,我们成功了!

Screenshot-2018-04-29-22.26.39

完整的源代码可以在这里找到:

https://github.com/donvito/learngo/tree/master/rest-kafka-mongo-microservice

现在是广告时间:如果你喜欢这篇文章,请在 Twitter @donvito 上关注我。我的 Twitter 上有关于 Docker、Kubernetes、GoLang、Cloud、DevOps、Agile 和 Startups 的内容。欢迎你们在 GitHubLinkedIn 关注我。

开心地玩吧!


via: https://www.melvinvivas.com/developing-microservices-using-kafka-and-mongodb/

作者:Melvin Vivas 译者:qhwdw 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

介绍

KSQL 是 Apache Kafka 中的开源的流式 SQL 引擎。它可以让你在 Kafka 主题 topic 上,使用一个简单的并且是交互式的 SQL 接口,很容易地做一些复杂的流处理。在这个短文中,我们将看到如何轻松地配置并运行在一个沙箱中去探索它,并使用大家都喜欢的演示数据库源: Twitter。我们将从推文的原始流中获取,通过使用 KSQL 中的条件去过滤它,来构建一个聚合,如统计每个用户每小时的推文数量。

Confluent

首先, 获取一个 Confluent 平台的副本。我使用的是 RPM 包,但是,如果你需要的话,你也可以使用 tar、 zip 等等 。启动 Confluent 系统:

$ confluent start

(如果你感兴趣,这里有一个 Confluent 命令行的快速教程

我们将使用 Kafka Connect 从 Twitter 上拉取数据。 这个 Twitter 连接器可以在 GitHub 上找到。要安装它,像下面这样操作:

# Clone the git repo
cd /home/rmoff
git clone https://github.com/jcustenborder/kafka-connect-twitter.git
# Compile the code
cd kafka-connect-twitter
mvn clean package

要让 Kafka Connect 去使用我们构建的连接器, 你要去修改配置文件。因为我们使用 Confluent 命令行,真实的配置文件是在 etc/schema-registry/connect-avro-distributed.properties,因此去修改它并增加如下内容:

plugin.path=/home/rmoff/kafka-connect-twitter/target/kafka-connect-twitter-0.2-SNAPSHOT.tar.gz

重启动 Kafka Connect:

confluent stop connect
confluent start connect

一旦你安装好插件,你可以很容易地去配置它。你可以直接使用 Kafka Connect 的 REST API ,或者创建你的配置文件,这就是我要在这里做的。如果你需要全部的方法,请首先访问 Twitter 来获取你的 API 密钥

{
 "name": "twitter_source_json_01",
 "config": {
   "connector.class": "com.github.jcustenborder.kafka.connect.twitter.TwitterSourceConnector",
   "twitter.oauth.accessToken": "xxxx",
   "twitter.oauth.consumerSecret": "xxxxx",
   "twitter.oauth.consumerKey": "xxxx",
   "twitter.oauth.accessTokenSecret": "xxxxx",
   "kafka.delete.topic": "twitter_deletes_json_01",
   "value.converter": "org.apache.kafka.connect.json.JsonConverter",
   "key.converter": "org.apache.kafka.connect.json.JsonConverter",
   "value.converter.schemas.enable": false,
   "key.converter.schemas.enable": false,
   "kafka.status.topic": "twitter_json_01",
   "process.deletes": true,
   "filter.keywords": "rickastley,kafka,ksql,rmoff"
 }
}

假设你写这些到 /home/rmoff/twitter-source.json,你可以现在运行:

$ confluent load twitter_source -d /home/rmoff/twitter-source.json

然后推文就从大家都喜欢的网络明星 [rick] 滚滚而来……

$ kafka-console-consumer --bootstrap-server localhost:9092 --from-beginning --topic twitter_json_01|jq '.Text'
{
  "string": "RT @rickastley: 30 years ago today I said I was Never Gonna Give You Up. I am a man of my word - Rick x https://t.co/VmbMQA6tQB"
}
{
  "string": "RT @mariteg10: @rickastley @Carfestevent Wonderful Rick!!\nDo not forget Chile!!\nWe hope you get back someday!!\nHappy weekend for you!!\n❤…"
}

KSQL

现在我们从 KSQL 开始 ! 马上去下载并构建它:

cd /home/rmoff
git clone https://github.com/confluentinc/ksql.git
cd /home/rmoff/ksql
mvn clean compile install -DskipTests

构建完成后,让我们来运行它:

./bin/ksql-cli local --bootstrap-server localhost:9092
                       ======================================
                       =      _  __ _____  ____  _          =
                       =     | |/ // ____|/ __ \| |         =
                       =     | ' /| (___ | |  | | |         =
                       =     |  <  \___ \| |  | | |         =
                       =     | . \ ____) | |__| | |____     =
                       =     |_|\_\_____/ \___\_\______|    =
                       =                                    =
                       =   Streaming SQL Engine for Kafka   =
Copyright 2017 Confluent Inc.

CLI v0.1, Server v0.1 located at http://localhost:9098

Having trouble? Type 'help' (case-insensitive) for a rundown of how things work!

ksql> 

使用 KSQL, 我们可以让我们的数据保留在 Kafka 主题上并可以查询它。首先,我们需要去告诉 KSQL 主题上的 数据模式 schema 是什么,一个 twitter 消息实际上是一个非常巨大的 JSON 对象, 但是,为了简洁,我们只选出其中几行:

ksql> CREATE STREAM twitter_raw (CreatedAt BIGINT, Id BIGINT, Text VARCHAR) WITH (KAFKA_TOPIC='twitter_json_01', VALUE_FORMAT='JSON');

Message  
----------------
Stream created

在定义的模式中,我们可以查询这些流。要让 KSQL 从该主题的开始展示数据(而不是默认的当前时间点),运行如下命令:

ksql> SET 'auto.offset.reset' = 'earliest';  
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'

现在,让我们看看这些数据,我们将使用 LIMIT 从句仅检索一行:

ksql> SELECT text FROM twitter_raw LIMIT 1;  
RT @rickastley: 30 years ago today I said I was Never Gonna Give You Up. I am a man of my word - Rick x https://t.co/VmbMQA6tQB
LIMIT reached for the partition.  
Query terminated
ksql>

现在,让我们使用刚刚定义和可用的推文内容的全部数据重新定义该流:

ksql> DROP stream twitter_raw;
Message
--------------------------------
Source TWITTER_RAW was dropped

ksql> CREATE STREAM twitter_raw (CreatedAt bigint,Id bigint, Text VARCHAR, SOURCE VARCHAR, Truncated VARCHAR, InReplyToStatusId VARCHAR, InReplyToUserId VARCHAR, InReplyToScreenName VARCHAR, GeoLocation VARCHAR, Place VARCHAR, Favorited VARCHAR, Retweeted VARCHAR, FavoriteCount VARCHAR, User VARCHAR, Retweet VARCHAR, Contributors VARCHAR, RetweetCount VARCHAR, RetweetedByMe VARCHAR, CurrentUserRetweetId VARCHAR, PossiblySensitive VARCHAR, Lang VARCHAR, WithheldInCountries VARCHAR, HashtagEntities VARCHAR, UserMentionEntities VARCHAR, MediaEntities VARCHAR, SymbolEntities VARCHAR, URLEntities VARCHAR) WITH (KAFKA_TOPIC='twitter_json_01',VALUE_FORMAT='JSON');
Message
----------------
Stream created

ksql>

现在,我们可以操作和检查更多的最近的数据,使用一般的 SQL 查询:

ksql> SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
EXTRACTJSONFIELD(user,'$.ScreenName') as ScreenName,Text \
FROM twitter_raw \
WHERE LCASE(hashtagentities) LIKE '%oow%' OR \
LCASE(hashtagentities) LIKE '%ksql%';  

2017-09-29 13:59:58.000 | rmoff | Looking forward to talking all about @apachekafka & @confluentinc’s #KSQL at #OOW17 on Sunday 13:45 https://t.co/XbM4eIuzeG

注意这里没有 LIMIT 从句,因此,你将在屏幕上看到 “continuous query” 的结果。不像关系型数据表中返回一个确定数量结果的查询,一个持续查询会运行在无限的流式数据上, 因此,它总是可能返回更多的记录。点击 Ctrl-C 去中断然后返回到 KSQL 提示符。在以上的查询中我们做了一些事情:

  • TIMESTAMPTOSTRING 将时间戳从 epoch 格式转换到人类可读格式。(LCTT 译注: epoch 指的是一个特定的时间 1970-01-01 00:00:00 UTC)
  • EXTRACTJSONFIELD 来展示数据源中嵌套的用户域中的一个字段,它看起来像:
{
"CreatedAt": 1506570308000,
"Text": "RT @gwenshap: This is the best thing since partitioned bread :) https://t.co/1wbv3KwRM6",
[...]
"User": {
    "Id": 82564066,
    "Name": "Robin Moffatt \uD83C\uDF7B\uD83C\uDFC3\uD83E\uDD53",
    "ScreenName": "rmoff",
    [...]
  • 应用断言去展示内容,对 #(hashtag)使用模式匹配, 使用 LCASE 去强制小写字母。(LCTT 译注:hashtag 是twitter 中用来标注线索主题的标签)

关于支持的函数列表,请查看 KSQL 文档

我们可以创建一个从这个数据中得到的流:

ksql> CREATE STREAM twitter AS \
SELECT TIMESTAMPTOSTRING(CreatedAt, 'yyyy-MM-dd HH:mm:ss.SSS') AS CreatedAt,\
EXTRACTJSONFIELD(user,'$.Name') AS user_Name,\
EXTRACTJSONFIELD(user,'$.ScreenName') AS user_ScreenName,\
EXTRACTJSONFIELD(user,'$.Location') AS user_Location,\
EXTRACTJSONFIELD(user,'$.Description') AS  user_Description,\
Text,hashtagentities,lang \
FROM twitter_raw ;

Message  
----------------------------  
Stream created and running  

ksql> DESCRIBE twitter;
Field            | Type  
------------------------------------  
ROWTIME          | BIGINT  
ROWKEY           | VARCHAR(STRING)  
CREATEDAT        | VARCHAR(STRING)  
USER_NAME        | VARCHAR(STRING)  
USER_SCREENNAME  | VARCHAR(STRING)  
USER_LOCATION    | VARCHAR(STRING)  
USER_DESCRIPTION | VARCHAR(STRING)  
TEXT             | VARCHAR(STRING)  
HASHTAGENTITIES  | VARCHAR(STRING)  
LANG             | VARCHAR(STRING)  
ksql>

并且查询这个得到的流:

ksql> SELECT CREATEDAT, USER_NAME, TEXT \
FROM TWITTER \
WHERE TEXT LIKE '%KSQL%';  

2017-10-03 23:39:37.000 | Nicola Ferraro | RT @flashdba: Again, I'm really taken with the possibilities opened up by @confluentinc's KSQL engine #Kafka https://t.co/aljnScgvvs

聚合

在我们结束之前,让我们去看一下怎么去做一些聚合。

ksql> SELECT user_screenname, COUNT(*) \
FROM twitter WINDOW TUMBLING (SIZE 1 HOUR) \
GROUP BY user_screenname HAVING COUNT(*) > 1;  

oracleace | 2  
rojulman | 2
smokeinpublic | 2  
ArtFlowMe | 2  
[...]

你将可能得到满屏幕的结果;这是因为 KSQL 在每次给定的时间窗口更新时实际发出聚合值。因为我们设置 KSQL 去读取在主题上的全部消息(SET 'auto.offset.reset' = 'earliest';),它是一次性读取这些所有的消息并计算聚合更新。这里有一个微妙之处值得去深入研究。我们的入站推文流正好就是一个流。但是,现有它不能创建聚合,我们实际上是创建了一个表。一个表是在给定时间点的给定键的值的一个快照。 KSQL 聚合数据基于消息的事件时间,并且如果它更新了,通过简单的相关窗口重申去操作后面到达的数据。困惑了吗? 我希望没有,但是,让我们看一下,如果我们可以用这个例子去说明。 我们将申明我们的聚合作为一个真实的表:

ksql> CREATE TABLE user_tweet_count AS \
SELECT user_screenname, count(*) AS  tweet_count \
FROM twitter WINDOW TUMBLING (SIZE 1 HOUR) \
GROUP BY user_screenname ;

Message  
---------------------------  
Table created and running

看表中的列,这里除了我们要求的外,还有两个隐含列:

ksql> DESCRIBE user_tweet_count;

Field           | Type  
-----------------------------------  
ROWTIME         | BIGINT  
ROWKEY          | VARCHAR(STRING)  
USER_SCREENNAME | VARCHAR(STRING)  
TWEET_COUNT     | BIGINT  
ksql>

我们看一下这些是什么:

ksql> SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') , \
ROWKEY, USER_SCREENNAME, TWEET_COUNT \
FROM user_tweet_count \
WHERE USER_SCREENNAME= 'rmoff';  

2017-09-29 11:00:00.000 | rmoff : Window{start=1506708000000 end=-} | rmoff | 2  
2017-09-29 12:00:00.000 | rmoff : Window{start=1506711600000 end=-} | rmoff | 4  
2017-09-28 22:00:00.000 | rmoff : Window{start=1506661200000 end=-} | rmoff | 2  
2017-09-29 09:00:00.000 | rmoff : Window{start=1506700800000 end=-} | rmoff | 4  
2017-09-29 15:00:00.000 | rmoff : Window{start=1506722400000 end=-} | rmoff | 2  
2017-09-29 13:00:00.000 | rmoff : Window{start=1506715200000 end=-} | rmoff | 6

ROWTIME 是窗口开始时间, ROWKEYGROUP BYUSER_SCREENNAME)加上窗口的组合。因此,我们可以通过创建另外一个衍生的表来整理一下:

ksql> CREATE TABLE USER_TWEET_COUNT_DISPLAY AS \
SELECT TIMESTAMPTOSTRING(ROWTIME, 'yyyy-MM-dd HH:mm:ss.SSS') AS WINDOW_START ,\
USER_SCREENNAME, TWEET_COUNT \
FROM user_tweet_count;

Message  
---------------------------  
Table created and running

现在它更易于查询和查看我们感兴趣的数据:

ksql> SELECT WINDOW_START ,  USER_SCREENNAME, TWEET_COUNT \
FROM USER_TWEET_COUNT_DISPLAY WHERE TWEET_COUNT> 20;  

2017-09-29 12:00:00.000 | VikasAatOracle | 22  
2017-09-28 14:00:00.000 | Throne_ie | 50  
2017-09-28 14:00:00.000 | pikipiki_net | 22  
2017-09-29 09:00:00.000 | johanlouwers | 22  
2017-09-28 09:00:00.000 | yvrk1973 | 24  
2017-09-28 13:00:00.000 | cmosoares | 22  
2017-09-29 11:00:00.000 | ypoirier | 24  
2017-09-28 14:00:00.000 | pikisec | 22  
2017-09-29 07:00:00.000 | Throne_ie | 22  
2017-09-29 09:00:00.000 | ChrisVoyance | 24  
2017-09-28 11:00:00.000 | ChrisVoyance | 28

结论

所以我们有了它! 我们可以从 Kafka 中取得数据, 并且很容易使用 KSQL 去探索它。 而不仅是去浏览和转换数据,我们可以很容易地使用 KSQL 从流和表中建立流处理。

如果你对 KSQL 能够做什么感兴趣,去查看:

记住,KSQL 现在正处于开发者预览阶段。 欢迎在 KSQL 的 GitHub 仓库上提出任何问题, 或者去我们的 community Slack group 的 #KSQL 频道。


via: https://www.confluent.io/blog/using-ksql-to-analyse-query-and-transform-data-in-kafka

作者:Robin Moffatt 译者:qhwdw 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出