IBM Cloud Docs
使用 Event Streams 模式注册表

使用 Event Streams 模式注册表

Schema Registry 提供用于管理和验证模式的集中存储库。 模式注册表中的模式提供生成事件的程序向使用这些事件的其他程序提供的显式合同。

模式概述

Apache Kafka 可以处理任何数据,但不会验证消息中的信息。 然而,要高效处理数据,通常需要数据包含特定格式的特定信息。 通过使用模式,可以定义消息中数据的结构,以确保生产者和使用者都使用正确的结构。

模式可帮助生产者创建符合预定义结构的数据,并定义需要显示的字段以及每个字段的类型。 然后,此定义可帮助使用者解析并正确解释该数据。Event Streams 企业套餐支持模式,并包含用于使用和管理模式的模式注册表。

通常,一个主题上的所有消息都会使用相同模式。 消息的每个键和值均可由模式来加以描述。

![模式概述图。](schema_registry1.svg "表示模式如何帮助定义消息的 "键/值" 对的结构的图")

模式注册表

模式存储在 Event Streams 模式注册表中。 除了存储已版本化的模式历史记录外,模式注册表还提供了用于检索模式的界面。 每个企业套餐 Event Streams 实例都有其自己的模式注册表。 企业实例中最多可存储 1000 个模式。

生产者和消费者根据存储在架构注册表中的指定架构验证数据(此外还要通过 Kafka 经纪人)。 这样,就无需在消息中传输模式,这意味着消息可以更小。

架构图。
模式注册表体系结构
中检索模式

Apache Avro 数据格式

模式是通过使用 Apache Avro 定义的,后者是通常与 Apache Kafka配合使用的开放式源代码数据序列化技术。 模式提供了有效的数据编码格式:使用精简二进制格式或更详细但人类可读的 JSON 格式。

Event Streams 模式注册表使用 Apache Avro 数据格式。 以 Avro 格式发送消息时,消息包含数据以及所使用模式的唯一标识。 该标识指定要将注册表中的哪个模式用于消息。

Avro 支持多种数据类型,包括基本类型(null、布尔值、整数、长整型、浮点值、双精度值、字节和字符串)和复杂类型(记录、枚举值、数组、映射、并集和定点数)。

Avro 格式图。
Avro 消息格式

序列化和反序列化

生产应用程序使用序列化程序来生成符合特定架构的消息。 如前所述,消息包含 Avro 格式的数据以及模式标识。

然后,一个消耗型应用程序使用反序列化程序来读取使用相同模式序列化的消息。 当消费者读取以Avro格式发送的消息时,反序列化程序会找到消息中的模式标识符,并从模式注册表中检索模式,以反序列化数据。

这个过程能够有效确保邮件中的数据符合要求的结构。

Event Streams 模式注册表支持 Kafka AVRO序列化和反序列化

序列化和反序列化图。
序列化器和反序列化器

兼容性和版本图。
兼容性和版本

版本和兼容性

无论何时添加架构,以及同一架构的任何后续版本,Event Streams 都可以自动验证格式,并在存在任何问题时拒绝该架构。 您可以随时间推移逐渐改进模式,以适应不断变化的需求。 创建现有架构的新版本,架构注册表确保新版本与现有版本兼容,这意味着使用现有版本的生产者和消费者不会受到新版本的干扰。

对模式进行比较以避免创建重复的模式,其中模式仅以不影响模式语义的方式存在差异。 在某些情况下,模式中 JSON 属性的排序对于如何使用该模式对数据进行编码和解码至关重要,但在其他情况下,它可能不相关。

例如,记录模式的 name 属性不用作编码和解码过程的一部分,因此您可以将其放在记录 JSON 对象中的任何位置。 所有这些变体都被视为相同的模式。

记录模式的 JSON 中的 fields 属性是其排序很重要的情况。 Avro 规范要求按照记录的字段在用于编码和解码操作的模式中的显示顺序对其进行编码和解码。

作为示例,请考虑以下三个模式。

模式 1

