IBM Cloud Docs
使用 Kafka API

使用 Kafka API

Kafka 在范围广泛的语言中提供一组丰富的 API 和客户机。 API 包括核心 API,Streams API 和 Connect API。

  • Kafka 的核心API(消费者、生产者和管理员API)
    用于直接发送和接收来自一个或多个 Kafka 主题的消息。 Kafka 管理客户机通过 Kafka API 提供了简单的界面,用于管理 Kafka 资源。 您可以创建,删除和管理主题。 您还可以使用管理客户机来管理使用者组和配置。
  • 流API
    一个更高级的流处理API,用于轻松地在主题之间消费、转换和生成事件。
  • 连接API
    一个框架,允许可重复使用或标准集成,将事件流输入或输出到外部系统,如数据库。

下表总结了使用 Event Streams 可实现的功能:

标准、企业和精简版计划均支持Kafka客户端。
企业套餐 标准套餐 Lite 套餐
集群上的 Kafka 版本 Kafka 3.6 Kafka 3.6 Kafka 3.6
建议的最低 Kafka 客户机版本 Kafka 2.6.0 或更高版本 Kafka 2.6.0 或更高版本 Kafka 2.6.0 或更高版本
支持的客户机版本 请参阅 所有建议客户机的支持摘要
Kafka Connect 支持
Kafka Streams 支持
ksqlDB 支持
认证需求 客户必须使用SASL Plain机制进行身份验证,并使用 TLSv1.2 协议的扩展名服务器名称指示(SNI)。 客户必须使用SASL Plain机制进行身份验证,并使用 TLSv1.2 协议的扩展名服务器名称指示(SNI)。 客户必须使用SASL Plain机制进行身份验证,并使用 TLSv1.2 协议的扩展名服务器名称指示(SNI)。

选择与 Event Streams 一起使用的 Kafka 客户机

Kafka API 的官方客户机以 Java 编写,因此包含最新的功能和错误修订。 如需了解有关此API的更多信息,请参阅 Kafka 生产者API 3.6Kafka 消费者API 3.6

对于其他语言,请运行下列其中一个客户机,所有这些客户机都通过 Event Streams进行测试。

所有建议的客户机的支持摘要

客户支持摘要
客户端 语言 建议的版本 支持的最低版本 [1] 样本链接
官方 Apache Kafka 客户端:
Apache Kafka 客户机 Java 3.6.2或更高版本 2.5.0 Java 控制台示例

Liberty 示例

第三方客户:
confluent-kafka-javascript Node.js 最新 1.0.0
confluent-kafka-python Python 最新 1.4.0 Kafka Python 样本
confluent-kafka-go 执行 最新 1.4.0
librdkafka C 或 C++ 最新 1.4.0
节点-rdkafka Node.js 最新 2.8.0 Node.js 样本
萨拉马 执行 最新 1.40.0 Sarama 示例

将客户机连接到 Event Streams

有关如何配置 Java 客户机以连接到 Event Streams 的信息,请参阅配置客户机

配置 Kafka API 客户机

要建立连接,客户必须至少通过 TLSv1.2 配置为使用SASL PLAIN或SASL OAUTHBEARER,并需要用户名和引导服务器列表。 TLSv1.2 确保连接加密并验证经纪人的真实性(以防止中间人攻击)。 SASL 会对所有连接强制执行认证。

要检索用户名、密码和引导服务器列表,服务实例需要服务凭证对象或服务密钥。 有关创建这些对象的更多信息,请参阅 “连接到 Event Streams”。

使用 SASL PLAIN

使用以下字符串和属性。

  • 使用 bootstrap_endpoints 字符串作为引导服务器的列表,并将此字符串的主机和端口对传递到 Kafka 客户机。
  • 请将 userapi_key 作为用户名和密码。

对于 Java 客户,以下示例显示了属性集合的最小值,其中 ${USERNAME}${PASSWORD}${BOOTSTRAP_ENDPOINTS} 应替换为您之前检索的值。

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

如果您使用的 Kafka 客户端版本低于 0.10.2.1,则不支持 sasl.jaas.config 属性,您必须在 JAAS 配置文件中提供客户端配置。

使用 SASL OAUTHBEARER

在为 Java 客户机配置 SASL 机制之前,有两个先决条件。

  • 受支持的最低 Kafka Java 客户机版本为 3.1.0。
  • 需要从 Maven Central 下载其他 jar 包并使其在类路径中可用。

如果在构建系统中使用 Maven,请将以下信息添加到依赖关系部分中的文件 pom.xml

<dependency>
    <groupId>com.ibm.cloud.eventstreams</groupId>
    <artifactId>oauth-client</artifactId>
    <version>1.3.1</version>
</dependency>

如果在构建系统中使用 Gradle,请将以下信息添加到依赖关系部分中的文件 build.gradle

implementation com.ibm.cloud.eventstreams:oauth-client:1.3.1

使用以下字符串和属性。

  • 使用 bootstrap_endpoints 字符串作为引导服务器的列表,并将此字符串的主机和端口对传递到 Kafka 客户机。
  • 使用 api_key 字符串作为 API 密钥。
  • IAMOAuthBearerLoginCallbackHandler 由 jar 包 com.ibm.cloud.eventstreams:oauth-client:+ 提供。
  • IBM Cloud® Identity and Access Management的令牌端点 https://iam.cloud.ibm.com/identity/token 配置为使用 jaas 配置中的指定授权类型从 API 密钥生成令牌。 它是在客户机端完成的,因此不会将 API 密钥发送到服务器端,并且提供比长期存在的 API 密钥更好的安全性。
  • Cloud Identity and Access Management的密钥端点 https://iam.cloud.ibm.com/identity/keys 配置为验证令牌。

对于 Java 客户,以下示例显示了最小属性集,其中 ${BOOTSTRAP_ENDPOINTS}${APIKEY} 应替换为您之前检索的值。

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

样本代码引用 Event Streams 样本

有关其他 Kafka 客户机属性,请参阅有关如何实现 OAUTHBEARER 支持的文档。 例如:。

  • sarama: 需要 AccessTokenProvider 接口的实现。
  • librdkafka: 需要 oauthbearer_token_refresh_cb 回调的实现。

有关如何使用 API 密钥生成 IBM Cloud IAM 令牌的信息,请参阅 IBM Cloud® Identity and Access Management的 文档


  1. 在持续测试中验证的最早版本。 通常情况下,是指最近12个月内推出的初始版本,如果已知存在重大问题,则提供更新的版本。 如果您无法运行列出的任何客户端,可以使用满足以下最低要求的其他第三方客户端(例如 librdkafka )。1. 支持 Kafka 1.40或更高版本。2。 可以使用SASL PLAIN与 TLSv1.2 进行连接和身份验证。3. 支持 TLS 的 SNI 扩展,其中服务器的主机名包含在 TLS 握手中。4. 支持椭圆曲线密码术。 在所有情况下,请使用最新版本的客户机。 ↩︎