使用 Kafka API
Kafka 提供跨越廣泛語言的豐富 API 及用戶端集合。 API 包括核心 API、Streams API 及 Connect API。
- Kafka 的核心 API(消費者、生產者及管理者 API) 用來直接從一個以上的 Kafka 主題傳送及接收訊息。 Kafka 管理用戶端透過 Kafka API 提供簡單的介面,以便管理 Kafka 資源。 您可以建立、刪除及管理主題。 您也可以使用管理用戶端來管理消費者群組和配置。
- 串流 API 較高層級的串流處理 API,可輕鬆在主題間消耗、轉換和產生事件。
- 連接 API 一個框架,允許可重用或標準的整合,將事件串流進出外部系統,例如資料庫。
下表彙總您可以與 Event Streams 搭配使用的項目:
| 企業方案 | 標準計畫 | 精簡方案 | |
|---|---|---|---|
| 叢集上的 Kafka 版本 | Kafka 3.8 | Kafka 3.8 | Kafka 3.8 |
| 最低建議 Kafka 用戶端版本 | Kafka 2.6.0,或之後 | Kafka 2.6.0,或之後 | Kafka 2.6.0,或之後 |
| 受支援的用戶端版本 | 請參閱 所有建議用戶端的支援摘要 | ||
| 支援 Kafka Connect | 是 | 是 | 否 |
| 支援 Kafka Streams | 是 | 是 | 否 |
| 支援 ksqlDB | 是 | 否 | 否 |
| 鑑別需求 | 用戶端必須使用 SASL Plain 機制支援驗證,並使用 TLSv1.2 通訊協定的伺服器名稱指示 (SNI) 延伸。 | 用戶端必須使用 SASL Plain 機制支援驗證,並使用 TLSv1.2 通訊協定的伺服器名稱指示 (SNI) 延伸。 | 用戶端必須使用 SASL Plain 機制支援驗證,並使用 TLSv1.2 通訊協定的伺服器名稱指示 (SNI) 延伸。 |
選擇 Kafka 用戶端以搭配 Event Streams 使用
Kafka API 的正式用戶端是以 Java 撰寫,因此,會包含最新特性及錯誤修正程式。 如需更多關於此 API 的資訊,請參閱 Kafka Producer API 3.8 及 Kafka Consumer API 3.8。
若為其他語言,請執行下列其中一個用戶端,所有這些用戶端都已使用 Event Streams進行測試。
所有建議用戶端的支援摘要
| 客戶 | 語言 | 建議版本 | 支援的最低版本 [1] | 範例鏈結 |
|---|---|---|---|---|
| 官方 Apache Kafka 客戶端: | ||||
| Apache Kafka 用戶端 | Java | 3.8.1,或之後 | 2.5.0 | Java 控制台樣本 |
| 第三方客戶: | ||||
| confluent-kafka-javascript | Node.js | 最新 | 1.0.0 | |
| confluent-kafka-python | Python | 最新 | 1.4.0 | Kafka Python 範例 |
| confluent-kafka-go | 進行 | 最新 | 1.4.0 | |
| librdkafka | C 或 C++ | 最新 | 1.4.0 | |
| 節點-rdkafka | Node.js | 最新 | 2.8.0 | Node.js 範例 |
| sarama | 進行 | 最新 | 1.40.0 | Sarama 範例 |
將用戶端連接至 Event Streams
如需如何配置 Java 用戶端以連接至 Event Streams 的相關資訊,請參閱配置用戶端。
配置 Kafka API 用戶端
若要建立連線,用戶端必須設定為至少使用 SASL PLAIN 或 SASL OAUTHBEARER over TLSv1.2,並需要使用者名稱和開機伺服器清單。 TLSv1.2 確保連線經過加密,並驗證經紀人的真實性 (防止中間人攻擊)。 SASL 會對所有連線強制執行鑑別。
若要擷取使用者名稱、密碼和開機伺服器清單,服務範例需要服務憑證物件或服務金鑰。 有關建立這些物件的詳細資訊,請參閱 連線至 Event Streams。
使用 SASL PLAIN
請使用下列字串及內容。
- 使用
bootstrap_endpoints字串作為引導伺服器的清單,並將此主機及埠配對字串傳遞至 Kafka 用戶端。 - 使用
user和api_key屬性作為使用者名稱和密碼。
對於 Java 用戶端,以下範例顯示了最小屬性集,其中 ${USERNAME}, ${PASSWORD},和 ${BOOTSTRAP_ENDPOINTS} 將被您之前擷取的值取代。
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
如果您使用的 Kafka 用戶端版本早於 0.10.2.1,則不支援 sasl.jaas.config 屬性,您必須在 JAAS 設定檔中提供用戶端設定。
使用 SASL OAUTHBEARER 與 Java 客戶端 v3.4- 4.0
在為 Java 用戶端設定 SASL 機制之前,有兩個先決條件:
- 最低支援 Kafka Java 用戶端版本為 3.4 ( 3.6 或更高版本為佳)。
- 需要從 Maven Central 下載額外的 jar 套件,並在 classpath 中提供。
如果在建立系統中使用 Maven,請將下列資訊加入 pom.xml 檔案中的 dependencies 區段:
<dependency>
<groupId>com.ibm.cloud.eventstreams</groupId>
<artifactId>oauth-client</artifactId>
<version>1.4.0</version>
</dependency>
如果在建立系統中使用 Gradle,請將下列資訊加入 build.gradle 檔案中的 dependencies 區段:
implementation com.ibm.cloud.eventstreams:oauth-client:1.4.0
IBM Cloud® Identity and Access Management Identity Service 支援多種產生承載令牌的方式,本 oauth 用戶端函式庫支援其中兩種:
- API 金鑰
- 受信任的設定檔和運算資源標記
使用 SASL OAUTHBEARER 與 API 金鑰
請使用下列字串及內容。
- 使用
BOOTSTRAP_ENDPOINTS字串作為引導伺服器的清單,並將此主機及埠配對字串傳遞至 Kafka 用戶端。 IAMOAuthBearerLoginCallbackHandler由 jar 套件com.ibm.cloud.eventstreams:oauth-client:+提供。- IBM Cloud® Identity and Access Management 的令牌端點
https://iam.cloud.ibm.com/identity/token已設定為使用 jaas config 中指定的授予類型從 API 金鑰產生令牌。 它是在客戶端完成的,因此 API 金鑰永遠不會傳送到伺服器端,這提供了比長期使用的 API 金鑰更好的安全性。 - Cloud Identity and Access Management的金鑰端點
https://iam.cloud.ibm.com/identity/keys已配置為驗證記號。 grant_type在 是sasl.jaas.configurn:ibm:params:oauth:grant-type:apikeyapikeysasl.jaas.config中是用於在用戶端生成承載令牌的 API 密鑰。 它可以來自使用者或服務 ID。
對於 Java 用戶端,以下範例顯示最小的屬性集,其中您將 ${BOOTSTRAP_ENDPOINTS},和 ${APIKEY} 替換為之前擷取的值。
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.cloud.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";
使用 SASL OAUTHBEARER 與可信設定檔和運算資源標記
除了 sasl.jaas.config 不同之外,所有屬性都與 API 金鑰 的描述相同。
grant_type在 是。sasl.jaas.configurn:ibm:params:oauth:grant-type:cr-tokenprofile_id中sasl.jaas.config是儲存可信設定檔 ID 的檔案位置。 此檔案可以只讀磁碟區的方式掛載到執行 Kafka 用戶端程式碼的 Kubernetes pod 上,並提供給 Kafka 用戶端程式碼使用。cr_token中的 是一個檔案位置,儲存來自執行 用戶端程式碼的 pod 的服務帳號符記。sasl.jaas.configKafka Kubernetes 如需詳細資訊,請參閱 何謂服務帳戶令牌。
例如:
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:cr-token" profile_id="${TRUSTED_PROFILE_ID_FILE_PATH}" cr_token="${SERVICE_ACCOUNT_TOKEN_FILE_PATH}";
您可以找到更多關於 如何設定受信任的個人資料的 詳細資訊。
oauth 客戶端的原始碼參考 Event Streams Java SDK。
範例用戶端程式碼是指 Event Streams 範例。
使用 Java 客戶端 v4.1 及更新版本的 SASL OAUTHBEARER
當使用 v4.1 或更高版本的 Kafka Java 客戶端時,客戶端需要使用更新版本的 Event Streams oauth 客戶端,它依賴 Kafka 的預設回呼處理程式和適當的 Token Retriever。
如果在建立系統中使用 Maven,請將下列資訊加入 pom.xml 檔案中的 dependencies 區段:
<dependency>
<groupId>com.ibm.cloud.eventstreams</groupId>
<artifactId>oauth-client</artifactId>
<version>2.0.0</version>
</dependency>
如果在建立系統中使用 Gradle,請將下列資訊加入 build.gradle 檔案中的 dependencies 區段:
implementation com.ibm.cloud.eventstreams:oauth-client:2.0.+
IBM Cloud® Identity and Access Management Identity Service 支援多種產生承載令牌的方式,本 oauth 用戶端函式庫支援其中兩種:
- API 金鑰
- 受信任的設定檔和運算資源標記
使用 SASL OAUTHBEARER 與 API 金鑰
除了必須的 bootstrap.servers 屬性以及任何特定的生產者、消費者和管理者設定之外,請使用下列字串和屬性。
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
grant_type="urn:ibm:params:oauth:grant-type:apikey" \
apikey="${YOUR_IBM_CLOUD_API_KEY}";
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.jwt.retriever.class=com.ibm.cloud.eventstreams.oauth.client.IAMTokenRetriever
sasl.oauthbearer.token.endpoint.url=https://private.iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://private.iam.cloud.ibm.com/identity/keys
在容器化環境中使用 SASL OAUTHBEARER 與可信設定檔和運算資源標記
如需詳細資訊,請參閱 為計算資源產生 IAM 令牌。
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required \
grant_type="urn:ibm:params:oauth:grant-type:cr-token" \
cr_token="/path/to/cr-token-file" \
profile_id="/path/to/profile-id-file";
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler
sasl.oauthbearer.jwt.retriever.class=com.ibm.cloud.eventstreams.oauth.client.IAMTokenRetriever
sasl.oauthbearer.token.endpoint.url=https://private.iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://private.iam.cloud.ibm.com/identity/keys
系統內容 org.apache.kafka.sasl.oauthbearer.allowed.urls
從 Kafka 4.0,用戶端需要一個系統屬性來設定 SASL OAUTHBEARER 令牌和 jwks 端點的允許 URL。
如需詳細資訊,請參閱 系統屬性。
使用 Apache Kafka 發行版提供的 CLI 用戶端 shell 腳本時,也可以使用 KAFKA_OPTS 環境變數設定系統屬性。
export KAFKA_OPTS="-Dorg.apache.kafka.sasl.oauthbearer.allowed.urls=https://private.iam.cloud.ibm.com/identity/keys,https://private.iam.cloud.ibm.com/identity/token,https://api.metadata.cloud.ibm.com/identity/v1/iam_tokens"
使用非 Java 用戶端 SASL OAUTHBEARER
如需其他 Kafka 用戶端程式庫,請參閱其說明文件,以瞭解如何實作 OAUTHBEARER 支援。 例如:
- sarama: 需要實作
AccessTokenProvider介面。 - librdkafka: 需要實作
oauthbearer_token_refresh_cb回呼。
如需如何使用 API 金鑰來產生 IBM Cloud IAM 記號的相關資訊,請參閱 IBM Cloud® Identity and Access Management的 文件。
-
在持續測試中驗證的最早版本。 一般而言,這是過去 12 個月內的初始版本,或已知存在重大問題時的更新版本。 如果您無法執行列出的任何用戶端,您可以使用符合下列最低需求的其他第三方用戶端(例如 librdkafka )。1. 支援 Kafka 1.40或更新版本。2. 可以使用 SASL PLAIN 與 TLSv1.2 進行連線和驗證。 支援 TLS 的 SNI 擴充,其中伺服器的主機名稱包含在 TLS 的握手中。4. 支援橢圓曲線加密法。 無論如何,請使用最新版本的用戶端。 ↩︎