Using Kafka console tools with Event Streams

Gen 2

Apache Kafka comes with various console tools for simple administration and messaging operations. You can use many of them with Event Streams, although Event Streams does not permit connection to its KRaft cluster.

Download the console tools

The console tools are distributed as part of the Kafka binary download and can be downloaded from the Apache Kafka downloads. The most recent supported release is recommended.

For example, to download version 4.1.1 to a Linux machine:

$ wget https://downloads.apache.org/kafka/4.1.1/kafka_2.13-4.1.1.tgz

Extract what you’ve downloaded and change directory:

$ tar -xzf kafka_2.13-4.1.1.tgz
$ cd kafka_2.13-4.1.1/

These console tools are in the bin directory.

Install Java

Install java with the following commands:

$ sudo apt update
$ sudo apt install openjdk-17-jdk

Next, you can verify the installation:

$ java -version

Create a client.properties file

Create a properties file with the information needed to connect to your instance.

$ nano client.properties

Copy the following snippet into the client.properties file and edit with your details.

sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="token" password="<API_KEY>";
security.protocol=SASL_SSL
sasl.mechanism=PLAIN

Replace the API_KEY variable in the snippet with the value from your Event Streams Service credentials tab in the IBM Cloud console.

Save and then exit nano.

Topics

To create a topic:

$ ./bin/kafka-topics.sh --create --topic <TOPIC_NAME> --bootstrap-server <BOOTSTRAP_SERVERS> --command-config client.properties --partitions 1 --replication-factor 3 --config retention.ms=604800000

Replace the variables in the example with your own values:

  • BOOTSTRAP_SERVERS with the value from your Event Streams Service credentials tab in the IBM Cloud console.
  • TOPIC_NAME with the name of the topic to be created.

The topics tool can also be used to find out information about your topics and their configuration in an existing service instance.

./bin/kafka-topics.sh --bootstrap-server <BOOTSTRAP_SERVERS> --command-config client.properties --describe

Topic:sample-topic	PartitionCount:3	ReplicationFactor:3	 Configs:min.insync.replicas=2,unclean.leader.election.enable=true,retention.bytes=1073741824,segment.bytes=536870912,retention.ms=86400000
    Topic: sample-topic    Partition: 0    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: sample-topic    Partition: 1    Leader: 1    Replicas: 1,2,0	   Isr: 0,2,1
    Topic: sample-topic    Partition: 2    Leader: 2    Replicas: 2,1,0	   Isr: 0,2,1
Topic:testtopic	 PartitionCount:1	 ReplicationFactor:3	Configs:min.insync.replicas=2,unclean.leader.election.enable=true,retention.bytes=1073741824,segment.bytes=536870912,retention.ms=86400000
    Topic: testtopic    Partition: 0    Leader: 0    Replicas: 0,2,1   Isr: 0,2

From the sample, you can see that topic sample-topic has three partitions and a replication factor of three. The example also shows which broker the leader of each partitions is on and which replicas are in sync (Isr). For example, the leader of partition 0 is on broker 0, the followers are on brokers 2 and 1 and all three replicas are in sync. If you look at the second topic testtopic, it has only one partition, which is replicated on brokers 0, 2, and 1 but the in-sync replica list shows only 0 and 2. This means that the follower on broker 1 is falling behind and is therefore not in the Isr list.

Console producer

To produce a message to a topic, run the following command. A prompt will be shown where some text can be entered and sent by pressing return. To quit, type + C.

$ ./bin/kafka-console-producer.sh --topic <TOPIC_NAME> --bootstrap-server <BOOTSTRAP_SERVERS> --producer.config client.properties

Replace the variables in the example with your own values:

  • BOOTSTRAP_SERVERS with the value from your Event Streams Service credentials tab in the IBM Cloud console.
  • TOPIC_NAME with the name of the topic to be created.

Console consumer

To consume a message from a topic, run the following. To quit, type + C. Note that the --from-beginning option will consume all messages on the topic. If this option is omitted, only messages produced after the consumer was run will be retrieved.

$ ./bin/kafka-console-consumer.sh --topic <TOPIC_NAME> --bootstrap-server <BOOTSTRAP_SERVERS> --consumer.config client.properties --from-beginning

Replace the variables in the example with your own values:

  • BOOTSTRAP_SERVERS with the value from your Event Streams Service credentials tab in the IBM Cloud console.
  • TOPIC_NAME with the name of the topic to be created.

Consumer groups

You can use the Kafka consumer groups tool with Event Streams.

$ ./bin/kafka-consumer-groups.sh --bootstrap-server <BOOTSTRAP_SERVERS> --command-config client.properties --list --timeout 60000

Replace the BOOTSTRAP_SERVERS variable in the example with the value from your Event Streams Service credentials tab in the IBM Cloud console.

Using this tool, you can also display details, such as the current positions of the consumers, their lag, and client-id for each partition for a group. For example:

$ kafka-consumer-groups.sh --bootstrap-server <BOOTSTRAP_SERVERS> --command-config client.properties --describe --group <GROUP> --timeout 60000

Replace GROUP in the example with the group name that you want to retrieve details for.

See the following sample output from running the kafka-consumer-groups tool:

GROUP              TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG         CONSUMER-ID      HOST          CLIENT-ID
consumer-group-1   foo        0          264             267            3          client-1-abc    example.com    client-1
consumer-group-1   foo        1          124             124            0          client-1-abc    example.com    client-1
consumer-group-1   foo        2          212             212            0          client-2-def    example.com    client-2

From the example, you can see that consumer group consumer-group-1 has two consumer members that consume messages from topic foo with three partitions. It also shows that the consumer client-1-abc that is consuming from partition 0 is three messages behind because the current offset of the consumer is 264 but the offset of the last message on partition 0 is 267.

Kafka Streams reset

You can use this tool with Event Streams to reset the processing state of a Kafka Streams application, so you can reprocess its input from scratch. Before you run this tool, ensure that your Streams application is fully stopped.

For example:

./bin/kafka-streams-application-reset.sh --bootstrap-servers <BOOTSTRAP_SERVERS> --config-file client.properties --application-id <APP_ID>

Replace the following variables in the example with your own values:

  • BOOTSTRAP_SERVERS with the value from your Event Streams Service credentials tab in the IBM Cloud console.
  • APP_ID with your Streams application ID.