メッセージのコンシューム
コンシューマーは、メッセージのストリームを Kafka トピックからコンシュームするアプリケーションです。 1 つのコンシューマーが 1 つ以上のトピックまたはパーティションにサブスクライブできます。 この情報は、Apache Kafka プロジェクトの一部である Java™ プログラミング・インターフェースに焦点を当てています。 概念は他の言語にも当てはまりますが、名前は少し異なる場合があります。
コンシューマーは Kafka に接続するときに初期ブートストラップ接続を行います。 この接続の接続先はクラスター内のどのサーバーでもかまいません。 コンシューマーは、コンシューム元にするトピックのパーティションおよびリーダーの情報を要求します。 その後、コンシューマーはパーティション・リーダーへの別の接続を確立し、メッセージをコンシュームできます。 これらのアクションは、コンシューマーが Kafka クラスターに接続すると、内部で自動的に発生します。
通常、コンシューマーは長時間実行アプリケーションです。 コンシューマーは、Consumer.poll(...)
を定期的に呼び出すことによって、Kafka にメッセージを要求します。 コンシューマーは、poll()
を呼び出し、メッセージのバッチを受け取り、それらをすぐに処理し、もう一度 poll()
を呼び出します。
コンシューマーがメッセージを処理しても、そのメッセージはトピックから削除されません。 代わりに、コンシューマーは、どのメッセージが処理されたかを Kafka に通知する方法をいくつか選択できます。 この処理は、オフセットのコミットと呼ばれます。
プログラミング・インターフェースでは、メッセージはレコードと呼ばれます。 例えば、Java クラス org.apache.kafka.clients.consumer.ConsumerRecord が、コンシューマー API のメッセージを表すために使用されます。 _記録_と_メッセージ_という用語は同じ意味で使用できますが、基本的には、メッセージを表すためにレコードが使用されます。
この情報は、Event Streams のメッセージの生成と一緒に読むと役立つ場合があります。
コンシューマー・プロパティーの構成
コンシューマーには、その動作の側面を制御する多くの構成設定があります。 以下の設定は、最も重要な設定の一部です。
名前 | 説明 | 有効な値 | デフォルト |
---|---|---|---|
key.deserializer | キーをデシリアライズするために使用されるクラス。 | デシリアライザー・インターフェースを実装する Java クラス (org.apache.kafka.common.serialization.StringDeserializer など)。 | デフォルトなし - 値を指定する必要があります |
value.deserializer | 値をデシリアライズするために使用されるクラス。 | デシリアライザー・インターフェースを実装する Java クラス (org.apache.kafka.common.serialization.StringDeserializer など)。 | デフォルトはありません-値を指定する必要があります。 |
group.id | コンシューマーが所属するコンシューマー・グループの ID。 | ストリング | デフォルトなし |
auto.offset.reset | コンシューマーに初期オフセットがないか、現行オフセットがクラスター内でもう使用可能でない場合の動作。 | 最新、最古、なし | 最新バージョン |
enable.auto.commit | コンシューマーのオフセットを自動的にバックグラウンドでコミットするかどうかを決定します。 | True、false | はい |
auto.commit.interval.ms | オフセットの定期的コミットの間隔 (ミリ秒)。 | 0,... | 5000 (5 秒) |
max.poll.records | poll () への呼び出しで戻されるレコードの最大数。 | 1,... | 500 |
session.timeout.ms | コンシューマーがコンシューマー・グループのメンバーであり続けるためにコンシューマー・ハートビートがその間に受信されなければならない時間 (ミリ秒)。 | 6000-300000 | 10000 (10 秒) |
max.poll.interval.ms | コンシューマーがグループを抜ける前の最大ポーリング時間間隔。 | 1,... | 300000 (5 分) |
さらに多くの構成設定を使用できますが、それらを試す前に、 Apache Kafka の資料 をお読みください。
コンシューマー・グループ
_コンシューマー・グループ_は、1 つ以上のトピックからメッセージをコンシュームするために連携するコンシューマーのグループです。 同じグループ内のすべてのコンシューマーは group.id
構成に同じ値を使用します。 作業負荷を処理するために複数のコンシューマーが必要な場合、同じコンシューマー・グループ内の複数のコンシューマーを実行できます。 1 つのコンシューマーのみが必要な場合でも、通常はgroup.id
の値も指定します。
各コンシューマー・グループには、グループ内のコンシューマーへのパーティションの割り当てを担当する_コーディネーター_と呼ばれるサーバーがクラスター内にあります。 コーディネーターの責任は、負荷を平均化するためにクラスター内のすべてのサーバーで負います。 コンシューマーへのパーティションの割り当ては、グループのバランスが再調整されるたびに変化する可能性があります。
コンシューマー・グループにコンシューマーが加わると、そのコンシューマーはグループのコーディネーターを検出します。 その後、コンシューマーはコーディネーターにグループに参加することを通知し、コーディネーターはグループ全体のパーティションのリバランスを開始して新規メンバーを組み込みます。
単一パーティションからのメッセージは、各グループ内の 1 つのコンシューマーによってのみ処理されます。 このメカニズムにより、各パーティション上のメッセージが順番に処理されるようになります。 トピックに 3 つのパーティションが含まれており、そのトピックをコンシュームしているコンシューマー・グループに 2 つのコンシューマーが含まれている例については、以下の図を参照してください。 グループ内の 1 つのコンシューマーには 2 つのパーティションが割り当てられ、もう 1 つのコンシューマーには 1 つのパーティションが割り当てられます。
コンシューマー・グループで以下のいずれかの変更が行われると、変更に対応するためにグループ・メンバーにパーティションの割り当てをシフトすることにより、グループのバランスが再調整されます。
- コンシューマーがグループに参加します。
- コンシューマーがグループを離脱します。
- コンシューマーは、コーディネーターによってもはやライブではないと見なされます。
- 新しいパーティションが既存のトピックに追加されます。
リバランスされたコンシューマー・グループがある場合は、そのグループを離れたすべてのコンシューマーが、そのグループに再結合するまで拒否されるコミットを持っていることに注意してください。 この場合、コンシューマーはグループに再加入する必要があります。その際、以前にコンシューム元にしていたパーティションとは異なるパーティションが割り当てられる可能性があります。
コンシューマーの活動性
Kafka は、障害が起こったコンシューマーを自動的に検出して、動作しているコンシューマーにパーティションを割り当て直します。 ポーリングとハートビートの 2 つのメカニズムを使用します。
Consumer.poll(...)
から返されるメッセージのバッチが大きい場合、または処理に時間がかかる場合、 poll()
が再び呼び出されるまでの遅延は、重大なものや予測不能なものになる可能性があります。 場合によっては、長い最大ポーリング間隔を構成して、メッセージ処理に時間がかかっているというだけの理由でコンシューマーがグループから削除されないようにする必要があります。 このメカニズムが使用可能な唯一のメカニズムである場合、失敗したコンシューマーの検出にかかる時間も長くなります。
コンシューマーの活動性をより簡単に処理できるようにするため、バックグラウンドでのハートビート処理が Kafka 0.10.1 で追加されました。 グループのコーディネーターは、グループ・メンバーがアクティブ状態のままであることを示すために定期的にハートビートを送信してくると予期しています。 バックグラウンド・ハートビート・スレッドはコンシューマーで実行され、通常のハートビートをコーディネーターに送信します。 コーディネーターが_セッション・タイムアウト_内のグループ・メンバーからハートビートを受信しない場合、コーディネーターはグループからメンバーを除去し、グループのリバランスを開始します。 セッション・タイムアウトは最大ポーリング間隔よりもはるかに短い可能性があるため、メッセージ処理に長い時間がかかる場合でも、失敗したコンシューマーの検出にかかる時間が短くなる可能性があります。
最大ポーリング間隔を構成するには、max.poll.interval.ms
プロパティーを使用し、session.timeout.ms
プロパティーを使用してセッション・タイムアウトを構成します。 メッセージのバッチの処理に 5 分以上かかる場合を除き、これらの設定を使用する必要はありません。
オフセットの管理
コンシューマー・グループごとに、Kafka は、コンシュームされるパーティションごとにコミットされたオフセットを維持します。 コンシューマーは、メッセージを処理してもパーティションからそのメッセージを削除しません。 代わりに、オフセットのコミットと呼ばれるプロセスを使用して、現在のオフセットを更新するだけです。
Event Streams は、コミット済みオフセットの情報を 7 日間保持します。
既存のコミット済みオフセットが存在しない場合はどうなりますか?
コンシューマーが開始され、コンシュームするパーティションが割り当てられると、コンシューマーはそのグループのコミット済みオフセットから開始します。 既存のコミット済みオフセットが存在しない場合、コンシューマーは、以下のように、 auto.offset.reset
プロパティーの設定に基づいて、使用可能な最も古いメッセージから開始するか、最も新しいメッセージから開始するかを選択できます。
latest
(デフォルト) : コンシューマーは、サブスクライブ後に到着したメッセージのみを受信してコンシュームします。 コンシューマーは、サブスクライブ前に送信されたメッセージを認識しないため、すべてのメッセージがトピックからコンシュームされるとは限りません。earliest
: コンシューマーは最初からすべてのメッセージをコンシュームします。
コンシューマーがメッセージの処理後、オフセットのコミット前に失敗した場合、コミットされたオフセット情報はメッセージの処理を反映しません。 これは、そのグループ内の次のコンシューマーによって、パーティションに割り当てられるメッセージが再び処理されることを意味します。
コミット済みオフセットが Kafka に保存されている場合、コンシューマーが再始動されると、コンシューマーは最後に停止したポイントから再開します。 コミットされたオフセットが存在する場合、 auto.offset.reset
プロパティーは使用されません。
オフセットの自動コミット
オフセットをコミットする最も簡単な方法は、Kafka コンシューマーに自動的にコミットさせることです。 これは簡単ですが、手動でコミットするよりも制御が少なくてすみます。 デフォルトでは、コンシューマーは 5 秒ごとに自動的にオフセットをコミットします。 このデフォルトのコミットは、コンシューマーによるメッセージ処理の進行状況とは無関係に、5 秒おきに発生します。 さらに、コンシューマーが poll()
を呼び出すと、前の呼び出しから poll()
に戻された最新のオフセットもコミットされます (前のメッセージがすべて処理されたと想定されるため)。
コミットされたオフセットがメッセージの処理を超過し、コンシューマーの障害が存在する場合、一部のメッセージが処理されない可能性があります。 これは、処理はコミット済みオフセットで再び開始するが、そのオフセットは障害の前に処理された最後のメッセージよりも後であることが原因です。 この理由のため、一般的には、単純さよりも信頼性に重きが置かれる場合はオフセットを手動でコミットするのが最適です。
オフセットの手動コミット
enable.auto.commit
が false
に設定されている場合、コンシューマーはオフセットを手動でコミットします。 これは同期的または非同期的に行うことができます。 最後に処理されたメッセージのオフセットを周期的タイマーに基づいてコミットするのが一般的なパターンです。 このパターンは、各メッセージが少なくとも一度は処理されるが、コミットされたオフセットが現在アクティブに進行しているメッセージの処理を追い越すことは決してないことを意味します。
周期的タイマーの頻度によって、コンシューマー障害の後に再処理される可能性のあるメッセージの数が制御されます。 アプリケーションが再始動された場合、またはグループのバランスが再調整された場合、最後に保存されたコミット済みオフセットからメッセージがもう一度取り出されます。
コミット済みオフセットは、処理が再開される開始点となるメッセージのオフセットです。 これは、通常、最後に処理されたメッセージのオフセットに* 1 を加算*したものです。
コンシューマー・ラグ
パーティションのコンシューマー・ラグは、最後にパブリッシュされたメッセージのオフセットと、コンシューマーのコミット済みオフセットとの差です。 つまり、作成されたレコードの数と消費されたレコードの数との差です。 プロデュース速度とコンシューム速度には自然な変動があるのが一般的ですが、長時間にわたってコンシューム速度がプロデュース速度よりも遅くてはなりません。
コンシューマーがメッセージを正常に処理しているにもかかわらず、メッセージのグループを飛び越えることがあると思われる場合は、コンシューマーが対応できないことを示している可能性があります。 ログ圧縮を使用していないトピックの場合、定期的に古いログ・セグメントを削除することによってログ容量が管理されます。 コンシューマーが、削除されたログ・セグメント内のメッセージをコンシュームしているほどに遅れている場合、次のログ・セグメントの先頭に突然ジャンプします。 コンシューマーがすべてのメッセージを処理することが重要である場合、このコンシューマーの観点からすると、この動作はメッセージを失うことを意味します。
kafka-consumer-groups
ツールを使用して、コンシューマー・ラグを確認できます。 同じ目的のためにコンシューマー API およびコンシューマー・メトリックを使用することもできます。
メッセージをコンシュームする速度の制御
メッセージあふれによるメッセージ処理に問題がある場合は、コンシューマー・オプションを設定して、メッセージの消費速度を制御することができます。 fetch.max.bytes
および max.poll.records
を使用して、1 回の poll()
呼び出しが返すことのできるデータ量を制御します。
コンシューマーのバランス再調整の処理
コンシューマーがグループに追加されるか、グループから削除されると、グループのリバランスが行われ、コンシューマーはメッセージをコンシュームできません。 これにより、コンシューマー・グループ内のすべてのコンシューマーが短期間使用不可になります。
「on partitions revoked」コールバックが通知された場合は、 ConsumerRebalanceリスナーを使用してオフセットを手動でコミットし (自動コミットを使用していない場合)、「on partition assigned」コールバックを使用して正常なリバランスが通知されるまで、以降の処理を一時停止します。
コード・スニペット
これらのコード・スニペットは、関連する概念を説明するための概要レベルです。 完全な例については、GitHub の Event Streams サンプルを参照してください。
コンシューマーを Event Streamsに接続するには、サービス資格情報を作成する必要があります。 これらの資格情報の取得方法については、 Event Streams を参照してください。
コンシューマー・コードでは、まず構成プロパティーのセットをビルドする必要があります。 Event Streams へのすべての接続は、TLS およびユーザー・パスワード認証を使用して保護されているため、少なくともこれらのプロパティーが必要です。 BOOTSTRAP_ENDPOINTS、USER、および PASSWORD を独自のサービス資格情報のものに置き換えます。
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("ssl.protocol", "TLSv1.2");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.endpoint.identification.algorithm", "HTTPS");
メッセージをコンシュームするには、以下の例のように、キーと値にデシリアライザーを指定する必要もあります。
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
これらのデシリアライザーは、プロデューサーによって使用されるシリアライザーと一致する必要があります。
次に、KafkaConsumer を使用してメッセージをコンシュームします。ここでは、各メッセージが 1 つの ConsumerRecord で表されます。 メッセージをコンシュームする最も一般的な方法は、グループ ID を設定することによってコンシューマーをコンシューマー・グループに入れ、次に、トピックのリストに対して subscribe()
を呼び出すことです。 コンシューマーにはコンシュームするいくつかのパーティションが割り当てられますが、トピック内のパーティションよりも多くのコンシューマーがグループ内に存在する場合は、コンシューマーにパーティションが割り当てられない可能性があります。
次に、ループ内で poll()
を呼び出して、処理するメッセージのバッチを受け取ります。それらのメッセージはそれぞれ 1 つの ConsumerRecord で表されます。
props.put("group.id", "G1");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1")); // T1 is the topic name
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
このコンシューマー・ループは永久に実行されますが、 Consumer.wakeup()
を呼び出して整理シャットダウンを行うことにより、別のスレッドから中断される可能性があります。
オフセットを手動でコミットするには、最初に enable.auto.commit
構成を false
に設定する必要があります。 次に、Consumer.commmitSync()
または Consumer.commitAsync()
のいずれかを使用して、コンシューマーのコミット済みオフセットを定期的に更新します。 単純にするため、この例では、各パーティションのレコードを処理し、最後のオフセットを別個にコミットしています。
props.put("group.id", "G1");
props.put("enable.auto.commit", "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> partRecords = records.records(tp);
long lastOffset = 0;
for (ConsumerRecord<String, String> record : partRecords) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
lastOffset = record.offset();
}
// having processed all the records in the above loop, we commit the partition's offset to 1 more than the last offset
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastOffset + 1)));
}
}
}
finally {
consumer.close();
}
例外処理
Kafka クライアントを使用する堅固なアプリケーションは、特定の予期される状態の例外を処理する必要があります。 一部のメソッドは非同期であり、
Future
またはコールバックを使用して結果を送信するため、例外が直接スローされない場合があります。 完全な例を示すコード例を GitHub で確認してください。
コード内の以下の例外リストを処理します。
org.apache.kafka.common.errors.WakeupException
Consumer.poll(...)
によって Consumer.wakeup()
の呼び出しの結果としてスローされます。 これは、コンシューマーのポーリング・ループを中断するための標準的な方法です。 ポーリング・ループが終了し、 Consumer.close()
が呼び出されて完全に切断されます。
org.apache.kafka.common.errors.NotLeaderForPartitionException
パーティションのリーダーが変わった場合に Producer.send(...)
の結果としてスローされます。 クライアントは自動的にメタデータをリフレッシュして、最新のリーダー情報を検出します。 更新されたメタデータで成功した操作を再試行してください。
org.apache.kafka.common.errors.CommitFailedException
リカバリー不能エラーが発生した場合に Consumer.commitSync(...)
の結果としてスローされます。 場合によっては、パーティション割り当てが変更され、コンシューマーがそのオフセットをコミットできなくなったために、操作を繰り返すことができないことがあります。 Consumer.commitSync(...)
は、1 回の呼び出しで複数のパーティションと共に使用されると、部分的に成功することがあるため、パーティションごとに別個の
Consumer.commitSync(...)
呼び出しを使用することによってエラー・リカバリーを単純化できます。
org.apache.kafka.common.errors.TimeoutException
メタデータを取得できない場合に Producer.send(...), Consumer.listTopics()
によってスローされます。 この例外は、要求された確認応答が request.timeout.ms
内に返されない場合、送信コールバック (または返される Future) でも発生します。 クライアントは操作を再試行できますが、反復された操作の効果は具体的な操作によって異なります。 例えば、メッセージの送信が再試行された場合、メッセージが重複している可能性があります。