使用 Kafka API
Kafka 在范围广泛的语言中提供一组丰富的 API 和客户机。 API 包括核心 API,Streams API 和 Connect API。
- Kafka 的核心API(消费者、生产者和管理员API)
用于直接发送和接收来自一个或多个 Kafka 主题的消息。 Kafka 管理客户机通过 Kafka API 提供了简单的界面,用于管理 Kafka 资源。 您可以创建,删除和管理主题。 您还可以使用管理客户机来管理使用者组和配置。 - 流API
一个更高级的流处理API,用于轻松地在主题之间消费、转换和生成事件。 - 连接API
一个框架,允许可重复使用或标准集成,将事件流输入或输出到外部系统,如数据库。
下表总结了使用 Event Streams 可实现的功能:
企业套餐 | 标准套餐 | Lite 套餐 | |
---|---|---|---|
集群上的 Kafka 版本 | Kafka 3.6 | Kafka 3.6 | Kafka 3.6 |
建议的最低 Kafka 客户机版本 | Kafka 2.6.0 或更高版本 | Kafka 2.6.0 或更高版本 | Kafka 2.6.0 或更高版本 |
支持的客户机版本 | 请参阅 所有建议客户机的支持摘要 | ||
Kafka Connect 支持 | 是 | 是 | 否 |
Kafka Streams 支持 | 是 | 是 | 否 |
ksqlDB 支持 | 是 | 否 | 否 |
认证需求 | 客户必须使用SASL Plain机制进行身份验证,并使用 TLSv1.2 协议的扩展名服务器名称指示(SNI)。 | 客户必须使用SASL Plain机制进行身份验证,并使用 TLSv1.2 协议的扩展名服务器名称指示(SNI)。 | 客户必须使用SASL Plain机制进行身份验证,并使用 TLSv1.2 协议的扩展名服务器名称指示(SNI)。 |
选择与 Event Streams 一起使用的 Kafka 客户机
Kafka API 的官方客户机以 Java 编写,因此包含最新的功能和错误修订。 如需了解有关此API的更多信息,请参阅 Kafka 生产者API 3.6 和 Kafka 消费者API 3.6。
对于其他语言,请运行下列其中一个客户机,所有这些客户机都通过 Event Streams进行测试。
所有建议的客户机的支持摘要
客户端 | 语言 | 建议的版本 | 支持的最低版本 [1] | 样本链接 |
---|---|---|---|---|
官方 Apache Kafka 客户端: | ||||
Apache Kafka 客户机 | Java | 3.6.2或更高版本 | 2.5.0 | Java 控制台示例 |
第三方客户: | ||||
confluent-kafka-javascript | Node.js | 最新 | 1.0.0 | |
confluent-kafka-python | Python | 最新 | 1.4.0 | Kafka Python 样本 |
confluent-kafka-go | 执行 | 最新 | 1.4.0 | |
librdkafka | C 或 C++ | 最新 | 1.4.0 | |
节点-rdkafka | Node.js | 最新 | 2.8.0 | Node.js 样本 |
萨拉马 | 执行 | 最新 | 1.40.0 | Sarama 示例 |
将客户机连接到 Event Streams
有关如何配置 Java 客户机以连接到 Event Streams 的信息,请参阅配置客户机。
配置 Kafka API 客户机
要建立连接,客户必须至少通过 TLSv1.2 配置为使用SASL PLAIN或SASL OAUTHBEARER,并需要用户名和引导服务器列表。 TLSv1.2 确保连接加密并验证经纪人的真实性(以防止中间人攻击)。 SASL 会对所有连接强制执行认证。
要检索用户名、密码和引导服务器列表,服务实例需要服务凭证对象或服务密钥。 有关创建这些对象的更多信息,请参阅 “连接到 Event Streams”。
使用 SASL PLAIN
使用以下字符串和属性。
- 使用
bootstrap_endpoints
字符串作为引导服务器的列表,并将此字符串的主机和端口对传递到 Kafka 客户机。 - 请将
user
和api_key
作为用户名和密码。
对于 Java 客户,以下示例显示了属性集合的最小值,其中 ${USERNAME}
、${PASSWORD}
和 ${BOOTSTRAP_ENDPOINTS}
应替换为您之前检索的值。
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
如果您使用的 Kafka 客户端版本低于 0.10.2.1,则不支持 sasl.jaas.config
属性,您必须在 JAAS 配置文件中提供客户端配置。
使用 SASL OAUTHBEARER
在为 Java 客户机配置 SASL 机制之前,有两个先决条件。
- 受支持的最低 Kafka Java 客户机版本为 3.1.0。
- 需要从 Maven Central 下载其他 jar 包并使其在类路径中可用。
如果在构建系统中使用 Maven,请将以下信息添加到依赖关系部分中的文件 pom.xml
。
<dependency>
<groupId>com.ibm.cloud.eventstreams</groupId>
<artifactId>oauth-client</artifactId>
<version>1.3.1</version>
</dependency>
如果在构建系统中使用 Gradle,请将以下信息添加到依赖关系部分中的文件 build.gradle
。
implementation com.ibm.cloud.eventstreams:oauth-client:1.3.1
使用以下字符串和属性。
- 使用
bootstrap_endpoints
字符串作为引导服务器的列表,并将此字符串的主机和端口对传递到 Kafka 客户机。 - 使用
api_key
字符串作为 API 密钥。 IAMOAuthBearerLoginCallbackHandler
由 jar 包com.ibm.cloud.eventstreams:oauth-client:+
提供。- IBM Cloud® Identity and Access Management的令牌端点
https://iam.cloud.ibm.com/identity/token
配置为使用 jaas 配置中的指定授权类型从 API 密钥生成令牌。 它是在客户机端完成的,因此不会将 API 密钥发送到服务器端,并且提供比长期存在的 API 密钥更好的安全性。 - Cloud Identity and Access Management的密钥端点
https://iam.cloud.ibm.com/identity/keys
配置为验证令牌。
对于 Java 客户,以下示例显示了最小属性集,其中 ${BOOTSTRAP_ENDPOINTS}
和 ${APIKEY}
应替换为您之前检索的值。
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
样本代码引用 Event Streams 样本。
有关其他 Kafka 客户机属性,请参阅有关如何实现 OAUTHBEARER 支持的文档。 例如:。
- sarama: 需要
AccessTokenProvider
接口的实现。 - librdkafka: 需要
oauthbearer_token_refresh_cb
回调的实现。
有关如何使用 API 密钥生成 IBM Cloud IAM 令牌的信息,请参阅 IBM Cloud® Identity and Access Management的 文档。
-
在持续测试中验证的最早版本。 通常情况下,是指最近12个月内推出的初始版本,如果已知存在重大问题,则提供更新的版本。 如果您无法运行列出的任何客户端,可以使用满足以下最低要求的其他第三方客户端(例如 librdkafka )。1. 支持 Kafka 1.40或更高版本。2。 可以使用SASL PLAIN与 TLSv1.2 进行连接和身份验证。3. 支持 TLS 的 SNI 扩展,其中服务器的主机名包含在 TLS 握手中。4. 支持椭圆曲线密码术。 在所有情况下,请使用最新版本的客户机。 ↩︎