{
  "type": "record",
  "name": "book",
  "fields": [
    {
      "name": "title",
      "type": "string"
    },
    {
      "name": "author",
      "type": "string"
    }
  ]
}

模式 2

{
  "type": "record",
  "name": "book",
  "fields": [
    {
      "name": "author",
      "type": "string"
    },
    {
      "name": "title",
      "type": "string"
    }
  ]
}

模式 3

{
  "type": "record",
  "name": "book",
  "fields": [
    {
      "type": "string"
      "name": "author",
    },
    {
      "type": "string"
      "name": "title",
    }
  ]
}

模式 1 和模式 2 是不同的模式,注册表将它们存储为单独的模式。 它们无法互换使用,因为它们以不同的顺序列出 authortitle 字段。 如果解码过程使用了模式 2,那么将无法正确解码使用模式 1 编码的数据。

使用 SerDes 按顺序“模式 1”,“模式 2”和“模式 3”创建新模式时,结果将是两个新模式。 模式 1 和模式 2 不同,但模式 3 等同于模式 2。

使用 REST API 创建模式时,仅当模式文本相同 (包括所有属性排序和描述性字段) 时,才会将其视为匹配。 这是为了允许您希望模式 3 成为其他模式的情况。

启用模式注册表

对于 Event Streams 企业套餐服务实例,缺省情况下启用模式注册表。 对于其他 Event Streams 套餐,模式注册表不可用。

访问模式注册表

要访问架构注册表,您需要架构注册表的 URL,该邮箱位于您的服务凭据中。 要在用户界面中查看这些凭据,请点击您的服务实例,在左侧导航窗格中选择“服务凭据”,然后点击表格中列出的服务凭据之一旁边的 “查看凭据”链接

服务凭证图。
Kafka 凭证块的必需凭证字段的表示

kafka_http_url 的值也是Schema Registry的 URL。

认证

要访问模式注册表,您还需要一组可用于向注册表进行认证的凭证。 有两个选项,即使用 API 密钥的基本认证或不记名令牌认证。

本文档中的示例显示了如何使用 API 密钥,但可以使用任一选项。

使用 API 密钥进行认证

服务凭证具有可用作向模式注册表进行认证的凭证的 apikey

您也可以使用服务ID授予的API密钥进行身份验证,前提是该服务ID至少允许其以“读取器”角色访问 Event Streams 实例。 如果您要向其他多人或团队授予访问权,那么此方法更灵活,也是更好的选择。 有关更多详细信息,请参阅管理对 Event Streams 资源的访问权帮助主题。

API密钥作为 HTTP 基本认证标头的密码部分提供。 头的用户名部分是单词 "token"。

要使用的 curl 命令如下所示,其中 $APIKEY 将替换为您的 API 密钥:

curl -u token:$APIKEY ...

使用不记名令牌进行认证

也可以将系统ID或用户的无记名令牌作为凭证。 这通常是一种更安全的方法,因为它不太可能公开 API 密钥,并且不记名令牌会在一段时间后自动到期。

要获取令牌,请使用 IBM Cloud CLI ibmcloud iam oauth-tokens 命令来生成令牌。 在 HTTP 报头中包含此令牌,格式为“授权:持票人$TOKEN”,其中$TOKEN为持票人令牌:

curl -H "Authorization: Bearer $TOKEN" ...

从其他模式注册表导入数据

您可以将数据导入到已从其他模式注册表导出的模式注册表中。 导入数据时,将保留与每个工件版本相关联的全局标识。 这意味着您可以继续使用已使用相同模式全局标识值存储在 Kafka 中的数据。

Event Streams CLI 支持使用 Apicurio 注册表的导入和导出格式导入数据,如以下示例中所示。

ibmcloud es schema-import import.zip

您可以使用 Apicurio 注册表 exportConfluent 实用程序生成要导入的数据,该实用程序将从 Confluent 模式注册表导出数据。 Event Streams 已在 2.6.x 版本的此实用程序中进行了测试。

如果 Event Streams 模式注册表已具有与要导入的工件版本具有相同全局标识的条目,那么导入操作将失败,如果要继续,将提示您除去工件版本。

