IBM Cloud Docs
Kafkaイベントを購読する

Kafkaイベントを購読する

このチュートリアルでは、IBM Cloud® Code Engineて Kafkaイベントを購読する方法を学ぶことができます。CLIを使用します。

分散環境では多くの場合、他のコンポーネント (通常、イベント・プロデューサーと呼ばれる) で生成されたメッセージ (イベント) に対応する処理をアプリケーションまたはジョブに行わせます。 Code Engine では、イベント・プロデューサーをサブスクライブすることで、アプリケーションまたはジョブが関心対象のイベントを受信できるようになります。 イベント情報は、アプリケーションについては POST HTTP 要求、ジョブについては環境変数として受信されます。

Kafka イベント・プロデューサーは、 Kafka インスタンスに新規メッセージが表示されるかどうかを監視します。 一連のトピックに対して Code Engine Kafka サブスクリプションを作成すると、アプリケーションまたはジョブは、いずれかのトピックに表示される新規メッセージごとに別個のイベントを受け取ります。

任意の Kafka インスタンスを使用できますが、このチュートリアルの例では IBM® Event Streams for IBM Cloud® サービスを使用します。 Event Streams は、 Kafka イベント用の IBM イベント・ストリーミング・サービスです。 このサービスについて詳しくは、 Event Streams の資料 を参照してください。

開始前に

すべてのCode Engineユーザーは、従量課金 (PAYG) アカウントを持っている必要があります。 チュートリアルでは、費用が発生する場合があります。 コスト見積もりツールを使用して、使用量の見積もりに基づいてコスト見積もりを生成してください。 詳しくは、 Code Engine 価格設定 を参照してください。

Kafkaイベントプロデューサーのセットアップ

メッセージを Code Engine Kafka イベント・サブスクリプションに送信するように Kafka メッセージ・プロデューサーをセットアップできます。 Code Engine Kafka イベント・サブスクリプションを使用して、 Kafka メッセージを受信したときにアプリケーションまたはジョブをトリガーします。

開始するには、イベント・ストリーミング・サービス用の Event Streams サービス・インスタンスを作成 します。 コンソールまたは CLI を使用できますが、以下のステップでは、CLI を使用して Event Streams イベント・プロデューサーをセットアップする方法について説明します。

  1. Event Streamsのサービス・インスタンスを作成します。 Event Streams CLI サービスの名前は messagehub です。 この例では、 myeventstream という名前の Event Streams サービス・インスタンスを作成します。

    ibmcloud resource service-instance-create myeventstream messagehub lite us-south
    
  2. サービス・インスタンスに資格情報を提供するためのサービス・キーを作成します。

    ibmcloud resource service-key-create myeventstream-key Manager --instance-name myeventstream
    

    出力例

    Creating service key of service instance myeventstream under account <user_account>...
    OK
    Service key crn:v1:bluemix:public:messagehub:us-south:a/e43abfcbd191404cb17ef650e9681dd3:c0736069-3f4a-438a-b614-6846877d692d:resource-key:4c8edfdb-abcd-abcd-abcd-abcdabcdabcd was created.
    
    Name:          myeventstream-key
    ID:            crn:v1:bluemix:public:messagehub:us-south:a/e43abfcbd191404cb17ef650e9681dd3:c0736069-3f4a-438a-b614-6846877d692d:resource-key:4c8edfdb-abcd-abcd-abcd-abcdabcdabcd
    Created At:    Mon Mar 21 18:36:09 UTC 2022
    State:         active
    Credentials:
                api_key:                  abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh
                apikey:                   abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh
                iam_apikey_description:   Auto-generated for key crn:v1:bluemix:public:messagehub:us-south:a/e43abfcbd191404cb17ef650e9681dd3:c0736069-3f4a-438a-b614-6846877d692d:resource-key:4c8edfdb-abcd-abcd-abcd-abcdabcdabcd
                iam_apikey_name:          myeventstream-key
                iam_role_crn:             crn:v1:bluemix:public:iam::::serviceRole:Manager
                iam_serviceid_crn:        crn:v1:bluemix:public:iam-identity::a/e43abfcbd191404cb17ef650e9681dd3::serviceid:ServiceId-3e99caa5-b174-4f04-9845-5c5d783b8bc7
                instance_id:              c0736069-3f4a-438a-b614-6846877d692d
                kafka_admin_url:          https://abcdabcdabcdabcd.svc07.us-south.eventstreams.cloud.ibm.com
                kafka_brokers_sasl:       [broker-1-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-2-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-5-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-3-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 broker-0-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093]
                kafka_http_url:           https://abcdabcdabcdabcd.svc07.us-south.eventstreams.cloud.ibm.com
                password:                 abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh
                user:                     token
    

    サービス・キーの userpassword、および kafka-brokers_sasl の値をメモします。 この情報は、 Code Engine Kafka サブスクリプションをセットアップするときに必要になります。 passwordapikey の値は、 Event Streams サービス・インスタンスのサービス・キーで同じです。 ibmcloud resource service-key myeventstream-key コマンドを使用して、サービス・キー情報を取得することもできます。

  3. Event Streams サービス・インスタンスに関連する Event Streams プラグインを初期化します。

    ibmcloud es init --instance-name myeventstream
    
  4. Event Streamsトピックを作成します。

    ibmcloud es topic-create kafka-topic1
    

