Generación de mensajes
Un productor es una aplicación que publica secuencias de mensajes en temas Kafka. Esta información se centra en la interfaz de programación Java que forma parte del proyecto Apache Kafka. Los conceptos se aplican también a otros idiomas, pero a veces son un poco distintos.
En las interfaces de programación, un mensaje se denomina registro. Por ejemplo la clase Java org.apache.kafka.clients.producer.ProducerRecord se utiliza para representar un mensaje desde el punto de vista de la API de productor. Los términos registro y mensaje se pueden utilizar indistintamente, pero esencialmente se utiliza un registro para representar un mensaje.
Cuando un productor se conecta a Kafka, realiza una conexión de arranque inicial. Esta conexión se puede realizar a cualquiera de los servidores del clúster. El productor solicita la información de partición y liderazgo sobre el tema que desea publicar. A continuación, el productor establece otra conexión con el líder de la partición y puede publicar mensajes. Estas acciones se producen automáticamente de forma interna cuando el productor se conecta al clúster Kafka.
Para garantizar la disponibilidad, los intermediarios de Kafka replican mensajes, de modo que si un intermediario no está disponible, los demás pueden seguir recibiendo mensajes de los productores y enviarlos a los consumidores. Event Streams utiliza un factor de réplica de 3, lo que significa que cada mensaje se almacena en tres intermediarios. Cuando se envía un mensaje al líder de la partición, este mensaje no está disponible inmediatamente para los consumidores. El líder añade el registro del mensaje a la partición, asignándole el siguiente número de desplazamiento para dicha partición. Una vez que todos los seguidores de las réplicas sincronizadas han replicado el registro y han confirmado que han escrito el registro en sus réplicas, el registro queda consignado y disponible para los consumidores.
Cada mensaje se representa como un registro que consta de dos partes: clave y valor. La clave se utiliza habitualmente para los datos sobre el mensaje y el valor es el cuerpo del mensaje. Dado que muchas herramientas del ecosistema Kafka (como los conectores a otros sistemas) sólo utilizan el valor e ignoran la clave, es mejor poner todos los datos del mensaje en el valor y utilizar la clave para la partición o la compactación del registro. No confíe en que todo lo que lea de Kafka utilice la clave.
Muchos otros sistemas de mensajería también tienen una manera de llevar otra información junto con los mensajes. La versión 0.11 de Kafka introduce cabeceras de registro para este propósito.
Es posible que le resulte útil leer esta información junto con consumir mensajes en Event Streams.
Valores de configuración
Existen muchos valores de configuración para el productor. Puede controlar aspectos del productor, como el procesamiento por lotes, los reintentos y el acuse de recibo de mensajes. Estos son los más importantes:
| Nombre | Descripción | Valores válidos | Valor predeterminado |
|---|---|---|---|
| key.serializer | La clase utilizada para serializar claves. | Clase Java que implementa la interfaz Serializer, como org.apache.kafka.common.serialization.StringSerializer. | Sin valor por defecto - debe especificar un valor. |
| value.serializer | La clase utilizada para serializar valores. | Clase Java que implementa la interfaz Serializer, como org.apache.kafka.common.serialization.StringSerializer. | Sin valor por defecto - debe especificar un valor. |
| acks | El número de servidores necesarios para reconocer cada mensaje publicado. Controla las garantías de duración que requiere el productor. | 0, 1, all (o -1) | all (Kafka 3.0 y posterior) 1 (anterior a Kafka 3.0) |
| retries | El número de veces que el cliente reenvía un mensaje cuando el envío encuentra un error. | 0,... | 0 |
| max.block.ms | El número de milisegundos que una solicitud de envío o de metadatos puede bloquear la espera. | 0,... | 60000 (1 minuto) |
| max.in.flight.requests.per.connection | Número máximo de peticiones no reconocidas que el cliente envía en una conexión antes de bloquear más peticiones. | 1,... | 5 |
| request.timeout.ms | La cantidad máxima de tiempo que el productor esperará una respuesta a una solicitud. Si no se recibe la respuesta antes de que transcurra el tiempo de espera, se reintenta la solicitud, o falla si se ha agotado el número de reintentos. | 0,... | 30000 (30 segundos) |
Hay muchas más opciones de configuración disponibles, pero asegúrese de leer detenidamente la documentación deApache Kafka antes de experimentar con ellas.
Particionamiento
Con Kafka, las particiones constituyen la unidad de escalabilidad. Por lo tanto, la partición es una forma eficaz de aumentar su rendimiento, ya que permite que los datos temáticos fluyan en múltiples flujos paralelos.
Cuando el productor publica un mensaje en un tema, el productor puede elegir qué partición utilizar. Si el orden es importante, tenga en cuenta que una partición es una secuencia ordenada de registros, pero un tema comprende una o más particiones. Si desea un conjunto de mensajes se entreguen en orden, asegúrese de que todos ellos vayan en la misma partición. La forma más sencilla de lograrlo es dar la misma clave a todos los mensajes.
El productor puede especificar explícitamente un número de partición cuando publica un mensaje. Esto permite el control directo, pero hace que el código de productor sea más complejo porque toma la responsabilidad de gestionar la selección de partición. Para obtener más información, consulte la llamada de método Producer.partitionsFor. Por ejemplo, la llamada se describe para Kafka versión 2.2.0.
Si el productor no especifica un número de partición, un particionador realiza la selección de la partición. El particionador predeterminado incluido en el productor de Kafka funciona de la siguiente manera:
- Si el registro no tiene ninguna clave, seleccione la partición de forma rotativa.
- Si el registro tiene una clave, seleccione la partición calculando un valor hash para la clave. Esto tiene el efecto de seleccionar la misma partición para todos los mensajes con la misma clave.
También puede escribir su propio particionador personalizado. Un particionador personalizado puede elegir cualquier esquema para asignar registros a particiones. Por ejemplo, utilice sólo un subconjunto de la información en la clave o un identificador específico para la aplicación.
Ordenación de mensajes
Kafka normalmente escribe los mensajes en el orden en el que el productor los ha enviado. Sin embargo, en determinadas situaciones los reintentos pueden hacer que los mensajes se dupliquen o se reordenen. Si quieres que una secuencia de mensajes se envíe en orden, es importante asegurarse de que todos se escriben en la misma partición porque es la única forma de garantizar el orden de los mensajes.
El productor también puede reintentar el envío de mensajes automáticamente. Es una buena idea activar esta función de reintento porque la alternativa es que el código de su aplicación deba realizar cualquier reintento por sí mismo. La combinación de lotes en Kafka y reintentos automáticos puede tener el efecto de duplicar los mensajes y reordenarlos.
Por ejemplo, si publica una secuencia de tres mensajes <M1, M2, M3> sobre un tema. Es posible que todos los registros quepan en el mismo lote, por lo que se envían todos juntos al jefe de la partición. El líder luego los escribe en la partición y los replica como registros separados. Si se produce una anomalía, es posible que M1 y M2 se añadan a la partición, pero M3 no. El productor no recibe acuse de recibo, por lo que reintenta enviar <M1, M2, M3>. El nuevo líder escribe M1, M2 y M3 en la partición, que ahora contiene <M1, M2, M1, M2, M3>, donde el M1 duplicado sigue al M2 original. Si restringe el número de solicitudes en curso a cada intermediario a sólo una, puede evitar esta reordenación. Es posible que siga encontrando que un único registro está duplicado como, por ejemplo, <M1, M2, M2, M3>, pero nunca obtiene secuencias desordenadas. En Kafka versión 0.11 o posterior, también puede utilizar la función de productor idempotente para evitar la duplicación de M2.
Es una práctica normal con Kafka escribir las aplicaciones para manejar los duplicados ocasionales de mensajes porque el impacto en el rendimiento de tener una sola solicitud en vuelo es significativo.
Acuses de recibo de mensajes
Cuando publiques un mensaje, puedes elegir el nivel de acuses de recibo que se requieren mediante la configuración del productor acks. La elección representa un equilibrio entre rendimiento y fiabilidad. Existen los tres niveles
siguientes.
- acks=0 (menos fiable)
- El mensaje se considera enviado en cuanto se escribe en la red. No hay ningún acuse de recibo por parte del líder de la partición. En consecuencia, los mensajes se pueden perder si cambia el liderazgo de la partición. Este nivel de acuse de recibo es rápido, pero conlleva la posibilidad de pérdida del mensaje en algunas situaciones.
- acks=1 (valor predeterminado)
- El mensaje es confirmado al productor tan pronto como el líder de la partición haya escrito con éxito su registro en la partición. Dado que el acuse de recibo se produce antes de que el registro llegue a las réplicas sincronizadas, el mensaje podría perderse si el líder falla, pero los seguidores aún no tienen el mensaje. Si cambia el liderazgo de la partición, el antiguo líder informa al productor, que puede gestionar el error y volver a intentar enviar el mensaje al nuevo líder. Dado que los mensajes se acusan antes de que su recepción haya sido confirmada por todas las réplicas, los mensajes acusados pero aún no replicados por completo pueden perderse si cambia el liderazgo de la partición.
- acks=all (más fiable)
- El mensaje se confirma al productor cuando el líder de la partición ha escrito correctamente su registro y todas las réplicas sincronizadas han hecho lo mismo. El mensaje no se pierde si el liderazgo cambia siempre que como mínimo haya una réplica sincronizada disponible.
Incluso si no espera que el productor acuse el recibo de los mensajes, los mensajes seguirán estando disponibles sólo para ser consumidos cuando se confirmen y esto significa que la replicación a las réplicas sincronizadas está completa. En otras palabras, la latencia del envío de mensajes desde el punto de vista del productor es inferior a la latencia de extremo a extremo desde el productor que envía un mensaje a un consumidor que lo recibe.
Si es posible, evite esperar el acuse de recibo de un mensaje antes de publicar el siguiente. Esperar impide que el productor agrupe mensajes en lotes y también reduce la velocidad con la que se pueden publicar mensajes para reducir la latencia de ida y vuelva de la red.
Agrupación por lotes, limitación y compresión
Por razones de eficacia, el productor reúne lotes de registros para enviarlos a los servidores. Si habilita la compresión, el productor comprime cada lote, lo cual puede mejorar el rendimiento al requerir que se transfieran menos datos a través de la red.
Si intenta publicar mensajes de forma más rápida que la necesaria para que se envíen a un servidor, el productor las almacena automáticamente en solicitudes por lotes. El productor mantiene un almacenamiento intermedio de registros no enviados para cada partición. Llega un momento en que ni siquiera la dosificación permite alcanzar el ritmo deseado.
Hay otro factor que tiene un impacto. Para evitar que productores o consumidores individuales inunden el clúster, Event Streams aplica cuotas de rendimiento. Se calcula la velocidad con la que cada productor envía datos y se regula a cualquier productor que intente superar su cuota. La regulación se aplica retrasando ligeramente el envío de respuestas al productor. Normalmente, este método actúa como un freno natural.
Para obtener más información sobre las directrices de rendimiento, consulte Límites y cuotas.
En resumen, cuando un mensaje se publica, su primer registro se escribe primero en un almacenamiento intermedio en el productor. En segundo plano, el productor agrupa los registros en lotes y los envía al servidor. A continuación, el servidor responde al productor, posiblemente aplicando un retraso de limitación si el productor está publicando demasiado rápido. Si el búfer del productor se llena, la llamada de envío del productor se retrasa, pero en última instancia puede fallar con una excepción.
Semántica de la entrega
Kafka ofrece varios tipos distintos de semántica de entrega de mensajes:
- Como máximo una vez: Los mensajes podrían perderse y no volver a entregarse.
- Al menos una vez: Los mensajes nunca se pierden, pero puede haber duplicados.
- Exactamente una vez: Los mensajes nunca se pierden y no hay duplicados.
Los siguientes valores determinan la semántica de la entrega:
acksretriesenable.idempotence
Por defecto, Kafka utiliza al menos una semántica.
Para activar la semántica "exactamente una vez", debe utilizar los productores idempotentes o transaccionales. El productor idempotente se habilita establecimiento enable.idempotence en true y garantiza
que se escribe en Kafka exactamente una copia de cada mensaje, incluso si hay reintentos. El productor transaccional habilita el envío de datos a varias particiones de modo que todos los mensajes se entregan correctamente o bien no se entrega
ninguno. Es decir, una transacción se confirma por completo o se descarta por completo. También puede incluir offsets en las transacciones para crear aplicaciones que lean, procesen y escriban mensajes en Kafka.
Fragmentos de código
Estos fragmentos de código están en un nivel alto para ilustrar los conceptos implicados. Para obtener ejemplos completos, consulte los ejemplos de Event Streams en GitHub.
Para conectar un consumidor a Event Streams, debe crear credenciales de servicio. Para obtener más información, consulte Conexión a Event Streams.
En el código de productor, primero debe crear el conjunto de propiedades de configuración. Todas las conexiones a Event Streams están protegidas mediante el uso de TLS y autenticación de usuario y contraseña, por lo que necesita estas propiedades como mínimo. Sustituya BOOTSTRAP_ENDPOINTS, USER y PASSWORD por los de sus propias credenciales de servicio:
Properties props = new Properties();
props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.mechanism", "PLAIN");
props.put("ssl.protocol", "TLSv1.2");
props.put("ssl.enabled.protocols", "TLSv1.2");
props.put("ssl.endpoint.identification.algorithm", "HTTPS");
Para enviar mensajes, también es necesario especificar serializadores para las claves y los valores, por ejemplo:
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Estos serializadores deben coincidir con los deserializadores utilizados por los consumidores.
A continuación, utiliza un KafkaProducer para enviar mensajes, donde cada mensaje está representado por un ProducerRecord. No olvide cerrar el KafkaProducer cuando haya terminado. Este código sólo envía el mensaje, pero no espera para ver si
el envío ha sido satisfactorio. El mensaje se envía al tema T1, con la clave de la serie key y el valor de la serie value.
Producer<String, String> producer = new KafkaProducer<>(props);
producer.send(new ProducerRecord<String, String>("T1", "key", "value"));
producer.close();
El método send() es asíncrono y devuelve un Future que puede utilizar para comprobar su finalización:
Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>("T1", "key", "value"));
// Do some other stuff
// Now wait for the result of the send
RecordMetadata rm = f.get();
long offset = rm.offset;
Alternativamente, puede proporcionar una llamada de retorno cuando envíe el mensaje:
producer.send(new ProducerRecord<String,String>("T1","key","value", new Callback() {
public void onCompletion(RecordMetadata metadata, Exception exception) {
// This is called when the send completes, either successfully or with an exception
}
});
Para obtener más información, consulte el Javadoc para el cliente Kafka.{: external}.