模式注册表 REST 端点

该 REST API 提供了四个主要功能:

  1. 创建、读取和删除模式。
  2. 创建、读取和删除模式的各个版本。
  3. 读取和更新注册表的全局兼容性规则。
  4. 创建、读取、更新和删除应用于各个模式的兼容性规则。

对于变更模式版本的操作(例如,创建、更新或删除工件、工件版本和规则),将生成 Activity Tracker 事件以报告相应操作。 有关更多信息,请参阅 Activity Tracker 事件

错误

如果遇到错误情况,架构注册表将返回一个 non-2XX 范围 HTTP 状态代码。 响应主体包含一个JSON对象,格式如下:

{
    "error_code":404,
    "message":"No artifact with id 'my-schema' might be found."
}

错误 JSON 对象的属性如下:

属性名 描述
error_code 响应的 HTTP 状态码。
消息 问题原因的描述。
事件 仅当错误是由于模式注册表发生问题而导致时,才会包含此字段。 此值可由 IBM 服务用于将请求与注册表捕获的诊断信息关联。

设置模式状态

此端点用于将注册表中模式的状态设置为 ENABLEDDISABLED。 可以通过向 /artifacts/{schema-id}/state 端点发出 PUT 请求来设置模式的状态 (其中 {schema-id} 是模式的标识)。 如果请求成功,则返回一个空响应和状态代码204(无内容)。

示例 curl 请求:

curl -u token:$APIKEY –X PUT $URL/artifacts/my-schema/state -d '{"state": "DISABLED"}'

设置模式状态需要:

  • 经理角色访问与被修改的架构匹配的架构资源。

设置模式版本状态

此端点用于将注册表中模式版本的状态设置为 ENABLEDDISABLED。 可以通过向 /artifacts/{schema-id}/versions/{version}/state 端点发出 PUT 请求来设置模式版本的状态 (其中 {schema-id} 是模式的标识,{version} 是模式版本的版本号)。 如果请求成功,则返回一个空响应和状态代码204(无内容)。

示例 curl 请求:

curl -u token:$APIKEY –X PUT $URL/artifacts/my-schema/versions/1/state -d '{"state": "DISABLED"}'

设置模式版本状态需要:

  • 经理角色访问与正在修改的模式匹配的模式资源。

创建模式

此端点用于在注册表中存储模式。 模式数据作为 POST 请求的主体发送。 可以通过使用 ‘X-Registry-ArtifactId' 请求头来包含模式标识符。 如果请求中不存在此标头,则生成一个ID。 内容类型头必须设置为“application/json”。

示例 curl 请求:

curl -u token:$APIKEY -H 'Content-Type: application/json' -H 'X-Registry-ArtifactId: my-schema' $URL/artifacts -d '{"type":"record","name":"Citizen","fields":[{"name": "firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"phoneNumber","type":"string"}]}'

示例响应:

{"id":"my-schema","type":"AVRO","version":1,"createdBy":"","createdOn":1579267788258,"modifiedBy":"","modifiedOn":1579267788258,"globalId":75}

创建模式至少需要两个角色:

  • Event Streams 集群资源类型的读取者角色访问权。
  • 创建的架构所对应的架构资源访问权限。

这将生成 Activity Tracker 事件以报告相应操作。 有关更多信息,请参阅 Activity Tracker 事件

列出模式

您可以通过向 /artifacts 终端发送GET请求,生成一个包含注册表中所有模式ID的列表。 您可以使用 jsonformat 参数来格式化响应 (仅支持 stringobject 格式)。 字符串格式是缺省值,它返回工件标识 (字符串) 的数组。 仅当设置了此选项时,才会在数组中包含已启用的工件。 对象格式返回一个 JSON 对象,该对象包含一个数组,其中数组中的每个条目都对应于注册表中的工件。 设置此选项时,将返回已启用和已禁用的工件。

示例 curl 请求:

curl -u token:$APIKEY $URL/artifacts

curl -u token:$APIKEY $URL/artifacts?jsonformat=string

curl -u token:$APIKEY $URL/artifacts?jsonformat=object

