IBM Cloud Docs
使用 Kafka API

使用 Kafka API

Kafka 在范围广泛的语言中提供一组丰富的 API 和客户机。 API 包括核心 API,Streams API 和 Connect API。

  • Kafka 的核心应用程序接口(消费者应用程序接口、生产者应用程序接口和管理应用程序接口)
    用于直接发送和接收来自一个或多个 Kafka 主题的消息。 Kafka 管理客户机通过 Kafka API 提供了简单的界面,用于管理 Kafka 资源。 您可以创建,删除和管理主题。 您还可以使用管理客户机来管理使用者组和配置。
  • 流应用程序接口
    更高级别的流处理 API,可在主题之间轻松消费、转换和生成事件。
  • 连接应用程序接口
    一个框架,允许可重复使用的或标准的集成,将事件流输入和输出到外部系统(如数据库)。

下表总结了使用 Event Streams 可实现的功能:

标准、企业和精简版计划均支持Kafka客户端。
企业套餐 标准套餐 Lite 套餐
集群上的 Kafka 版本 Kafka 3.6 Kafka 3.6 Kafka 3.6
建议的最低 Kafka 客户机版本 Kafka 2.6.0 或以后 Kafka 2.6.0 或以后 Kafka 2.6.0 或以后
支持的客户机版本 请参阅 所有建议客户机的支持摘要
Kafka Connect 支持
Kafka Streams 支持
ksqlDB 支持
认证需求 客户端必须支持使用 SASL Plain 机制进行身份验证,并使用 TLSv1.2 协议的服务器名称指示(SNI)扩展。 客户端必须支持使用 SASL Plain 机制进行身份验证,并使用 TLSv1.2 协议的服务器名称指示(SNI)扩展。 客户端必须支持使用 SASL Plain 机制进行身份验证,并使用 TLSv1.2 协议的服务器名称指示(SNI)扩展。

选择与 Event Streams 一起使用的 Kafka 客户机

Kafka API 的官方客户机以 Java 编写,因此包含最新的功能和错误修订。 有关此 API 的更多信息,请参阅 Kafka 生产者 API 3.6Kafka 消费者 API 3.6

对于其他语言,请运行下列其中一个客户机,所有这些客户机都通过 Event Streams进行测试。

所有建议的客户机的支持摘要

客户支持摘要
客户端 语言 建议的版本 支持的最低版本 [1] 样本链接
Apache Kafka 官方客户端:
Apache Kafka 客户机 Java 3.6.2或更高版本 2.5.0 Java 控制台示例

Liberty 示例

第三方客户:
confluent-kafka-javascript Node.js 最新 1.0.0
confluent-kafka-python Python 最新 1.4.0 Kafka Python 样本
confluent-kafka-go 执行 最新 1.4.0
librdkafka C 或 C++ 最新 1.4.0
节点-rdkafka Node.js 最新 2.8.0 Node.js 样本
萨拉马 执行 最新 1.40.0 Sarama 示例

将客户机连接到 Event Streams

有关如何配置 Java 客户机以连接到 Event Streams 的信息,请参阅配置客户机

配置 Kafka API 客户机

要建立连接,必须至少将客户端配置为通过 TLSv1.2 使用 SASL PLAIN 或 SASL OAUTHBEARER,并要求提供用户名和引导服务器列表。 TLSv1.2 确保连接加密,并验证经纪人的真实性(防止中间人攻击)。 SASL 会对所有连接强制执行认证。

要检索用户名、密码和引导服务器列表,服务实例需要一个服务凭据对象或服务密钥。 有关创建这些对象的更多信息,请参阅 连接到 Event Streams

使用 SASL PLAIN

使用以下字符串和属性。

  • 使用 bootstrap_endpoints 字符串作为引导服务器的列表,并将此字符串的主机和端口对传递到 Kafka 客户机。
  • 使用 userapi_key 属性作为用户名和密码。

对于 Java 客户端,下面的示例显示了最小属性集,其中 ${USERNAME}${PASSWORD}${BOOTSTRAP_ENDPOINTS} 将由您之前获取的值取代。

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

如果使用的 Kafka 客户端版本早于 0.10.2.1,则不支持 sasl.jaas.config 属性,必须在 JAAS 配置文件中提供客户端配置。

使用 SASL OAUTHBEARER

