IBM Cloud Docs
Consommation de messages

Consommation de messages

Un consommateur est une application qui consomme les flux de messages issus des rubriques Kafka. Un consommateur peut s'abonner à une ou plusieurs rubriques ou partitions. Ces informations se concentrent sur l'interface de programmation Java™ qui fait partie du projet Apache Kafka. Les concepts s'appliquent également à d'autres langages, mais avec des noms parfois légèrement différents.

Lorsqu'un consommateur se connecte à Kafka, il établit une connexion d'amorçage initiale. Cette connexion peut viser n'importe quel serveur du cluster. Le consommateur demande des informations concernant la partition et le responsable de la rubrique qu'il veut "consommer". Ensuite, le consommateur établit une autre connexion au chef de partition et peut consommer des messages. Ces actions s'exécutent automatiquement en interne lorsque votre consommateur se connecte au cluster Kafka.

En général, un consommateur est une application à exécution longue. Un consommateur demande des messages à partir de Kafka en appelant Consumer.poll(...) régulièrement. Le consommateur appelle poll(), reçoit un lot de messages, les traite rapidement, puis appelle de nouveau poll().

Lorsqu'un consommateur traite un message, ce dernier n'est pas supprimé de la rubrique dont il fait partie. En revanche, les consommateurs peuvent choisir parmi plusieurs manières de notifier Kafka quels messages ont été traités. Ce processus est appelé validation de la position.

Dans les interfaces de programmation, un message est appelé un enregistrement. Par exemple, la classe Java org.apache.kafka.clients.consumer.ConsumerRecord est utilisée pour représenter l'API consommateur. Les termes enregistrement et message peuvent être utilisés de manière interchangeable, mais, en résumé, un enregistrement est utilisé pour représenter un message.

Il peut être utile de lire ces informations ainsi que celles sur la production de messages dans Event Streams.

Configuration des propriétés du consommateur

Il existe de nombreux paramètres de configuration pour le consommateur qui contrôlent certains aspects de son comportement. Les paramètres suivants sont parmi les plus importants.

Configuration des propriétés du consommateur
Nom Description Valeur valides Valeur par défaut
key.deserializer Classe utilisée pour désérialiser les clés. Une classe Java qui prend en charge l'interface Deserializer, telle que org.apache.kafka.common.serialization.StringDeserializer. Aucune valeur par défaut - vous devez indiquer une valeur
value.deserializer Classe utilisée pour désérialiser les valeurs. Une classe Java qui prend en charge l'interface Deserializer, telle que org.apache.kafka.common.serialization.StringDeserializer. Pas de valeur par défaut - vous devez spécifier une valeur.
group.id Identificateur du groupe de consommateur auquel appartient le consommateur. Chaîne Aucune valeur par défaut
auto.offset.reset Comportement lorsque le consommateur n'a pas de position initiale ou que la position en cours n'est plus disponible dans le cluster. Latest, earliest, none Le plus récent
enable.auto.commit Détermine si la position du consommateur doit être automatiquement validée en arrière-plan. True, false Oui
auto.commit.interval.ms Nombre de millisecondes entre chaque validation régulière des positions. 0,... 5000 (5 secondes)
max.poll.records Le nombre maximum d'enregistrements renvoyés lors d'un appel à poll(). 1,... 500
session.timeout.ms Nombre de millisecondes pendant lesquelles le signal de présence d'un consommateur doit être reçu pour conserver le consommateur comme membre d'un groupe de consommateurs. 6000-300000 10000 (10 secondes)
max.poll.interval.ms Intervalle de temps maximum entre les interrogations avant que le consommateur quitte le groupe. 1,... 300000 (5 minutes)

De nombreux autres paramètres de configuration sont disponibles, mais lisez la documentationApache Kafka avant de commencer à les expérimenter.

Groupes de consommateurs stables

Un groupe de consommateurs est un groupe de consommateurs qui utilisent des messages provenant d'une ou de plusieurs rubriques. Les consommateurs d'un groupe utilisent tous la même valeur pour le paramètre de configuration group.id. Si vous avez besoin de plusieurs consommateurs pour gérer votre charge de travail, vous pouvez exécuter plusieurs consommateurs dans le même groupe de consommateurs. Même si vous avez besoin d'un seul consommateur, il est normal de spécifier également une valeur pour group.id.

Chaque groupe de consommateurs dispose d'un serveur dans le cluster appelé coordinateur responsable de l'affectation de partitions aux consommateurs du groupe. Cette responsabilité est répartie dans le cluster afin d'équilibrer la charge. L'affectation des partitions aux consommateurs peut se modifier à chaque rééquilibrage du groupe.

Lorsqu'un consommateur rejoint un groupe de consommateurs, il découvre le coordinateur du groupe. Le consommateur indique ensuite au coordinateur qu'il souhaite rejoindre le groupe et le coordinateur démarre un rééquilibrage des partitions de l'ensemble du groupe pour y inclure le nouveau membre.