jsonformat 为字符串或未提供 (缺省为字符串) 时的示例响应:

["my-schema-2","my-schema-4"]

jsonformat 为对象时的响应示例:

{"artifacts":[{"id":"my-schema","state":"DISABLED"},{"id":"my-schema-2","state":"ENABLED"},{"id":"my-schema-3","state":"DISABLED"},{"id":"my-schema-4","state":"ENABLED"}],"count":4}

列出模式至少需要以下角色:

  • Event Streams 集群资源类型的读取者角色访问权。

模式的状态和删除

模式删除是一个两阶段过程。 删除的第一阶段会将模式保留在注册表中,但会将其隐藏在某些操作中。 第二个阶段将永久除去模式,但只能在第一个阶段之后应用。 两阶段删除过程适用于工件级别,也适用于版本级别。

通过具有与工件和版本相关联的已启用或已禁用状态 (第一个阶段) 以及删除资源和版本的 API (第二个阶段) 来完成两个删除阶段。

可以使用列示工件或版本的操作所返回的 "state" 属性或者通过获取工件或版本的详细信息来发现已禁用的工件或版本。 已禁用的模式将计入每个企业实例 1000 个模式的模式配额。

删除模式

通过向 /artifacts/{schema-id} 终端发送DELETE请求(其中 {schema-id} 是架构的ID),架构将从注册表中删除。 如果成功,将返回空响应和状态码 204(无内容)。

示例 curl 请求:

curl -u token:$APIKEY -X DELETE $URL/artifacts/my-schema

删除模式至少需要两个角色:

  • Event Streams 集群资源类型的读取者角色访问权。
  • 经理角色访问与被删除模式匹配的模式资源。

这将生成 Activity Tracker 事件以报告相应操作。 有关更多信息,请参阅 Activity Tracker 事件

创建模式的新版本

要创建架构的新版本,请向 /artifacts/{schema-id}/versions 端点发送POST请求(其中 {schema-id} 是架构的ID)。 请求主体必须包含模式的新版本。

如果请求成功,则新架构将作为架构的最新版本创建,并带有适当的版本号,同时返回状态码为200(OK)的响应,以及包含描述新版本(包括版本号)的元数据的有效负载。

示例 curl 请求:

curl -u token:$APIKEY -H 'Content-Type: application/json' $URL/artifacts/my-schema/versions -d '{"type":"record","name":"Citizen","fields":[{"name": "firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"phoneNumber","type":"string"}]}'

示例响应:

{"id":"my-schema","type":"AVRO","version":2,"createdBy":"","createdOn": 1579267978382,"modifiedBy":"","modifiedOn":1579267978382,"globalId":83}

创建模式的新版本至少需要两个角色:

  • Event Streams 集群资源类型的读取者角色访问权。
  • 当模式资源与获取新版本的模式匹配时,写入器角色可以访问该资源。

这将生成 Activity Tracker 事件以报告相应操作。 有关更多信息,请参阅 Activity Tracker 事件

获取模式的最新版本

要检索特定架构的最新版本,请向 /artifacts/{schema-id} 端点发送GET请求(其中 {schema-id} 是架构的ID)。 如果成功,将在响应的有效内容中返回模式的最新版本。

示例 curl 请求:

curl -u token:$APIKEY $URL/artifacts/my-schema

示例响应:

