IBM Cloud Docs
Event Streams での Kafka コンソール・ツールの使用

Event Streams での Kafka コンソール・ツールの使用

Apache Kafka には、単純な管理およびメッセージング操作のためのさまざまなコンソール・ツールが付属しています。 それらの多くを Event Streams で使用できますが、Event Streams ではその ZooKeeper クラスターとの接続は許可されません。 Kafka の開発に伴い、以前に ZooKeeper への接続を必要としていた多くのツールには、この要件がなくなりました。

これらのコンソール・ツールは、 Kafka ダウンロードの bin ディレクトリーにあります。 クライアントは、 Apache Kafka のダウンロードからダウンロードできます。

それらのツールに SASL 資格情報を提供するには、以下の例に基づいてプロパティー・ファイルを作成します。

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD";
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    ssl.protocol=TLSv1.2
    ssl.enabled.protocols=TLSv1.2
    ssl.endpoint.identification.algorithm=HTTPS

USER と PASSWORD を、Event Streams コンソール内の IBM Cloud**「サービス資格情報」**タブにある値に置き換えます。

コンソール・プロデューサー

Kafka コンソール・プロデューサー・ツールを Event Streams で使用できます。 ブローカーのリストと SASL 資格情報を指定する必要があります。

プロパティー・ファイルを作成した後、以下のように端末でコンソール・プロデューサーを実行できます。

   kafka-console-producer.sh --broker-list BOOTSTRAP_ENDPOINTS --producer.config CONFIG_FILE --topic TOPIC_NAME

例にある以下の変数を独自の値に置き換えます。

  • IBM Cloud コンソールの「 Event Streams サービス資格情報 」タブの値を使用した BOOTSTRAP_ENDPOINTS
  • CONFIG_FILE を構成ファイルのパスに置き換えます。

ZooKeeperへのアクセスを必要とするオプションを除き、このツールの他の多くのオプションを使用できます。

コンソール・コンシューマー

Kafka コンソール・コンシューマー・ツールを Event Streams で使用できます。 ブートストラップ・サーバーと SASL 資格情報を指定する必要があります。

前述のようにプロパティー・ファイルを作成した後、以下のようにして端末でコンソール・コンシューマーを実行します。

   kafka-console-consumer.sh --bootstrap-server BOOTSTRAP_ENDPOINTS --consumer.config CONFIG_FILE --topic TOPIC_NAME

例にある以下の変数を独自の値に置き換えます。

  • IBM Cloud コンソールの「 Event Streams サービス資格情報 」タブの値を使用した BOOTSTRAP_ENDPOINTS
  • CONFIG_FILE を構成ファイルのパスに置き換えます。

ZooKeeperへのアクセスを必要とするオプションを除き、このツールの他の多くのオプションを使用できます。

コンシューマー・グループ

Kafka コンシューマー・グループ・ツールを Event Streams で使用できます。 Event Streams ではその ZooKeeper クラスターとの接続は許可されないので、一部のオプションは使用できません。

前述のようにプロパティー・ファイルを作成した後、端末でコンシューマー・グループ・ツールを実行します。 例えば、以下のようにコンシューマー・グループをリストできます。

   kafka-consumer-groups.sh --bootstrap-server BOOTSTRAP_ENDPOINTS --command-config CONFIG_FILE --list --timeout 60000

例にある以下の変数を独自の値に置き換えます。

  • IBM Cloud コンソールの「 Event Streams サービス資格情報 」タブの値を使用した BOOTSTRAP_ENDPOINTS
  • CONFIG_FILE を構成ファイルのパスに置き換えます。

このツールを使用して、コンシューマーの現在位置、それらのラグ、グループの各パーティションのクライアント ID などの詳細を表示することもできます。 以下に例を示します。

   kafka-consumer-groups.sh --bootstrap-server BOOTSTRAP_ENDPOINTS --command-config CONFIG_FILE --describe --group GROUP --timeout 60000

例にある GROUP を、詳細情報の検索対象となるグループ名に置き換えます。