Kafka メッセージを生成するための Code Engine サンプル・アプリのセットアップ

このチュートリアルでは、 Kafka メッセージのイベント・プロデューサーとして機能するように Code Engine アプリケーションをセットアップします。 この kafka-sender-app アプリケーションの目的は、 Event Streams インスタンスに接続し、メッセージの受信側 (Kafka コンシューマー) に Kafka メッセージを生成 (送信) することです。 Kafka メッセージのイベントを生成するこのアプリケーションは、 Code Engine Kafka 送信側サンプル・アプリケーション を使用して Kafka メッセージを送信します。 このサンプル送信側イメージには、 BROKERS 環境変数と、 password 資格情報を含むシークレットが必要です。

  1. Code Engine Kafka サンプルで必要な資格情報を使用して秘密を作成します。 例えば、 Kafka 送信側サンプル・アプリケーションと、 Kafka 受信側サンプルを使用する Kafka イベント・サブスクリプションの両方に必要な資格情報を含めるために、 kafka-subscription-secret 秘密を作成します。 これらの資格情報は、サンプルの Kafka 送信側アプリおよび Code Engine Kafka イベント・サブスクリプションが Event Streamsのサービス・インスタンスと通信するために必要です。 Kafka 送信側アプリおよびイベント・サブスクリプションを作成する前にこのシークレットを作成する必要はありませんが、このアクションにより、必要なステップが単純化されます。

    kafka-subscription-secret シークレットを作成するには、 password および username のリテラル環境変数を追加します。 詳しくは、 CLI でのシークレットの作成 を参照してください。

    • Event Streams サービス・インスタンスのサービス資格情報の詳細にリストされている user の値を使用して、 username キーを指定します。 Event Streams サービス・インスタンスの場合、この値は token です。 このキーは、 Code Engine Kafka イベント・サブスクリプションと Kafka メッセージ・ブローカーとの間の認証に必要です。

    • Event Streams サービス・インスタンスのサービス資格情報の詳細にリストされている apikey の値を使用して、 password キーを指定します。 このキーは、送信側サンプルで、 Code Engine Kafka イベント・サブスクリプションと Kafka メッセージ・ブローカーとの間の通信を有効にするために必要です。

      ibmcloud ce secret create --name kafka-subscription-secret --from-literal password=<value_of_apikey> --from-literal username=<value_of_user>
      

      以下に例を示します。

      ibmcloud ce secret create --name kafka-subscription-secret --from-literal password=abcdeH9tu3qE5Sn8VbJfcDEWtjR_l0iPisB3abcdefgh --from-literal username=token
      
  2. kafka-sender-app 以下の内容で作成する。

    • icr.io/codeengine/kafka-sender コンテナー・イメージを参照するには、 --image オプションを指定します。 このイメージは sender.go からビルドされています。これは、 Samples for IBM Cloud Code Engine GitHub repoから入手できます。 このサンプル送信側アプリケーションには、 kafka-subscription-secret に保管されている password 資格情報と、 BROKERS 環境変数が必要です。

    • password 資格情報を含む完全なシークレット kafka-subscription-secret を参照するには、 --env-from-secret オプションを指定します。

    • --env オプションを指定してリテラル環境変数 BROKERS を追加し、 Event Streams サービス・インスタンスのサービス資格情報の詳細にリストされているいずれかのブローカー・ホストの名前を指定します。 ただし、複数のブローカー・ホスト名を指定する場合は、 --env BROKERS-broker1,broker2,broker3 の形式を使用します。

    • (オプション)--min-scale=1 オプションを指定することで、アプリが常に実行中のインスタンスを持ち、スケールがゼロにならないようにします。 ログを参照するときには、実行中のインスタンスが常に存在するようにアプリを構成すると役に立ちます。 実稼働環境で実行している場合は、アプリの実行インスタンスを保持するコスト、または Code Engine をゼロに自動スケーリングするかどうかを考慮してください。 デフォルトでは、使用されていない場合、アプリはゼロにスケーリングされます。

      ibmcloud ce app create --name kafka-sender-app --image icr.io/codeengine/kafka-sender --env-from-secret kafka-subscription-secret --env BROKERS=broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --min-scale 1
      
  3. このアプリをデプロイした後、 app get コマンドを実行して、アプリの状況が ready であることを確認します。

    ibmcloud ce app get -n kafka-sender-app
    

