标签 stream 下的文章

这个例子让我们看到了在 Foreman 中管理和配置 CentOS Stream 内容的许多选项。

 title=

2021 年 12 月,CentOS 8 将达到生命终点,被 CentOS Stream 取代。CentOS Stream 和 CentOS 之前的迭代之间的主要变化之一是没有小版本。Centos Stream 采用了一个连续的发布周期。从今年年初开始,Foreman 社区的开发者开始看到 CentOS Stream 由于持续发布而提供的更早的错误检测和补丁的好处。我们不再需要等待下一个版本来利用最新的变化和错误修复。一位资深的 Linux 社区爱好者 指出,此举也使 RHEL 开发者比以往更接近 FOSS 社区。

然而,如果你是一个拥有数百或数千台服务器的管理员,你可能想控制新的软件包何时被添加到特定的服务器。如果你正在寻找一个免费的开源工具,帮助你确保生产服务器的稳定性,同时允许你安全地从 Centos Stream 中拉入最新的变化用于开发和测试,这就是 Foreman 可以帮助你的地方。有了 Foreman,你可以在生命周期环境的各个阶段管理你的 Centos Stream 内容。

Foreman 介绍

Foreman 是一个完整的物理和虚拟服务器的生命周期管理工具。有了 Foreman,系统管理员有能力轻松实现重复性任务的自动化,快速部署应用程序,并主动管理内部或云中的服务器。Foreman 为 配备 provisioning 管理、配置管理和监控提供了企业级解决方案。由于其插件架构,Foreman 可以以无数种方式进行扩展。使用 Katello 插件,你可以把 Foreman 作为一个完整的 内容管理 content management 工具来管理 CentOS Stream,以及其他许多内容类型。

通过 Foreman 和 Katello,你可以准确地定义你希望每个环境包含哪些软件包。例如,生产环境可能使用已被验证为稳定的软件包,而开发环境可能需要最新、最先进的软件包版本。你还可以跨生命周期环境推广 内容视图 content view 。让我们来看看 Foreman 是如何完成这个任务的。

我们在这篇文章中使用了网页用户界面,但 Foreman 也有一个强大的 CLI 和 API。Katello 插件为 Pulp 项目提供了一个工作流和网页用户界面,你可以在 这篇文章 中了解更多。我们在这里也提供了一个简单的工作流程,但是 Foreman 和 Katello 项目提供了许多不同的配置选项来满足你的具体需求。

本文假设 Foreman 和 Katello 已经安装完毕。关于如何安装的更多信息,请参阅 Katello 安装手册

创建一个产品

第一步是在 Foreman 中创建一个 产品 product 。该产品的功能是作为一个内部标签来存储 CentOS Stream 存储库。

  1. 在 Foreman 网页用户界面,导航到“ 内容 Content > 产品 Products ”,并点击“ 创建产品 Create Product ”。
  2. 在“ 名称 Name ”字段中,为产品输入一个名称。Foreman会根据你输入的“ 名称 Name ”自动完成“ 标签 Label ”字段,以后不能再更改。

将 CentOS Stream 存储库添加到产品中

现在你有了一个产品,你可以使用 AppStream 和 BaseOS 存储库的 URL,并将它们添加到你的新产品中。

  1. 在 Foreman 网页用户界面中,导航到 “ 内容 Content > 产品 Products ”,选择你要使用的产品,然后点击 “ 新存储库 New Repository ”。
  2. 在“ 名称 Name ”字段中,为存储库输入一个名称;例如,“Centos8StreamBaseOS”。Foreman 会根据你输入的“ 名称 Name ”,自动完成“ 标签 Label ”字段。
  3. 从“ 类型 Type ”列表中,选择存储库的类型,然后选择“Yum”。
  4. 在 “URL” 字段中,输入 CentOS Stream Baseos 存储库的 URL,作为源: http://mirror.centos.org/centos/8-stream/BaseOS/x86_64/os/
  5. 选择“ 下载规则 Download Policy ”列表。默认的是“ 按需 On Demand ”,这意味着 Katello 将只下载元数据。如果你想下载所有的软件包,请改成“ 即时 Immediate ”,它可以下载所有的软件包,可能会达到 20-30GB。
  6. 确保“ 与镜像同步 Mirror on Sync ”复选框被选中。这个选项确保在同步过程中,不再是上游存储库的一部分的内容被删除。
  7. 点击“ 保存 Save ”。

