メッセージのプロデュース
プロデューサーは、メッセージのストリームを Kafka トピックにパブリッシュするアプリケーションです。 ここでは、Apache Kafka プロジェクトの一部である Java プログラミング・インターフェースについて重点的に説明します。 概念は他の言語にも当てはまりますが、名前は少し異なる場合があります。
プログラミング・インターフェースでは、メッセージはレコードと呼ばれます。 例えば、Java クラス org.apache.kafka.clients.producer.ProducerRecord が、プロデューサー API の観点からメッセージを表すために使用されます。 _記録_と_メッセージ_という用語は同じ意味で使用できますが、基本的には、メッセージを表すためにレコードが使用されます。
プロデューサーは Kafka に接続するときに初期ブートストラップ接続を行います。 この接続の接続先はクラスター内のどのサーバーでもかまいません。 プロデューサーは、パブリッシュ先にするトピックのパーティションおよびリーダーの情報を要求します。 その後、プロデューサはパーティション・リーダーへの別の接続を確立し、メッセージを公開できるようになる。 これらのアクションは、プロデューサーが Kafka クラスターに接続すると、内部で自動的に発生します。
可用性を確保するために、 Kafka ブローカーはメッセージを複製します。これにより、1 つのブローカーが使用不可の場合でも、他のブローカーはプロデューサーからメッセージを受信してコンシューマーに送信することができます。 Event Streams は、複製係数 3 を使用します。これは、各メッセージが 3 つのブローカーに保管されることを意味します。 パーティション・リーダーにメッセージが送信されても、そのメッセージはすぐにコンシューマーで使用可能にはなりません。 リーダーはメッセージのレコードをパーティションに付加し、その際、そのパーティションの次のオフセット数値をそのレコードに割り当てます。 同期しているレプリカのすべてのフォロワーがレコードをレプリケートし、そのレプリカにレコードを書き込んだことを認めた後、レコードはコミットされ、コンシューマーが利用できるようになる。
各メッセージは、キーと値の2つの部分からなるレコードとして表現される。 キーは一般的にメッセージに関するデータ用に使用され、値はメッセージ本体です。 Kafkaエコシステムの多くのツール(他のシステムへのコネクターなど)は、値のみを使用し、キーを無視するため、すべてのメッセージデータを値に入れ、パーティショニングやログのコンパクションにキーを使用するのが最善です。 Kafkaから読み込むものすべてがキーを使用することに依存してはならない。
その他の多くのメッセージング・システムにも、他の情報をメッセージと共に受け渡す方法があります。 Kafkaバージョン0.11では、この目的のためにレコードヘッダが導入されている。
Event Streamsで メッセージのコンシューム とともにこの情報を読むと役立つ場合があります。
構成設定
プロデューサーには、多くの構成設定が存在します。 バッチ処理、再試行、メッセージの確認など、プロデューサーの側面を制御できる。 以下に、最重要な設定を示します。
| 名前 | 説明 | 有効な値 | デフォルト |
|---|---|---|---|
| key.serializer | キーをシリアライズするために使用されるクラス。 | org.apache.kafka.common.serialization.StringSerializer などの Serializer インターフェースを実装するJavaクラス。 | デフォルトなし - 値を指定する必要があります。 |
| value.serializer | 値をシリアライズするために使用されるクラス。 | org.apache.kafka.common.serialization.StringSerializer などの Serializer インターフェースを実装するJavaクラス。 | デフォルトなし - 値を指定する必要があります。 |
| acks | 各メッセージのパブリッシュに対して確認応答を行う必要があるサーバーの数。 これは、プロデューサーが必要とする耐久性保証を制御します。 | 0、1、all (または -1) | all (Kafka 3.0 以降) 1 ( Kafka 3.0より前) |
| 再試行 | 送信でエラーが発生したときにクライアントがメッセージを再送信する回数。 | 0,... | 0 |
| max.block.ms | 送信要求またはメタデータ要求が待機をブロックできる時間 (ミリ秒)。 | 0,... | 60000 (1 分) |
| max.in.flight.requests.per.connection | クライアントがそれ以降のリクエストをブロックするまでに、コネクション上で送信する未承認リクエストの最大数。 | 1,... | 5 |
| request.timeout.ms | プロデューサーが要求への応答を待機する最大時間。 タイムアウトが経過する前に応答を受け取らなかった場合、リクエストは再試行 され、再試行回数が尽きた場合は失敗する。 | 0,... | 30000 (30 秒) |
他にも多くのコンフィギュレーション設定がありますが、試す前に Apache Kafkaのドキュメントをよく読んでください。
区画化
Kafka では、パーティションはスケーラビリティーの単位です。 したがって、パーティショニングは、トピックデータを複数の並列ストリームで流すことができるため、スループットを向上させる効果的な方法である。
プロデューサーは、トピックにメッセージをパブリッシュするときに、どのパーティションを使用するのかを選択できます。 順序付けが重要な場合、パーティションは順序付けられたレコードのシーケンスですが、トピックは1つ以上のパーティションで構成されることに注意してください。 一群のメッセージが順序通りに配信されるようにする必要がある場合、それらのメッセージが確実に同じパーティションに送信されるようにします。 これを行うための最も単純な方法は、それらのメッセージのすべてに同じキーを付与することです。
プロデューサーは、メッセージをパブリッシュするときにパーティション番号を明示的に指定できます。 この方法は直接制御を可能にしますが、プロデューサーのコードはパーティション選択の管理について責任を担うため、より複雑になります。 詳しくは、Producer.partitionsFor メソッド呼び出しを参照してください。 例えば Kafkaバージョン2.2.0。
プロデューサーがパーティション番号を指定しない場合、パーティションの選択はパーティショナーによって行われます。 Kafka プロデューサーに組み込まれたデフォルト・パーティショナーは、以下のように動作します。
- レコードにキーが付いていない場合、ラウンドロビン方式でパーティションを選択します。
- レコードにキーが付いている場合、そのキーのハッシュ値を計算してパーティションを選択します。 これは、同じキーを持つすべてのメッセージに対して同じパーティションを選択する効果があります。
ユーザー独自のカスタム・パーティショナーを作成することもできます。 カスタム・パーティショナーは、レコードをパーティションに割り当てる方式を任意に選択できます。 例えば、キー内の情報のサブセットのみを使用したり、アプリケーション固有の ID を使用したりできます。
メッセージ順序付け
Kafka は、通常、プロデューサーによって送信された順序でメッセージを書き込みます。 しかし、状況によっては、再試行によってメッセージが重複したり、順序が入れ替わったりすることがある。 メッセージの順序を保証する唯一の方法だからだ。
プロデューサーはまた、自動的にメッセージの送信を再試行することもできる。 このリトライ機能を有効にしておくと、アプリケーション・コード自身がリトライを行わなければならなくなるからだ。 Kafka でのバッチ処理と自動再試行を組み合わせると、メッセージの重複および順序変更という結果が生じる可能性があります。
例えば、トピックについて 3 つのメッセージ <M1、M2、M3> のシーケンスをパブリッシュするとします。 レコードはすべて同じバッチ内に収まるかもしれないので、すべて一緒にパーティションリーダーに送られる。 リーダーは、それらをパーティションに書き込み、別々のレコードとして複製します。 障害が発生した場合、 M1 および M2 が区画に追加されているが、 M3 は追加されていない可能性があります。 The producer doesn't receive an acknowledgment, so it retries sending <M1, M2, M3>. The new leader writes M1, M2 and M3 onto the partition, which now contains <M1, M2, M1, M2, M3>, where the duplicated M1 follows the original M2. 各ブローカーへの送信中の要求の数を 1 つのみに制限すると、このような順序の変更を防止できます。 <M1、 M2、 M2、 M3>などの 1 つのレコードが重複していても、順序が乱れることはありません。 バージョン0.11以降のKafkaでは、M2の重複を防ぐためにidempotent producer機能を使うこともできる。
Kafkaでは、フライト中のリクエストが1つしかない場合のパフォーマンスへの影響が大きいため、時折メッセージの重複を処理するようにアプリケーションを記述するのが通常のプラクティスである。
メッセージ確認応答
メッセージを発行するとき、acks producer configurationを使うことで、必要な確認メッセージのレベルを選択することができます。 選択項目はスループットと信頼性の間のバランスを表しています。 以下の 3 つのレベルが存在します。
- acks=0 (最も信頼性が低い)
- メッセージはネットワークに書き込まれた時点で送信されたとみなされる。 パーティション・リーダーからの確認応答はありません。 結果として、パーティション・リーダーが変わった場合、メッセージが失われる可能性があります。 このレベルの確認応答は高速だが、状況によってはメッセージが失われる可能性がある。
- acks=1 (デフォルト)
- このメッセージは、パーティションリーダーがそのレコードをパーティションに正常に書き込むとすぐに、プロデューサーに確認される。 確認応答は、レコードが同期しているレプリカに到達する前に発生するため、リーダーが失敗した場合、メッセージは失われるかもしれないが、フォロワーはまだメッセージを持っていない。 もしパーティションのリーダが変われば、古いリーダがプロデューサに通知し、プロデューサはエラーを処理し、新しいリーダにメッセージを送るために再試行することができる。 メッセージはすべてのレプリカで受信が確認される前に承認されるため、パーティションのリーダーシップが変わると、承認されたがまだ完全に複製されていないメッセージが失われる可能性がある。
- acks=all (最も信頼性が高い)
- パーティション・リーダーがそのレコードの書き込みに成功し、同期しているすべてのレプリカが同じことを行ったとき、メッセージはプロデューサーに確認される。 少なくとも 1 つの同期レプリカが使用可能であれば、パーティション・リーダーが変わってもメッセージは失われません。
プロデューサーに対してメッセージが確認応答されるのを待たない場合でも、メッセージがコンシュームに使用可能になるのは、コミットされた場合のみであり、それは同期レプリカへの複製が完了していることを意味します。 言い換えると、プロデューサーの観点から見たメッセージ送信の待ち時間は、プロデューサーがメッセージ送信してからコンシューマーがそのメッセージを受け取るまでを測定したエンドツーエンドの待ち時間よりも短いということです。
可能であれば、次のメッセージを発行する前に、メッセージの肯定応答を 待つことは避けてください。 待機すると、プロデューサーはメッセージをまとめるバッチ処理を行うことができず、メッセージをパブリッシュできる速度がネットワークの往復待ち時間を下回ることになります。
バッチ処理、抑制、および圧縮
効率化のため、プロデューサーはレコードのバッチをまとめてサーバーに送信する。 圧縮を有効にするとプロデューサーは各バッチを圧縮します。圧縮によって、ネットワークを介して転送する必要のあるデータ量が減り、パフォーマンスが向上する可能性があります。
サーバーへ送信できるよりも速くメッセージをパブリッシュしようとすると、プロデューサーは自動的にそれらをバッチ処理された要求のバッファーに入れます。 プロデューサーは、パーティションごとに未送信レコードのバッファーを保守します。 バッチ処理でさえも、望むレートを達成できないときが来る。
影響を持つ別の要素もあります。 個々のプロデューサーまたはコンシューマーがクラスターを過負荷にするのを防止するため、Event Streams はスループット割り当て量を適用します。 各プロデューサーがデータを送信する速度が計算され、割り当て量を超えようとしたプロデューサーは抑えられます。 この抑制は、プロデューサーへの応答の送信を少し遅らせることによって適用されます。 通常、これは単に自然なブレーキとして動作します。
スループットのガイダンスについて詳しくは、 制限と割り当て量 を参照してください。
要約すると、メッセージがパブリッシュされると、そのレコードはまずプロデューサーのバッファーに書き込まれます。 バックグラウンドでは、プロデューサーがレコードをバッチにまとめて、サーバーに送信します。 次に、サーバーはプロデューサーに応答しますが、プロデューサーがパブリッシュするのが速すぎる場合は、抑制のための遅延が適用されることがあります。 プロデューサのバッファがいっぱいになると、プロデューサの送信コールは遅延されるが、最終的には例外で失敗するかもしれない。
配信セマンティクス
Kafka は、以下の複数の異なるメッセージ配信セマンティクスを提供します。
- 多くても1回です:メッセージが紛失し、再配達されない可能性があります。
- 少なくとも一度は:メッセージが失われることはありませんが、重複する可能性があります。
- 正確に一度だけ:メッセージが失われることはなく、重複もありません。
配信セマンティクスは以下の設定で決まります。
acksretriesenable.idempotence
デフォルトでは、Kafkaは 少なくとも1回のセマンティクスを使用する。
exactly onceセマンティクスを有効にするには、idempotentまたはtransactionalプロデューサーを使わなければならない。 idempotent プロデューサーを有効にするには、enable.idempotence を true に設定します。これによって、再試行される場合でも、各メッセージの正確に 1 つのコピーが Kafka に書き込まれることが保証されます。 transactional
プロデューサーは、すべてのメッセージが正常に配信されるか、どのメッセージも配信されないように、データを複数のパーティションに送信できるようにします。 すなわち、トランザクションは完全にコミットされるか、完全に破棄されます。 また、Kafkaへのメッセージの読み取り、処理、書き込みを行うアプリケーションを構築するために、トランザクションにオフセットを含めることもできる。
コード・スニペット
これらのコード・スニペットは、関連する概念を説明するための概要レベルです。 完全な例については、GitHub の Event Streams サンプルを参照してください。
コンシューマーを Event Streamsに接続するには、サービス資格情報を作成する必要があります。 詳しくは、Event Streams への接続を参照してください。
プロデューサー・コードでは、まず構成プロパティーのセットをビルドする必要があります。 Event Streamsへのすべての接続は、TLSとユーザー認証とパスワード認証を使って保護されているので、最低限これらのプロパティが必要です。 BOOTSTRAP_ENDPOINTS、USER、および PASSWORD を、自分のサービス資格情報のものに置き換えます:
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("ssl.protocol", "TLSv1.2");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.endpoint.identification.algorithm", "HTTPS");
メッセージを送信するには、例えばキーと値にシリアライザーを指定する必要もある:
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
これらのシリアライザーは、コンシューマーによって使用されるデシリアライザーと一致する必要があります。
そして、KafkaProducerてメッセージを送信する。各メッセージはProducerRecordで表される。 終了したら KafkaProducer を閉じることを忘れないでください。 このコードは、メッセージを送信するだけであり、送信が成功したかどうかを調べるための待機はしていません。 メッセージはトピック T1 に送信されます。キーはストリング key で、値はストリング value です。
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("T1", "key", "value"));
producer.close();
send() メソッドは非同期であり、Future を返します。それを使用して完了を確認できます。
Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>("T1", "key", "value"));
// Do some other stuff
// Now wait for the result of the send
RecordMetadata rm = f.get();
long offset = rm.offset;
あるいは、メッセージ送信時にコールバックを指定することもできます:
producer.send(new ProducerRecord<String,String>("T1","key","value", new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
// This is called when the send completes, either successfully or with an exception
}
});
詳しくは、 Kafka クライアントの Javadoc を参照してください。