IBM Cloud Docs
Consumo di messaggi

Consumo di messaggi

Un consumatore è un'applicazione che consuma flussi di messaggi da argomenti Kafka. Un consumatore può sottoscrivere uno o più argomenti o partizioni. Queste informazioni si concentrano sull'interfaccia di programmazione Java™ che fa parte del progetto Apache Kafka. I concetti si applicano anche ad altri linguaggi ma i nomi sono a volte leggermente differenti.

Quando stabilisce una connessione a Kafka, un consumatore esegue una connessione bootstrap iniziale. Tale connessione può essere stabilita con qualsiasi server nel cluster. Il consumatore richiede le informazioni della partizione e della leadership sull'argomento da cui desidera consumare. Quindi, il consumatore stabilisce un'altra connessione con il leader della partizione e può consumare i messaggi. Tali azioni si verificano automaticamente internamente quando il tuo consumatore stabilisce una connessione al cluster Kafka.

Un consumatore è di norma un'applicazione a lunga esecuzione. Un consumatore richiede messaggi da Kafka chiamando regolarmente Consumer.poll(...). Il consumatore richiama poll(), riceve un batch di messaggi, li elabora immediatamente e richiama quindi nuovamente poll().

Quando un consumatore elabora un messaggio, quest'ultimo non viene rimosso dal suo argomento. I consumatori possono invece scegliere tra diversi modi per notificare a Kafka quali messaggi sono stati elaborati. Questo processo è noto come esecuzione del commit dell'offset.

Nelle interfacce di programmazione, un messaggio è chiamato record. Ad esempio, la classe Java org.apache.kafka.clients.consumer.ConsumerRecord viene utilizzata per rappresentare un messaggio per l'API consumatore. I termini record e messaggio possono essere usati in modo intercambiabile, ma essenzialmente un record viene usato per rappresentare un messaggio.

Potrebbe essere utile leggere queste informazioni insieme alla produzione di messaggi in Event Streams.

Configurazione delle proprietà del consumatore

Esistono molte impostazioni di configurazione per il consumer che controllano gli aspetti del suo funzionamento. Le seguenti impostazioni sono alcune delle più importanti.

Configurazione delle proprietà del consumatore
Nome Descrizione Valori validi Valore predefinito
key.deserializer La classe utilizzata per deserializzare le chiavi. Classe Java che implementa l'interfaccia Deserializer, come org.apache.kafka.common.serialization.StringDeserializer. Nessun valore predefinito - devi specificare un valore
value.deserializer La classe utilizzata per deserializzare i valori. Classe Java che implementa l'interfaccia Deserializer, come org.apache.kafka.common.serialization.StringDeserializer. Nessun valore predefinito: è necessario specificare un valore.
group.id Un identificativo per il gruppo di consumatori a cui appartiene il consumatore. Stringa Nessun valore predefinito
auto.offset.reset La modalità di funzionamento quando il consumatore non ha alcun offset iniziale oppure l'offset corrente non è più disponibile nel cluster. latest, earliest, none Più recente
enable.auto.commit Determina se eseguire il commit dell'offset del consumatore automaticamente in background. true, false Vero
auto.commit.interval.ms Il numero di millisecondi tra i commit periodici degli offset. 0,... 5000 (5 secondi)
max.poll.records Il numero massimo di record restituiti in una chiamata a poll(). 1,... 500
session.timeout.ms Il numero di millisecondi entro cui deve essere ricevuto un heartbeat del consumatore per mantenere l'appartenenza del consumatore a un gruppo di consumatori. 6000-300000 10000 (10 secondi)
max.poll.interval.ms L'intervallo di tempo massimo tra i polling prima che il consumatore lasci il gruppo. 1,... 300000 (5 minuti)

Sono disponibili molte altre impostazioni di configurazione, ma leggi la documentazione diApache Kafka prima di iniziare a provarle.

Gruppi di consumer