Les messages d'une seule partition sont traités par un consommateur de chaque groupe uniquement. Ce mécanisme garantit que les messages de chaque partition sont traités dans l'ordre. Consultez le diagramme suivant pour un exemple où une rubrique contient trois partitions et un groupe de consommateurs, qui consomme cette rubrique, contient deux consommateurs. Deux partitions sont affectées à un consommateur du groupe et une partition est affectée à l'autre consommateur.

Diagramme des groupes de consommateurs.
Consumer group example

Lorsque l'une des modifications suivantes a lieu dans un groupe de consommateurs, le groupe effectue un rééquilibrage en déplaçant l'affectation des partitions vers les membres du groupe afin de prendre en charge la modification :

  • Un consommateur rejoint le groupe.
  • Un consommateur quitte le groupe.
  • Un consommateur est considéré comme n'étant plus en production par le coordinateur.
  • De nouvelles partitions sont ajoutées à une rubrique existante.

Si vous avez un groupe de consommateurs qui est rééquilibré, sachez que tout consommateur qui a quitté le groupe voit ses engagements rejetés jusqu'à ce qu'il rejoigne le groupe. Dans ce cas, le consommateur doit rejoindre le groupe, où il lui sera affecté une autre partition que celle à partir de laquelle il consommait précédemment.

Continuité des consommateurs

Kafka détecte automatiquement les consommateurs en échec de manière à réaffecter les partitions aux consommateurs actifs. Il utilise deux mécanismes : l'interrogation et la pulsation.

Si le lot de messages renvoyés par Consumer.poll(...) est important ou si le traitement prend du temps, le délai avant que poll() ne soit appelé à nouveau peut être important ou imprévisible. Dans certains cas, il est nécessaire de configurer un long intervalle maximum entre les interrogations pour que des consommateurs ne soient pas supprimés de leurs groupes uniquement parce que le traitement des messages prend du temps. Si ce mécanisme est le seul disponible, le temps nécessaire à la détection d'un consommateur défaillant est également long.

Afin de faciliter la gestion de la continuité des consommateurs, un mécanisme de signal de présence en arrière-plan a été ajouté dans Kafka 0.10.1. Le coordinateur des groupes attend des membres des groupes qu'ils envoient régulièrement un signal de présence pour indiquer qu'ils sont toujours actifs. Une unité d'exécution de signal de présence en arrière-plan s'exécute dans le consommateur et envoie des signaux de présence réguliers au coordinateur. Si le coordinateur ne reçoit pas de signal de présence d'un membre de groupe dans délai d'attente de session, le coordinateur retire le membre du groupe et démarre un rééquilibrage du groupe. Le délai d'attente de la session peut être beaucoup plus court que l'intervalle d'interrogation maximal, de sorte que le temps nécessaire à la détection d'un consommateur défaillant peut être court, même si le traitement des messages prend beaucoup de temps.

Vous pouvez configurer l'intervalle d'interrogation maximal avec la propriété max.poll.interval.ms et le délai d'attente de session avec la propriété session.timeout.ms. Vous n'avez pas besoin d'utiliser ces paramètres, sauf s'il faut plus de 5 minutes pour traiter un lot de messages.

Gestion des positions

Pour chaque groupe de consommateurs, Kafka maintient le décalage validé pour chaque partition consommée. Lorsqu'un consommateur traite un message, ce dernier n'est pas supprimé de la partition. Au lieu de cela, il met simplement à jour son décalage actuel à l'aide d'un processus appelé validation du décalage.

Event Streams conserve les informations de validation de position pendant 7 jours.

Que se passe-t-il s'il n'existe aucun décalage validé existant?

Lorsqu'un consommateur démarre et qu'il est affecté à une partition à consommer, il commence au niveau de décalage validé de son groupe. S'il n'y a pas de décalage existant, le consommateur peut choisir de commencer par le message disponible le plus ancien ou le plus récent, en fonction des éléments suivants de la propriété auto.offset.reset, comme suit :

  • latest (valeur par défaut) : votre consommateur reçoit et consomme uniquement les messages qui arrivent après votre abonnement. Votre consommateur n'a aucune connaissance des messages qui ont été envoyés avant son abonnement. Par conséquent, ne vous attendez pas à ce que tous les messages d'une rubrique soient consommés.
  • earliest: votre consommateur consomme tous les messages depuis le début.

Si un consommateur échoue après avoir traité un message, mais avant de valider son décalage, les informations de décalage validées ne reflètent pas le traitement du message. Cela signifie que le message est à nouveau traité par le destinataire suivant de ce groupe à affecter à la partition.

