使用 Kafka API

Kafka 在范围广泛的语言中提供一组丰富的 API 和客户机。 API 包括核心 API,Streams API 和 Connect API。

  • Kafka 的核心 API(Consumer、Producer 和 Admin API) 用于直接发送和接收来自一个或多个 Kafka 主题的消息。 Kafka 管理客户机通过 Kafka API 提供了简单的界面,用于管理 Kafka 资源。 您可以创建,删除和管理主题。 您还可以使用管理客户机来管理使用者组和配置。
  • 流应用程序接口 更高级别的流处理 API,可在主题之间轻松消费、转换和生成事件。
  • 连接应用程序接口 一个框架,允许可重复使用的或标准的集成将事件流输入和输出到外部系统(如数据库)。

下表总结了使用 Event Streams 可实现的功能:

标准、企业和精简版计划均支持Kafka客户端。
企业套餐 标准套餐 Lite 套餐
集群上的 Kafka 版本 Kafka 3.8 Kafka 3.8 Kafka 3.8
建议的最低 Kafka 客户机版本 Kafka 2.6.0 或以后 Kafka 2.6.0 或以后 Kafka 2.6.0 或以后
支持的客户机版本 请参阅 所有建议客户机的支持摘要
Kafka Connect 支持
Kafka Streams 支持
ksqlDB 支持
认证需求 客户端必须支持使用 SASL Plain 机制进行身份验证,并使用 TLSv1.2 协议的服务器名称指示(SNI)扩展。 客户端必须支持使用 SASL Plain 机制进行身份验证,并使用 TLSv1.2 协议的服务器名称指示(SNI)扩展。 客户端必须支持使用 SASL Plain 机制进行身份验证,并使用 TLSv1.2 协议的服务器名称指示(SNI)扩展。

选择与 Event Streams 一起使用的 Kafka 客户机

Kafka API 的官方客户机以 Java 编写,因此包含最新的功能和错误修订。 有关此 API 的更多信息,请参阅 Kafka 生产者 API 3.8Kafka 消费者 API 3.8

对于其他语言,请运行下列其中一个客户机,所有这些客户机都通过 Event Streams进行测试。

所有建议的客户机的支持摘要

客户支持摘要
客户端 语言 建议的版本 支持的最低版本 [1] 样本链接
Apache Kafka 官方客户端:
Apache Kafka 客户机 Java 3.8.1 或以后 2.5.0 Java 控制台示例

Liberty 示例

第三方客户:
confluent-kafka-javascript Node.js 最新 1.0.0
confluent-kafka-python Python 最新 1.4.0 Kafka Python 样本
confluent-kafka-go 执行 最新 1.4.0
librdkafka C 或 C++ 最新 1.4.0
节点-rdkafka Node.js 最新 2.8.0 Node.js 样本
萨拉马 执行 最新 1.40.0 Sarama 示例

将客户机连接到 Event Streams

有关如何配置 Java 客户机以连接到 Event Streams 的信息,请参阅配置客户机

配置 Kafka API 客户机

要建立连接,必须至少将客户端配置为通过 TLSv1.2 使用 SASL PLAIN 或 SASL OAUTHBEARER,并要求提供用户名和引导服务器列表。 TLSv1.2 确保连接加密,并验证经纪人的真实性(防止中间人攻击)。 SASL 会对所有连接强制执行认证。

要检索用户名、密码和引导服务器列表,服务实例需要一个服务凭据对象或服务密钥。 有关创建这些对象的更多信息,请参阅 连接到 Event Streams

使用 SASL PLAIN

使用以下字符串和属性。

  • 使用 bootstrap_endpoints 字符串作为引导服务器的列表,并将此字符串的主机和端口对传递到 Kafka 客户机。
  • 使用 userapi_key 属性作为用户名和密码。

对于 Java 客户端,下面的示例显示了最小属性集,其中 ${USERNAME}${PASSWORD}${BOOTSTRAP_ENDPOINTS} 将由您之前获取的值代替。

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

如果使用的 Kafka 客户端版本早于 0.10.2.1,则不支持 sasl.jaas.config 属性,必须在 JAAS 配置文件中提供客户端配置。

使用 SASL OAUTHBEARER 与 Java 客户端 v3.4- 4.0

在为 Java 客户端配置 SASL 机制之前,有两个先决条件:

  • Kafka Java 客户端的最低支持版本是 3.4 (最好是 3.6 或更高)。
  • 需要从 Maven Central 下载一个额外的 jar 包,并将其放在类路径中。

如果在构建系统中使用了 Maven,请在 pom.xml 文件的依赖项部分添加以下信息:

<dependency>
    <groupId>com.ibm.cloud.eventstreams</groupId>
    <artifactId>oauth-client</artifactId>
    <version>1.4.0</version>
</dependency>

如果在构建系统中使用 Gradle,请在依赖项部分的 build.gradle 文件中添加以下信息:

implementation com.ibm.cloud.eventstreams:oauth-client:1.4.0

IBM Cloud® Identity and Access Management 身份服务支持多种生成承载令牌的方式,本 oauth 客户端库支持其中两种:

  • API 密钥
  • 可信配置文件和计算资源令牌

使用带有 API 密钥的 SASL OAUTHBEARER

使用以下字符串和属性。

  • 使用 BOOTSTRAP_ENDPOINTS 字符串作为引导服务器的列表,并将此字符串的主机和端口对传递到 Kafka 客户机。
  • IAMOAuthBearerLoginCallbackHandler 由 jar 包 com.ibm.cloud.eventstreams:oauth-client:+ 提供。
  • IBM Cloud® Identity and Access Management 的令牌端点 https://iam.cloud.ibm.com/identity/token 被配置为使用 jaas 配置中指定的授予类型从 API 密钥生成令牌。 它是在客户端完成的,因此 API 密钥永远不会发送到服务器端,这比长期存在的 API 密钥提供了更好的安全性。
  • Cloud Identity and Access Management的密钥端点 https://iam.cloud.ibm.com/identity/keys 配置为验证令牌。
  • grant_type sasl.jaas.configurn:ibm:params:oauth:grant-type:apikey
  • apikey sasl.jaas.config 中是用于在客户端生成承载令牌的 API 密钥。 它可以来自用户或服务 ID。