Un gruppo di consumatori è un gruppo di consumatori che collabora per consumare i messaggi di uno o più argomenti. I consumatori in un gruppo utilizzano tutti lo stesso valore per la configurazione group.id. Se ti serve più di un consumatore per gestire il tuo carico di lavoro, puoi eseguire più consumatori nello stesso gruppo di consumatori. Anche se si ha bisogno di un solo consumatore, è normale specificare anche un valore per group.id.

Ogni gruppo di consumatori ha un server nel cluster, chiamato coordinatore, responsabile dell'assegnazione delle partizioni ai consumatori del gruppo. Questa responsabilità è distribuita tra i server nel cluster per uniformare il carico. L?assegnazione di partizioni ai consumatori può cambiare a ogni riequilibratura dei gruppi.

Quando si unisce a un gruppo di consumatori, un consumatore scopre il coordinatore per il gruppo. Il consumatore comunica quindi al coordinatore di volersi unire al gruppo e il coordinatore avvia un ribilanciamento delle partizioni del gruppo per includere il nuovo membro.

I messaggi provenienti da una singola partizione vengono elaborati da un solo consumatore in ogni gruppo. Questo meccanismo garantisce che i messaggi di ogni partizione vengano elaborati in ordine. Consultare il diagramma riportato di seguito per un esempio in cui un argomento contiene tre partizioni e un gruppo di consumatori, che utilizza tale argomento, contiene due consumatori. A un consumer nel gruppo vengono assegnate due partizioni e all'altro consumer viene assegnata una partizione.

Diagramma dei gruppi di consumatori.
Consumer group example

Quando in un gruppo di consumatori si verifica una delle seguenti modifiche, il gruppo si riequilibra spostando l'assegnazione delle partizioni ai membri del gruppo per adattarsi alla modifica:

  • Un consumatore si unisce al gruppo.
  • Un consumatore lascia il gruppo.
  • Un consumatore è considerato non più in vita dal coordinatore.
  • Vengono aggiunte nuove partizioni a un tema esistente.

Se avete un gruppo di consumatori che viene riequilibrato, sappiate che qualsiasi consumatore che ha lasciato il gruppo ha i suoi commit che vengono rifiutati fino a quando non si ricongiunge al gruppo. in questo caso, il consumatore deve unirsi nuovamente al gruppo, dove gli potrebbe essere assegnata una partizione diversa da quella da cui stava consumando in precedenza.

Attività del consumatore

Kafka rileva automaticamente i consumatori in errore in modo da poter riassegnare le partizioni ai consumatori funzionanti. Utilizza due meccanismi: il polling e l'heartbeat.

Se il batch di messaggi restituito da Consumer.poll(...) è grande o l'elaborazione richiede tempo, il ritardo prima che poll() venga richiamato di nuovo può essere significativo o imprevedibile. In alcuni casi, è necessario configurare un lungo intervallo di polling massimo in modo che i consumatori non vengano rimossi dai loro gruppi solo perché l'elaborazione dei messaggi sta impiegando del tempo. Se questo meccanismo è l'unico disponibile, anche il tempo impiegato per rilevare un consumer non riuscito è lungo.

Per rendere lo stato di attività del consumatore facile da gestire, in Kafka 0.10.1 è stato aggiunto un heartbeat di background. Il coordinatore del gruppo prevede che i membri del gruppo gli inviino degli heartbeat regolari per indicare che rimangono attivi. Un thread di heartbeat in background viene eseguito nel consumatore e invia regolarmente heartbeat al coordinatore. Se il coordinatore non riceve un heartbeat da un membro del gruppo entro il timeout della sessione, il coordinatore rimuove il membro dal gruppo e avvia un ribilanciamento del gruppo. Il timeout di sessione può essere molto più breve dell'intervallo massimo di polling, in modo che il tempo necessario per rilevare un consumatore fallito possa essere breve, anche se l'elaborazione dei messaggi richiede molto tempo.

