IBM Cloud Docs
メッセージのプロデュース

メッセージのプロデュース

プロデューサーは、メッセージのストリームを Kafka トピックにパブリッシュするアプリケーションです。 ここでは、Apache Kafka プロジェクトの一部である Java プログラミング・インターフェースについて重点的に説明します。 概念は他の言語にも当てはまりますが、名前は少し異なる場合があります。

プログラミング・インターフェースでは、メッセージはレコードと呼ばれます。 例えば、Java クラス org.apache.kafka.clients.producer.ProducerRecord が、プロデューサー API の観点からメッセージを表すために使用されます。 _記録_と_メッセージ_という用語は同じ意味で使用できますが、基本的には、メッセージを表すためにレコードが使用されます。

プロデューサーは Kafka に接続するときに初期ブートストラップ接続を行います。 この接続の接続先はクラスター内のどのサーバーでもかまいません。 プロデューサーは、パブリッシュ先にするトピックのパーティションおよびリーダーの情報を要求します。 その後、プロデューサはパーティション・リーダーへの別の接続を確立し、メッセージを公開できるようになる。 これらのアクションは、プロデューサーが Kafka クラスターに接続すると、内部で自動的に発生します。

可用性を確保するために、 Kafka ブローカーはメッセージを複製します。これにより、1 つのブローカーが使用不可の場合でも、他のブローカーはプロデューサーからメッセージを受信してコンシューマーに送信することができます。 Event Streams は、複製係数 3 を使用します。これは、各メッセージが 3 つのブローカーに保管されることを意味します。 パーティション・リーダーにメッセージが送信されても、そのメッセージはすぐにコンシューマーで使用可能にはなりません。 リーダーはメッセージのレコードをパーティションに付加し、その際、そのパーティションの次のオフセット数値をそのレコードに割り当てます。 同期しているレプリカのすべてのフォロワーがレコードをレプリケートし、そのレプリカにレコードを書き込んだことを認めた後、レコードはコミットされ、コンシューマーが利用できるようになる。

各メッセージは、キーと値の2つの部分からなるレコードとして表現される。 キーは一般的にメッセージに関するデータ用に使用され、値はメッセージ本体です。 Kafkaエコシステムの多くのツール(他のシステムへのコネクターなど)は、値のみを使用し、キーを無視するため、すべてのメッセージデータを値に入れ、パーティショニングやログのコンパクションにキーを使用するのが最善です。 Kafkaから読み込むものすべてがキーを使用することに依存してはならない。

その他の多くのメッセージング・システムにも、他の情報をメッセージと共に受け渡す方法があります。 Kafkaバージョン0.11では、この目的のためにレコードヘッダが導入されている。

Event Streamsで メッセージのコンシューム とともにこの情報を読むと役立つ場合があります。

構成設定

プロデューサーには、多くの構成設定が存在します。 バッチ処理、再試行、メッセージの確認など、プロデューサーの側面を制御できる。 以下に、最重要な設定を示します。

プロデューサー・コンフィギュレーション設定
名前 説明 有効な値 デフォルト
key.serializer キーをシリアライズするために使用されるクラス。 org.apache.kafka.common.serialization.StringSerializer などの Serializer インターフェースを実装するJavaクラス。 デフォルトなし - 値を指定する必要があります。
value.serializer 値をシリアライズするために使用されるクラス。 org.apache.kafka.common.serialization.StringSerializer などの Serializer インターフェースを実装するJavaクラス。 デフォルトなし - 値を指定する必要があります。
acks 各メッセージのパブリッシュに対して確認応答を行う必要があるサーバーの数。 これは、プロデューサーが必要とする耐久性保証を制御します。 0、1、all (または -1) all (Kafka 3.0 以降) 1 ( Kafka 3.0より前)
再試行 送信でエラーが発生したときにクライアントがメッセージを再送信する回数。 0,... 0
max.block.ms 送信要求またはメタデータ要求が待機をブロックできる時間 (ミリ秒)。 0,... 60000 (1 分)
max.in.flight.requests.per.connection クライアントがそれ以降のリクエストをブロックするまでに、コネクション上で送信する未承認リクエストの最大数。 1,... 5
request.timeout.ms プロデューサーが要求への応答を待機する最大時間。 タイムアウトが経過する前に応答を受け取らなかった場合、リクエストは再試行 され、再試行回数が尽きた場合は失敗する。 0,... 30000 (30 秒)