重复这些步骤,添加 AppStream 存储库及其 URL,例如,http://mirror.centos.org/centos/8-stream/AppStream/x86_64/os/。确保你使用最近的官方 CentOS 镜像来代替它。

要执行立即同步,在你的产品窗口,点击“ 立即同步 Sync Now ”。最初的同步可能需要一些时间。你可以从“ 内容 Content > 同步状态 Sync Status ”查看同步状态。

同步完成后,你可以在“ 主机 Hosts > 操作系统 Operating System ”中查看新的 CentOS Stream 操作系统。请随意编辑名称和描述以满足你的要求。

如果你打算使用 Ansible 或 Puppet 等配置管理软件,Foreman 会自动创建一个操作系统报告。你可以在“ 管理 Administe > 设置 Settings > 忽略操作系统状况 Ignore facts for operating system ”中关闭这个选项。重命名操作系统以匹配配置管理软件中的名称是一个好主意。例如,对于 Puppet,这将是“CentOS 8”。

定义你的基础设施的生命周期环境

应用程序生命周期是 Foreman 的内容管理功能的一个核心概念。应用程序的生命周期定义了一个特定的系统和它的软件在特定阶段的状况。例如,一个应用程序的生命周期可能很简单,你可能只有一个“开发”阶段和“生产”阶段。Foreman 提供了一些方法来以可控的方式定制每个应用生命周期阶段,以适应你的规范。

在这一点上,你必须创建你的生命周期环境路径。

  1. 在 Foreman 网页用户界面中,导航到“ 内容 Content > 生命周期环境 Lifecycle Environments ”。
  2. 点击“ 新环境路径 New Environment Path ”,开始一个新的应用生命周期。
  3. 在“ 名称 Name ”字段中,为你的环境输入一个名称。
  4. 在“ 描述 Description ”字段中,为你的环境输入一个描述。
  5. 点击“ 保存 Save ”。
  6. 根据你的需要添加尽可能多的环境路径。例如,你可以创建“dev”、“test”、“stage” 和 “production” 环境。要添加这些环境,点击“添加新环境”,完成“ 名称 Name ”和“ 描述 Description ”字段,并从“ 优先环境 Prior Environment* ”列表中选择先前的环境,这样你就可以按照你预期使用的顺序将它们串联起来。

创建和发布一个内容视图

在 Foreman 中,“ 内容视图 Content View ”是你的存储库在某个特定时间点的快照。内容视图提供了隔离软件包版本到你想保留的状态的机制。内容视图有很多可配置的功能,你可以用它来进一步细化。为了本教程的目的,让我们保持简单。

  1. 在 Foreman 网页用户界面中,导航到“ 内容 Content > 内容视图 Content View ”,并点击“ 创建新视图 Create New View ”。
  2. 在“ 名称 Name ”字段中,为视图输入一个名称。Foreman 会根据你输入的名称自动完成“ 标签 Label ”字段。
  3. 在“ 描述 Description ”字段中,输入视图的描述。
  4. 单击“ 保存 Save ”以创建内容视图。
  5. 在新的内容视图中,点击“ Yum 内容 Yum Contents > 添加存储库 Add Repositories ”,在“ 存储库选择 Repository Selection ”区域,点击“ 添加 Add ”。对于 BaseOS 和 Appstream 存储库,选择你想包括的软件包,然后点击“ 添加存储库 Add Repositories ”。
  6. 点击“ 发布新版本 Publish New Version ”,在“ 描述 Description ”区域,输入关于版本的信息以记录变化。
  7. 单击“ 保存 Save ”。

当你点击“ 发布新版本 Publish New Version ”时,你创建了一个你已同步的所有内容的快照。这意味着你订阅此内容视图的每台服务器将只能访问与此生命周期环境相关的内容视图中的软件包版本。

每一个新的内容视图和后续版本都会首先发布到库环境,然后你可以在那里推广到其他环境。

跨生命周期环境推广内容

