IBM Cloud Docs
使用 Kafka API

使用 Kafka API

Kafka 提供跨越廣泛語言的豐富 API 及用戶端集合。 API 包括核心 API、Streams API 及 Connect API。

  • Kafka 的核心 API(消費者、生產者和管理 API)
    用來直接從一個以上的 Kafka 主題傳送及接收訊息。 Kafka 管理用戶端透過 Kafka API 提供簡單的介面,以便管理 Kafka 資源。 您可以建立、刪除及管理主題。 您也可以使用管理用戶端來管理消費者群組和配置。
  • 串流 API
    較高層級的串流處理 API,可輕鬆在主題間消耗、轉換和產生事件。
  • 連接 API
    一個框架,允許可重用或標準的整合,將事件串流進出外部系統,例如資料庫。

下表彙總您可以與 Event Streams 搭配使用的項目:

Standard、Enterprise 和 Lite 計畫中的Kafka客戶端支援。
企業方案 標準計畫 精簡方案
叢集上的 Kafka 版本 Kafka 3.6 Kafka 3.6 Kafka 3.6
最低建議 Kafka 用戶端版本 Kafka 2.6.0,或之後 Kafka 2.6.0,或之後 Kafka 2.6.0,或之後
受支援的用戶端版本 請參閱 所有建議用戶端的支援摘要
支援 Kafka Connect True True
支援 Kafka Streams True True
支援 ksqlDB True
鑑別需求 用戶端必須使用 SASL Plain 機制支援驗證,並使用 TLSv1.2 通訊協定的伺服器名稱指示 (SNI) 延伸。 用戶端必須使用 SASL Plain 機制支援驗證,並使用 TLSv1.2 通訊協定的伺服器名稱指示 (SNI) 延伸。 用戶端必須使用 SASL Plain 機制支援驗證,並使用 TLSv1.2 通訊協定的伺服器名稱指示 (SNI) 延伸。

選擇 Kafka 用戶端以搭配 Event Streams 使用

Kafka API 的正式用戶端是以 Java 撰寫,因此,會包含最新特性及錯誤修正程式。 如需更多關於此 API 的資訊,請參閱 Kafka Producer API 3.6Kafka Consumer API 3.6

若為其他語言,請執行下列其中一個用戶端,所有這些用戶端都已使用 Event Streams進行測試。

所有建議用戶端的支援摘要

客戶支援摘要
客戶 語言 建議版本 支援的最低版本 [1] 範例鏈結
官方 Apache Kafka 客戶端:
Apache Kafka 用戶端 Java 3.6.2或更新版本 2.5.0 Java 控制台樣本

Liberty 樣本

第三方客戶:
confluent-kafka-javascript Node.js 最新 1.0.0
confluent-kafka-python Python 最新 1.4.0 Kafka Python 範例
confluent-kafka-go 進行 最新 1.4.0
librdkafka C 或 C++ 最新 1.4.0
節點-rdkafka Node.js 最新 2.8.0 Node.js 範例
sarama 進行 最新 1.40.0 Sarama 範例

將用戶端連接至 Event Streams

如需如何配置 Java 用戶端以連接至 Event Streams 的相關資訊,請參閱配置用戶端

配置 Kafka API 用戶端

若要建立連線,用戶端必須設定為至少使用 SASL PLAIN 或 SASL OAUTHBEARER over TLSv1.2,並需要使用者名稱和開機伺服器清單。 TLSv1.2 確保連線經過加密,並驗證經紀人的真實性 (防止中間人攻擊)。 SASL 會對所有連線強制執行鑑別。

若要擷取使用者名稱、密碼和開機伺服器清單,服務範例需要服務憑證物件或服務金鑰。 有關建立這些物件的詳細資訊,請參閱 連線至 Event Streams

使用 SASL PLAIN

請使用下列字串及內容。

  • 使用 bootstrap_endpoints 字串作為引導伺服器的清單,並將此主機及埠配對字串傳遞至 Kafka 用戶端。
  • 使用 userapi_key 屬性作為使用者名稱和密碼。

對於 Java 用戶端,以下範例顯示了最小屬性集,其中 ${USERNAME}, ${PASSWORD},和 ${BOOTSTRAP_ENDPOINTS} 將被您之前擷取的值取代。

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

如果您使用的 Kafka 用戶端版本早於 0.10.2.1,則不支援 sasl.jaas.config 屬性,您必須在 JAAS 設定檔中提供用戶端設定。

使用 SASL OAUTHBEARER

在配置 Java 用戶端的 SASL 機制之前,有兩個必要條件。

  • 支援的最低 Kafka Java 用戶端版本為 3.1.0。
  • 需要從 Maven Central 下載其他 JAR 套件,並在類別路徑中提供。

如果在建置系統中使用 Maven,請將下列資訊新增至 pom.xml 檔的相依關係區段中。

<dependency>
    <groupId>com.ibm.cloud.eventstreams</groupId>
    <artifactId>oauth-client</artifactId>
    <version>1.3.1</version>
</dependency>

如果在建置系統中使用 Gradle,請將下列資訊新增至相依關係區段中的檔案 build.gradle

implementation com.ibm.cloud.eventstreams:oauth-client:1.3.1

請使用下列字串及內容。

  • 使用 bootstrap_endpoints 字串作為引導伺服器的清單,並將此主機及埠配對字串傳遞至 Kafka 用戶端。
  • 使用 api_key 字串作為 API 金鑰。
  • IAMOAuthBearerLoginCallbackHandler 由 jar 套件 com.ibm.cloud.eventstreams:oauth-client:+ 提供。
  • IBM Cloud® Identity and Access Management的記號端點 https://iam.cloud.ibm.com/identity/token 配置為使用 jaas 配置中指定的授權類型從 API 金鑰產生記號。 它是在用戶端執行,因此 API 金鑰永遠不會傳送至伺服器端,且提供比長期 API 金鑰更好的安全。
  • Cloud Identity and Access Management的金鑰端點 https://iam.cloud.ibm.com/identity/keys 已配置為驗證記號。

對於 Java 用戶端,以下範例顯示了最小屬性集,其中 ${BOOTSTRAP_ENDPOINTS},和 ${APIKEY} 將被您之前擷取的值取代。

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

範例程式碼參照 Event Streams 範例

如需其他 Kafka 用戶端程式庫,請參閱其說明文件,以瞭解如何實作 OAUTHBEARER 支援。 例如:。

  • sarama: 需要實作 AccessTokenProvider 介面。
  • librdkafka: 需要實作 oauthbearer_token_refresh_cb 回呼。

如需如何使用 API 金鑰來產生 IBM Cloud IAM 記號的相關資訊,請參閱 IBM Cloud® Identity and Access Management的 文件


  1. 在持續測試中驗證的最早版本。 一般而言,這是過去 12 個月內的初始版本,或已知存在重大問題時的更新版本。 如果您無法執行列出的任何用戶端,您可以使用符合下列最低需求的其他第三方用戶端(例如 librdkafka )。1. 支援 Kafka 1.40或更新版本。2. 可以使用 SASL PLAIN 與 TLSv1.2 進行連線和驗證。 支援 TLS 的 SNI 擴充,其中伺服器的主機名稱包含在 TLS 握手中。4. 支援橢圓曲線加密法。 無論如何,請使用最新版本的用戶端。 ↩︎