他にも多くのコンフィギュレーション設定がありますが、試す前に Apache Kafkaのドキュメントをよく読んでください。

区画化

Kafka では、パーティションはスケーラビリティーの単位です。 したがって、パーティショニングは、トピックデータを複数の並列ストリームで流すことができるため、スループットを向上させる効果的な方法である。

プロデューサーは、トピックにメッセージをパブリッシュするときに、どのパーティションを使用するのかを選択できます。 順序付けが重要な場合、パーティションは順序付けられたレコードのシーケンスですが、トピックは1つ以上のパーティションで構成されることに注意してください。 一群のメッセージが順序通りに配信されるようにする必要がある場合、それらのメッセージが確実に同じパーティションに送信されるようにします。 これを行うための最も単純な方法は、それらのメッセージのすべてに同じキーを付与することです。

プロデューサーは、メッセージをパブリッシュするときにパーティション番号を明示的に指定できます。 この方法は直接制御を可能にしますが、プロデューサーのコードはパーティション選択の管理について責任を担うため、より複雑になります。 詳しくは、Producer.partitionsFor メソッド呼び出しを参照してください。 例えば Kafkaバージョン2.2.0

プロデューサーがパーティション番号を指定しない場合、パーティションの選択はパーティショナーによって行われます。 Kafka プロデューサーに組み込まれたデフォルト・パーティショナーは、以下のように動作します。

  • レコードにキーが付いていない場合、ラウンドロビン方式でパーティションを選択します。
  • レコードにキーが付いている場合、そのキーのハッシュ値を計算してパーティションを選択します。 これは、同じキーを持つすべてのメッセージに対して同じパーティションを選択する効果があります。

ユーザー独自のカスタム・パーティショナーを作成することもできます。 カスタム・パーティショナーは、レコードをパーティションに割り当てる方式を任意に選択できます。 例えば、キー内の情報のサブセットのみを使用したり、アプリケーション固有の ID を使用したりできます。

メッセージ順序付け

Kafka は、通常、プロデューサーによって送信された順序でメッセージを書き込みます。 しかし、状況によっては、再試行によってメッセージが重複したり、順序が入れ替わったりすることがある。 メッセージの順序を保証する唯一の方法だからだ。

プロデューサーはまた、自動的にメッセージの送信を再試行することもできる。 このリトライ機能を有効にしておくと、アプリケーション・コード自身がリトライを行わなければならなくなるからだ。 Kafka でのバッチ処理と自動再試行を組み合わせると、メッセージの重複および順序変更という結果が生じる可能性があります。

例えば、トピックについて 3 つのメッセージ <M1、M2、M3> のシーケンスをパブリッシュするとします。 レコードはすべて同じバッチ内に収まるかもしれないので、すべて一緒にパーティションリーダーに送られる。 リーダーは、それらをパーティションに書き込み、別々のレコードとして複製します。 障害が発生した場合、 M1 および M2 が区画に追加されているが、 M3 は追加されていない可能性があります。 The producer doesn't receive an acknowledgment, so it retries sending <M1, M2, M3>. The new leader writes M1, M2 and M3 onto the partition, which now contains <M1, M2, M1, M2, M3>, where the duplicated M1 follows the original M2. 各ブローカーへの送信中の要求の数を 1 つのみに制限すると、このような順序の変更を防止できます。 <M1、 M2、 M2、 M3>などの 1 つのレコードが重複していても、順序が乱れることはありません。 バージョン0.11以降のKafkaでは、M2の重複を防ぐためにidempotent producer機能を使うこともできる。