È possibile configurare l'intervallo massimo di polling mediante la proprietà max.poll.interval.ms e il timeout della sessione mediante la proprietà session.timeout.ms. Non è necessario utilizzare queste impostazioni a meno che non ci vogliano più di 5 minuti per elaborare un gruppo di messaggi.

Gestione degli offset

Per ogni gruppo di consumatori, Kafka mantiene l'offset impegnato per ogni partizione consumata. Quando un consumatore elabora un messaggio, non lo rimuove dalla partizione. Invece, si limita ad aggiornare l'offset corrente utilizzando un processo che viene chiamato "committing" dell'offset.

Event Streams conserva le informazioni sugli offset di cui è stato eseguito il commit per 7 giorni.

Cosa succede se non esiste alcun offset di cui è stato eseguito il commit?

Quando un consumatore si avvia e gli viene assegnata una partizione da consumare, parte dall'offset impegnato del suo gruppo. Se non esiste un offset impegnato, il consumatore può scegliere se iniziare con il primo o l'ultimo messaggio disponibile in base all'impostazione della proprietà * come segue dell'impostazione della proprietà auto.offset.reset come segue:

  • latest (impostazione predefinita): il consumer riceve e utilizza solo i messaggi che arrivano dopo la sottoscrizione. Il consumatore non è a conoscenza dei messaggi inviati prima della sottoscrizione, quindi non ci si deve aspettare che tutti i messaggi siano consumati da un argomento.
  • earliest: il cliente consuma tutti i messaggi dall'inizio.

Se un consumatore fallisce dopo aver elaborato un messaggio ma prima di impegnare il suo offset, le informazioni sull'offset impegnato non riflettono l'elaborazione del messaggio. Ciò significa che il messaggio viene elaborato nuovamente dal successivo consumatore del gruppo a cui viene assegnata la partizione.

Quando gli offset di cui è stato eseguito il commit vengono salvati in Kafka e i consumatori vengono riavviati, i consumatori riprendono dal punto del loro ultimo arresto. Quando esiste un offset impegnato, la proprietà auto.offset.reset non viene utilizzata.

Esecuzione automatica del commit degli offset

Il modo più semplice per impegnare gli offset è che il consumatore di Kafka lo faccia automaticamente. È semplice, ma offre un controllo minore rispetto all'impegno manuale. Per impostazione predefinita, un consumatore esegue automaticamente il commit degli offset ogni 5 secondi. Questo commit predefinito avviene ogni 5 secondi a prescindere dai progressi che il consumatore sta facendo relativamente all'elaborazione dei messaggi. Inoltre, quando il consumatore chiama poll(), questo causa anche l'impegno dell'ultimo offset restituito dalla precedente chiamata a poll() (perché si presume che i messaggi precedenti siano stati tutti elaborati).

Se l'offset impegnato supera l'elaborazione dei messaggi e c'è un errore del consumatore, è possibile che alcuni messaggi non vengano elaborati. Ciò è dovuto al fatto che l'elaborazione ricomincia dall'offset di cui era stato eseguito il commit, che è posteriore all'ultimo messaggio da elaborare prima dell'errore. Per questo motivo, se l'affidabilità è più importante della semplicità, è di norma meglio eseguire il commit degli offset manualmente.

Esecuzione manuale del commit degli offset

Se enable.auto.commit è impostata su false, il consumatore esegue il commit dei suoi offset manualmente. Può eseguire tale operazione in modo sincrono o asincrono. Uno schema comune consiste nell'eseguire il commit dell'offset del messaggio elaborato più recente sulla base di un timer periodico. Questo modello significa che ogni messaggio viene elaborato almeno una volta ma l'offset di cui viene eseguito il commit non subentra mai all'avanzamento dei messaggi che stanno venendo elaborati in modo attivo. La frequenza del timer periodico controlla il numero di messaggi che possono essere rielaborati dopo un errore del consumatore. I messaggi vengono recuperati nuovamente dall'offset di cui è stato eseguito il commit salvato per ultimo quando l'applicazione viene riavviata o quando viene eseguito il ribilanciamento del gruppo.

