IBM Cloud Docs
Kafka API の使用

Kafka API の使用

Kafka は、広範囲の言語に渡る豊富な API とクライアントのセットを提供しています。 API には、コア API、Streams API、および Connect API が含まれます。

  • Kafka のコアAPI(Consumer API、Producer API、Admin API)。
    1 つ以上の Kafka トピックとの間で直接メッセージを送受信するために使用します。 Kafka Admin クライアントは、Kafka リソースを管理するための Kafka API を通じてシンプルなインターフェースを提供します。 トピックを作成、削除、および管理できます。 コンシューマー・グループおよび構成の管理のためにも Admin クライアントを使用できます。
  • ストリームAPI
    トピック間のイベントを簡単に消費、変換、生成するための、より高度なストリーム処理API。
  • コネクトAPI
    再利用可能な、あるいは標準的な統合によって、データベースなどの外部システムとの間でイベントをストリーミングできるようにするフレームワーク。

以下の表に、Event Streams で使用可能なものを要約します。

Kafka スタンダード、エンタープライズ、ライトの各プランでクライアントをサポート。
Enterprise プラン 標準プラン ライト・プラン
クラスターの Kafka バージョン Kafka 3.6 Kafka 3.6 Kafka 3.6
推奨される Kafka クライアントの最小バージョン Kafka 2.6.0 またはそれ以降 Kafka 2.6.0 またはそれ以降 Kafka 2.6.0 またはそれ以降
サポートされるクライアント・バージョン すべての推奨クライアントのサポートの要約 を参照してください。
Kafka Connect のサポート ある ある いいえ
Kafka Streams のサポート ある ある いいえ
ksqlDB のサポート ある いいえ いいえ
認証要件 クライアントは、SASL Plainメカニズムを使用した認証をサポートし、 TLSv1.2 プロトコルのServer Name Indication(SNI)拡張を使用しなければならない。 クライアントは、SASL Plainメカニズムを使用した認証をサポートし、 TLSv1.2 プロトコルのServer Name Indication(SNI)拡張を使用しなければならない。 クライアントは、SASL Plainメカニズムを使用した認証をサポートし、 TLSv1.2 プロトコルのServer Name Indication(SNI)拡張を使用しなければならない。

Event Streams で使用する Kafka クライアントの選択

Kafka API の公式クライアントは Java で書かれており、最新の機能やバグ修正が含まれています。 このAPIの詳細については、 Kafka Producer API 3.6 および Kafka Consumer API 3.6 を参照のこと。

その他の言語の場合は、以下のいずれかのクライアントを実行します。すべてのクライアントは Event Streamsでテストされます。

すべての推奨されるクライアントのサポートの概要

クライアント・サポートの概要
クライアント 言語 推奨されるバージョン サポートされる最小バージョン [1] サンプルのリンク先
公式の Apache Kafka クライアント:
Apache Kafka クライアント Java 3.6.2またはそれ以降 2.5.0 Java コンソール・サンプル

Liberty サンプル

サード・パーティー・クライアント:
confluent-kafka-javascript Node.js 最新バージョン 1.0.0
confluent-kafka-python Python 最新バージョン 1.4.0 Kafka Python サンプル
confluent-kafka-go Go 最新バージョン 1.4.0
librdkafka C または C++ 最新バージョン 1.4.0
node-rdkafka Node.js 最新バージョン 2.8.0 Node.js サンプル
サラマ Go 最新バージョン 1.40.0 サラマの例

クライアントの Event Streams への接続

Event Streams に接続するように Java クライアントを構成する方法については、クライアントの構成を参照してください。

Kafka API クライアントの構成

接続を確立するために、クライアントは最低でも TLSv1.2 上で SASL PLAIN または SASL OAUTHBEARER を使い、ユーザー名とブートストラップ・サーバーのリストを要求するように設定されなければならない。 TLSv1.2 接続が暗号化されていることを確認し、ブローカーの真正性を検証する(中間者攻撃を防ぐため)。 SASL により、すべての接続について認証が適用されます。

ユーザー名、パスワード、およびブートストラップ・サーバーのリストを取得するには、サービス・インスタンスに対してサービス・クレデンシャル・オブジェクトまたはサービス・キーが必要です。 これらのオブジェクトの作成について詳しくは、Event Streams への接続を参照してください。

SASL PLAIN の使用

以下のストリングとプロパティーを使用します。

  • bootstrap_endpoints ストリングをブートストラップ・サーバーのリストとして使用し、このホストとポートのペアのストリングを Kafka クライアントに渡します。
  • ユーザー名とパスワードとして user プロパティーと api_key プロパティーを使用します。

Java ク ラ イ ア ン ト に対 し て、 以下の例は、 プ ロ パテ ィ の最小セ ッ ト を示 し ます。 ${USERNAME}${PASSWORD}${BOOTSTRAP_ENDPOINTS} は、 以前に取得 し た値に置 き 換え ら れます。

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${USERNAME}" password="${PASSWORD}";
security.protocol=SASL_SSL
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

バージョン 0.10.2.1 より前の Kafka クライアントを使用する場合、 sasl.jaas.config プロパティはサポートされていませんので、代わりに JAAS 構成ファイルでクライアント構成を提供する必要があります。

SASL OAUTHBEARER の使用

Java クライアントの SASL メカニズムを構成する前に、2 つの前提条件があります。

  • サポートされる Kafka Java クライアントの最小バージョンは 3.1.0です。
  • 追加の jar パッケージを Maven Central からダウンロードして、クラスパスで使用可能にする必要があります。

