Utilisation des outils de la console Kafka avec Event Streams
Apache Kafka est fourni avec divers outils de console pour des opérations d'administration et de messagerie simples. Vous pouvez utiliser plusieurs d'entre eux avec Event Streams, bien que Event Streams ne permette pas la connexion à son cluster ZooKeeper. Au fur et à mesure du développement de Kafka, la plupart des outils qui nécessitaient auparavant une connexion à ZooKeeper n'ont plus cette exigence.
Ces outils de console se trouvent dans le répertoire bin
de votre téléchargement Kafka. Vous pouvez télécharger un client à partir des téléchargementsApache Kafka.
Pour fournir les données d'identification SASL à ces outils, créez un fichier de propriétés à l'image de l'exemple suivant :
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS
Remplacez USER et PASSWORD par les valeurs issues de l'onglet Données d'identification pour le service de Event Streams dans la console IBM Cloud.
Fournisseur de console
Vous pouvez utiliser l'outil Fournisseur de console Kafka avec Event Streams. Vous devez fournir une liste des données d'identification SASL et des courtiers.
Après avoir créé le fichier de propriétés, vous pouvez exécuter le fournisseur de console dans un terminal comme suit:
kafka-console-producer.sh --broker-list BOOTSTRAP_ENDPOINTS --producer.config CONFIG_FILE --topic TOPIC_NAME
Remplacez les variables de l'exemple par vos propres valeurs :
- BOOTSTRAP_ENDPOINTS avec la valeur de votre onglet Event Streams Données d'identification de service dans la console IBM Cloud.
- CONFIG_FILE par le chemin d'accès au fichier de configuration.
Vous pouvez utiliser de nombreuses autres options de cet outil, à l'exception de celles qui requièrent un accès à ZooKeeper.
Consommateur de console
Vous pouvez utiliser l'outil Consommateur de console Kafka avec Event Streams. Vous devez fournir un serveur d'amorce et des données d'identification SASL.
Après avoir créé le fichier de propriétés comme indiqué précédemment, exécutez le consommateur de console dans un terminal comme suit :
kafka-console-consumer.sh --bootstrap-server BOOTSTRAP_ENDPOINTS --consumer.config CONFIG_FILE --topic TOPIC_NAME
Remplacez les variables de l'exemple par vos propres valeurs :
- BOOTSTRAP_ENDPOINTS avec la valeur de votre onglet Event Streams Données d'identification de service dans la console IBM Cloud.
- CONFIG_FILE par le chemin d'accès au fichier de configuration.
Vous pouvez utiliser de nombreuses autres options de cet outil, à l'exception de celles qui requièrent un accès à ZooKeeper.
Groupes de consommateurs stables
Vous pouvez utiliser l'outil Groupes de consommateurs Kafka avec Event Streams. Etant donné que Event Streams ne permet pas la connexion à son cluster ZooKeeper, certaines options ne sont pas disponibles.
Après avoir créé le fichier de propriétés comme indiqué précédemment, exécutez les outils de groupes de consommateurs dans un terminal. Par exemple, répertorier les groupes de consommateurs comme suit :
kafka-consumer-groups.sh --bootstrap-server BOOTSTRAP_ENDPOINTS --command-config CONFIG_FILE --list --timeout 60000
Remplacez les variables de l'exemple par vos propres valeurs :
- BOOTSTRAP_ENDPOINTS avec la valeur de votre onglet Event Streams Données d'identification de service dans la console IBM Cloud.
- CONFIG_FILE par le chemin d'accès au fichier de configuration.
L'utilisation de cet outil permet également d'afficher des détails tels que les positions actuelles des consommateurs, leur décalage et l'identificateur de client de chaque partition d'un groupe. Exemple :
kafka-consumer-groups.sh --bootstrap-server BOOTSTRAP_ENDPOINTS --command-config CONFIG_FILE --describe --group GROUP --timeout 60000
Remplacez GROUP de l'exemple par le nom du groupe dont vous voulez extraire les détails.
Voici un exemple de sortie de l'exécution de l'outil kafka-consumer-groups:
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
consumer-group-1 foo 0 264 267 3 client-1-abc example.com client-1
consumer-group-1 foo 1 124 124 0 client-1-abc example.com client-1
consumer-group-1 foo 2 212 212 0 client-2-def example.com client-2
Dans l'exemple, vous pouvez voir que le groupe de consommateurs consumer-group-1
comporte deux membres de consommateur qui consomment des messages de la rubrique foo
avec trois partitions. Il indique également que le
consommateur client-1-abc
qui consomme à partir de la partition 0
est en retard de trois messages car le décalage actuel du consommateur est 264
mais le décalage du dernier message sur la partition 0
est 267
.
Rubriques
Vous pouvez utiliser l'outil kafka-topics avec Event Streams. Assurez-vous d'utiliser la V2.8 de l'outil, car celle-ci ne requiert pas l'accès à Zookeeper.
Un scénario dans lequel vous souhaiterez peut-être utiliser kafka-topics consiste à trouver des informations sur vos sujets et leur configuration dans un cluster existant afin de pouvoir les recréer dans un nouveau cluster. Vous pouvez utiliser les informations générées à partir de kafka-topics pour créer les mêmes rubriques nommées dans le nouveau cluster. Pour plus d'informations sur la création de rubriques, voir Utilisation de l'API Kafka d'administration du client Java ou la rubrique sur la commande ibmcloud es topic-create. Vous pouvez aussi utiliser la console IBM Event Streams.
Consultez l'exemple de sortie suivant de l'exécution de l'outil kafka-topics:
bin/kafka-topics.sh --bootstrap-server kafka03-prod01.messagehub.services.us-south.bluemix.net:9093 --command-config vcurr_dal06.properties --describe
Topic:sample-topic PartitionCount:3 ReplicationFactor:3 Configs:min.insync.replicas=2,unclean.leader.election.enable=true,retention.bytes=1073741824,segment.bytes=536870912,retention.ms=86400000
Topic: sample-topic Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
Topic: sample-topic Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 0,2,1
Topic: sample-topic Partition: 2 Leader: 2 Replicas: 2,1,0 Isr: 0,2,1
Topic:testtopic PartitionCount:1 ReplicationFactor:3 Configs:min.insync.replicas=2,unclean.leader.election.enable=true,retention.bytes=1073741824,segment.bytes=536870912,retention.ms=86400000
Topic: testtopic Partition: 0 Leader: 0 Replicas: 0,2,1 Isr: 0,2
Dans l'exemple, vous pouvez voir que la rubrique sample-topic
comporte trois partitions et un facteur de réplication de trois. L'exemple montre également le courtier sur lequel se trouve le leader de chaque partition et les répliques
qui sont synchronisées (Isr
). Par exemple, le leader de la partition 0
se trouve sur le courtier 0
, les suiveurs se trouvent sur les courtiers 2
et 1
et les trois répliques sont
synchronisées. Si vous consultez la deuxième rubrique testtopic
, elle ne comporte qu'une seule partition, qui est répliquée sur les courtiers 0
, 2
et 1
, mais la liste des répliques synchronisées
affiche uniquement 0
et 2
. Cela signifie que le suiveur du courtier 1
est en retard, et n'est donc pas dans la liste Isr
.
Réinitialisation de Kafka Streams
Vous pouvez utiliser cet outil avec Event Streams pour réinitialiser l'état de traitement d'une application Kafka Streams, afin de traiter de nouveau son entrée à partir de zéro. Avant d'exécuter cet outil, vérifiez que votre application Streams est totalement arrêtée.
Exemple :
kafka-streams-application-reset.sh --bootstrap-servers BOOTSTRAP_ENDPOINTS --config-file CONFIG_FILE --application-id APP_ID
Remplacez les variables de l'exemple par vos propres valeurs :
- BOOTSTRAP_ENDPOINTS avec la valeur de votre onglet Event Streams Données d'identification de service dans la console IBM Cloud.
- CONFIG_FILE par le chemin d'accès au fichier de configuration.
- APP_ID par l'ID de votre application Streams.