使用 Kafka API
Kafka 提供跨越廣泛語言的豐富 API 及用戶端集合。 API 包括核心 API、Streams API 及 Connect API。
- Kafka 的核心 API(消費者、生產者和管理 API)
用來直接從一個以上的 Kafka 主題傳送及接收訊息。 Kafka 管理用戶端透過 Kafka API 提供簡單的介面,以便管理 Kafka 資源。 您可以建立、刪除及管理主題。 您也可以使用管理用戶端來管理消費者群組和配置。 - 串流 API
較高層級的串流處理 API,可輕鬆在主題間消耗、轉換和產生事件。 - 連接 API
一個框架,允許可重用或標準的整合,將事件串流進出外部系統,例如資料庫。
下表彙總您可以與 Event Streams 搭配使用的項目:
| 企業方案 | 標準計畫 | 精簡方案 | |
|---|---|---|---|
| 叢集上的 Kafka 版本 | Kafka 3.8 | Kafka 3.8 | Kafka 3.8 |
| 最低建議 Kafka 用戶端版本 | Kafka 2.6.0,或之後 | Kafka 2.6.0,或之後 | Kafka 2.6.0,或之後 |
| 受支援的用戶端版本 | 請參閱 所有建議用戶端的支援摘要 | ||
| 支援 Kafka Connect | 是 | 是 | 否 |
| 支援 Kafka Streams | 是 | 是 | 否 |
| 支援 ksqlDB | 是 | 否 | 否 |
| 鑑別需求 | 用戶端必須使用 SASL Plain 機制支援驗證,並使用 TLSv1.2 通訊協定的伺服器名稱指示 (SNI) 延伸。 | 用戶端必須使用 SASL Plain 機制支援驗證,並使用 TLSv1.2 通訊協定的伺服器名稱指示 (SNI) 延伸。 | 用戶端必須使用 SASL Plain 機制支援驗證,並使用 TLSv1.2 通訊協定的伺服器名稱指示 (SNI) 延伸。 |
選擇 Kafka 用戶端以搭配 Event Streams 使用
Kafka API 的正式用戶端是以 Java 撰寫,因此,會包含最新特性及錯誤修正程式。 如需更多關於此 API 的資訊,請參閱 Kafka Producer API 3.8 及 Kafka Consumer API 3.8。
若為其他語言,請執行下列其中一個用戶端,所有這些用戶端都已使用 Event Streams進行測試。
所有建議用戶端的支援摘要
| 客戶 | 語言 | 建議版本 | 支援的最低版本 [1] | 範例鏈結 |
|---|---|---|---|---|
| 官方 Apache Kafka 客戶端: | ||||
| Apache Kafka 用戶端 | Java | 3.8.1,或之後 | 2.5.0 | Java 控制台樣本 |
| 第三方客戶: | ||||
| confluent-kafka-javascript | Node.js | 最新 | 1.0.0 | |
| confluent-kafka-python | Python | 最新 | 1.4.0 | Kafka Python 範例 |
| confluent-kafka-go | 進行 | 最新 | 1.4.0 | |
| librdkafka | C 或 C++ | 最新 | 1.4.0 | |
| 節點-rdkafka | Node.js | 最新 | 2.8.0 | Node.js 範例 |
| sarama | 進行 | 最新 | 1.40.0 | Sarama 範例 |
將用戶端連接至 Event Streams
如需如何配置 Java 用戶端以連接至 Event Streams 的相關資訊,請參閱配置用戶端。
配置 Kafka API 用戶端
若要建立連線,用戶端必須設定為至少使用 SASL PLAIN 或 SASL OAUTHBEARER over TLSv1.2,並需要使用者名稱和開機伺服器清單。 TLSv1.2 確保連線經過加密,並驗證經紀人的真實性 (防止中間人攻擊)。 SASL 會對所有連線強制執行鑑別。
若要擷取使用者名稱、密碼和開機伺服器清單,服務範例需要服務憑證物件或服務金鑰。 有關建立這些物件的詳細資訊,請參閱 連線至 Event Streams。
使用 SASL PLAIN
請使用下列字串及內容。
- 使用
bootstrap_endpoints字串作為引導伺服器的清單,並將此主機及埠配對字串傳遞至 Kafka 用戶端。 - 使用
user和api_key屬性作為使用者名稱和密碼。
對於 Java 用戶端,以下範例顯示了最小屬性集,其中 ${USERNAME}, ${PASSWORD},和 ${BOOTSTRAP_ENDPOINTS} 將被您之前擷取的值取代。
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
如果您使用的 Kafka 用戶端版本早於 0.10.2.1,則不支援 sasl.jaas.config 屬性,您必須在 JAAS 設定檔中提供用戶端設定。
使用 SASL OAUTHBEARER
在配置 Java 用戶端的 SASL 機制之前,有兩個必要條件。
- 支援的最低 Kafka Java 用戶端版本為 3.1.0。
- 需要從 Maven Central 下載其他 JAR 套件,並在類別路徑中提供。
如果在建置系統中使用 Maven,請將下列資訊新增至 pom.xml 檔的相依關係區段中。
<dependency>
<groupId>com.ibm.cloud.eventstreams</groupId>
<artifactId>oauth-client</artifactId>
<version>1.4.0</version>
</dependency>
如果在建置系統中使用 Gradle,請將下列資訊新增至相依關係區段中的檔案 build.gradle。
implementation com.ibm.cloud.eventstreams:oauth-client:1.4.0
IBM Cloud® Identity and Access Management Identity Service 支援多種產生承載令牌的方式,本 oauth 用戶端函式庫支援其中兩種。
- API 金鑰。
- 受信任的設定檔和運算資源標記。
使用 SASL OAUTHBEARER 與 API 金鑰
請使用下列字串及內容。
- 使用
BOOTSTRAP_ENDPOINTS字串作為引導伺服器的清單,並將此主機及埠配對字串傳遞至 Kafka 用戶端。 IAMOAuthBearerLoginCallbackHandler由 jar 套件com.ibm.cloud.eventstreams:oauth-client:+提供。- IBM Cloud® Identity and Access Management的記號端點
https://iam.cloud.ibm.com/identity/token配置為使用 jaas 配置中指定的授權類型從 API 金鑰產生記號。 它是在用戶端執行,因此 API 金鑰永遠不會傳送至伺服器端,且提供比長期 API 金鑰更好的安全。 - Cloud Identity and Access Management的金鑰端點
https://iam.cloud.ibm.com/identity/keys已配置為驗證記號。 grant_type在 是sasl.jaas.configurn:ibm:params:oauth:grant-type:apikeyapikeysasl.jaas.config中是用於在用戶端生成承載令牌的 API 密鑰。 它可以來自使用者或服務 Id。
對於 Java 用戶端,以下範例顯示了最小屬性集,其中 ${BOOTSTRAP_ENDPOINTS},和 ${APIKEY} 將被您之前擷取的值取代。
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";
使用 SASL OAUTHBEARER 與可信設定檔和運算資源標記
除了 sasl.jaas.config 不同之外,所有屬性都與 API key 相同。
grant_type在 是。sasl.jaas.configurn:ibm:params:oauth:grant-type:cr-tokenprofile_id中 是儲存可信設定檔 ID 的檔案位置。sasl.jaas.config此檔案可以只讀磁碟區的方式掛載到執行 Kafka 用戶端程式碼的 Kubernetes pod 上,並提供給 Kafka 用戶端程式碼使用。cr_token中的 是一個檔案位置,儲存來自執行 用戶端程式碼的 pod 的服務帳號符記。sasl.jaas.configKafka Kubernetes 請參閱 什麼是服務帳戶令牌。
請參閱以下範例
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:cr-token" profile_id="${TRUSTED_PROFILE_ID_FILE_PATH}" cr_token="${SERVICE_ACCOUNT_TOKEN_FILE_PATH}";
更多關於 如何設定可信設定檔的 詳細資訊
oauth 客戶端的原始碼請參考 Event Streams Java SDK。
客戶端程式碼範例請參考 Event Streams Sample。
如需其他 Kafka 用戶端程式庫,請參閱其說明文件,以瞭解如何實作 OAUTHBEARER 支援。 例如:。
- sarama: 需要實作
AccessTokenProvider介面。 - librdkafka: 需要實作
oauthbearer_token_refresh_cb回呼。
如需如何使用 API 金鑰來產生 IBM Cloud IAM 記號的相關資訊,請參閱 IBM Cloud® Identity and Access Management的 文件。
-
在持續測試中驗證的最早版本。 一般而言,這是過去 12 個月內的初始版本,或已知存在重大問題時的更新版本。 如果您無法執行列出的任何用戶端,您可以使用符合下列最低需求的其他第三方用戶端(例如 librdkafka )。1. 支援 Kafka 1.40或更新版本。2. 可以使用 SASL PLAIN 與 TLSv1.2 進行連線和驗證。 支援 TLS 的 SNI 擴充,其中伺服器的主機名稱包含在 TLS 握手中。4. 支援橢圓曲線加密法。 無論如何,請使用最新版本的用戶端。 ↩︎