Kafkaでは、フライト中のリクエストが1つしかない場合のパフォーマンスへの影響が大きいため、時折メッセージの重複を処理するようにアプリケーションを記述するのが通常のプラクティスである。

メッセージ確認応答

メッセージを発行するとき、acks producer configurationを使うことで、必要な確認メッセージのレベルを選択することができます。 選択項目はスループットと信頼性の間のバランスを表しています。 以下の 3 つのレベルが存在します。

acks=0 (最も信頼性が低い)
メッセージはネットワークに書き込まれた時点で送信されたとみなされる。 パーティション・リーダーからの確認応答はありません。 結果として、パーティション・リーダーが変わった場合、メッセージが失われる可能性があります。 このレベルの確認応答は高速だが、状況によってはメッセージが失われる可能性がある。
acks=1 (デフォルト)
このメッセージは、パーティションリーダーがそのレコードをパーティションに正常に書き込むとすぐに、プロデューサーに確認される。 確認応答は、レコードが同期しているレプリカに到達する前に発生するため、リーダーが失敗した場合、メッセージは失われるかもしれないが、フォロワーはまだメッセージを持っていない。 もしパーティションのリーダが変われば、古いリーダがプロデューサに通知し、プロデューサはエラーを処理し、新しいリーダにメッセージを送るために再試行することができる。 メッセージはすべてのレプリカで受信が確認される前に承認されるため、パーティションのリーダーシップが変わると、承認されたがまだ完全に複製されていないメッセージが失われる可能性がある。
acks=all (最も信頼性が高い)
パーティション・リーダーがそのレコードの書き込みに成功し、同期しているすべてのレプリカが同じことを行ったとき、メッセージはプロデューサーに確認される。 少なくとも 1 つの同期レプリカが使用可能であれば、パーティション・リーダーが変わってもメッセージは失われません。

プロデューサーに対してメッセージが確認応答されるのを待たない場合でも、メッセージがコンシュームに使用可能になるのは、コミットされた場合のみであり、それは同期レプリカへの複製が完了していることを意味します。 言い換えると、プロデューサーの観点から見たメッセージ送信の待ち時間は、プロデューサーがメッセージ送信してからコンシューマーがそのメッセージを受け取るまでを測定したエンドツーエンドの待ち時間よりも短いということです。

可能であれば、次のメッセージを発行する前に、メッセージの肯定応答を 待つことは避けてください。 待機すると、プロデューサーはメッセージをまとめるバッチ処理を行うことができず、メッセージをパブリッシュできる速度がネットワークの往復待ち時間を下回ることになります。

バッチ処理、抑制、および圧縮

効率化のため、プロデューサーはレコードのバッチをまとめてサーバーに送信する。 圧縮を有効にするとプロデューサーは各バッチを圧縮します。圧縮によって、ネットワークを介して転送する必要のあるデータ量が減り、パフォーマンスが向上する可能性があります。

サーバーへ送信できるよりも速くメッセージをパブリッシュしようとすると、プロデューサーは自動的にそれらをバッチ処理された要求のバッファーに入れます。 プロデューサーは、パーティションごとに未送信レコードのバッファーを保守します。 バッチ処理でさえも、望むレートを達成できないときが来る。

影響を持つ別の要素もあります。 個々のプロデューサーまたはコンシューマーがクラスターを過負荷にするのを防止するため、Event Streams はスループット割り当て量を適用します。 各プロデューサーがデータを送信する速度が計算され、割り当て量を超えようとしたプロデューサーは抑えられます。 この抑制は、プロデューサーへの応答の送信を少し遅らせることによって適用されます。 通常、これは単に自然なブレーキとして動作します。

スループットのガイダンスについて詳しくは、 制限と割り当て量 を参照してください。