Une fois les positions validées et enregistrées dans Kafka, lorsque les consommateurs redémarrent, ils reprennent au point où ils s'étaient précédemment arrêtés. Lorsqu'il existe un décalage engagé, la propriété auto.offset.reset n'est pas utilisée.

Validation automatique des positions

La méthode la plus simple pour valider les décalages consiste à faire en sorte que le consommateur Kafka le fasse automatiquement. C'est simple, mais cela donne moins de contrôle que l'engagement manuel. Par défaut, les positions d'un consommateur sont validées automatiquement toutes les 5 secondes. Cette validation par défaut s'effectue toutes les 5 secondes, quelle que soit la progression du consommateur dans le traitement des messages. En outre, lorsque le consommateur appelle poll(), le dernier décalage renvoyé par l'appel précédent à poll() est également validé (car il est supposé que les messages précédents ont tous été traités).

Si le décalage engagé prend le pas sur le traitement des messages et qu'il existe une défaillance du consommateur, il est possible que certains messages ne soient pas traités. Ceci découle du fait que le traitement redémarre à la position validée, qui est postérieure au dernier message à traiter avant l'échec. C'est pourquoi, si la fiabilité est plus importante que la simplicité, il est généralement préférable de valider les positions manuellement.

Validation manuelle des positions

Si enable.auto.commit est défini sur false, le consommateur valide manuellement ses positions. Il peut effectuer cette opération de manière synchrone ou asynchrone. Un schéma classique consiste à valider la position du dernier message traité sur la base d'un temporisateur périodique. Ce schéma implique que chaque message est traité au moins une fois, mais que la position validée ne supplante jamais la progression des messages en cours de traitement. La fréquence du temporisateur périodique contrôle le nombre de messages pouvant être de nouveau traités suite à un échec de consommateur. Les messages sont de nouveau extraits à partir de la dernière position validée enregistrée lorsque l'application redémarre ou que le groupe se rééquilibre.

La position validée est celle des messages à partir desquels le traitement reprend. Il s'agit généralement de la position du dernier message traité plus un.

Décalage d'un consommateur

Le décalage d'un consommateur pour une partition est la différence entre la position du plus récent message publié et la position validée du consommateur. En d'autres termes, il s'agit de la différence entre le nombre d'enregistrements produits et le nombre d'enregistrements consommés. Même si des variations naturelles sont courantes dans les débits de génération et de consommation, le débit de consommation ne doit pas être inférieur au débit de génération sur une longue période de temps.

Si vous observez qu'un consommateur traite les messages avec succès, mais qu'il arrive parfois qu'il ignore un groupe de messages, cela peut être un signe que le consommateur n'est pas en mesure de suivre le rythme. Pour les rubriques qui n'utilisent pas la compression des journaux, la quantité d'espace occupée par les journaux est gérée en supprimant régulièrement les anciens segments des journaux. Si un consommateur a pris tellement de retard qu'il consomme des messages dans un segment de journal qui a été supprimé, il sautera soudainement vers le début du segment de journal suivant. S'il est essentiel que le consommateur traite tous les messages, ce comportement signale une perte de messages du point de vue de ce consommateur.

Vous pouvez utiliser l'outil kafka-consumer-groups pour indiquer le décalage du consommateur. Pour ce faire, vous pouvez également utiliser l'API consommateur et les mesures du consommateur.

Contrôle de la vitesse de consommation des messages

Si vous rencontrez des problèmes de gestion des messages dus à l'afflux de messages, vous pouvez définir une option de consommation pour contrôler la vitesse de consommation des messages. Utilisez fetch.max.bytes et max.poll.records pour contrôler la quantité de données qu'un appel de poll() peut renvoyer.

Gestion du rééquilibrage des consommateurs

Lorsque des consommateurs sont ajoutés à un groupe ou en sont retirés, un rééquilibrage du groupe a lieu et les consommateurs ne sont pas en mesure de consommer des messages. Il en résulte que tous les consommateurs d'un groupe de consommateurs sont indisponibles pendant une courte période.

Si vous êtes informé par le rappel "on partitions revoked", utilisez un ConsumerRebalanceListener pour valider manuellement les décalages (si vous n'utilisez pas l'auto-commit) et pour mettre en pause la suite du traitement jusqu'à ce que vous soyez informé de la réussite du rééquilibrage par l'utilisation du rappel "on partition assigned".

Fragments de code

Ces fragments de code sont à un niveau élevé pour illustrer les concepts impliqués. Pour des exemples complets, voir les exemples Event Streams dans GitHub.

Pour connecter un consommateur à Event Streams, vous devez créer des données d'identification de service. Pour plus d'informations sur l'obtention de ces données d'identification, voir Connexion à Event Streams.

Dans le code de consommateur, vous devez d'abord générer l'ensemble des propriétés de configuration. Toutes les connexions à Event Streams sont sécurisées par l'utilisation de TLS et d'une authentification par mot de passe, vous avez donc besoin d'au moins ces propriétés. Remplacez BOOTSTRAP_ENDPOINTS, USER et PASSWORD par les informations d'identification de votre propre service :

