Kafka API の使用
Kafka は、広範囲の言語に渡る豊富な API とクライアントのセットを提供しています。 API には、コア API、Streams API、および Connect API が含まれます。
- Kafka のコアAPI(Consumer API、Producer API、Admin API)。
1 つ以上の Kafka トピックとの間で直接メッセージを送受信するために使用します。 Kafka Admin クライアントは、Kafka リソースを管理するための Kafka API を通じてシンプルなインターフェースを提供します。 トピックを作成、削除、および管理できます。 コンシューマー・グループおよび構成の管理のためにも Admin クライアントを使用できます。 - ストリームAPI
トピック間のイベントを簡単に消費、変換、生成するための、より高度なストリーム処理API。 - コネクトAPI
再利用可能な、あるいは標準的な統合によって、データベースなどの外部システムとの間でイベントをストリーミングできるようにするフレームワーク。
以下の表に、Event Streams で使用可能なものを要約します。
Enterprise プラン | 標準プラン | ライト・プラン | |
---|---|---|---|
クラスターの Kafka バージョン | Kafka 3.6 | Kafka 3.6 | Kafka 3.6 |
推奨される Kafka クライアントの最小バージョン | Kafka 2.6.0 またはそれ以降 | Kafka 2.6.0 またはそれ以降 | Kafka 2.6.0 またはそれ以降 |
サポートされるクライアント・バージョン | すべての推奨クライアントのサポートの要約 を参照してください。 | ||
Kafka Connect のサポート | ある | ある | いいえ |
Kafka Streams のサポート | ある | ある | いいえ |
ksqlDB のサポート | ある | いいえ | いいえ |
認証要件 | クライアントは、SASL Plainメカニズムを使用した認証をサポートし、 TLSv1.2 プロトコルのServer Name Indication(SNI)拡張を使用しなければならない。 | クライアントは、SASL Plainメカニズムを使用した認証をサポートし、 TLSv1.2 プロトコルのServer Name Indication(SNI)拡張を使用しなければならない。 | クライアントは、SASL Plainメカニズムを使用した認証をサポートし、 TLSv1.2 プロトコルのServer Name Indication(SNI)拡張を使用しなければならない。 |
Event Streams で使用する Kafka クライアントの選択
Kafka API の公式クライアントは Java で書かれており、最新の機能やバグ修正が含まれています。 このAPIの詳細については、 Kafka Producer API 3.6 および Kafka Consumer API 3.6 を参照のこと。
その他の言語の場合は、以下のいずれかのクライアントを実行します。すべてのクライアントは Event Streamsでテストされます。
すべての推奨されるクライアントのサポートの概要
クライアント | 言語 | 推奨されるバージョン | サポートされる最小バージョン [1] | サンプルのリンク先 |
---|---|---|---|---|
公式の Apache Kafka クライアント: | ||||
Apache Kafka クライアント | Java | 3.6.2またはそれ以降 | 2.5.0 | Java コンソール・サンプル |
サード・パーティー・クライアント: | ||||
confluent-kafka-javascript | Node.js | 最新バージョン | 1.0.0 | |
confluent-kafka-python | Python | 最新バージョン | 1.4.0 | Kafka Python サンプル |
confluent-kafka-go | Go | 最新バージョン | 1.4.0 | |
librdkafka | C または C++ | 最新バージョン | 1.4.0 | |
node-rdkafka | Node.js | 最新バージョン | 2.8.0 | Node.js サンプル |
サラマ | Go | 最新バージョン | 1.40.0 | サラマの例 |
クライアントの Event Streams への接続
Event Streams に接続するように Java クライアントを構成する方法については、クライアントの構成を参照してください。
Kafka API クライアントの構成
接続を確立するために、クライアントは最低でも TLSv1.2 上で SASL PLAIN または SASL OAUTHBEARER を使い、ユーザー名とブートストラップ・サーバーのリストを要求するように設定されなければならない。 TLSv1.2 接続が暗号化されていることを確認し、ブローカーの真正性を検証する(中間者攻撃を防ぐため)。 SASL により、すべての接続について認証が適用されます。
ユーザー名、パスワード、およびブートストラップ・サーバーのリストを取得するには、サービス・インスタンスに対してサービス・クレデンシャル・オブジェクトまたはサービス・キーが必要です。 これらのオブジェクトの作成について詳しくは、Event Streams への接続を参照してください。
SASL PLAIN の使用
以下のストリングとプロパティーを使用します。
bootstrap_endpoints
ストリングをブートストラップ・サーバーのリストとして使用し、このホストとポートのペアのストリングを Kafka クライアントに渡します。- ユーザー名とパスワードとして
user
プロパティーとapi_key
プロパティーを使用します。
Java ク ラ イ ア ン ト に対 し て、 以下の例は、 プ ロ パテ ィ の最小セ ッ ト を示 し ます。 ${USERNAME}
、 ${PASSWORD}
、 ${BOOTSTRAP_ENDPOINTS}
は、 以前に取得 し た値に置 き 換え ら れます。
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
バージョン 0.10.2.1 より前の Kafka クライアントを使用する場合、 sasl.jaas.config
プロパティはサポートされていませんので、代わりに JAAS 構成ファイルでクライアント構成を提供する必要があります。
SASL OAUTHBEARER の使用
Java クライアントの SASL メカニズムを構成する前に、2 つの前提条件があります。
- サポートされる Kafka Java クライアントの最小バージョンは 3.1.0です。
- 追加の jar パッケージを Maven Central からダウンロードして、クラスパスで使用可能にする必要があります。
ビルド・システムで Maven を使用する場合は、ファイル pom.xml
の dependencies セクションに以下の情報を追加します。
<dependency>
<groupId>com.ibm.cloud.eventstreams</groupId>
<artifactId>oauth-client</artifactId>
<version>1.4.0</version>
</dependency>
Gradle がビルド・システムで使用されている場合は、 build.gradle
ファイルの dependencies セクションに以下の情報を追加します。
implementation com.ibm.cloud.eventstreams:oauth-client:1.4.0
IBM Cloud® Identity and Access Management Identity Service はベアラートークンを生成する複数の方法をサポートしており、この oauth クライアントライブラリではそのうちの 2 つをサポートしている。
- APIキー
- 信頼されたプロファイルとコンピュート・リソース・トークン。
APIキーでSASL OAUTHBEARERを使用する
以下のストリングとプロパティーを使用します。
BOOTSTRAP_ENDPOINTS
ストリングをブートストラップ・サーバーのリストとして使用し、このホストとポートのペアのストリングを Kafka クライアントに渡します。IAMOAuthBearerLoginCallbackHandler
は、jar パッケージcom.ibm.cloud.eventstreams:oauth-client:+
によって提供されます。- IBM Cloud® Identity and Access Managementのトークン・エンドポイント
https://iam.cloud.ibm.com/identity/token
は、jaas 構成で指定された付与タイプを使用して API キーからトークンを生成するように構成されています。 これはクライアント・サイドで行われるため、API キーがサーバー・サイドに送信されることはなく、存続期間の長い API キーよりも優れたセキュリティーを提供します。 - Cloud Identity and Access Managementの鍵エンドポイント
https://iam.cloud.ibm.com/identity/keys
は、トークンを検証するように構成されています。 grant_type
sasl.jaas.config
はurn:ibm:params:oauth:grant-type:apikey
apikey
の は、クライアント側でベアラートークンを生成するために使用されるAPIキーです。sasl.jaas.config
ユーザーIDまたはサービスIDのいずれかである。
Java ク ラ イ ア ン ト に対 し て、 以下の例は、 プ ロ パテ ィ の最小セ ッ ト を示 し ます。 こ こ では、 ${BOOTSTRAP_ENDPOINTS}
と ${APIKEY}
は、 以前に取得 し た値に置 き 換え ら れます。
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";
信頼されたプロファイルとコンピュート・リソース・トークンを使用したSASL OAUTHBEARERの使用
sasl.jaas.config
が異なるだけで、すべてのプロパティはAPIキーと同じである。
grant_type
sasl.jaas.config
は。urn:ibm:params:oauth:grant-type:cr-token
profile_id
の は、信頼されたプロファイルIDを格納するファイルの場所である。sasl.jaas.config
このファイルは、 Kafka クライアント・コードを実行している Kubernetes ポッドに読み取り専用ボリュームとしてマウントし、 Kafka クライアント・コードが利用できるようにすることができる。cr_token
sasl.jaas.config
にあるのは、 クライアント・コードを実行している ポッドからのサービス・アカウント・トークンを保存するファイルの場所です。 Kafka Kubernetes サービス・アカウント・トークンとは 」を参照。
以下の例を参照のこと
bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:cr-token" profile_id="${TRUSTED_PROFILE_ID_FILE_PATH}" cr_token="${SERVICE_ACCOUNT_TOKEN_FILE_PATH}";
oauth クライアントのソースコードは Event Streams Java SDK を参照してください。
クライアントコードのサンプルは、 Event Streams Sampleを参照してください。
その他の Kafka クライアント・リバーについては、OAUTHBEARER サポートの実装方法に関する資料を参照してください。 例えば、次のようになります。
- sarama:
AccessTokenProvider
インターフェースの実装が必要です。 - librdkafka:
oauthbearer_token_refresh_cb
コールバックの実装が必要です。
API キーを使用して IBM Cloud IAM トークンを生成する方法については、 IBM Cloud® Identity and Access Managementの 文書 を参照してください。
-
継続的なテストで検証された最も初期のバージョン。 一般的には、過去12ヶ月以内に入手可能な最初のバージョン、または重大な問題が存在することが知られている場合は新しいバージョンとなります。 リストにあるクライアントのいずれかを実行できない場合は、以下の最小要件を満たす他のサードパーティクライアント(例えば、 librdkafka )を使用することができます。 1. Kafka 1.40以降をサポートします。 2. TLSv1.2 で SASL PLAIN を使用して接続し、認証することができる。 サーバーのホスト名がTLSハンドシェイクに含まれるTLSのSNI拡張をサポートする。 4. 楕円曲線暗号化をサポートします。 いずれの場合も、最新バージョンのクライアントを使用してください。 ↩︎