在为 Java 客户机配置 SASL 机制之前,有两个先决条件。

  • 受支持的最低 Kafka Java 客户机版本为 3.1.0。
  • 需要从 Maven Central 下载其他 jar 包并使其在类路径中可用。

如果在构建系统中使用 Maven,请将以下信息添加到依赖关系部分中的文件 pom.xml

<dependency>
    <groupId>com.ibm.cloud.eventstreams</groupId>
    <artifactId>oauth-client</artifactId>
    <version>1.4.0</version>
</dependency>

如果在构建系统中使用 Gradle,请将以下信息添加到依赖关系部分中的文件 build.gradle

implementation com.ibm.cloud.eventstreams:oauth-client:1.4.0

IBM Cloud® Identity and Access Management 身份服务支持多种生成承载令牌的方式,本 oauth 客户端库支持其中两种。

  • API 密钥。
  • 可信配置文件和计算资源令牌。

使用带有 API 密钥的 SASL OAUTHBEARER

使用以下字符串和属性。

  • 使用 BOOTSTRAP_ENDPOINTS 字符串作为引导服务器的列表,并将此字符串的主机和端口对传递到 Kafka 客户机。
  • IAMOAuthBearerLoginCallbackHandler 由 jar 包 com.ibm.cloud.eventstreams:oauth-client:+ 提供。
  • IBM Cloud® Identity and Access Management的令牌端点 https://iam.cloud.ibm.com/identity/token 配置为使用 jaas 配置中的指定授权类型从 API 密钥生成令牌。 它是在客户机端完成的,因此不会将 API 密钥发送到服务器端,并且提供比长期存在的 API 密钥更好的安全性。
  • Cloud Identity and Access Management的密钥端点 https://iam.cloud.ibm.com/identity/keys 配置为验证令牌。
  • grant_type sasl.jaas.configurn:ibm:params:oauth:grant-type:apikey
  • apikey sasl.jaas.config 中是用于在客户端生成承载令牌的 API 密钥。 它可以来自用户或服务 ID。

对于 Java 客户端,下面的示例显示了最小属性集,其中 ${BOOTSTRAP_ENDPOINTS}${APIKEY} 将由您之前获取的值取代。

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";

使用带有可信配置文件和计算资源令牌的 SASL OAUTHBEARER

除了 sasl.jaas.config 不同之外,所有属性都与 API 密钥相同。

  • grant_type sasl.jaas.config 是。urn:ibm:params:oauth:grant-type:cr-token
  • profile_id 中的 是存储可信配置文件 ID 的文件位置。sasl.jaas.config 该文件可作为只读卷挂载到运行 Kafka 客户端代码的 Kubernetes pod 上,并供 Kafka 客户端代码使用。
  • cr_token 中的 是一个文件位置,存储来自运行 客户端代码的 pod 的服务帐户令牌。sasl.jaas.config Kafka Kubernetes 请参阅 什么是服务帐户令牌

请看下面的例子

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:cr-token" profile_id="${TRUSTED_PROFILE_ID_FILE_PATH}" cr_token="${SERVICE_ACCOUNT_TOKEN_FILE_PATH}";

有关 如何设置受信任的个人资料的 更多详情

oauth 客户端的源代码请参阅 Event Streams Java SDK

客户端代码示例请参阅 Event Streams Sample

有关其他 Kafka 客户机属性,请参阅有关如何实现 OAUTHBEARER 支持的文档。 例如:。

  • sarama: 需要 AccessTokenProvider 接口的实现。
  • librdkafka: 需要 oauthbearer_token_refresh_cb 回调的实现。

有关如何使用 API 密钥生成 IBM Cloud IAM 令牌的信息,请参阅 IBM Cloud® Identity and Access Management的 文档


  1. 在持续测试中验证的最早版本。 通常情况下,它是过去 12 个月内可用的初始版本,如果已知存在重大问题,则是更新的版本。 如果无法运行列出的任何客户端,可以使用符合以下最低要求的其他第三方客户端(例如 librdkafka )。1. 支持 Kafka 1.40或更高版本。2。 可使用 SASL PLAIN 与 TLSv1.2 进行连接和验证。3. 支持 TLS 的 SNI 扩展,在 TLS 握手过程中包含服务器的主机名。4. 支持椭圆曲线密码术。 在所有情况下,请使用最新版本的客户机。 ↩︎