使用 Kafka API
Kafka 在范围广泛的语言中提供一组丰富的 API 和客户机。 API 包括核心 API,Streams API 和 Connect API。
- Kafka 的核心应用程序接口(消费者应用程序接口、生产者应用程序接口和管理应用程序接口)
用于直接发送和接收来自一个或多个 Kafka 主题的消息。 Kafka 管理客户机通过 Kafka API 提供了简单的界面,用于管理 Kafka 资源。 您可以创建,删除和管理主题。 您还可以使用管理客户机来管理使用者组和配置。 - 流应用程序接口
更高级别的流处理 API,可在主题之间轻松消费、转换和生成事件。 - 连接应用程序接口
一个框架,允许可重复使用的或标准的集成,将事件流输入和输出到外部系统(如数据库)。
下表总结了使用 Event Streams 可实现的功能:
企业套餐 | 标准套餐 | Lite 套餐 | |
---|---|---|---|
集群上的 Kafka 版本 | Kafka 3.6 | Kafka 3.6 | Kafka 3.6 |
建议的最低 Kafka 客户机版本 | Kafka 2.6.0 或以后 | Kafka 2.6.0 或以后 | Kafka 2.6.0 或以后 |
支持的客户机版本 | 请参阅 所有建议客户机的支持摘要 | ||
Kafka Connect 支持 | 是 | 是 | 否 |
Kafka Streams 支持 | 是 | 是 | 否 |
ksqlDB 支持 | 是 | 否 | 否 |
认证需求 | 客户端必须支持使用 SASL Plain 机制进行身份验证,并使用 TLSv1.2 协议的服务器名称指示(SNI)扩展。 | 客户端必须支持使用 SASL Plain 机制进行身份验证,并使用 TLSv1.2 协议的服务器名称指示(SNI)扩展。 | 客户端必须支持使用 SASL Plain 机制进行身份验证,并使用 TLSv1.2 协议的服务器名称指示(SNI)扩展。 |
选择与 Event Streams 一起使用的 Kafka 客户机
Kafka API 的官方客户机以 Java 编写,因此包含最新的功能和错误修订。 有关此 API 的更多信息,请参阅 Kafka 生产者 API 3.6 和 Kafka 消费者 API 3.6。
对于其他语言,请运行下列其中一个客户机,所有这些客户机都通过 Event Streams进行测试。
所有建议的客户机的支持摘要
客户端 | 语言 | 建议的版本 | 支持的最低版本 [1] | 样本链接 |
---|---|---|---|---|
Apache Kafka 官方客户端: | ||||
Apache Kafka 客户机 | Java | 3.6.2或更高版本 | 2.5.0 | Java 控制台示例 |
第三方客户: | ||||
confluent-kafka-javascript | Node.js | 最新 | 1.0.0 | |
confluent-kafka-python | Python | 最新 | 1.4.0 | Kafka Python 样本 |
confluent-kafka-go | 执行 | 最新 | 1.4.0 | |
librdkafka | C 或 C++ | 最新 | 1.4.0 | |
节点-rdkafka | Node.js | 最新 | 2.8.0 | Node.js 样本 |
萨拉马 | 执行 | 最新 | 1.40.0 | Sarama 示例 |
将客户机连接到 Event Streams
有关如何配置 Java 客户机以连接到 Event Streams 的信息,请参阅配置客户机。
配置 Kafka API 客户机
要建立连接,必须至少将客户端配置为通过 TLSv1.2 使用 SASL PLAIN 或 SASL OAUTHBEARER,并要求提供用户名和引导服务器列表。 TLSv1.2 确保连接加密,并验证经纪人的真实性(防止中间人攻击)。 SASL 会对所有连接强制执行认证。
要检索用户名、密码和引导服务器列表,服务实例需要一个服务凭据对象或服务密钥。 有关创建这些对象的更多信息,请参阅 连接到 Event Streams。
使用 SASL PLAIN
使用以下字符串和属性。
- 使用
bootstrap_endpoints
字符串作为引导服务器的列表,并将此字符串的主机和端口对传递到 Kafka 客户机。 - 使用
user
和api_key
属性作为用户名和密码。
对于 Java 客户端,下面的示例显示了最小属性集,其中 ${USERNAME}
、${PASSWORD}
和 ${BOOTSTRAP_ENDPOINTS}
将由您之前获取的值取代。
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
如果使用的 Kafka 客户端版本早于 0.10.2.1,则不支持 sasl.jaas.config
属性,必须在 JAAS 配置文件中提供客户端配置。
使用 SASL OAUTHBEARER
在为 Java 客户机配置 SASL 机制之前,有两个先决条件。
- 受支持的最低 Kafka Java 客户机版本为 3.1.0。
- 需要从 Maven Central 下载其他 jar 包并使其在类路径中可用。
如果在构建系统中使用 Maven,请将以下信息添加到依赖关系部分中的文件 pom.xml
。
<dependency>
<groupId>com.ibm.cloud.eventstreams</groupId>
<artifactId>oauth-client</artifactId>
<version>1.4.0</version>
</dependency>
如果在构建系统中使用 Gradle,请将以下信息添加到依赖关系部分中的文件 build.gradle
。
implementation com.ibm.cloud.eventstreams:oauth-client:1.4.0
IBM Cloud® Identity and Access Management 身份服务支持多种生成承载令牌的方式,本 oauth 客户端库支持其中两种。
- API 密钥。
- 可信配置文件和计算资源令牌。
使用带有 API 密钥的 SASL OAUTHBEARER
使用以下字符串和属性。
- 使用
BOOTSTRAP_ENDPOINTS
字符串作为引导服务器的列表,并将此字符串的主机和端口对传递到 Kafka 客户机。 IAMOAuthBearerLoginCallbackHandler
由 jar 包com.ibm.cloud.eventstreams:oauth-client:+
提供。- IBM Cloud® Identity and Access Management的令牌端点
https://iam.cloud.ibm.com/identity/token
配置为使用 jaas 配置中的指定授权类型从 API 密钥生成令牌。 它是在客户机端完成的,因此不会将 API 密钥发送到服务器端,并且提供比长期存在的 API 密钥更好的安全性。 - Cloud Identity and Access Management的密钥端点
https://iam.cloud.ibm.com/identity/keys
配置为验证令牌。 grant_type
sasl.jaas.config
是urn:ibm:params:oauth:grant-type:apikey
apikey
sasl.jaas.config
中是用于在客户端生成承载令牌的 API 密钥。 它可以来自用户或服务 ID。
对于 Java 客户端,下面的示例显示了最小属性集,其中 ${BOOTSTRAP_ENDPOINTS}
和 ${APIKEY}
将由您之前获取的值取代。
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";
使用带有可信配置文件和计算资源令牌的 SASL OAUTHBEARER
除了 sasl.jaas.config
不同之外,所有属性都与 API 密钥相同。
grant_type
sasl.jaas.config
是。urn:ibm:params:oauth:grant-type:cr-token
profile_id
中的 是存储可信配置文件 ID 的文件位置。sasl.jaas.config
该文件可作为只读卷挂载到运行 Kafka 客户端代码的 Kubernetes pod 上,并供 Kafka 客户端代码使用。cr_token
中的 是一个文件位置,存储来自运行 客户端代码的 pod 的服务帐户令牌。sasl.jaas.config
Kafka Kubernetes 请参阅 什么是服务帐户令牌。
请看下面的例子
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:cr-token" profile_id="${TRUSTED_PROFILE_ID_FILE_PATH}" cr_token="${SERVICE_ACCOUNT_TOKEN_FILE_PATH}";
有关 如何设置受信任的个人资料的 更多详情
oauth 客户端的源代码请参阅 Event Streams Java SDK。
客户端代码示例请参阅 Event Streams Sample。
有关其他 Kafka 客户机属性,请参阅有关如何实现 OAUTHBEARER 支持的文档。 例如:。
- sarama: 需要
AccessTokenProvider
接口的实现。 - librdkafka: 需要
oauthbearer_token_refresh_cb
回调的实现。
有关如何使用 API 密钥生成 IBM Cloud IAM 令牌的信息,请参阅 IBM Cloud® Identity and Access Management的 文档。
-
在持续测试中验证的最早版本。 通常情况下,它是过去 12 个月内可用的初始版本,如果已知存在重大问题,则是更新的版本。 如果无法运行列出的任何客户端,可以使用符合以下最低要求的其他第三方客户端(例如 librdkafka )。1. 支持 Kafka 1.40或更高版本。2。 可使用 SASL PLAIN 与 TLSv1.2 进行连接和验证。3. 支持 TLS 的 SNI 扩展,在 TLS 握手过程中包含服务器的主机名。4. 支持椭圆曲线密码术。 在所有情况下,请使用最新版本的客户机。 ↩︎