Consumo de mensajes
Un consumidor es una aplicación que consume secuencias de mensajes de temas Kafka. Un consumidor puede suscribirse a uno o más temas o particiones. Esta información se centra en la interfaz de programación Java™ que forma parte del proyecto Apache Kafka. Los conceptos se aplican también a otros idiomas, pero a veces son un poco distintos.
Cuando un consumidor se conecta a Kafka, realiza una conexión de arranque inicial. Esta conexión se puede realizar a cualquiera de los servidores del clúster. El consumidor solicita la información de partición y liderazgo sobre el tema del que desea consumir. A continuación, el consumidor establece otra conexión con la partición líder y puede consumir mensajes. Estas acciones se producen automáticamente de forma interna cuando el consumidor se conecta al clúster Kafka.
Un consumidor es normalmente una aplicación de larga ejecución. Un consumidor solicita mensajes de Kafka llamando a Consumer.poll(...)
de forma regular. El consumidor llama a poll()
, recibe un lote de mensajes, los procesa
de inmediato y vuelve a llamar a poll()
.
Cuando un consumidor procesa un mensaje, el mensaje no se elimina de su tema. En su lugar, los consumidores pueden elegir entre varias formas de notificar a Kafka qué mensajes se han procesado. Este proceso se conoce como confirmar el desplazamiento.
En las interfaces de programación, un mensaje se denomina registro. Por ejemplo, la clase Java org.apache.kafka.clients.consumer.ConsumerRecord se utiliza para representar un mensaje para la API de consumidor. Los términos registro y mensaje se pueden utilizar indistintamente, pero esencialmente se utiliza un registro para representar un mensaje.
Es posible que le resulte útil leer esta información junto con la producción de mensajes en Event Streams.
Configuración de propiedades de consumidor
Existen muchos ajustes de configuración para el consumidor que controlan aspectos de su comportamiento. Los siguientes valores son algunos de los más importantes.
Nombre | Descripción | Valores válidos | Valor predeterminado |
---|---|---|---|
key.deserializer | La clase utilizada para deserializar claves. | Clase Java que implementa la interfaz Deserializador, como org.apache.kafka.common.serialization.StringDeserializer. | Ningún valor predeterminado. Debe especificar un valor. |
value.deserializer | La clase utilizada para deserializar valores. | Clase Java que implementa la interfaz Deserializador, como org.apache.kafka.common.serialization.StringDeserializer. | Sin valor por defecto - debe especificar un valor. |
group.id | Un identificador para el grupo de consumidores al que pertenece el consumidor. | Serie | Ningún valor predeterminado |
auto.offset.reset | El comportamiento cuando el consumidor no tiene desplazamiento inicial o si el desplazamiento actual no está disponible en el clúster. | Latest, earliest, none | Más reciente |
enable.auto.commit | Determina si se debe confirmar el desplazamiento del consumidor automáticamente en segundo plano. | True, false | Sí |
auto.commit.interval.ms | El número de milisegundos entre confirmaciones periódicas de desplazamientos. | 0,... | 5000 (5 segundos) |
max.poll.records | El número máximo de registros devueltos en una llamada a poll(). | 1,... | 500 |
session.timeout.ms | El número de milisegundos en el que un latido del consumidor debe ser recibido para mantener la pertenencia de un consumidor de un grupo de consumidores. | 6000-300000 | 10000 (10 segundos) |
max.poll.interval.ms | El intervalo de tiempo máximo entre sondeos antes de que el consumidor deje el grupo. | 1,... | 300000 (5 minutos) |
Hay muchos más valores de configuración disponibles, pero lea la documentación deApache Kafka antes de empezar a experimentar con ellos.
Grupos de consumidores
Un grupo de consumidores es un grupo de consumidores que coopera para consumir mensajes de uno o más temas. Los consumidores de un grupo utilizan todos el mismo valor para la configuración de group.id
. Si necesita más de
un consumidor para gestionar su carga, puede ejecutar varios consumidores en el mismo grupo de consumidores. Incluso si sólo necesita un consumidor, es habitual especificar también un valor para group.id
.
Cada grupo de consumidores tiene un servidor en el clúster que se denomina coordinador responsable de asignar particiones a los consumidores del grupo. Esta responsabilidad se reparte entre los servidores del clúster para repartir la carga. La asignación de particiones a los consumidores puede cambiar en cada reequilibrio del grupo.
Cuando un consumidor se une a un grupo de consumidores, descubre el coordinador del grupo. A continuación, el consumidor indica al coordinador que desea unirse al grupo y el coordinador inicia un reequilibrio de las particiones en el grupo para incluir el nuevo miembro.
Los mensajes de una sola partición son procesados por solo un consumidor en cada grupo. Este mecanismo garantiza que los mensajes de cada partición se procesan en orden. Consulte el diagrama siguiente para ver un ejemplo en el que un tema contiene tres particiones y un grupo de consumidores, que está consumiendo ese tema, contiene dos consumidores. A un consumidor del grupo se le asignan dos particiones y al otro consumidor se le asigna una partición.