{"type":"record","name":"Citizen","fields":[{"name": "firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"phoneNumber","type":"string"}]}

获取模式的最新版本至少需要两个角色:

  • Event Streams 集群资源类型的读取者角色访问权。
  • 读取器角色访问与检索到的架构匹配的架构资源。

获取模式的特定版本

要检索特定版本的架构,请向 /artifacts/{schema-id}/versions/{version} 端点发送GET请求(其中 {schema-id} 是架构的ID,{version} 是您要检索的特定版本的版本号)。 如果成功,将在响应的有效内容中返回模式的指定版本。

示例 curl 请求

curl -u token:$APIKEY $URL/artifacts/my-schema/versions/3

示例响应:

{"type":"record","name":"Citizen","fields":[{"name": "firstName","type":"string"},{"name":"lastName","type":"string"},{"name":"age","type":"int"},{"name":"phoneNumber","type":"string"}]}

获取模式的最新版本至少需要两个角色:

  • Event Streams 集群资源类型的读取者角色访问权。
  • 读取器角色访问与检索到的架构匹配的架构资源。

列出模式的所有版本

要列出当前存储在注册表中的模式的所有版本,请向 /artifacts/{schema-id}/versions 端点发送GET请求(其中 {schema-id} 是模式的ID)。 如果成功,将在响应的有效内容中返回模式的所有当前版本号的列表。 您可以使用 jsonformat 参数来格式化响应 (仅支持 numberobject 格式)。 如果指定 "number" (缺省值),那么响应是对应于工件的已启用版本 (将省略禁用版本) 的数字值的数组。 它与当前生成的端点格式相同。 如果指定了 "object",那么响应是一个 JSON 对象,其中包含表示工件版本的 JSON 对象数组。 已启用和已禁用的版本都包含在阵列中。

示例 curl 请求:

curl -u token:$APIKEY $URL/artifacts/my-schema/versions

curl -u token:$APIKEY $URL/artifacts/my-schema/versions?jsonformat=number

curl -u token:$APIKEY $URL/artifacts/my-schema/versions?jsonformat=object

当 jsonformat 为 number 或未提供 (缺省为 number) 时的示例响应:

[1,3,4,6,7]

jsonformat 为对象时的响应示例:

{"versions":[{"id":1,"state":"ENABLED"},{"id":2,"state":"DISABLED"},{"id":3,"state":"ENABLED"},{"id":4,"state":"ENABLED"},{"id":5,"state":"DISABLED"},{"id":6,"state":"ENABLED"},{"id":7,"state":"ENABLED"}],"count":7}

获取模式的可用版本列表至少需要两个角色:

  • Event Streams 集群资源类型的读取者角色访问权。
  • 读取器角色访问与检索到的架构匹配的架构资源。

删除模式的版本

通过向 /artifacts/{schema-id}/versions/{version} 端点发送DELETE请求,可以从注册表中删除架构版本(其中 {schema-id} 是架构的ID,{version} 是架构版本的版本号)。 如果成功,将返回空的响应和状态码 204(无内容)。 删除唯一剩余的架构版本也会删除该架构。

示例 curl 请求:

curl -u token:$APIKEY -X DELETE $URL/artifacts/my-schema/versions/3

删除模式版本至少需要两个角色:

  • Event Streams 集群资源类型的读取者角色访问权。
  • 经理角色访问与被删除模式匹配的模式资源。

这将生成 Activity Tracker 事件以报告相应操作。 有关更多信息,请参阅 Activity Tracker 事件

获取模式版本的特定全局唯一标识

要检索架构版本的特定全局唯一ID,请向 /artifacts/{artifactId}/versions/{version}/meta 端点发送GET请求(其中 {artifactId} 是工件的ID,{version} 是您要检索的特定版本的版本号)。 如果成功,那么将在响应的有效内容中返回模式版本的特定全局唯一标识。

示例 curl 请求:

curl -u token:$APIKEY $URL/artifacts/9030f450-45fb-4750-bb37-771ad49ee0e8/versions/1/meta

示例响应:

{"id":"9030f450-45fb-4750-bb37-771ad49ee0e8","type":"AVRO","version":1,"createdOn":1682340169202,"modifiedOn":1682340169202,"globalId":1}

获取模式版本的全局唯一标识至少需要以下两种类型的访问权:

  • Event Streams 集群资源类型的读取者角色访问权。
  • 读取器角色访问与检索到的架构匹配的架构资源。

更新全局规则

可通过向/rules/ {rule-type} 终端发送PUT请求来更新全局兼容性规则(其中,{rule-type} 标识要更新的全局规则类型——目前仅支持COMPATIBILITY类型),并在请求正文中包含新的规则配置。 如果请求成功,将在响应的有效内容中返回新更新的规则配置以及状态码 200(正常)。

请求正文中发送的JSON文档必须具有以下属性:

属性名 描述
Type 必须始终设置为值 COMPATIBILITY。
配置 必须设置为下列其中一个值:NONE、BACKWARD、BACKWARD_TRANSITIVE、FORWARD、FORWARD_TRANSITIVE、FULL 或 FULL_TRANSITIVE(请参阅有关兼容性规则的部分,以获取有关其中每个值的详细信息)。

示例 curl 请求:

curl -u token:$APIKEY -X PUT $URL/rules/COMPATIBILITY -d '{"type":"COMPATIBILITY","config":"BACKWARD"}'

示例响应:

{"type":"COMPATIBILITY","config":"BACKWARD"}

更新全局规则配置至少需要以下角色:

  • Event Streams 集群资源类型的管理者角色访问权。

这将生成 Activity Tracker 事件以报告相应操作。 有关更多信息,请参阅 Activity Tracker 事件

获取全局规则的当前值

通过向/rules/ {rule-type} 终端发送GET请求,可以获取全局规则的当前值(其中 {rule-type} 是要获取的全局规则的类型——目前仅支持兼容性类型)。 如果请求成功,将在响应的有效内容中返回当前规则配置以及状态码 200(正常)。

示例 curl 请求:

curl -u token:$APIKEY $URL/rules/COMPATIBILITY

示例响应:

{"type":"COMPATIBILITY","config":"BACKWARD"}

获取全局规则配置至少需要以下角色:

  • Event Streams 集群资源类型的读取者角色访问权。

创建基于模式的规则

规则可以应用于特定架构,通过向 /artifacts/{schema-id}/rules 端点发送POST请求(其中 {schema-id} 是架构的ID),覆盖任何已设置的通用规则,请求正文中包含新规则的类型和值(目前仅支持兼容性类型)。 如果成功,将返回空的响应和状态码 204(无内容)。

示例 curl 请求:

curl -u token:$APIKEY $URL/artifacts/my-schema/rules -d '{"type":"COMPATIBILITY","config":"FORWARD"}'

创建基于模式的规则至少需要以下角色:

  • Event Streams 集群资源类型的读取者角色访问权。
  • 经理角色可访问规则所适用的架构资源。

这将生成 Activity Tracker 事件以报告相应操作。 有关更多信息,请参阅 Activity Tracker 事件

获取基于模式的规则

要检索应用于特定架构的规则类型的当前值,请向 /artifacts/{schema-id}/rules/{rule-type} 端点发送GET请求(其中 {schema-id} 是架构的ID,{rule-type} 是要检索的全局规则的类型——目前仅支持的类型是兼容性)。 如果请求成功,将在响应的有效内容中返回当前规则值以及状态码 200(正常)。

示例 curl 请求:

curl -u token:$APIKEY $URL/artifacts/my-schema/rules/COMPATIBILITY

示例响应:

{"type":"COMPATIBILITY","config":"FORWARD"}

获取基于模式的规则至少需要以下角色:

  • Event Streams 集群资源类型的读取者角色访问权。
  • 要应用规则的模式资源的读取者角色访问权。

更新基于模式的规则

通过向 /artifacts/{schema-id}/rules/{rule-type} 终端发送PUT请求,可以修改特定架构的规则(其中 {schema-id} 是架构的ID,{rule-type} 是要检索的全局规则的类型——目前仅支持兼容性类型)。 如果请求成功,将在响应的有效内容中返回新更新的规则配置以及状态码 200(正常)。

示例 curl 请求:

curl -u token:$APIKEY -X PUT $URL/artifacts/my-schema/rules/COMPATIBILITY -d '{"type":"COMPATIBILITY","config":"BACKWARD"}'

示例响应:

{"type":"COMPATIBILITY","config":"BACKWARD"}

更新基于模式的规则至少需要以下角色:

  • Event Streams 集群资源类型的读取者角色访问权。
  • 要应用规则的模式资源的管理者角色访问权。

这将生成 Activity Tracker 事件以报告相应操作。 有关更多信息,请参阅 Activity Tracker 事件

删除基于模式的规则

通过向 /artifacts/{schema-id}/rules/{rule-type} 终端发送DELETE请求,可以删除特定架构所适用的规则(其中,{schema-id} 是架构的ID,{rule-type} 是要检索的全局规则的类型——目前仅支持兼容性类型)。 如果请求成功,将返回空的响应和状态码 204(无内容)。

示例 curl 请求:

curl -u token:$APIKEY -X DELETE $URL/artifacts/my-schema/rules/COMPATIBILITY

删除基于模式的规则至少需要以下角色:

  • Event Streams 集群资源类型的读取者角色访问权。
  • 要应用规则的模式资源的管理者角色访问权。

这将生成 Activity Tracker 事件以报告相应操作。 有关更多信息,请参阅 Activity Tracker 事件

将兼容性规则应用于模式的新版本

当您创建架构的新版本时,架构注册表支持执行兼容性规则。 如果发出了创建新模式版本的请求,但此版本不符合必需的兼容性规则,那么注册表将拒绝该请求。 支持以下规则:

兼容性规则 测试对象 描述
不适用 创建新模式版本时,不执行兼容性检查。
BACKWARD 模式的最新版本 模式的新版本可以省略模式现有版本中存在的字段。
BACKWARD_TRANSITIVE 模式的所有版本 模式的新版本可以添加模式现有版本中不存在的可选字段。
FORWARD 模式的最新版本 模式的新版本可以添加模式现有版本中不存在的字段。
FORWARD_TRANSITIVE 模式的所有版本 模式的新版本可以省略模式现有版本中存在的可选字段。
FULL 模式的最新版本 模式的新版本可以添加模式现有版本中不存在的可选字段。
FULL_TRANSITIVE 模式的所有版本 模式的新版本可以省略模式现有版本中存在的可选字段。

可以在以下两个作用域应用这些规则:

  1. 全局作用域,这是创建新模式版本时使用的缺省值。
  2. 基于模式的级别。 如果定义了基于模式级别的规则,那么该规则会覆盖该特定模式的全局缺省值。

缺省情况下,注册表的全局兼容性规则设置为 NONE。 必须定义每个模式级别的规则,否则模式将使用全局设置。

完整 API 描述

有关包含示例的 REST API 的描述,请参阅 Event Streams schema-registry-rest

您可以 从 Event Streams Schema Registry REST API YAML文件下载API的完整规格。 要查看Swagger文件,请使用Swagger工具,例如 Swagger编辑器

有关使用 SDK 访问模式注册表的更多信息,请参阅 Event Streams 模式注册表 REST API

有关 Event Streams 资源和 Terraform 上的数据源的信息,请参阅 资源和数据源

将Schema Registry与第三方 SerDes

Schema Registry支持使用以下第三方 SerDes:

  • Confluent SerDes

要将 Confluent SerDes 配置为使用模式注册表,您需要在 Kafka 客户机的配置中指定以下两个属性:

属性名
SCHEMA_REGISTRY_URL_CONFIG 将其设置为Schema Registry的 URL,包括您的基本认证凭证,路径为 /confluent。 例如,如果 $APIKEY 是API密钥, $HOST“服务凭证”选项卡中“ kafka_http_url ”字段的主机,则其值的形式为: https://token:{$APIKEY}@{$HOST}/{confluent}
BASIC_AUTH_CREDENTIALS_SOURCE 发送至 URL。 这指示 SerDes 使用模式注册表 URL 中提供的凭证来执行 HTTP 基本认证。

您还可以选择提供以下属性来控制模式选择 (主题命名策略):

属性名
VALUE_SUBJECT_NAME_STRATEGY 支持 TopicNameStrategy(缺省值),RecordNameStrategyTopicRecordNameStrategy。 例如,要指定使用 TopicRecordNameStrategy 选择消息值的模式,可以使用以下客户端属性:configs.put(KafkaAvroSerializerConfig.VALUE_SUBJECT_NAME_STRATEGY, TopicRecordNameStrategy.class.getName());
KEY_SUBJECT_NAME_STRATEGY 支持 TopicNameStrategy (缺省值),RecordNameStrategyTopicRecordNameStrategy。 参见VALUE_SUBJECT_NAME_STRATEGY示例。

下图显示了创建一个 Kafka 生产器的属性示例,该生产器使用Confluent SerDes,并且可以连接到 Event Streams 服务:

Kafka properties for Confluent Serdes
Kafka properties for Confluent Serdes

如果使用不在注册表中的模式发送消息,那么 SerDes 会尝试在注册表中创建新模式或模式版本。 如果不需要此行为,那么可以通过从应用程序中除去对模式资源的写程序许可权来将其禁用。 请参阅 管理对模式注册表的访问权

不支持用于模式查找和注册的 normalize 选项。

将模式注册表与使用 Confluent 注册表 API 的工具配合使用

模式注册表支持由 Confluent 模式注册表的 7.2 版本提供的 API 子集。 这旨在提供与旨在与 Confluent 模式注册表配合使用的工具的有限兼容性。 仅实现了带有以下路径的 HTTP REST端点:

  • 兼容性
  • 配置
  • 模式
  • subjects

要配置应用程序以使用此兼容性 API,请按以下格式指定模式注册表端点:

https://token:{$APIKEY}@{$HOST}/{confluent}

其中:

  • $APIKEY 是要从 服务凭证 选项卡使用的 API 密钥
  • $HOST 是来自 服务凭证 选项卡中 kafka_http_url 字段的主机

将Schema Registry与第三方工具结合使用

Schema Registry可以使用第三方工具进行测试,例如 kafka-avro-console-producer.shkafka-avro-console-consumer.sh,它们允许使用Confluent SerDes 测试与架构的一致性。

要运行生产者或消费者工具,需要与 Event Streams 企业实例的连接选项具有相同的属性。

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="apikey";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

Avro 控制台生产者和使用者

您可以将 Kafka Avro 控制台生产者和使用者工具与 Event Streams 配合使用。 您必须提供一个客户端属性,此外,还需要将Schema Registry的连接方法和凭据作为命令行 --property 参数提供。 有两种连接方法,分别使用USER_INFO或 URL 作为凭证源。

要使用 URL 的凭据源方法进行执行,请使用以下代码。

    ./kafka-avro-console-[producer|consumer] --broker-list $BOOTSTRAP_ENDPOINTS --topic schema-test --property schema.registry.url=$SCHEMA_REGISTRY_URL --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' --property basic.auth.credentials.source=URL --producer.config $CONFIG_FILE

请将示例中的以下变量替换为您自己的值。

  • 将 Event Streams IBM Cloud 控制台中的 服务凭证 选项卡中的值作为引导服务器列表的 BOOTSTRAP_ENDPOINTS。
  • Event Streams SCHEMA_REGISTRY_URL,其中 kafka_http_url 的值来自 IBM Cloud 控制台中的 “服务凭据”选项卡,用户名是 token,apikey是 /confluent (例如,https://{token}:{apikey}@{kafka_http_url}/{confluent} )。
  • 将 CONFIG_FILE 替换为配置文件的路径。

要使用USER_INFO的凭据源方法执行,请使用以下代码。

    ./kafka-avro-console-[producer|consumer] --broker-list $BOOTSTRAP_ENDPOINTS --topic schema-test --property schema.registry.url=$SCHEMA_REGISTRY_URL --property value.schema='{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' --property basic.auth.credentials.source=USER_INFO --property basic.auth.user.info=token:apikey --producer.config $CONFIG_FILE

请将示例中的以下变量替换为您自己的值。

  • 将 Event Streams IBM Cloud 控制台中的 服务凭证 选项卡中的值作为引导服务器列表的 BOOTSTRAP_ENDPOINTS。
  • Event Streams SCHEMA_REGISTRY_URL,其中 kafka_http_url 的值来自 IBM Cloud 控制台中的 “服务凭据”选项卡,路径为 /confluent (例如 https://{kafka_http_url}/{confluent} )。
  • 将 CONFIG_FILE 替换为配置文件的路径。