要約すると、メッセージがパブリッシュされると、そのレコードはまずプロデューサーのバッファーに書き込まれます。 バックグラウンドでは、プロデューサーがレコードをバッチにまとめて、サーバーに送信します。 次に、サーバーはプロデューサーに応答しますが、プロデューサーがパブリッシュするのが速すぎる場合は、抑制のための遅延が適用されることがあります。 プロデューサのバッファがいっぱいになると、プロデューサの送信コールは遅延されるが、最終的には例外で失敗するかもしれない。

配信セマンティクス

Kafka は、以下の複数の異なるメッセージ配信セマンティクスを提供します。

  • 多くても1回です:メッセージが紛失し、再配達されない可能性があります。
  • 少なくとも一度は:メッセージが失われることはありませんが、重複する可能性があります。
  • 正確に一度だけ:メッセージが失われることはなく、重複もありません。

配信セマンティクスは以下の設定で決まります。

  • acks
  • retries
  • enable.idempotence

デフォルトでは、Kafkaは 少なくとも1回のセマンティクスを使用する。

exactly onceセマンティクスを有効にするには、idempotentまたはtransactionalプロデューサーを使わなければならない。 idempotent プロデューサーを有効にするには、enable.idempotencetrue に設定します。これによって、再試行される場合でも、各メッセージの正確に 1 つのコピーが Kafka に書き込まれることが保証されます。 transactional プロデューサーは、すべてのメッセージが正常に配信されるか、どのメッセージも配信されないように、データを複数のパーティションに送信できるようにします。 すなわち、トランザクションは完全にコミットされるか、完全に破棄されます。 また、Kafkaへのメッセージの読み取り、処理、書き込みを行うアプリケーションを構築するために、トランザクションにオフセットを含めることもできる。

コード・スニペット

これらのコード・スニペットは、関連する概念を説明するための概要レベルです。 完全な例については、GitHub の Event Streams サンプルを参照してください。

コンシューマーを Event Streamsに接続するには、サービス資格情報を作成する必要があります。 詳しくは、Event Streams への接続を参照してください。

プロデューサー・コードでは、まず構成プロパティーのセットをビルドする必要があります。 Event Streamsへのすべての接続は、TLSとユーザー認証とパスワード認証を使って保護されているので、最低限これらのプロパティが必要です。 BOOTSTRAP_ENDPOINTS、USER、および PASSWORD を、自分のサービス資格情報のものに置き換えます:

Properties props = new Properties();
 props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
 props.put("security.protocol", "SASL_SSL");
 props.put("sasl.mechanism", "PLAIN");
 props.put("ssl.protocol", "TLSv1.2");
 props.put("ssl.enabled.protocols", "TLSv1.2");
 props.put("ssl.endpoint.identification.algorithm", "HTTPS");

メッセージを送信するには、例えばキーと値にシリアライザーを指定する必要もある:

 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

これらのシリアライザーは、コンシューマーによって使用されるデシリアライザーと一致する必要があります。

そして、KafkaProducerてメッセージを送信する。各メッセージはProducerRecordで表される。 終了したら KafkaProducer を閉じることを忘れないでください。 このコードは、メッセージを送信するだけであり、送信が成功したかどうかを調べるための待機はしていません。 メッセージはトピック T1 に送信されます。キーはストリング key で、値はストリング value です。

 Producer<String, String> producer = new KafkaProducer<>(props);
 producer.send(new ProducerRecord<String, String>("T1", "key", "value"));
 producer.close();

send() メソッドは非同期であり、Future を返します。それを使用して完了を確認できます。

 Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>("T1", "key", "value"));
// Do some other stuff
// Now wait for the result of the send
 RecordMetadata rm = f.get();
 long offset = rm.offset;

あるいは、メッセージ送信時にコールバックを指定することもできます:

producer.send(new ProducerRecord<String,String>("T1","key","value", new Callback() {
          public void onCompletion(RecordMetadata metadata, Exception exception) {
                     // This is called when the send completes, either successfully or with an exception
          }
});

詳しくは、 Kafka クライアントの Javadoc を参照してください。