Properties props = new Properties();
 props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
 props.put("security.protocol", "SASL_SSL");
 props.put("sasl.mechanism", "PLAIN");
 props.put("ssl.protocol", "TLSv1.2");
 props.put("ssl.enabled.protocols", "TLSv1.2");
 props.put("ssl.endpoint.identification.algorithm", "HTTPS");

Pour consommer les messages, vous devez également spécifier des désérialiseurs pour les clés et les valeurs, comme dans l'exemple suivant.

 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

Ces désérialiseurs doivent correspondre aux sérialiseurs utilisés par les producteurs.

Utilisez ensuite un KafkaConsumer pour consommer des messages, chaque message étant représenté par un ConsumerRecord. La méthode la plus courante pour consommer des messages consiste à placer le consommateur dans un groupe de consommateurs en définissant l'ID de groupe, puis d'appeler subscribe() afin d'obtenir une liste de rubriques. Le consommateur est affecté à certaines partitions à consommer, mais, s'il y a plus de consommateurs dans le groupe que de partitions dans la rubrique, il se peut que le consommateur ne soit affecté à aucune partition. Ensuite, appelez poll() dans une boucle pour réceptionner un lot de messages à traiter, chaque message étant représenté par un ConsumerRecord.

props.put("group.id", "G1");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));  // T1 is the topic name
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
   for (ConsumerRecord<String, String> record : records)
     System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

Cette boucle de consommation s'exécute indéfiniment, mais elle peut être interrompue à partir d'un autre fil d'exécution en appelant Consumer.wakeup() pour obtenir un arrêt en bon ordre.

Pour valider manuellement des positions, vous devez commencer par définir le paramètre de configuration enable.auto.commit sur false. Ensuite, utilisez Consumer.commmitSync() ou Consumer.commitAsync() pour mettre à jour régulièrement la validation des positions des consommateurs. Par souci de simplicité, cet exemple traite les enregistrements de chaque partition et valide la dernière position séparément.

props.put("group.id", "G1");
props.put("enable.auto.commit", "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));
try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (TopicPartition tp : records.partitions()) {
      List<ConsumerRecord<String, String>> partRecords = records.records(tp);
      long lastOffset = 0;
      for (ConsumerRecord<String, String> record : partRecords) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        lastOffset = record.offset();
      }
      // having processed all the records in the above loop, we commit the partition's offset to 1 more than the last offset
      consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastOffset + 1)));
    }
  }
}
finally {
  consumer.close();
}

Gestion des exceptions

Toute application puissante qui utilise le client Kafka a besoin de gérer les exceptions relatives à certaines situations prévisibles. Dans certains cas, les exceptions ne sont pas lancées directement car certaines méthodes sont asynchrones et fournissent leurs résultats à l'aide d'un Future ou d'un rappel. Consultez un exemple de code dans GitHub qui présente des exemples complets.

Traitez la liste d'exceptions suivante dans votre code:

org.apache.kafka.common.errors.WakeupException

Émis par Consumer.poll(...) à la suite de l'appel de Consumer.wakeup(). C'est le moyen standard d'interrompre la boucle d'interrogation du consommateur. La boucle d'interrogation se ferme et Consumer.close() est appelé pour se déconnecter correctement.

org.apache.kafka.common.errors.NotLeaderForPartitionException

Émis en tant que résultat de Producer.send(...) lorsque le leadership d'une partition change. Le client actualise automatiquement ses métadonnées afin de trouver les information à jour concernant le responsable. Relancez l'opération qui aboutit avec les métadonnées mises à jour.

org.apache.kafka.common.errors.CommitFailedException

Émis en tant que résultat de Consumer.commitSync(...) lorsqu'une erreur irrémédiable se produit. Dans certains cas, il n'est pas possible de répéter l'opération parce que l'affectation des partitions a changé et que le consommateur n'est plus en mesure d'engager ses décalages. Étant donné que Consumer.commitSync(...) peut réussir partiellement lorsqu'il est utilisé avec plusieurs partitions dans un seul appel, la récupération d'erreurs peut être simplifiée à l'aide d'un appel Consumer.commitSync(...) distinct pour chaque partition.

org.apache.kafka.common.errors.TimeoutException

Émis par Producer.send(...), Consumer.listTopics() si les métadonnées ne peuvent pas être récupérées. Cette exception est également visible dans le rappel d'envoi (ou le Future renvoyé) lorsque l'accusé de réception demandé n'est pas retourné dans request.timeout.ms. Le client peut relancer l'opération, mais l'effet d'une opération répétée dépend de l'opération proprement dite. Par exemple, si l'envoi d'un message est retenté, le message peut être dupliqué.