Code Engine のイベント・サブスクリプション用の Kafka メッセージを生成する kafka-sender-app アプリを作成し、必要な資格情報を含む kafka-subscription-secret シークレットを作成しました。

Code Engine Kafka サブスクリプションのセットアップ

Code Engine で Kafka イベントを処理するには、 Code Engine Kafka イベント・サブスクリプションをセットアップして Kafka イベント・ブローカーに接続し、 Kafka イベントを listen します。 また、 Kafka イベントの受信側として機能するように Code Engine アプリをセットアップします。 Kafka イベント・サブスクリプションは、イベントの Kafka プロデューサー (送信側) とコンシューマー (受信側) の間の関係を定義します。

Code Engine Kafka イベント・サブスクリプションは、 Kafka メッセージ・ブローカーに接続し、着信する Kafka メッセージごとに HTTP POST 要求を受信側アプリケーションに送信します。 Kafka イベントに含まれる情報について詳しくは、 アプリに配信されるイベントの HTTP ヘッダーおよび本文情報 を参照してください。

  1. Kafka メッセージのイベント・コンシューマーとして機能し、 Kafka イベントを受信する Code Engine アプリケーションを作成します。 例えば、icr.io/codeengine/kafka-receiverイメージを使用するkafka-receiver-appというアプリケーションを作成します。 このイメージは receiver.go からビルドされています。これは、 Samples for IBM Cloud Code Engine GitHub repoから入手できます。 このサンプルでは、環境変数は必要ありません。 オプションで --min-scale=1 を指定すると、アプリが常に実行中のインスタンスを持ち、スケールがゼロになることはない。 ログを参照するときには、実行中のインスタンスが常に存在するようにアプリを構成すると役に立ちます。 実稼働環境で実行している場合は、アプリの実行インスタンスを保持するコスト、または Code Engine をゼロに自動スケーリングするかどうかを考慮してください。 デフォルトでは、使用されていない場合、アプリはゼロにスケーリングされます。

    ibmcloud ce app create -n kafka-receiver-app --image icr.io/codeengine/kafka-receiver --min-scale 1
    

    デフォルトでは、イベントは、宛先アプリケーションのルート URL にルーティングされます。 --path オプションを使用すると、アプリ内の異なる宛先にイベントを送信できます。 例えば、サブスクリプションで--path /eventが指定されている場合、イベントはhttps://<base application URL>/eventsに送信されます。

  2. このアプリをデプロイした後、 app get コマンドを実行して、アプリの状況が ready であることを確認します。

    ibmcloud ce app get -n kafka-receiver-app
    
  3. ibmcloud ce sub kafka create コマンドを使用して、 Kafka イベント用の Code Engine Kafka イベント・サブスクリプションを作成します。 以前に作成した kafka-subscription-secret シークレットを使用して、メッセージ・ブローカーにアクセスします。 Kafka リソースのサービス資格情報に基づいてブローカー情報を指定します。 この例では、 ibmcloud resource service-key myeventstream-key コマンドの出力からブローカー情報を取得できます。 トピックのブローカーごとに --broker オプションを指定する必要があることに注意してください。 --destination オプションは、イベントを受信する Code Engine リソースを指定します。

    ibmcloud ce sub kafka create --name mykafkasubscription --destination kafka-receiver-app --secret kafka-subscription-secret --topic kafka-topic1 --broker broker-3-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-5-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker  broker-0-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-1-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093 --broker broker-2-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    
  4. Kafkaイベントサブスクリプションの詳細を表示します。

    ibmcloud ce sub kafka get -n mykafkasubscription
    

    出力例

    Getting Kafka event subscription 'mykafkasubscription'...
    OK
    
    Name:          mykafkasubscription
    [...]
    Destination Type:                 app
    Destination:                      kafka-receiver-app
    Brokers:
    broker-3-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    broker-5-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    broker-0-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    broker-1-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    broker-4-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    broker-2-abcdabcdabcdabcd.kafka.svc07.us-south.eventstreams.cloud.ibm.com:9093
    Consumer Group:                   knative-kafka-source-a4072fe1-1dfa-4470-9d07-bf7a0ff8e340
    Topics:
    kafka-topic1
    Secret key reference (user):      kafka-subscription-secret.username
    Secret key reference (password):  kafka-subscription-secret.password
    Ready:                            true
    
    Conditions:
    Type                     OK    Age  Reason
    ConnectionEstablished    true  24s
    InitialOffsetsCommitted  true  24s
    Ready                    true  24s
    Scheduled                true  24s
    SinkProvided             true  24s
    
    Events:
    Type     Reason           Age  Source                  Messages
    Normal   FinalizerUpdate  26s  kafkasource-controller  Updated "mykafkasubscription" finalizers
    

