Produzione di messaggi
Un produttore è un'applicazione che pubblica flussi di messaggi ad argomenti Kafka. Queste informazioni si concentrano sull'interfaccia di programmazione Java, che fa parte del progetto Apache Kafka. I concetti si applicano anche ad altri linguaggi ma i nomi sono a volte leggermente differenti.
Nelle interfacce di programmazione, un messaggio è chiamato record. Ad esempio, la classe Java org.apache.kafka.clients.producer.ProducerRecord viene utilizzata per rappresentare un messaggio dal punto di vista dell'API del produttore. I termini record e messaggio possono essere usati in modo intercambiabile, ma essenzialmente un record viene usato per rappresentare un messaggio.
Quando stabilisce una connessione a Kafka, un produttore esegue una connessione bootstrap iniziale. Tale connessione può essere stabilita con qualsiasi server nel cluster. Il produttore richiede le informazioni della partizione e della leadership sull'argomento in cui desidera pubblicare. Quindi, il produttore stabilisce un'altra connessione con il leader della partizione e può pubblicare i messaggi. Tali azioni si verificano automaticamente internamente quando il tuo produttore stabilisce una connessione al cluster Kafka.
Per garantire la disponibilità, i broker Kafka replicano i messaggi, in modo che se un broker non è disponibile, gli altri possono ancora ricevere i messaggi dai produttori e inviarli ai consumatori. Event Streams usa un fattore di replica di 3, il che significa che ogni messaggio è memorizzato su tre broker. Quando viene inviato al leader della partizione, tale messaggio non è immediatamente disponibile per i consumatori. Il leader accoda il record per il messaggio alla partizione, assegnando ad esso il numero di offset successivo per tale partizione. Dopo che tutti i follower delle repliche in-sync hanno replicato il record e hanno riconosciuto di aver scritto il record sulle loro repliche, il record è ora impegnato e diventa disponibile per i consumatori.
Ogni messaggio è rappresentato come un record che comprende due parti: chiave e valore. La chiave viene di solito usata per i dati relativi al messaggio e il valore è il corpo del messaggio. Poiché molti strumenti dell'ecosistema Kafka (come i connettori ad altri sistemi) usano solo il valore e ignorano la chiave, è meglio mettere tutti i dati del messaggio nel valore e usare la chiave per il partizionamento o la compattazione del registro. Non fare affidamento su tutto ciò che legge da Kafka per utilizzare la chiave.
Anche molti altri sistemi di messaggistica hanno un modo per trasportare altre informazioni insieme ai messaggi. La versione 0.11 di Kafka introduce le intestazioni dei record a questo scopo.
Potresti trovare utile leggere queste informazioni insieme ai messaggi di consumo in Event Streams.
Impostazioni di configurazione
Esistono molte impostazioni di configurazione per il produttore. È possibile controllare alcuni aspetti del produttore, tra cui il batching, i tentativi e il riconoscimento dei messaggi. Sono qui di seguito riportati quelli più importanti:
Nome | Descrizione | Valori validi | Valore predefinito |
---|---|---|---|
key.serializer | La classe utilizzata per serializzare le chiavi. | Classe Java che implementa l'interfaccia Serializer, come org.apache.kafka.common.serialization.StringSerializer. | Nessun valore predefinito: è necessario specificare un valore. |
value.serializer | La classe utilizzata per serializzare i valori. | Classe Java che implementa l'interfaccia Serializer, come org.apache.kafka.common.serialization.StringSerializer. | Nessun valore predefinito: è necessario specificare un valore. |
acks | Il numero di server a cui è richiesto di riconoscere ogni messaggio pubblicato. Questo controlla le garanzie di durabilità richieste dal produttore. | 0, 1, all (oppure -1) | tutto (Kafka 3.0 and later) 1 (precedente a Kafka 3.0) |
retries | Quante volte il client invia nuovamente un messaggio quando l'invio riscontra un errore. | 0,... | 0 |
max.block.ms | Il numero di millisecondi per cui una richiesta di invio o di metadati può bloccarsi in attesa. | 0,... | 60000 (1 minuto) |
max.in.flight.requests.per.connection | Il numero massimo di richieste non riconosciute che il client invia su una connessione prima di bloccare ulteriori richieste. | 1,... | 5 |
request.timeout.ms | Il periodo di tempo massimo per cui un produttore attende una risposta a una richiesta. Se la risposta non viene ricevuta prima dello scadere del timeout, la richiesta viene ritentata o fallisce se il numero di tentativi è esaurito. | 0,... | 30000 (30 secondi) |
Sono disponibili molte altre impostazioni di configurazione, ma prima di sperimentarle è necessario leggere attentamente il documento Apache Kafka documentation.
Partizionamento
Con Kafka, le partizioni sono le unità di scalabilità. Pertanto, il partizionamento è un modo efficace per aumentare il throughput, perché consente ai dati degli argomenti di fluire in più flussi paralleli.
Quando pubblica un messaggio su un argomento, il produttore può scegliere quale partizione utilizzare. Se l'ordine è importante, si noti che una partizione è una sequenza ordinata di record, mentre un argomento comprende una o più partizioni. Se vuoi che un insieme di messaggi venga recapitato in ordine, assicurati che vadano tutti nella stessa partizione. La soluzione migliore per ottenere questo risultato consiste nel dare a tutti questi messaggi la stessa chiave.
Il produttore può specificare esplicitamente un numero di partizione quando pubblica un messaggio. Ciò offre un controllo diretto ma rende più complesso il codice del produttore poiché si assume la responsabilità della gestione della selezione della partizione. Per ulteriori informazioni, vedi la chiamata al metodo Producer.partitionsFor. Ad esempio, la chiamata è descritta per Kafka versione 2.2.0.
Se il produttore non specifica un numero partizione, la selezione della partizione viene eseguita da un partitioner. Il partitioner predefinito integrato nel produttore Kafka funziona nel seguente modo:
- Se il record non ha una chiave, seleziona la partizione in modo round-robin.
- Se il record ha una chiave, seleziona la partizione calcolando un valore hash per la chiave. In questo modo, viene selezionata la stessa partizione per tutti i messaggi con la stessa chiave.
Puoi anche scrivere un tuo partitioner personalizzato. Un partitioner personalizzato può scegliere qualsiasi schema per assegnare i record alle partizioni. Usa ad esempio solo un sottoinsieme delle informazioni nella chiave oppure un identificativo specifico dell'applicazione.
Ordinamento dei messaggi
Kafka di norma scrive i messaggi nell'ordine in cui vengono inviati dal produttore. Tuttavia, in alcune situazioni i tentativi possono causare la duplicazione o il riordino dei messaggi. Se si desidera che una sequenza di messaggi venga inviata in ordine, è importante assicurarsi che siano tutti scritti sulla stessa partizione perché questo è l'unico modo per garantire l'ordinamento dei messaggi.
Il produttore è anche in grado di riprovare a inviare i messaggi automaticamente. È una buona idea abilitare questa funzione di ripetizione, perché l'alternativa è che il codice dell'applicazione debba eseguire autonomamente qualsiasi ripetizione. La combinazione di organizzazione in batch in Kafka e dei nuovi tentativi automatici può avere l'effetto di duplicare i messaggi e riordinarli.
Ad esempio, se si pubblica una sequenza di tre messaggi <M1, M2, M3> su un argomento. I record potrebbero rientrare tutti nello stesso lotto, quindi vengono inviati tutti insieme al capo partizione. Il leader li scrive quindi nella partizione e li replica come record separati. Se si verifica un errore, è probabile che M1 e M2 vengano aggiunti alla partizione, ma M3 non lo è. Il produttore non riceve un riconoscimento, quindi tenta di inviare <M1, M2, M3>. Il nuovo leader scrive M1, M2 e M3 nella partizione, che ora contiene <M1, M2, M1, M2, M3>, dove l' M1 duplicato segue l' M2originale. Se limiti il numero di richieste in-flight a ciascun broker a uno solo, puoi evitare questo riordino. È ancora possibile che un singolo record venga duplicato, ad esempio <M1, M2, M2, M3>, ma non si ottengono mai sequenze non ordinate. In Kafka versione 0.11 o successiva, è possibile utilizzare la funzione di produttore idempotente per evitare la duplicazione di M2.
È prassi normale con Kafka scrivere le applicazioni per gestire i duplicati occasionali dei messaggi, perché l'impatto sulle prestazioni di una sola richiesta in volo è significativo.
Riconoscimenti dei messaggi
Quando si pubblica un messaggio, si può scegliere il livello di riconoscimento richiesto utilizzando la configurazione acks
del produttore. La scelta rappresenta un bilanciamento tra la velocità effettiva e l'affidabilità. Esistono
i seguenti tre livelli.
- acks=0 (meno affidabile)
- Il messaggio viene considerato inviato non appena viene scritto sulla rete. Non c'è alcun riconoscimento dal leader della partizione. Di conseguenza, i messaggi possono andare perduti se la leadership della partizione cambia. Questo livello di riconoscimento è veloce, ma comporta la possibilità di perdita del messaggio in alcune situazioni.
- acks=1 (il valore predefinito)
- Il messaggio viene confermato al produttore non appena il leader della partizione ha scritto con successo il suo record nella partizione. Poiché il riconoscimento avviene prima che il record abbia raggiunto le repliche in-sync, il messaggio potrebbe andare perso se il leader fallisce, ma i follower non hanno ancora il messaggio. Se la leadership della partizione cambia, il vecchio leader informa il produttore, che può gestire l'errore e riprovare a inviare il messaggio al nuovo leader. Poiché i messaggi vengono riconosciuti prima che la loro ricezione sia stata confermata da tutte le repliche, i messaggi riconosciuti ma non ancora completamente replicati possono andare persi se la leadership della partizione cambia.
- acks=all (più affidabile)
- Il messaggio viene riconosciuto al produttore quando il leader della partizione ha scritto con successo il suo record e tutte le repliche in-sync hanno fatto lo stesso. Il messaggio non va perduto se il leader della partizione cambia, a condizione che sia disponibile almeno una replica sincronizzata.
Anche se non attendi che i messaggi vengano riconosciuti al produttore, i messaggi sono comunque disponibili per il consumo solo quando ne è stato eseguito il commit, cosa che significa che la replica alle repliche sincronizzate è completa. In altre parole, la latenza dell'invio di messaggi dal punto di vista del produttore è inferiore alla latenza end-to-end misurata dal produttore che invia un messaggio a un consumatore che riceve il messaggio.
Se possibile, evitate di attendere il riconoscimento di un messaggio prima di pubblicare il messaggio successivo. L'attesa impedisce al produttore di poter organizzare insieme in batch i messaggi e riduce anche il tasso a cui messaggi possono essere pubblicati al di sotto della latenza di round-trip della rete.
Organizzazione in batch, limitazione e compressione
Per motivi di efficienza, il produttore raccoglie lotti di record per inviarli ai server. Se abiliti la compressione, il produttore comprime ogni batch, il che può migliorare le prestazioni perché richiede il trasferimento di una quantità inferiore di dati sulla rete.
Se probi a pubblicare i messaggi più rapidamente di quanto possano essere inviati al server, il produttore li memorizza automaticamente in buffer in richieste organizzate in batch. Il produttore conserva un buffer di record non inviati per ciascuna partizione. Arriva un momento in cui anche il dosaggio non consente di raggiungere la velocità desiderata.
C'è un altro fattore che ha un impatto. per evitare che i singoli produttori o consumatori sommergano il cluster, Event Streams applica delle quote di velocità effettiva. Il tasso a cui ciascun produttore sta inviando i dati viene calcolato e ogni produttore che prova a superare la sua quota viene limitato. La limitazione viene applicata ritardando leggermente l'invio di risposte al produttore. Di norma, questo agisce solo come un freno naturale.
Per ulteriori informazioni sulla guida della velocità di trasmissione, vedi Limiti e quote.
Riepilogando, quando un messaggio viene pubblicato, il suo record viene prima scritto in un buffer nel produttore. In background, il produttore organizza in batch e invia i record al server. Il server risponde quindi al produttore, possibilmente applicando un ritardo di limitazione se il produttore sta pubblicando troppo rapidamente. Se il buffer nel produttore si riempie, la chiamata di invio del produttore viene ritardata, ma alla fine potrebbe fallire con un'eccezione.
Semantiche di fornitura
Kafka offre le seguenti diverse semantiche di fornitura del messaggio:
- Al massimo una volta: I messaggi possono andare persi e non vengono riconsegnati.
- Almeno una volta: I messaggi non vanno mai persi, ma potrebbero esserci dei duplicati.
- Esattamente una volta: I messaggi non vanno mai persi e non ci sono duplicati.
Le semantiche di fornitura vengono determinate dalle seguenti impostazioni:
acks
retries
enable.idempotence
Per impostazione predefinita, Kafka utilizza almeno una volta la semantica.
Per abilitare la semantica exactly once, è necessario utilizzare i produttori idempotenti o transazionali. Il produttore idempotente viene abilitato impostando enable.idempotence
su true
e garantisce che esattamente
una copia di ogni messaggio viene scritta in Kafka, anche nel caso di ulteriori tentativi. Il produttore transazionale abilita l'invio dei dati a più partizioni in modo tale che tutti i messaggi, o nessuno di essi, siano correttamente forniti.
Vale a dire che viene eseguito completamente il commit di una transazione oppure viene completamente scartata. È anche possibile includere gli offset nelle transazioni per creare applicazioni che leggono, elaborano e scrivono messaggi su Kafka.
Frammenti di codice
Questi frammenti di codice sono di alto livello per illustrare i concetti coinvolti. Per esempi completi, vedi gli esempi di Event Streams in GitHub.
Per connettere un consumatore a Event Streams, devi creare le credenziali del servizio. Per ulteriori informazioni, vedi Connessione a Event Streams.
Nel codice produttore, è necessario prima creare la serie di proprietà di configurazione. Tutte le connessioni a Event Streams sono protette dall'uso di TLS e dall'autenticazione di utente e password, quindi sono necessarie almeno queste proprietà. Sostituire BOOTSTRAP_ENDPOINTS, USER e PASSWORD con le credenziali del proprio servizio:
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("ssl.protocol", "TLSv1.2");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.endpoint.identification.algorithm", "HTTPS");
Per inviare i messaggi, è necessario specificare anche i serializzatori per le chiavi e i valori, ad esempio:
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Questi serializer devono corrispondere ai deserializer utilizzati dai consumer.
Quindi, utilizzare un KafkaProducer per inviare messaggi, dove ogni messaggio è rappresentato da un ProducerRecord. Non dimenticarti di chiudere il KafkaProducer, quando hai finito. Questo codice invia solo il messaggio ma non attende per appurare
se l'invio è riuscito. Il messaggio viene inviato all'argomento T1
, con la chiave key
e il valore della stringa value
.
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("T1", "key", "value"));
producer.close();
Il metodo send()
è asincrono e restituisce una Future che puoi usare per controllarne il completamento:
Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>("T1", "key", "value"));
// Do some other stuff
// Now wait for the result of the send
RecordMetadata rm = f.get();
long offset = rm.offset;
In alternativa, è possibile fornire un callback quando si invia il messaggio:
producer.send(new ProducerRecord<String,String>("T1","key","value", new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
// This is called when the send completes, either successfully or with an exception
}
});
Per ulteriori informazioni, vedi il Javadoc per il client Kafka.{: external}.