对于 Java 客户端,下面的示例显示了最小属性集,其中用先前获取的值替换了 ${BOOTSTRAP_ENDPOINTS}${APIKEY}

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.cloud.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";

使用带有可信配置文件和计算资源令牌的 SASL OAUTHBEARER

除了 sasl.jaas.config 不同之外,所有属性都与 API 密钥 相同。

  • grant_type sasl.jaas.config 是。urn:ibm:params:oauth:grant-type:cr-token
  • profile_id sasl.jaas.config 中是存储受信任配置文件 ID 的文件位置。 该文件可作为只读卷挂载到运行 Kafka 客户端代码的 Kubernetes pod 上,并供 Kafka 客户端代码使用。
  • cr_token 中的 是一个文件位置,存储来自运行 客户端代码的 pod 的服务帐户令牌。sasl.jaas.config Kafka Kubernetes 更多信息,请参阅 什么是服务帐户令牌

例如:

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:cr-token" profile_id="${TRUSTED_PROFILE_ID_FILE_PATH}" cr_token="${SERVICE_ACCOUNT_TOKEN_FILE_PATH}";

您可以了解有关 如何设置受信任的个人资料的 更多详情。

oauth 客户端的源代码参考了 Event Streams Java SDK

示例客户端代码指的是 Event Streams 示例。

使用 Java 客户端 v4.1 及更高版本的 SASL OAUTHBEARER

当使用 v4.1 或更高版本的 Kafka Java 客户端时,客户端需要使用较新版本的 Event Streams oauth 客户端,它依赖于 Kafka 的默认回调处理程序和适当的令牌检索器。

如果在构建系统中使用了 Maven,请在 pom.xml 文件的依赖项部分添加以下信息:

<dependency>
    <groupId>com.ibm.cloud.eventstreams</groupId>
    <artifactId>oauth-client</artifactId>
    <version>2.0.0</version>
</dependency>

如果在构建系统中使用 Gradle,请在依赖项部分的 build.gradle 文件中添加以下信息:

implementation com.ibm.cloud.eventstreams:oauth-client:2.0.+

IBM Cloud® Identity and Access Management 身份服务支持多种生成承载令牌的方式,本 oauth 客户端库支持其中两种:

  • API 密钥
  • 可信配置文件和计算资源令牌

使用带有 API 密钥的 SASL OAUTHBEARER

除了必须的 bootstrap.servers 属性和任何特定的生产者、消费者和管理者设置外,请使用以下字符串和属性。

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    grant_type="urn:ibm:params:oauth:grant-type:apikey" \
    apikey="${YOUR_IBM_CLOUD_API_KEY}";
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.jwt.retriever.class=com.ibm.cloud.eventstreams.oauth.client.IAMTokenRetriever
sasl.oauthbearer.token.endpoint.url=https://private.iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://private.iam.cloud.ibm.com/identity/keys

在容器化环境中使用带有可信配置文件和计算资源令牌的 SASL OAUTHBEARER

有关详细信息,请参阅 为计算资源生成 IAM 令牌

security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
    grant_type="urn:ibm:params:oauth:grant-type:cr-token" \
    cr_token="/path/to/cr-token-file" \
    profile_id="/path/to/profile-id-file";
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.jwt.retriever.class=com.ibm.cloud.eventstreams.oauth.client.IAMTokenRetriever
sasl.oauthbearer.token.endpoint.url=https://private.iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://private.iam.cloud.ibm.com/identity/keys

系统属性 org.apache.kafka.sasl.oauthbearer.allowed.urls

从 Kafka 4.0,客户端需要一个系统属性来设置 SASL OAUTHBEARER 令牌和 jwks 端点允许的 URL。

更多信息,请参阅 系统属性

使用 Apache Kafka 发行版提供的 CLI 客户端 shell 脚本时,也可以使用 KAFKA_OPTS 环境变量设置系统属性。

export KAFKA_OPTS="-Dorg.apache.kafka.sasl.oauthbearer.allowed.urls=https://private.iam.cloud.ibm.com/identity/keys,https://private.iam.cloud.ibm.com/identity/token,https://api.metadata.cloud.ibm.com/identity/v1/iam_tokens"

与非 Java 客户端一起使用 SASL OAUTHBEARER

有关其他 Kafka 客户机属性,请参阅有关如何实现 OAUTHBEARER 支持的文档。 例如:

  • sarama: 需要 AccessTokenProvider 接口的实现。
  • librdkafka: 需要 oauthbearer_token_refresh_cb 回调的实现。

有关如何使用 API 密钥生成 IBM Cloud IAM 令牌的信息,请参阅 IBM Cloud® Identity and Access Management的 文档


  1. 在持续测试中验证的最早版本。 通常情况下,它是过去 12 个月内可用的初始版本,如果已知存在重大问题,则是更新的版本。 如果无法运行列出的任何客户端,可以使用符合以下最低要求的其他第三方客户端(例如 librdkafka )。1. 支持 Kafka 1.40或更高版本。2。 可使用 SASL PLAIN 与 TLSv1.2 进行连接和验证。3. 支持用于 TLS 的 SNI 扩展,其中服务器的主机名包含在 TLS 握手中。4. 支持椭圆曲线密码术。 在所有情况下,请使用最新版本的客户机。 ↩︎