取用訊息
消費者是一個取用 Kafka 主題之訊息串流的應用程式。 消費者可以訂閱一個以上的主題或分割區。 此資訊重點介紹屬於Apache Kafka專案一部分的Java™程式介面。 這些概念也套用於其他語言,但名稱有時會稍有不同。
當消費者連接至 Kafka 時,即會起始引導連線。 此連線可連接至叢集裡的任何伺服器。 消費者會要求與其想要取用的主題有關的分割區及領導權資訊。 然後,消費者與分區領導者建立另一個連結並可以消費訊息。 當您的消費者連接至 Kafka 叢集時,即會在內部自動發生這些動作。
消費者通常是一個長時間執行的應用程式。 消費者藉由定期呼叫 Consumer.poll(...) 來要求 Kafka 的訊息。 消費者會呼叫 poll(),接收訊息批次,及時進行處理,然後再重新呼叫 poll()。
當消費者處理訊息時,訊息並不會從其主題中移除。 相反,消費者可以選擇多種方式來通知Kafka處理了哪些訊息。 此處理程序稱為確定偏移。
在程式設計介面中,訊息稱為記錄。 例如,Java 類別 org.apache.kafka.clients.consumer.ConsumerRecord 用來表示消費者 API 的訊息。 術語_記錄_和_訊息_可以互換使用,但本質上記錄用於表示訊息。
在 Event Streams中 產生訊息 時,您可能會發現閱讀此資訊很有用。
配置消費者內容
消費者有許多配置設定,可控制其行為的各個層面。 下列設定是一些最重要的設定。
| 名稱 | 說明 | 有效值 | 預設值 |
|---|---|---|---|
| key.deserializer | 此類別用來解除索引鍵的序列化。 | 實作 Deserializer 介面的Java類,例如org.apache.kafka.common.serialization.StringDeserializer。 | 無預設值 - 您必須指定一個值 |
| value.deserializer | 此類別用來解除值的序列化。 | 實作 Deserializer 介面的Java類,例如org.apache.kafka.common.serialization.StringDeserializer。 | 無預設值 - 您必須指定一個值。 |
| group.id | 消費者所屬的消費者群組的 ID。 | 字串 | 無預設值 |
| auto.offset.reset | 消費者沒有起始偏移、或現行偏移不再適用於叢集時的行為。 | 最新、最舊、無 | 最新 |
| enable.auto.commit | 判斷是否在背景中自動確定消費者的偏移。 | true、false | 是 |
| auto.commit.interval.ms | 偏移定期確定之間的毫秒數。 | 0,... | 5000(5 秒) |
| max.poll.records | 呼叫 poll() 時傳回的最大記錄數。 | 1,... | 500 |
| session.timeout.ms | 必須接收消費者活動訊號以維持消費者群組的消費者成員資格的毫秒數。 | 6000-300000 | 10000(10 秒) |
| max.poll.interval.ms | 消費者離開群組之前,輪詢之間的最大時間間隔。 | 1,... | 300000(5 分鐘) |
還有許多其他配置設定可用,但在開始實驗之前,請先閱讀 Apache Kafka 說明文件。
消耗者群組
_消費者組_是一組合作消費來自一個或多個主題的訊息的消費者。 群組中的消費者都使用相同的 group.id 配置值。 如果您需要有多個消費者來處理工作負載,則可以在同一個消費者群組中執行多個消費者。 即使您只需要一個使用者,通常也會為 group.id 指定一個值。
每個消費者群組在叢集中都有一個伺服器,稱為_協調器,_負責為群組中的消費者分配分區。 此責任分佈在叢集裡的伺服器上,甚至是負載。 每次群組重新平衡時,都可以變更消費者的分割區指派。
當消費者結合消費者群組時,它會探索群組的協調者。 然後,消費者告訴協調器它想要加入該組,協調器開始重新平衡該組中的分區以包含新成員。
單一分割區中的訊息只由每一個群組中的一個消費者處理。 這種機制保證了每個分區上的消息都按順序處理。 請參閱下圖,以取得一個範例,其中主題包含三個分割區,而耗用該主題的消費者群組包含兩個消費者。 群組中的一個消費者獲指派兩個分割區,而另一個消費者則獲指派一個分割區。
當消費者群組中發生以下變更之一時,該群組會透過將分割區分配轉移給群組成員來重新平衡以適應變更:
- 消費者加入該組。
- 消費者離開該組。
- 協調者認為消費者不再存在。
- 新分區將會新增到現有主題中。
如果您有一個重新平衡的消費者群組,請注意,任何離開該群組的消費者的提交都會被拒絕,直到它重新加入該群組。 在此情況下,消費者需要重新加入群組,在群組中它可能獲指派一個不同於先前所取用的分割區。
消費者活動
Kafka 會自動偵測失敗的消費者,以將分割區重新指派給工作中消費者。 它使用兩種機制: 輪詢和活動訊號。
如果從 Consumer.poll(...) 返回的訊息批次很大或處理很耗時,則再次呼叫 poll() 之前的延遲可能會很大或不可預測。 在某些情況下,需要配置較長的最大輪詢間隔,這樣消費者就不會因為訊息處理需要較長的時間而被從其群組移除。 如果此機制是唯一可用的機制,則偵測失敗消費者所花費的時間也會很長。
為了讓消費者活動更容易處理,Kafka 0.10.1 新增了背景活動訊號。 群組協調者期望群組成員會定期傳送活動訊號,以表示它們仍處於活動狀態。 後台心跳線程在消費者中運行,並向協調器發送定期心跳。 如果協調器在_會話逾時_內沒有收到來自組成員的心跳,則協調器將從群組中刪除該成員並開始群組的重新平衡。 會話逾時可以比最大輪詢間隔短得多,這樣即使訊息處理需要很長時間,檢測失敗的消費者所需的時間也可以很短。
您可以使用 max.poll.interval.ms 屬性配置最大輪詢間隔,並使用 session.timeout.ms 屬性配置會話逾時。 除非處理一批訊息需要超過 5 分鐘,否則不需要使用這些設定。
管理偏移
對於每個消費者群組,Kafka維護每個消費分區的提交偏移量。 當消費者處理訊息時,並不會將訊息從分割區移除。 相反,它只是透過使用稱為提交偏移量的過程來更新其當前偏移量。
Event Streams 會將已確定的偏移資訊保留 7 天。
如果不存在現有的已確定偏移,怎麼辦?
當消費者啟動並被分配一個要消費的分區時,它從其組的提交偏移量開始。 如果不存在現有的已提交偏移量,消費者可以根據 auto.offset.reset 屬性的設定選擇是從最早的還是最新的可用訊息開始,如下所示:
latest(預設值): 您的消費者只會接收及耗用在您訂閱之後送達的訊息。 您的消費者不知道在訂閱之前發送的訊息,因此不要期望所有訊息都是從某個主題消費的。earliest: 您的消費者從頭開始會耗用所有訊息。
如果消費者在處理訊息之後但在提交其偏移量之前失敗,則提交的偏移量資訊不會反映訊息的處理情況。 這意味著該訊息將由該群組中的下一個使用者再次處理,以分配該分區。
當已確定偏移儲存在 Kafka 中且消費者已重新啟動時,消費者將從它們前次停止的點繼續。 當存在提交的偏移量時,不使用 auto.offset.reset 屬性。
自動確定偏移
提交偏移量最簡單的方法是讓Kafka消費者自動執行。 它很簡單,但它確實比手動提交提供更少的控制。 依預設,消費者每 5 秒會自動確定一次偏移。 這項預設確定每 5 秒鐘進行一次,不論消費者處理訊息的進度如何。 此外,當消費者呼叫 poll() 時,這也會導致上次呼叫 poll() 返回的最新偏移量被提交(因為假設先前的訊息已全部處理)。
如果提交的偏移量超過了訊息的處理並且存在使用者故障,則某些訊息可能未處理。 這是因為處理在已確定偏移處重新啟動,該偏移比失敗之前要處理的最後一則訊息晚。 因此,如果可靠性比簡單更重要,通常最好手動確定偏移。
手動確定偏移
如果 enable.auto.commit 設為 false,則消費者會手動確定其偏移。 它可以採取同步或非同步方式執行。 常見的模式是根據定期計時器確定最新處理訊息的偏移。 這個模式表示每個訊息至少被處理一次,但已確定的偏移永遠不會超過積極處理中的訊息進度。 定期計時器的頻率控制了消費者失敗後可以重新處理的訊息數。 當應用程式重新啟動或群組重新平衡時,會從上次儲存的已確定偏移中再次擷取訊息。
已確定偏移是開始繼續處理的訊息的偏移。 這通常是最近已處理的訊息加上一 的偏移。
消費者遲滯
分割區的消費遲滯是最近發佈訊息的偏移與消費者已確定偏移之間的差額。 換句話說,它是已產生的記錄數與已耗用的記錄數之間的差異。 雖然生產率與消費率通常會有自然的變異,但消費率不應該長期低於生產率。
如果您觀察到消費者正在成功處理訊息,但偶爾會跳過一組訊息,則可能表示消費者無法跟上。 對於未使用日誌壓縮的主題,可藉由定期刪除舊的日誌區段來管理日誌空間量。 如果消費者落後太多以至於它正在消費已刪除的日誌段中的消息,那麼它會突然向前跳到下一個日誌段的開頭。 如果讓消費者處理所有訊息是很重要的,則此行為會從該消費者的觀點指出訊息流失。
您可以使用 kafka-consumer-groups 工具來查看消費者滯後。 您也可以使用消費者 API 及消費度量值來達到相同的目的。
控制訊息取用速度
如果您遇到因訊息氾濫而導致的訊息處理問題,您可以設定消費者選項來控制訊息消費的速度。 使用 fetch.max.bytes 及 max.poll.records 來控制呼叫 poll() 可傳回的資料量。
處理消費者重新平衡
當消費者加入群組或從群組中刪除時,會發生群組重新平衡,且消費者無法消費訊息。 這會導致消費者組中的所有消費者在短時間內無法使用。
如果您收到「分區撤銷」回呼通知,請使用ConsumerRebalanceListener手動提交偏移量(如果您不使用自動提交)並暫停進一步處理,直到使用「分區分配」回呼通知您成功重新平衡。
程式碼 Snippet
這些程式碼片段在較高層次上說明了所涉及的概念。 如需完整範例,請參閱 GitHub中的 Event Streams 範例。
若要將消費者連接至 Event Streams,您需要建立服務認證。 如需如何取得這些認證的相關資訊,請參閱 連接至 Event Streams。
在消費者程式碼中,您首先需要建置一組配置內容。 與Event Streams的所有連線均透過使用 TLS 和使用者密碼驗證進行保護,因此您至少需要這些屬性。 將 BOOTSTRAP_ENDPOINTS、USER 和 PASSWORD 替換為您自己的服務憑證中的內容:
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("ssl.protocol", "TLSv1.2");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.endpoint.identification.algorithm", "HTTPS");
若要耗用訊息,您還需要指定索引鍵及值的解除序列化程式,如下列範例所示。
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
這些解除序列化程式必須符合生產者使用的序列化程式。
然後,使用 KafkaConsumer 來取用訊息,每一則訊息都由一個 ConsumerRecord 表示。 取用訊息最常見的方式是藉由設定群組 ID 來將消費者放入消費者群組中,然後呼叫 subscribe() 以取得主題清單。 消費者被分配了一些要消費的分區,但如果組中存在的消費者多於主題中的分區,則可能不會為消費者分配任何分區。 接下來,請在迴圈中呼叫 poll(),接收要處理的訊息批次,每一則訊息都由一個 ConsumerRecord
表示。
props.put("group.id", "G1");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1")); // T1 is the topic name
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
這個消費者循環永遠運行,但可以透過呼叫 Consumer.wakeup() 來從另一個線程中斷它,以實現整齊的關閉。
若要手動確定偏移,必須先將 enable.auto.commit 配置設為 false。 然後,使用 Consumer.commmitSync() 或 Consumer.commitAsync(),定期更新消費者的已確定偏移。 為了簡單起見,此範例處理每一個分割區的記錄並分別確定最後的偏移。
props.put("group.id", "G1");
props.put("enable.auto.commit", "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> partRecords = records.records(tp);
long lastOffset = 0;
for (ConsumerRecord<String, String> record : partRecords) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
lastOffset = record.offset();
}
// having processed all the records in the above loop, we commit the partition's offset to 1 more than the last offset
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastOffset + 1)));
}
}
}
finally {
consumer.close();
}
異常狀況處理
任何使用 Kafka 用戶端的健全應用程式都需要處理某些預期狀況下的異常狀況。 在某些情況下,不會直接引發異常,因為某些方法是非同步的,並透過使用 Future 或回調來傳遞其結果。 請參閱 GitHub 中顯示完整範例的範例程式碼。
處理程式碼中的下列異常狀況清單:
org.apache.kafka.common.errors.WakeupException
由 Consumer.poll(...) 擲出,為呼叫 Consumer.wakeup() 的結果。 這是中斷消費者輪詢循環的標準方法。 輪詢迴圈會結束,並呼叫 Consumer.close() 以完全中斷連線。
org.apache.kafka.common.errors.NotLeaderForPartitionException
分割區領導權變更時,因為 Producer.send(...) 而擲出。 用戶端會自動重新整理其 meta 資料,以尋找最新的領導者資訊。 請使用更新的 meta 資料重試成功的作業。
org.apache.kafka.common.errors.CommitFailedException
發生無法復原的錯誤時,因為 Consumer.commitSync(...) 而擲出。 在某些情況下,不可能重複該操作,因為分區分配已更改且使用者不再能夠提交其偏移量。 因為在單一呼叫中與多個分割區搭配使用時 Consumer.commitSync(...) 可能會局部成功,所以可針對每一個分割區使用個別的 Consumer.commitSync(...) 呼叫來簡化錯誤回復。
org.apache.kafka.common.errors.TimeoutException
如果 meta 資料無法擷取,由 Producer.send(...), Consumer.listTopics() 擲出。 當所要求的確認通知未在 request.timeout.ms 內送回,則傳送的回呼(或傳回的 Future)中也會出現異常狀況。 用戶端可以重試作業,但重複作業的效果取決於具體的作業。 例如,如果重試發送訊息,則該訊息可能是重複的。