Cuando se produce uno de los cambios siguientes en un grupo de consumidores, el grupo se reequilibra cambiando la asignación de particiones a los miembros del grupo para incorporar el cambio:
- Un consumidor se une al grupo.
- Un consumidor abandona el grupo.
- El coordinador considera que un consumidor ha dejado de estar activo.
- Se añaden nuevas particiones a un tema existente.
Si tienes un grupo de consumidores que se reequilibra, ten en cuenta que cualquier consumidor que haya abandonado el grupo tiene sus commits rechazados hasta que se reincorpore al grupo. En este caso, el consumidor deberá volverse a unir al grupo, donde se le podría asignar a una partición distinta de la que estaba consumiendo anteriormente.
Actividad de consumidor
Kafka detecta automáticamente los consumidores fallidos para poder reasignar particiones a los consumidores en funcionamiento. Utiliza dos mecanismos: el sondeo y la pulsación.
Si el lote de mensajes que se devuelven de Consumer.poll(...)
es grande o el procesamiento lleva mucho tiempo, el retraso antes de poll()
que se vuelva a llamar puede ser significativo o impredecible. En algunos casos,
es necesario configurar un intervalo de tiempo máximo de sondeo para que no se elimine a los consumidores de sus grupos sólo porque el procesamiento de mensajes tarda un tiempo. Si este mecanismo es el único disponible, el tiempo que se tarda
en detectar un consumidor anómalo también es largo.
Para que la actividad de los consumidores sea más fácil de gestionar, se añadió el latido de fondo en Kafka 0.10.1. El coordinador del grupo espera que los miembros del grupo le envíen latidos regulares para indicar que siguen activos. Una hebra de pulsación de fondo se ejecuta en el consumidor y envía pulsaciones regulares al coordinador. Si el coordinador no recibe una pulsación de un miembro de un grupo dentro del tiempo de espera de sesión, el coordinador elimina el miembro del grupo e inicia un reequilibrio del grupo. El tiempo de espera de la sesión puede ser mucho más corto que el intervalo máximo de sondeo, de modo que el tiempo que se tarda en detectar un consumidor fallido puede ser corto, incluso si el procesamiento de mensajes lleva mucho tiempo.
Puede configurar el intervalo máximo de sondeo utilizando la propiedad max.poll.interval.ms
y el tiempo de espera de sesión utilizando la propiedad session.timeout.ms
. No es necesario que utilice estos valores a menos
que tarde más de 5 minutos en procesar un lote de mensajes.
Gestión de desplazamientos
Para cada grupo de consumidores, Kafka mantiene el desplazamiento confirmado para cada partición que se consume. Cuando un consumidor procesa un mensaje, no lo elimina de la partición. En su lugar, sólo actualiza su desplazamiento actual utilizando un proceso que se llama confirmar el desplazamiento.
Event Streams retiene la información de desplazamientos confirmados durante 7 días.
¿Qué pasa si no existe ningún desplazamiento confirmado existente?
Cuando se inicia un consumidor y se le asigna una partición para consumir, se inicia en el desplazamiento confirmado de su grupo. Si no existe una compensación comprometida, el consumidor puede elegir entre empezar con el mensaje más antiguo
o el más reciente disponible, basándose en la configuración de la propiedad auto.offset.reset
de la siguiente manera:
latest
(el valor predeterminado): el consumidor recibe y consume sólo los mensajes que llegan después de suscribirse. El consumidor no tiene conocimiento de los mensajes que se enviaron antes de su suscripción, por lo tanto, no espere que se consuman todos los mensajes de un tema.earliest
: Su consumidor consume todos los mensajes desde el principio.
Si un consumidor falla después de procesar un mensaje pero antes de confirmar su desplazamiento, la información del desplazamiento confirmado no refleja el proceso del mensaje. Esto significa que el siguiente consumidor de ese grupo que debe asignar la partición vuelve a procesar el mensaje.
Cuando los desplazamientos confirmados se guardan en Kafka y se reinician los consumidores, los consumidores se reanudan a partir del punto en el que se detuvieron por última vez. Cuando existe un desplazamiento comprometido, la auto.offset.reset
propiedad no se utiliza.
Confirmación automática de desplazamientos
La forma más fácil de confirmar los desplazamientos es hacer que el consumidor de Kafka lo haga automáticamente. Es sencillo, pero ofrece menos control que si se hace manualmente. De forma predeterminada, un consumidor confirma automáticamente
los desplazamientos cada 5 segundos. Esta confirmación predeterminada se produce cada 5 segundos, independientemente del progreso que esté realizando el consumidor hacia procesar los mensajes. Además, cuando el consumidor llama a poll()
,
esto también hace que el último desplazamiento devuelto de la llamada anterior a poll()
sea consignado (porque se supone que los mensajes anteriores fueron todos procesados).
Si el desplazamiento comprometido supera el procesamiento de los mensajes y existe un fallo del consumidor, es posible que algunos mensajes no se procesen. Esto es debido a que el proceso se reinicia en el desplazamiento confirmado, que es después de que el último mensaje se proceso antes del error. Por esta razón, si la fiabilidad es más importante que la simplicidad, normalmente es mejor confirmar los desplazamientos manualmente.
Confirmación manual de desplazamientos
Si enable.auto.commit
se establece en false
, el consumidor confirmará sus desplazamientos manualmente. Puede hacerlo de forma síncrona o asíncrona. Un patrón común es confirmar el desplazamiento del último mensaje
procesado según un temporizador periódico. Este patrón significa que cada mensaje se procesa como mínimo una vez, pero el desplazamiento nunca supera el progreso de los mensajes que están procesados activamente. La frecuencia del temporizador
periódico controla el número de mensajes que se pueden reprocesar tras un error del consumidor. Los mensajes se vuelven a recuperar desde el último desplazamiento confirmado que se ha guardado cuando la aplicación se reinicia o cuando el
grupo se reequilibra.
El desplazamiento confirmado es el desplazamiento de los mensajes a partir del que se reanuda el procesamiento. Normalmente es el desplazamiento del mensaje procesado más recientemente más uno.
Retardo de consumidor
El retardo de consumidor de una partición es la diferencia entre el desplazamiento del mensaje publicado más recientemente y el desplazamiento confirmado del consumidor. En otras palabras, es la diferencia entre el número de registros que se han producido y el número que se han consumido. Aunque es normal que haya variaciones naturales en las velocidades de producción y consumo, la velocidad de consumo no puede ser inferior a la velocidad de producción durante un período largo.
Si observa que un consumidor está procesando mensajes correctamente pero, de vez en cuando, parece saltarse un grupo de mensajes, puede ser una señal de que el consumidor no puede mantenerse al día. En el caso de temas que no utilizan la compactación de registros, la cantidad de espacio de registro se gestiona suprimiendo periódicamente los segmentos de registro antiguos. Si un consumidor se retrasa tanto que está consumiendo mensajes en un segmento de registro que se borra, saltará de repente hacia delante hasta el comienzo del siguiente segmento de registro. Si es importante que el consumidor procese todos los mensajes, este comportamiento indica la pérdida de mensajes desde el punto de vista de este consumidor.
Puede utilizar la herramienta kafka-consumer-groups
para ver el retardo del consumidor. También puede utilizar la API de consumidor y las métricas de consumidor con el mismo propósito.
Control de la velocidad de consumo de mensajes
Si tiene problemas con la gestión de mensajes provocados por la inundación de mensajes, puede establecer una opción de consumidor para controlar la velocidad de consumo de mensajes. Utilice fetch.max.bytes
y max.poll.records
para controlar el volumen de datos que puede devolver una llamada a poll()
.
Gestión del reequilibrio de consumidores
Cuando se añaden o eliminan consumidores de un grupo, se produce un reequilibrio del grupo y los consumidores no pueden consumir mensajes. Esto provoca que todos los consumidores de un grupo de consumidores no estén disponibles durante un breve periodo.
Si se le notifica con el callback "on partitions revoked", utilice un ConsumerRebalanceListener para consignar manualmente los offsets (si no está utilizando auto-commit) y para pausar el procesamiento posterior hasta que se le notifique el reequilibrio exitoso mediante el callback "on partition assigned".
Fragmentos de código
Estos fragmentos de código están en un nivel alto para ilustrar los conceptos implicados. Para obtener ejemplos completos, consulte los ejemplos de Event Streams en GitHub.
Para conectar un consumidor a Event Streams, debe crear credenciales de servicio. Para obtener información sobre cómo obtener estas credenciales, consulte Conexión a Event Streams.
En el código de consumidor, primero debe crear el conjunto de propiedades de configuración. Todas las conexiones a Event Streams están aseguradas usando TLS y autenticación usuario-contraseña, por lo que necesitas al menos estas propiedades. Sustituya BOOTSTRAP_ENDPOINTS, USER y PASSWORD por los de sus propias credenciales de servicio:
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("ssl.protocol", "TLSv1.2");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.endpoint.identification.algorithm", "HTTPS");
Para consumir mensajes, también es necesario especificar deserializadores para las claves y los valores, como en el siguiente ejemplo.
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
Estos deserializadores deben coincidir con los serializadores utilizados por los productores.
A continuación, utilice un KafkaConsumer para consumir mensajes, donde cada mensaje está representado por un ConsumerRecord. La forma más común de consumir mensajes es poner el consumidor en un grupo de consumidores estableciendo el ID de grupo
y después llamar a subscribe()
para ver una lista de temas. Al consumidor se le asignan algunas particiones para consumir, aunque si existen más consumidores en el grupo que particiones del tema, es posible que al consumidor no
se le asigne ninguna partición. A continuación, llame a poll()
en bucle, recibiendo un lote de mensajes para procesar, donde cada mensaje está representado por un ConsumerRecord.
props.put("group.id", "G1");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1")); // T1 is the topic name
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
Este bucle consumidor se ejecuta eternamente pero puede ser interrumpido desde otro hilo llamando a Consumer.wakeup()
para conseguir un cierre ordenado.
Para confirmar los desplazamientos manualmente, primero es necesario establecer la configuración de enable.auto.commit
en false
. A continuación, utilice Consumer.commmitSync()
o bien Consumer.commitAsync()
para actualizar el desplazamiento confirmado del consumidor periódicamente. Por simplicidad, este ejemplo procesa los registros para cada partición y confirma el último desplazamiento por separado.
props.put("group.id", "G1");
props.put("enable.auto.commit", "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (TopicPartition tp : records.partitions()) {
List<ConsumerRecord<String, String>> partRecords = records.records(tp);
long lastOffset = 0;
for (ConsumerRecord<String, String> record : partRecords) {
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
lastOffset = record.offset();
}
// having processed all the records in the above loop, we commit the partition's offset to 1 more than the last offset
consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastOffset + 1)));
}
}
}
finally {
consumer.close();
}
Manejo de excepciones
Cualquier aplicación robusta que utiliza el cliente Kafka necesita gestionar excepciones para determinadas situaciones previstas. En algunos casos, las excepciones no se generan directamente porque algunos métodos son asíncronos y entregan sus
resultados utilizando un Future
o una devolución de llamada. Consulte el código de ejemplo en GitHub que muestra ejemplos completos.
Maneje la siguiente lista de excepciones en el código:
org.apache.kafka.common.errors.WakeupException
Lanzado por Consumer.poll(...)
como resultado de llamar a Consumer.wakeup()
. Es la forma estándar de interrumpir el bucle de sondeo del consumidor. El bucle de sondeo sale y se llama a Consumer.close()
para desconectarse limpiamente.
org.apache.kafka.common.errors.NotLeaderForPartitionException
Se lanza como resultado de Producer.send(...)
cuando cambia el liderazgo de una partición. El cliente renueva automáticamente sus metadatos para encontrar la información de líder actualizada. Vuelva a intentar la operación que
sea satisfactoria con los metadatos actualizados.
org.apache.kafka.common.errors.CommitFailedException
Se lanza como resultado de Consumer.commitSync(...)
cuando se produce un error irrecuperable. En algunos casos, no es posible repetir la operación porque la asignación de particiones ha cambiado y el consumidor ya no puede confirmar
sus desplazamientos. Puesto que Consumer.commitSync(...)
puede tener éxito parcialmente cuando se utiliza con diversas suscripciones en una única llamada, la recuperación de errores se puede simplificar utilizando una llamada
Consumer.commitSync(...)
separada para cada partición.
org.apache.kafka.common.errors.TimeoutException
Lanzado por Producer.send(...), Consumer.listTopics()
si no se pueden recuperar los metadatos. La excepción también se ve en la devolución de llamada de envío (o el Futuro devuelto) cuando el acuse de recibo solicitado no regresa
en request.timeout.ms
. El cliente puede reintentar la operación, pero el efecto de una operación repetida depende de la operación determinada. Por ejemplo, si se vuelve a intentar enviar un mensaje, es posible que éste se duplique.