L'offset di cui è stato eseguito il commit è l'offset dei messaggi da cui viene ripresa l'elaborazione. Si tratta di norma dell'offset del messaggio elaborato più di recente più uno.

Ritardo del consumatore

Il ritardo del consumatore per una partizione è la differenza tra l'offset del messaggio pubblicato più di recente e l'offset di cui è stato eseguito il commit del consumatore. In altre parole, è la differenza tra il numero di record che sono stati prodotti e il numero che sono stati consumati. Anche se è comune avere delle variazioni naturali nei tassi di produzione e consumo, il tasso di consumo non deve essere più lento del tasso di produzione per un lungo periodo.

Se si osserva che un consumatore elabora i messaggi con successo, ma occasionalmente sembra saltare un gruppo di messaggi, può essere un segno che il consumatore non è in grado di tenere il passo. Per gli argomenti che non stanno utilizzando la compressione del log, la quantità di spazio del log viene gestita eliminando periodicamente i vecchi segmenti del log. Se un consumatore è rimasto così indietro da consumare messaggi in un segmento di log che viene cancellato, salterà improvvisamente in avanti all'inizio del segmento di log successivo. Se è importante che il consumatore elabori tutti i messaggi, questa modalità di funzionamento indica una perdita di messaggi dal punto di vista di questo consumatore.

È possibile utilizzare lo strumento kafka-consumer-groups per vedere il ritardo dei consumatori. Per lo stesso scopo puoi anche utilizzare l'API consumatore e le metriche del consumatore.

Controllo della velocità di consumo dei messaggi

Se si verificano problemi di gestione dei messaggi causati dall'ingolfamento degli stessi, è possibile impostare un'opzione consumer per controllare la velocità di consumo dei messaggi. Utilizza fetch.max.bytes e max.poll.records per controllare quanti dati può restituire una chiamata a poll().

Gestione del ribilanciamento dei consumatori

Quando i consumatori vengono aggiunti o rimossi da un gruppo, si verifica un riequilibrio del gruppo e i consumatori non sono in grado di consumare i messaggi. In questo modo tutti i consumatori di un gruppo di consumatori non sono disponibili per un breve periodo.

Se si riceve la notifica con il callback "on partitions revoked", utilizzare un ConsumerRebalanceListener per impegnare manualmente gli offset (se non si utilizza l'autocommit) e per sospendere l'ulteriore elaborazione fino alla notifica dell'avvenuto ribilanciamento tramite il callback "on partition assigned".

Frammenti di codice

Questi frammenti di codice sono di alto livello per illustrare i concetti coinvolti. Per esempi completi, vedi gli esempi di Event Streams in GitHub.

Per connettere un consumer a Event Streams, devi creare le credenziali del servizio. Per informazioni su come ottenere queste credenziali, vedi Connessione a Event Streams.

Nel codice consumer, è necessario prima creare la serie di proprietà di configurazione. Tutte le connessioni a Event Streams sono protette dall'uso di TLS e dall'autenticazione utente-password, per cui sono necessarie almeno queste proprietà. Sostituire BOOTSTRAP_ENDPOINTS, USER e PASSWORD con le credenziali del proprio servizio:

Properties props = new Properties();
 props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
 props.put("security.protocol", "SASL_SSL");
 props.put("sasl.mechanism", "PLAIN");
 props.put("ssl.protocol", "TLSv1.2");
 props.put("ssl.enabled.protocols", "TLSv1.2");
 props.put("ssl.endpoint.identification.algorithm", "HTTPS");

Per utilizzare i messaggi, è anche necessario specificare deserializzatori per le chiavi e i valori, come nel seguente esempio.

 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Questi deserializer devono corrispondere ai serializer utilizzati dai producer.

Usa quindi un KafkaConsumer per consumare i messaggi, dove ogni messaggio è rappresentato da un ConsumerRecord. Il modo più comune per consumare i messaggi consiste nell'inserire il consumatore in un gruppo di consumatori impostando l'ID gruppo e richiamare quindi subscribe() per un elenco di argomenti. Al consumatore vengono assegnate alcune partizioni da consumare, anche se se nel gruppo esistono più consumatori che partizioni nell'argomento, al consumatore potrebbe non essere assegnata alcuna partizione. Richiama quindi poll() in un loop, ricevendo un batch di messaggi da elaborare, dove ogni messaggio è rappresentato da un ConsumerRecord.

props.put("group.id", "G1");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));  // T1 is the topic name
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
   for (ConsumerRecord<String, String> record : records)
     System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