サブスクリプションのテスト

kafka-receiver-app アプリケーションを参照する Kafka イベント・サブスクリプションが作成されたので、 kafka-sender-app を使用してメッセージ・イベントを受信側アプリケーションに送信します。

  1. --output url オプションを指定した ibmcloud ce app get コマンドを使用して、宛先アプリケーションのパブリック URL kafka-sender-app を取得し、アプリケーションの URL を見つけます。

    ibmcloud ce app get -n kafka-sender-app --output url
    

    出力例

    https://kafka-sender-app.abcdabcdabc.us-south.codeengine.appdomain.cloud
    
  2. Kafka イベント・プロデューサー・アプリ kafka-sender-app を実行して、宛先 Code Engine アプリケーションにイベントを送信します。 curl を指定して kafka-sender-app アプリケーションを呼び出し、トピックの値とメッセージ数を指定します。 ibmcloud ce app get の出力を使って、アプリの公開URLを見つける。 値が単一ストリングとして扱われるように、必ず値を curl に引用符で囲んでください。 以下に例を示します。

    curl "<public_URL_of_Kafka_sender_app>?topic=<your_topic_name>&num=<number_of_messages_to_produce>"
    

    以下に例を示します。

    curl "https://kafka-sender-app.abcdabcdabc.us-south.codeengine.appdomain.cloud?topic=kafka-topic1&num=1"
    
  3. ログ内のイベントを表示します。 有効なブローカー、トピック、およびアクセス・シークレットを使用して Kafka イベント・サブスクリプションが作成され、そのトピックに関するメッセージを生成する Kafka アプリケーション ( kafka-sender-app など) がある場合、 kafka-receiver-app などの Kafka メッセージを受信する宛先 Code Engine アプリケーションのログにイベントを表示できます。 Kafka 受信側アプリケーション (icr.io/codeengine/kafka-receiver) を使用する場合は、受信側アプリケーションのログで Event data を検索して、受信したメッセージを確認します。

    ibmcloud ce app logs -n kafka-receiver-app
    

    出力例

    Getting logs for all instances of application 'kafka-receiver-app'...
    OK
    
    kafka-receiver-app-00001-deployment-66976f7988-9xttm/user-container:
    2022/03/31 22:19:45 Listening on port 8080
    2022/03/31 22:19:46 ----------
    2022/03/31 22:19:46 Path: /
    2022/03/31 22:19:46 Header: Accept-Encoding=[gzip]
    2022/03/31 22:19:46 Header: Ce-Id=[partition:0/offset:167]
    2022/03/31 22:19:46 Header: Ce-Source=[/apis/v1/namespaces/glxo4k7nj7d/kafkasources/mykafkasubscription#kafka-topic1]
    2022/03/31 22:19:46 Header: Ce-Specversion=[1.0]
    2022/03/31 22:19:46 Header: Ce-Subject=[partition:0#167]
    2022/03/31 22:19:46 Header: Ce-Time=[2022-03-31T22:19:36.499Z]
    2022/03/31 22:19:46 Header: Ce-Type=[dev.knative.kafka.event]
    2022/03/31 22:19:46 Header: Content-Length=[8]
    2022/03/31 22:19:46 Header: Forwarded=[for=172.30.208.213;proto=http, for=127.0.0.6]
    2022/03/31 22:19:46 Header: K-Proxy-Request=[activator]
    2022/03/31 22:19:46 Header: Traceparent=[00-b033708685c715a7c2384cdf05797785-65540b0937e9b0ce-00]
    2022/03/31 22:19:46 Header: User-Agent=[Go-http-client/1.1]
    2022/03/31 22:19:46 Header: X-B3-Parentspanid=[e1a785d7fdbead6c]
    2022/03/31 22:19:46 Header: X-B3-Sampled=[1]
    2022/03/31 22:19:46 Header: X-B3-Spanid=[abcde9901e6bf83f]
    2022/03/31 22:19:46 Header: X-B3-Traceid=[abcde490a426573772fa0bf60caf5ddb]
    2022/03/31 22:19:46 Header: X-Envoy-Attempt-Count=[1]
    2022/03/31 22:19:46 Header: X-Forwarded-For=[172.30.208.213, 127.0.0.6, 127.0.0.6]
    2022/03/31 22:19:46 Header: X-Forwarded-Proto=[http]
    2022/03/31 22:19:46 Header: X-Request-Id=[abcdeb4e-c5ac-abcd-abcd-60e6278abcde]
    2022/03/31 22:19:46 Event data: test1: 1
    

    なお、アプリのログ情報は1時間しか保存されない。 アプリ (またはジョブ) のログの表示について詳しくは、 ログの表示 を参照してください。

サブスクリプションの更新

CLI でイベント・サブスクリプションを更新するには、 ibmcloud ce subscription kafka update コマンドを使用します。 以下の例では、トピック名を更新します。

ibmcloud ce sub kafka update -n mykafkasubscription --topic kafka-topic2

ibmcloud ce subscription kafka update コマンドを使用して、 Kafka サブスクリプションの値を更新できます。 ただし、このコマンドを使用してコンシューマー・グループの値を変更することはできません。 別のトピックを参照するようにサブスクリプションを更新する場合は、サブスクリプションを更新する前に、 Kafka トピックが存在することを確認してください。

Kafkaサブスクリプション・チュートリアルのクリーンアップ

Kafka サブスクリプション、送信側アプリと受信側アプリ、およびシークレットを削除する準備ができましたか? 「 ibmcloud ce app delete」、「 ibmcloud ce sub kafka delete」、および「 ibmcloud ce sub kafka delete 」コマンドを使用できます。 オプションで、確認なしにコンポーネントを強制的に削除することもできる -f

Kafka サブスクリプションを削除しても、サブスクリプションによって参照されているアプリは削除されません。

サブスクリプションを削除するには、以下のようにします。

ibmcloud ce sub kafka delete --name mykafkasubscription -f

Kafka メッセージ受信アプリケーションを削除するには、以下のようにします。

ibmcloud ce app delete --name kafka-receiver-app -f

同様に、 kafka-sender-app を削除できます。

ibmcloud ce app delete --name kafka-sender-app -f

kafka-subscription-secret を削除する場合:

ibmcloud ce secret delete --name kafka-subscription-secret -f

Event Streams サービス・インスタンスを削除する準備ができましたか? --recursive オプションは、関連付けられたサービス・キーを含む、サービス・インスタンスのすべてのリソースを削除することを指定します。

ibmcloud resource service-instance-delete myeventstream --recursive -f

次のステップ

Kafka イベント・サブスクリプションの操作について詳しくは、 Kafka イベント・プロデューサーの操作 を参照してください。

コード・サンプルがさらに必要ですか? IBM Cloud Code Engine GitHub repoを確認してください。