使用 Kafka API
Kafka 提供跨越廣泛語言的豐富 API 及用戶端集合。 API 包括核心 API、Streams API 及 Connect API。
- Kafka 的核心 API(消費者、生產者和管理 API)
用來直接從一個以上的 Kafka 主題傳送及接收訊息。 Kafka 管理用戶端透過 Kafka API 提供簡單的介面,以便管理 Kafka 資源。 您可以建立、刪除及管理主題。 您也可以使用管理用戶端來管理消費者群組和配置。 - 串流 API
較高層級的串流處理 API,可輕鬆在主題間消耗、轉換和產生事件。 - 連接 API
一個框架,允許可重用或標準的整合,將事件串流進出外部系統,例如資料庫。
下表彙總您可以與 Event Streams 搭配使用的項目:
企業方案 | 標準計畫 | 精簡方案 | |
---|---|---|---|
叢集上的 Kafka 版本 | Kafka 3.6 | Kafka 3.6 | Kafka 3.6 |
最低建議 Kafka 用戶端版本 | Kafka 2.6.0,或之後 | Kafka 2.6.0,或之後 | Kafka 2.6.0,或之後 |
受支援的用戶端版本 | 請參閱 所有建議用戶端的支援摘要 | ||
支援 Kafka Connect | True | True | 否 |
支援 Kafka Streams | True | True | 否 |
支援 ksqlDB | True | 否 | 否 |
鑑別需求 | 用戶端必須使用 SASL Plain 機制支援驗證,並使用 TLSv1.2 通訊協定的伺服器名稱指示 (SNI) 延伸。 | 用戶端必須使用 SASL Plain 機制支援驗證,並使用 TLSv1.2 通訊協定的伺服器名稱指示 (SNI) 延伸。 | 用戶端必須使用 SASL Plain 機制支援驗證,並使用 TLSv1.2 通訊協定的伺服器名稱指示 (SNI) 延伸。 |
選擇 Kafka 用戶端以搭配 Event Streams 使用
Kafka API 的正式用戶端是以 Java 撰寫,因此,會包含最新特性及錯誤修正程式。 如需更多關於此 API 的資訊,請參閱 Kafka Producer API 3.6 及 Kafka Consumer API 3.6。
若為其他語言,請執行下列其中一個用戶端,所有這些用戶端都已使用 Event Streams進行測試。
所有建議用戶端的支援摘要
客戶 | 語言 | 建議版本 | 支援的最低版本 [1] | 範例鏈結 |
---|---|---|---|---|
官方 Apache Kafka 客戶端: | ||||
Apache Kafka 用戶端 | Java | 3.6.2或更新版本 | 2.5.0 | Java 控制台樣本 |
第三方客戶: | ||||
confluent-kafka-javascript | Node.js | 最新 | 1.0.0 | |
confluent-kafka-python | Python | 最新 | 1.4.0 | Kafka Python 範例 |
confluent-kafka-go | 進行 | 最新 | 1.4.0 | |
librdkafka | C 或 C++ | 最新 | 1.4.0 | |
節點-rdkafka | Node.js | 最新 | 2.8.0 | Node.js 範例 |
sarama | 進行 | 最新 | 1.40.0 | Sarama 範例 |
將用戶端連接至 Event Streams
如需如何配置 Java 用戶端以連接至 Event Streams 的相關資訊,請參閱配置用戶端。
配置 Kafka API 用戶端
若要建立連線,用戶端必須設定為至少使用 SASL PLAIN 或 SASL OAUTHBEARER over TLSv1.2,並需要使用者名稱和開機伺服器清單。 TLSv1.2 確保連線經過加密,並驗證經紀人的真實性 (防止中間人攻擊)。 SASL 會對所有連線強制執行鑑別。
若要擷取使用者名稱、密碼和開機伺服器清單,服務範例需要服務憑證物件或服務金鑰。 有關建立這些物件的詳細資訊,請參閱 連線至 Event Streams。
使用 SASL PLAIN
請使用下列字串及內容。
- 使用
bootstrap_endpoints
字串作為引導伺服器的清單,並將此主機及埠配對字串傳遞至 Kafka 用戶端。 - 使用
user
和api_key
屬性作為使用者名稱和密碼。
對於 Java 用戶端,以下範例顯示了最小屬性集,其中 ${USERNAME}
, ${PASSWORD}
,和 ${BOOTSTRAP_ENDPOINTS}
將被您之前擷取的值取代。
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
如果您使用的 Kafka 用戶端版本早於 0.10.2.1,則不支援 sasl.jaas.config
屬性,您必須在 JAAS 設定檔中提供用戶端設定。
使用 SASL OAUTHBEARER
在配置 Java 用戶端的 SASL 機制之前,有兩個必要條件。
- 支援的最低 Kafka Java 用戶端版本為 3.1.0。
- 需要從 Maven Central 下載其他 JAR 套件,並在類別路徑中提供。
如果在建置系統中使用 Maven,請將下列資訊新增至 pom.xml
檔的相依關係區段中。
<dependency>
<groupId>com.ibm.cloud.eventstreams</groupId>
<artifactId>oauth-client</artifactId>
<version>1.3.1</version>
</dependency>
如果在建置系統中使用 Gradle,請將下列資訊新增至相依關係區段中的檔案 build.gradle
。
implementation com.ibm.cloud.eventstreams:oauth-client:1.3.1
請使用下列字串及內容。
- 使用
bootstrap_endpoints
字串作為引導伺服器的清單,並將此主機及埠配對字串傳遞至 Kafka 用戶端。 - 使用
api_key
字串作為 API 金鑰。 IAMOAuthBearerLoginCallbackHandler
由 jar 套件com.ibm.cloud.eventstreams:oauth-client:+
提供。- IBM Cloud® Identity and Access Management的記號端點
https://iam.cloud.ibm.com/identity/token
配置為使用 jaas 配置中指定的授權類型從 API 金鑰產生記號。 它是在用戶端執行,因此 API 金鑰永遠不會傳送至伺服器端,且提供比長期 API 金鑰更好的安全。 - Cloud Identity and Access Management的金鑰端點
https://iam.cloud.ibm.com/identity/keys
已配置為驗證記號。
對於 Java 用戶端,以下範例顯示了最小屬性集,其中 ${BOOTSTRAP_ENDPOINTS}
,和 ${APIKEY}
將被您之前擷取的值取代。
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
範例程式碼參照 Event Streams 範例。
如需其他 Kafka 用戶端程式庫,請參閱其說明文件,以瞭解如何實作 OAUTHBEARER 支援。 例如:。
- sarama: 需要實作
AccessTokenProvider
介面。 - librdkafka: 需要實作
oauthbearer_token_refresh_cb
回呼。
如需如何使用 API 金鑰來產生 IBM Cloud IAM 記號的相關資訊,請參閱 IBM Cloud® Identity and Access Management的 文件。
-
在持續測試中驗證的最早版本。 一般而言,這是過去 12 個月內的初始版本,或已知存在重大問題時的更新版本。 如果您無法執行列出的任何用戶端,您可以使用符合下列最低需求的其他第三方用戶端(例如 librdkafka )。1. 支援 Kafka 1.40或更新版本。2. 可以使用 SASL PLAIN 與 TLSv1.2 進行連線和驗證。 支援 TLS 的 SNI 擴充,其中伺服器的主機名稱包含在 TLS 握手中。4. 支援橢圓曲線加密法。 無論如何,請使用最新版本的用戶端。 ↩︎