如果你已经测试了新的软件包,并且确信一切都很稳定,你可以把你的内容视图推广到另一个生命周期环境中。

  1. 导航到“ 内容 Content > 内容视图 Content Views ”,选择你想推广的内容视图。
  2. 点击内容视图的“ 版本 Versions ”标签。
  3. 选择你想推广的版本,并在“ 操作 Action ”栏中,点击“ 推广 Promote ”。
  4. 选择你要推广内容视图的环境,并点击“ 推广版本 Promote Version ”。
  5. 再次点击“ 推广 Promote ”按钮。这次选择生命周期环境,例如,“Test”,然后单击“ 推广版本 Promote Version ”。
  6. 最后,再次点击“ 推广 Promote ”按钮。例如,选择“Production”环境并点击“ 推广版本 Promote Version ”。

被分配到该特定环境的服务器现在可以从一套更新的软件包中提取。

创建一个激活密钥

为了将 CentOS Stream 服务器注册到你在特定生命周期中定义的内容,你必须创建一个激活密钥。激活密钥是一种与服务器共享凭证的安全方法。这使用了一个叫做“ 订阅管理器 subscription-manager 的工具来订阅 CentOS Stream 服务器的内容。

当你创建了激活密钥后,将 CentOS Stream 订阅添加到激活密钥中。

  1. 在 Foreman 网页用户界面中,导航到“ 内容 Content > 激活密钥 Activation keys ”,并点击“ 创建激活密钥 Create Activation Key ”。
  2. 在“ 名称 Name ”栏中,输入激活密钥的名称。
  3. 在“ 描述 Description ”栏中,输入激活密钥的描述。
  4. 从“ 环境 Environment ”列表中,选择要使用的环境。
  5. 从“ 内容视图 Content View ”列表中,选择你刚才创建的内容视图。
  6. 点击“ 保存 Save ”。

从 Foreman 管理的内容中创建一个 CentOS Stream 主机

现在一切都准备好了。随着你创建的内容包含在内容视图中,并在整个生命周期中推广,你现在可以准确地用你想使用的内容来配置主机,并订阅你想让它们接收的更新。

要在 Foreman 中创建一个主机,请导航到“主机 > 创建主机”。

  1. 在“ 名称 Name ”字段中,为主机输入一个名称。
  2. 单击“ 组织 Organization ”和“ 位置 Location ”选项卡,以确保配置环境自动设置为当前环境。
  3. 从“ 部署在 Deploy On ”列表中,选择“ 裸金属 Bare Metal ”。
  4. 单击“ 操作系统 Operating System ”选项卡。
  5. 从“ 架构 Architectures ”列表中,选择“x86\_64”。
  6. 从“ 操作系统 Operating System ”列表中,选择“CentOS\_Stream 8”。
  7. 勾选“ 构建模式 Build Mode ”框。
  8. 对于“ 媒体选择 Media Selection ”,选择“ 同步的内容 Synced Content ”来使用你之前同步的 CentOS Stream 内容。
  9. 从“ 同步的内容 Synced Content ”列表中,确保选择 “CentOS Stream”。
  10. 从“ 分区表 Partition Table ”列表中,对于这个演示,选择默认的 “Kickstart”,但有许多可用的选项。
  11. 在“ Root 密码 Root Password ”栏中,为你的新主机输入一个 root 密码。
  12. 点击“ 接口 Interface ”标签,并点击“ 编辑 Edit ”,并添加一个 “ Mac 地址 Mac address ”。
  13. 点击“ 参数 Parameters ”标签,并确保存在一个提供激活密钥的参数。如果没有,添加一个激活密钥。
  14. 点击“ 提交 Submit ”以保存主机条目。

现在,新的主机处于构建模式,这意味着当你打开它时,它将开始安装操作系统。

如果你导航到“ 主机 Hosts > 内容主机 Content Hosts ”,你可以看到你的主机所订阅的订阅、生命周期环境和内容视图的全部细节。

这个例子只是对你在 Foreman 中管理和配置 CentOS Stream 内容的众多选项的一个小窥视。如果你想了解更多关于如何管理 CentOS Stream 版本,控制你的服务器可以访问的内容,以及控制和保护你的基础设施的稳定性的详细信息,请查看 Foreman 内容管理 文档。当所有 CentOS Stream 内容在你的控制之下时,你可以创建和注册 Centos Stream,只使用你指定的内容。有关配备的更多详细信息,请参见 Foreman 配备 文档。如果你有任何问题、反馈或建议,你可以在 https://community.theforeman.org/ 找到 Foreman 社区。