Questo ciclo consumer viene eseguito per sempre, ma può essere interrotto da un altro thread chiamando Consumer.wakeup() per ottenere una chiusura ordinata.

Per eseguire il commit degli offset manualmente, è prima necessario impostare la configurazione enable.auto.commit su false. Usa quindi Consumer.commmitSync() o Consumer.commitAsync() per aggiornare periodicamente l'offset di cui è stato eseguito il commit del consumatore. Per semplicità, questo esempio elabora i record per ciascuna partizione ed esegue il commit dell'ultimo offset separatamente.

props.put("group.id", "G1");
props.put("enable.auto.commit", "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));
try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (TopicPartition tp : records.partitions()) {
      List<ConsumerRecord<String, String>> partRecords = records.records(tp);
      long lastOffset = 0;
      for (ConsumerRecord<String, String> record : partRecords) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        lastOffset = record.offset();
      }
      // having processed all the records in the above loop, we commit the partition's offset to 1 more than the last offset
      consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastOffset + 1)));
    }
  }
}
finally {
  consumer.close();
}

Gestione delle eccezioni

Qualsiasi solida applicazione che utilizza il client Kafka deve gestire le eccezioni per delle specifiche situazioni previste. In alcuni casi, le eccezioni non vengono lanciate direttamente, perché alcuni metodi sono asincroni e forniscono i loro risultati utilizzando un Future o un callback. Controlla il codice di esempio in GitHub che mostra gli esempi completi.

Gestire il seguente elenco di eccezioni nel codice:

org.apache.kafka.common.errors.WakeupException

Generato da Consumer.poll(...) come risultato della chiamata di Consumer.wakeup(). È il modo standard per interrompere il ciclo di polling del consumatore. Il loop di polling termina e Consumer.close() viene richiamato per disconnettersi correttamente.

org.apache.kafka.common.errors.NotLeaderForPartitionException

Generata come risultato di Producer.send(...) quando la leadership per una partizione cambia. Il client aggiorna automaticamente i suoi metadati per trovare le informazioni aggiornate sul leader. Ritentare l'operazione che ha esito positivo con i metadati aggiornati.

org.apache.kafka.common.errors.CommitFailedException

Generato come risultato di Consumer.commitSync(...) quando si verifica un errore irreversibile. In alcuni casi, non è possibile ripetere l'operazione perché l'assegnazione della partizione è cambiata e il consumatore non è più in grado di impegnare i suoi offset. Poiché Consumer.commitSync(...) può avere un successo parziale quando viene utilizzato con più partizioni in una singola chiamata, il recupero degli errori può essere semplificato utilizzando una chiamata Consumer.commitSync(...) separata per ogni partizione.

org.apache.kafka.common.errors.TimeoutException

Generata da Producer.send(...), Consumer.listTopics() se non è possibile richiamare i metadati. L'eccezione viene osservata anche nella callback send (o nella Future restituita) quando il riconoscimento richiesto non viene restituito entro request.timeout.ms. Il client può ritentare l'operazione ma l'effetto di un'operazione ripetuta dipende dalla specifica operazione. Ad esempio, se l'invio di un messaggio viene ripetuto, è possibile che il messaggio venga duplicato.