ビルド・システムで Maven を使用する場合は、ファイル pom.xml の dependencies セクションに以下の情報を追加します。

<dependency>
    <groupId>com.ibm.cloud.eventstreams</groupId>
    <artifactId>oauth-client</artifactId>
    <version>1.4.0</version>
</dependency>

Gradle がビルド・システムで使用されている場合は、 build.gradle ファイルの dependencies セクションに以下の情報を追加します。

implementation com.ibm.cloud.eventstreams:oauth-client:1.4.0

IBM Cloud® Identity and Access Management Identity Service はベアラートークンを生成する複数の方法をサポートしており、この oauth クライアントライブラリではそのうちの 2 つをサポートしている。

  • APIキー
  • 信頼されたプロファイルとコンピュート・リソース・トークン。

APIキーでSASL OAUTHBEARERを使用する

以下のストリングとプロパティーを使用します。

  • BOOTSTRAP_ENDPOINTS ストリングをブートストラップ・サーバーのリストとして使用し、このホストとポートのペアのストリングを Kafka クライアントに渡します。
  • IAMOAuthBearerLoginCallbackHandler は、jar パッケージ com.ibm.cloud.eventstreams:oauth-client:+ によって提供されます。
  • IBM Cloud® Identity and Access Managementのトークン・エンドポイント https://iam.cloud.ibm.com/identity/token は、jaas 構成で指定された付与タイプを使用して API キーからトークンを生成するように構成されています。 これはクライアント・サイドで行われるため、API キーがサーバー・サイドに送信されることはなく、存続期間の長い API キーよりも優れたセキュリティーを提供します。
  • Cloud Identity and Access Managementの鍵エンドポイント https://iam.cloud.ibm.com/identity/keys は、トークンを検証するように構成されています。
  • grant_type sasl.jaas.configurn:ibm:params:oauth:grant-type:apikey
  • apikey の は、クライアント側でベアラートークンを生成するために使用されるAPIキーです。 sasl.jaas.config ユーザーIDまたはサービスIDのいずれかである。

Java ク ラ イ ア ン ト に対 し て、 以下の例は、 プ ロ パテ ィ の最小セ ッ ト を示 し ます。 こ こ では、 ${BOOTSTRAP_ENDPOINTS}${APIKEY} は、 以前に取得 し た値に置 き 換え ら れます。

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:apikey" apikey="${APIKEY}";

信頼されたプロファイルとコンピュート・リソース・トークンを使用したSASL OAUTHBEARERの使用

sasl.jaas.config が異なるだけで、すべてのプロパティはAPIキーと同じである。

  • grant_type sasl.jaas.config は。 urn:ibm:params:oauth:grant-type:cr-token
  • profile_id の は、信頼されたプロファイルIDを格納するファイルの場所である。 sasl.jaas.config このファイルは、 Kafka クライアント・コードを実行している Kubernetes ポッドに読み取り専用ボリュームとしてマウントし、 Kafka クライアント・コードが利用できるようにすることができる。
  • cr_token sasl.jaas.config にあるのは、 クライアント・コードを実行している ポッドからのサービス・アカウント・トークンを保存するファイルの場所です。 Kafka Kubernetes サービス・アカウント・トークンとは 」を参照。

以下の例を参照のこと

bootstrap.servers=${BOOTSTRAP_ENDPOINTS}
security.protocol=SASL_SSL
sasl.mechanism=OAUTHBEARER
sasl.oauthbearer.token.endpoint.url=https://iam.cloud.ibm.com/identity/token
sasl.oauthbearer.jwks.endpoint.url=https://iam.cloud.ibm.com/identity/keys
sasl.login.callback.handler.class=com.ibm.eventstreams.oauth.client.IAMOAuthBearerLoginCallbackHandler
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required grant_type="urn:ibm:params:oauth:grant-type:cr-token" profile_id="${TRUSTED_PROFILE_ID_FILE_PATH}" cr_token="${SERVICE_ACCOUNT_TOKEN_FILE_PATH}";

信頼できるプロフィールの設定方法の 詳細

oauth クライアントのソースコードは Event Streams Java SDK を参照してください。

クライアントコードのサンプルは、 Event Streams Sampleを参照してください。

その他の Kafka クライアント・リバーについては、OAUTHBEARER サポートの実装方法に関する資料を参照してください。 例えば、次のようになります。

  • sarama: AccessTokenProvider インターフェースの実装が必要です。
  • librdkafka: oauthbearer_token_refresh_cb コールバックの実装が必要です。

API キーを使用して IBM Cloud IAM トークンを生成する方法については、 IBM Cloud® Identity and Access Managementの 文書 を参照してください。


  1. 継続的なテストで検証された最も初期のバージョン。 一般的には、過去12ヶ月以内に入手可能な最初のバージョン、または重大な問題が存在することが知られている場合は新しいバージョンとなります。 リストにあるクライアントのいずれかを実行できない場合は、以下の最小要件を満たす他のサードパーティクライアント(例えば、 librdkafka )を使用することができます。 1. Kafka 1.40以降をサポートします。 2. TLSv1.2 で SASL PLAIN を使用して接続し、認証することができる。 サーバーのホスト名がTLSハンドシェイクに含まれるTLSのSNI拡張をサポートする。 4. 楕円曲線暗号化をサポートします。 いずれの場合も、最新バージョンのクライアントを使用してください。 ↩︎