via: https://opensource.com/article/21/9/centos-stream-foreman

作者:Melanie Corr 选题:lujun9972 译者:wxy 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出

对大多数开发者来说,与 RxJS 的初次接触是通过库的形式,就像 Angular。一些函数会返回 stream ,要使用它们就得把注意力放在操作符上。

有些时候,混用响应式和非响应式代码似乎很有用。然后大家就开始热衷流的创造。不论是在编写异步代码或者是数据处理时,流都是一个不错的方案。

RxJS 提供很多方式来创建流。不管你遇到的是什么情况,都会有一个完美的创建流的方式。你可能根本用不上它们,但了解它们可以节省你的时间,让你少码一些代码。

我把所有可能的方法,按它们的主要目的,放在四个分类当中:

  • 流式化现有数据
  • 生成数据
  • 使用现有 API 进行交互
  • 选择现有的流,并结合起来

注意:示例用的是 RxJS 6,可能会以前的版本有所不同。已知的区别是你导入函数的方式不同了。

RxJS 6

import {of, from} from 'rxjs';

of(...);
from(...);

RxJS < 6

import { Observable } from 'rxjs/Observable';
import 'rxjs/add/observable/of';
import 'rxjs/add/observable/from';

Observable.of(...);
Observable.from(...);

//或

import { of } from 'rxjs/observable/of';
import { from } from 'rxjs/observable/from';

of(...);
from(...);

流的图示中的标记:

  • | 表示流结束了
  • X 表示流出现错误并被终结
  • ... 表示流的走向不定

流式化已有数据

你有一些数据,想把它们放到流中。有三种方式,并且都允许你把调度器当作最后一个参数传入(你如果想深入了解调度器,可以看看我的 上一篇文章)。这些生成的流都是静态的。

of

如果只有一个或者一些不同的元素,使用 of

of(1,2,3)
  .subscribe();
// 结果
// 1 2 3 |

from

如果有一个数组或者 可迭代的对象 ,而且你想要其中的所有元素发送到流中,使用 from。你也可以用它来把一个 promise 对象变成可观测的。

const foo = [1,2,3];

from(foo)
  .subscribe();
// 结果
// 1 2 3 |

pairs

流式化一个对象的键/值对。用这个对象表示字典时特别有用。

const foo = { a: 1, b: 2};

pairs(foo)
  .subscribe();
// 结果
// [a,1] [b,2] |

那么其他的数据结构呢?

也许你的数据存储在自定义的结构中,而它又没有实现 可迭代的对象 接口,又或者说你的结构是递归的、树状的。也许下面某种选择适合这些情况:

  1. 先将数据提取到数组里
  2. 使用下一节将会讲到的 generate 函数,遍历所有数据
  3. 创建一个自定义流(见下一节)
  4. 创建一个迭代器

稍后会讲到选项 2 和 3 ,因此这里的重点是创建一个迭代器。我们可以对一个 可迭代的对象 调用 from 创建一个流。 可迭代的对象 是一个对象,可以产生一个迭代器(如果你对细节感兴趣,参考 这篇 mdn 文章)。

创建一个迭代器的简单方式是 生成函数 generator function 。当你调用一个生成函数时,它返回一个对象,该对象同时遵循 可迭代的对象 接口和 迭代器 接口。

// 自定义的数据结构
class List {
  add(element) ...
  get(index) ...
  get size() ...
  ...
}

function* listIterator(list) {
  for (let i = 0; i<list.size; i++) {
    yield list.get(i);
  }
}

const myList = new List();
myList.add(1);
myList.add(3);

from(listIterator(myList))
  .subscribe(console.log);
// 结果
// 1 3 |    

调用 listIterator 函数时,返回值是一个 可迭代的对象 / 迭代器 。函数里面的代码在调用 subscribe 前不会执行。

生成数据

你知道要发送哪些数据,但想(或者必须)动态生成它。所有函数的最后一个参数都可以用来接收一个调度器。他们产生静态的流。