kafka-consumer-groups ツールを実行した場合の出力例を以下に示します。

GROUP              TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG         CONSUMER-ID      HOST          CLIENT-ID
consumer-group-1   foo        0          264             267            3          client-1-abc    example.com    client-1
consumer-group-1   foo        1          124             124            0          client-1-abc    example.com    client-1
consumer-group-1   foo        2          212             212            0          client-2-def    example.com    client-2

この例では、コンシューマー・グループ consumer-group-1 に、3 つのパーティションを持つトピック foo からのメッセージをコンシュームする 2 つのコンシューマー・メンバーがあることが分かります。 また、コンシューマーの現在のオフセットは 264 ですが、区画 0 上の最後のメッセージのオフセットは 267 であるため、区画 0 からコンシュームするコンシューマー client-1-abc が 3 つ遅れていることも示しています。

トピック

kafka-topics ツールを Event Streams で使用できます。 ツールの V2.8 は Zookeeper アクセスを必要としないため、必ず使用してください。

kafka-topics を使用するシナリオとして、既存のクラスター内のトピックとその構成に関する情報を見つけて、新しいクラスター内にそれらを再作成できるようにすることが考えられます。 kafka-topics からの出力となる情報を使用して、新しいクラスター内に同じ名前のトピックを作成できます。 トピックを作成する方法について詳しくは、管理 Kafka Java クライアント API の使用または ibmcloud es topic-create コマンドを参照してください。 代わりの方法として、IBM Event Streams コンソールを使用することもできます。

kafka-topics ツールの実行からの以下の出力例を参照してください。

   bin/kafka-topics.sh --bootstrap-server kafka03-prod01.messagehub.services.us-south.bluemix.net:9093 --command-config vcurr_dal06.properties --describe

Topic:sample-topic	PartitionCount:3	ReplicationFactor:3	 Configs:min.insync.replicas=2,unclean.leader.election.enable=true,retention.bytes=1073741824,segment.bytes=536870912,retention.ms=86400000
    Topic: sample-topic    Partition: 0    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: sample-topic    Partition: 1    Leader: 1    Replicas: 1,2,0	   Isr: 0,2,1
    Topic: sample-topic    Partition: 2    Leader: 2    Replicas: 2,1,0	   Isr: 0,2,1
Topic:testtopic	 PartitionCount:1	 ReplicationFactor:3	Configs:min.insync.replicas=2,unclean.leader.election.enable=true,retention.bytes=1073741824,segment.bytes=536870912,retention.ms=86400000
    Topic: testtopic    Partition: 0    Leader: 0    Replicas: 0,2,1   Isr: 0,2

このサンプルから、トピック sample-topic には 3 つのパーティションと 3 つの複製係数があることが分かります。 この例では、各パーティションのリーダーがどのブローカー上にあり、どのレプリカが同期しているかも示しています (Isr)。 例えば、区画 0 のリーダーがブローカー 0 上にあり、フォロワーがブローカー 2 および 1 上にあり、3 つのレプリカがすべて同期しているとします。 2 番目のトピック testtopic には、ブローカー 02、および 1 に複製されるパーティションが 1 つだけありますが、同期レプリカ・リストには 02 のみが表示されます。 これは、ブローカー 1のフォロワーが遅れているため、Isrリストに表示されないことを意味します。

Kafka Streams リセット

このツールを Event Streams で使用して、Kafka Streams アプリケーションの処理状態をリセットし、その入力を最初から再処理することができます。 このツールを実行する前に、Streams アプリケーションが完全に停止していることを確認してください。

以下に例を示します。

   kafka-streams-application-reset.sh --bootstrap-servers BOOTSTRAP_ENDPOINTS --config-file CONFIG_FILE --application-id APP_ID

例にある以下の変数を独自の値に置き換えます。

  • IBM Cloud コンソールの「 Event Streams サービス資格情報 」タブの値を使用した BOOTSTRAP_ENDPOINTS
  • CONFIG_FILE を構成ファイルのパスに置き換えます。
  • APP_ID を Streams アプリケーション ID に置き換えます。