范围(range

从初始值开始,发送一系列数字,直到完成了指定次数的迭代。

range(10, 2)  // 从 10 开始,发送两个值
  .subscribe();
// 结果
// 10 11 |

间隔(interval) / 定时器(timer

有点像范围,但定时器是周期性的发送累加的数字(就是说,不是立即发送)。两者的区别在于在于定时器允许你为第一个元素设定一个延迟。也可以只产生一个值,只要不指定周期。

interval(1000) // 每 1000ms = 1 秒 发送数据
  .subscribe()
// 结果
// 0  1  2  3  4 ...
delay(5000, 1000) // 和上面相同,在开始前先等待 5000ms

delay(5000)
.subscribe(i => console.log("foo");
// 5 秒后打印 foo

大多数定时器将会用来周期性的处理数据:

interval(10000).pipe(
  flatMap(i => fetch("https://server/stockTicker")
).subscribe(updateChart)

这段代码每 10 秒获取一次数据,更新屏幕。

生成(generate

这是个更加复杂的函数,允许你发送一系列任意类型的对象。它有一些重载,这里你看到的是最有意思的部分:

generate(
  0,           // 从这个值开始
  x => x < 10, // 条件:只要值小于 10,就一直发送
  x => x*2     // 迭代:前一个值加倍
).subscribe();
// 结果
// 1 2 4 8 |

你也可以用它来迭代值,如果一个结构没有实现 可迭代的对象 接口。我们用前面的列表例子来进行演示:

const myList = new List();
myList.add(1);
myList.add(3);

generate(
  0,                  // 从这个值开始
  i => i < list.size, // 条件:发送数据,直到遍历完整个列表
  i => ++i,           // 迭代:获取下一个索引
  i => list.get(i)    // 选择器:从列表中取值
).subscribe();
// 结果
// 1 3 |

如你所见,我添加了另一个参数:选择器。它和 map 操作符作用类似,将生成的值转换为更有用的东西。

空的流

有时候你要传递或返回一个不用发送任何数据的流。有三个函数分别用于不同的情况。你可以给这三个函数传递调度器。emptythrowError 接收一个调度器参数。

empty

创建一个空的流,一个值也不发送。

empty()
  .subscribe();
// 结果
// |

never

创建一个永远不会结束的流,仍然不发送值。

never()
  .subscribe();
// 结果
// ...

throwError

创建一个流,流出现错误,不发送数据。

throwError('error')
  .subscribe();
// 结果
// X

挂钩已有的 API

不是所有的库和所有你之前写的代码使用或者支持流。幸运的是 RxJS 提供函数用来桥接非响应式和响应式代码。这一节仅仅讨论 RxJS 为桥接代码提供的模版。

你可能还对这篇出自 Ben Lesh全面的文章 感兴趣,这篇文章讲了几乎所有能与 promises 交互操作的方式。

from

我们已经用过它,把它列在这里是因为,它可以封装一个含有 observable 对象的 promise 对象。

from(new Promise(resolve => resolve(1)))
  .subscribe();
// 结果
// 1 |

fromEvent

fromEvent 为 DOM 元素添加一个事件监听器,我确定你知道这个。但你可能不知道的是,也可以通过其它类型来添加事件监听器,例如,一个 jQuery 对象。

const element = $('#fooButton'); // 从 DOM 元素中创建一个 jQuery 对象

from(element, 'click')
  .subscribe();
// 结果
// clickEvent ...

fromEventPattern

要理解为什么有 fromEvent 了还需要 fromEventPattern,我们得先理解 fromEvent 是如何工作的。看这段代码:

from(document, 'click')
  .subscribe();

这告诉 RxJS 我们想要监听 document 中的点击事件。在提交过程中,RxJS 发现 document 是一个 EventTarget 类型,因此它可以调用它的 addEventListener 方法。如果我们传入的是一个 jQuery 对象而非 document,那么 RxJs 知道它得调用 on 方法。

这个例子用的是 fromEventPattern ,和 fromEvent 的工作基本上一样:

function addClickHandler(handler) {
  document.addEventListener('click', handler);
}

function removeClickHandler(handler) {
  document.removeEventListener('click', handler);
}

fromEventPattern(
  addClickHandler,
  removeClickHandler,
)
.subscribe(console.log);

// 等效于
fromEvent(document, 'click')

RxJS 自动创建实际的监听器( handler )你的工作是添加或者移除监听器。fromEventPattern 的目的基本上是告诉 RxJS 如何注册和移除事件监听器。

现在想象一下你使用了一个库,你可以调用一个叫做 registerListener 的方法。我们不能再用 fromEvent,因为它并不知道该怎么处理这个对象。

const listeners = [];

class Foo {
  registerListener(listener) {
    listeners.push(listener);
  }

  emit(value) {
    listeners.forEach(listener => listener(value));
  }
}

const foo = new Foo();

fromEventPattern(listener => foo.registerListener(listener))
  .subscribe();

foo.emit(1);
// 结果
// 1 ...

当我们调用 foo.emit(1) 时,RxJS 中的监听器将被调用,然后它就能把值发送到流中。

你也可以用它来监听多个事件类型,或者结合所有可以通过回调进行通讯的 API,例如,WebWorker API:

const myWorker = new Worker('worker.js');

fromEventPattern(
  handler => { myWorker.onmessage = handler },
  handler => { myWorker.onmessage = undefined }
)
.subscribe();
// 结果
// workerMessage ...

bindCallback

它和 fromEventPattern 相似,但它能用于单个值。就在回调函数被调用时,流就结束了。用法当然也不一样 —— 你可以用 bindCallBack 封装函数,然后它就会在调用时魔术般的返回一个流:

function foo(value, callback) {
  callback(value);
}

// 没有流
foo(1, console.log); //prints 1 in the console

// 有流
const reactiveFoo = bindCallback(foo); 
// 当我们调用 reactiveFoo 时,它返回一个 observable 对象

reactiveFoo(1)
  .subscribe(console.log); // 在控制台打印 1
// 结果
// 1 |

websocket

是的,你完全可以创建一个 websocket 连接然后把它暴露给流:

import { webSocket } from 'rxjs/webSocket'; 

let socket$ = webSocket('ws://localhost:8081');

// 接收消息
socket$.subscribe(
  (msg) => console.log('message received: ' + msg),
  (err) => console.log(err),
  () => console.log('complete') * );

// 发送消息
socket$.next(JSON.stringify({ op: 'hello' }));

把 websocket 功能添加到你的应用中真的很简单。websocket 创建一个 subject。这意味着你可以订阅它,通过调用 next 来获得消息和发送消息。

ajax

如你所知:类似于 websocket,提供 AJAX 查询的功能。你可能用了一个带有 AJAX 功能的库或者框架。或者你没有用,那么我建议使用 fetch(或者必要的话用 polyfill),把返回的 promise 封装到一个 observable 对象中(参考稍后会讲到的 defer 函数)。

定制流

有时候已有的函数用起来并不是足够灵活。或者你需要对订阅有更强的控制。

主题(Subject

Subject 是一个特殊的对象,它使得你的能够把数据发送到流中,并且能够控制数据。Subject 本身就是一个可观察对象,但如果你想要把流暴露给其它代码,建议你使用 asObservable 方法。这样你就不能意外调用原始方法。

const subject = new Subject();
const observable = subject.asObservable();

observable.subscribe();

subject.next(1);
subject.next(2);
subject.complete();
// 结果
// 1 2 |

注意在订阅前发送的值将会“丢失”:

const subject = new Subject();
const observable = subject.asObservable();

subject.next(1);

observable.subscribe(console.log);

subject.next(2);
subject.complete();
// 结果
// 2

除了常规的 Subject,RxJS 还提供了三种特殊的版本。

AsyncSubject 在结束后只发送最后的一个值。

const subject = new AsyncSubject();
const observable = subject.asObservable();

observable.subscribe(console.log);

subject.next(1);
subject.next(2);
subject.complete();
// 输出
// 2

BehaviorSubject 使得你能够提供一个(默认的)值,如果当前没有其它值发送的话,这个值会被发送给每个订阅者。否则订阅者收到最后一个发送的值。

const subject = new BehaviorSubject(1);
const observable = subject.asObservable();

const subscription1 = observable.subscribe(console.log);

subject.next(2);
subscription1.unsubscribe();
// 输出
// 1
// 2
const subscription2 = observable.subscribe(console.log);

// 输出
// 2

ReplaySubject 存储一定数量、或一定时间或所有的发送过的值。所有新的订阅者将会获得所有存储了的值。

const subject = new ReplaySubject();
const observable = subject.asObservable();

subject.next(1);

observable.subscribe(console.log);

subject.next(2);
subject.complete();
// 输出
// 1
// 2

你可以在 ReactiveX 文档(它提供了一些其它的连接) 里面找到更多关于 Subject 的信息。Ben LeshOn The Subject Of Subjects 上面提供了一些关于 Subject 的理解,Nicholas Jamiesonin RxJS: Understanding Subjects 上也提供了一些理解。

可观察对象

你可以简单地用 new 操作符创建一个可观察对象。通过你传入的函数,你可以控制流,只要有人订阅了或者它接收到一个可以当成 Subject 使用的观察者,这个函数就会被调用,比如,调用 nextcompleterror

让我们回顾一下列表示例:

const myList = new List();
myList.add(1);
myList.add(3);

new Observable(observer => {
  for (let i = 0; i<list.size; i++) {
    observer.next(list.get(i));
  }

  observer.complete();
})
.subscribe();
// 结果
// 1 3 |

这个函数可以返回一个 unsubcribe 函数,当有订阅者取消订阅时这个函数就会被调用。你可以用它来清楚或者执行一些收尾操作。

new Observable(observer => {
  // 流式化

  return () => {
                 //clean up
               };
})
.subscribe();

继承可观察对象

在有可用的操作符前,这是一种实现自定义操作符的方式。RxJS 在内部扩展了 可观察对象Subject 就是一个例子,另一个是 publisher 操作符。它返回一个 ConnectableObservable 对象,该对象提供额外的方法 connect

实现 Subscribable 接口

有时候你已经用一个对象来保存状态,并且能够发送值。如果你实现了 Subscribable 接口,你可以把它转换成一个可观察对象。Subscribable 接口中只有一个 subscribe 方法。

interface Subscribable<T> {  subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void), error?: (error: any) => void, complete?: () => void): Unsubscribable}

结合和选择现有的流

知道怎么创建一个独立的流还不够。有时候你有好几个流但其实只需要一个。有些函数也可作为操作符,所以我不打算在这里深入展开。推荐看看 Max NgWizard K 所写的一篇 文章,它还包含一些有趣的动画。

还有一个建议:你可以通过拖拽元素的方式交互式的使用结合操作,参考 RxMarbles

ObservableInput 类型

期望接收流的操作符和函数通常不单独和可观察对象一起工作。相反,它们实际上期望的参数类型是 ObservableInput,定义如下:

type ObservableInput<T> = SubscribableOrPromise<T> | ArrayLike<T> | Iterable<T>;

这意味着你可以传递一个 promises 或者数组却不需要事先把他们转换成可观察对象。

defer

主要的目的是把一个 observable 对象的创建延迟(defer)到有人想要订阅的时间。在以下情况,这很有用:

  • 创建可观察对象的开销较大
  • 你想要给每个订阅者新的可观察对象
  • 你想要在订阅时候选择不同的可观察对象
  • 有些代码必须在订阅之后执行

最后一点包含了一个并不起眼的用例:Promises(defer 也可以返回一个 promise 对象)。看看这个用到了 fetch API 的例子:

function getUser(id) {
  console.log("fetching data");
  return fetch(`https://server/user/${id}`);
}

const userPromise = getUser(1);
console.log("I don't want that request now");

// 其它地方
userPromise.then(response => console.log("done");
// 输出
// fetching data
// I don't want that request now
// done

只要流在你订阅的时候执行了,promise 就会立即执行。我们调用 getUser 的瞬间,就发送了一个请求,哪怕我们这个时候不想发送请求。当然,我们可以使用 from 来把一个 promise 对象转换成可观察对象,但我们传递的 promise 对象已经创建或执行了。defer 让我们能够等到订阅才发送这个请求:

const user$ = defer(() => getUser(1));

console.log("I don't want that request now");

// 其它地方
user$.subscribe(response => console.log("done");
// 输出
// I don't want that request now
// fetching data
// done

iif

iif 包含了一个关于 defer 的特殊用例:在订阅时选择两个流中的一个:

iif(
  () => new Date().getHours() < 12,
  of("AM"),
  of("PM")
)
.subscribe();
// 结果
// AM before noon, PM afterwards

引用该文档:

实际上 iif 能够轻松地用 defer 实现,它仅仅是出于方便和可读性的目的。

onErrorResumeNext

开启第一个流并且在失败的时候继续进行下一个流。错误被忽略掉。

const stream1$ = of(1, 2).pipe(
  tap(i => { if(i>1) throw 'error'}) //fail after first element
);

const stream2$ = of(3,4);

onErrorResumeNext(stream1$, stream2$)
  .subscribe(console.log);
// 结果
// 1 3 4 |

如果你有多个 web 服务,这就很有用了。万一主服务器开启失败,那么备份的服务就能自动调用。

forkJoin

它让流并行运行,当流结束时发送存在数组中的最后的值。由于每个流只有最后一个值被发送,它一般用在只发送一个元素的流的情况,就像 HTTP 请求。你让请求并行运行,在所有流收到响应时执行某些任务。

function handleResponses([user, account]) {
  // 执行某些任务
}

forkJoin(
  fetch("https://server/user/1"),
  fetch("https://server/account/1")
)
.subscribe(handleResponses);

merge / concat

发送每一个从可观察对象源中发出的值。

merge 接收一个参数,让你定义有多少流能被同时订阅。默认是无限制的。设为 1 就意味着监听一个源流,在它结束的时候订阅下一个。由于这是一个常见的场景,RxJS 为你提供了一个显示的函数:concat

merge(
  interval(1000).pipe(mapTo("Stream 1"), take(2)),
  interval(1200).pipe(mapTo("Stream 2"), take(2)),
  timer(0, 1000).pipe(mapTo("Stream 3"), take(2)),
  2 //two concurrent streams
)
.subscribe();

// 只订阅流 1 和流 2

// 输出
// Stream 1 -> after 1000ms
// Stream 2 -> after 1200ms
// Stream 1 -> after 2000ms

// 流 1 结束后,开始订阅流 3

// 输出
// Stream 3 -> after 0 ms
// Stream 2 -> after 400 ms (2400ms from beginning)
// Stream 3 -> after 1000ms

merge(
  interval(1000).pipe(mapTo("Stream 1"), take(2)),
  interval(1200).pipe(mapTo("Stream 2"), take(2))
  1
)
// 等效于
concat(
  interval(1000).pipe(mapTo("Stream 1"), take(2)),
  interval(1200).pipe(mapTo("Stream 2"), take(2))
)

// 输出
// Stream 1 -> after 1000ms
// Stream 1 -> after 2000ms
// Stream 2 -> after 3200ms
// Stream 2 -> after 4400ms

zip / combineLatest

mergeconcat 一个接一个的发送所有从源流中读到的值,而 zipcombineLatest 是把每个流中的一个值结合起来一起发送。zip 结合所有源流中发送的第一个值。如果流的内容相关联,那么这就很有用。

zip(
  interval(1000),
  interval(1200),
)
.subscribe();
// 结果
// [0, 0] [1, 1] [2, 2] ...

combineLatest 与之类似,但结合的是源流中发送的最后一个值。直到所有源流至少发送一个值之后才会触发事件。这之后每次源流发送一个值,它都会把这个值与其他流发送的最后一个值结合起来。

combineLatest(
  interval(1000),
  interval(1200),
)
.subscribe();
// 结果
// [0, 0] [1, 0] [1, 1] [2, 1] ...

两个函数都让允许传递一个选择器函数,把元素结合成其它对象而不是数组:

zip(
  interval(1000),
  interval(1200),
  (e1, e2) -> e1 + e2
)
.subscribe();
// 结果
// 0 2 4 6 ...

race

选择第一个发送数据的流。产生的流基本是最快的。

race(
  interval(1000),
  of("foo")
)
.subscribe();
// 结果
// foo |

由于 of 立即产生一个值,因此它是最快的流,然而这个流就被选中了。

总结

已经有很多创建可观察对象的方式了。如果你想要创造响应式的 API 或者想用响应式的 API 结合传统 API,那么了解这些方法很重要。

我已经向你展示了所有可用的方法,但它们其实还有很多内容可以讲。如果你想更加深入地了解,我极力推荐你查阅 文档 或者阅读相关文章。

RxViz 是另一种值得了解的有意思的方式。你编写 RxJS 代码,产生的流可以用图形或动画进行显示。


via: https://blog.angularindepth.com/the-extensive-guide-to-creating-streams-in-rxjs-aaa02baaff9a

作者:Oliver Flaggl 译者:BriFuture 校对:wxy

本文由 LCTT 原创编译,Linux中国 荣誉推出