IBM Cloud Docs
将 Kafka 控制台工具与 Event Streams 配合使用

将 Kafka 控制台工具与 Event Streams 配合使用

Apache Kafka 随附各种控制台工具,用于简单的管理和消息传递操作。 虽然 Event Streams 不允许连接到其 ZooKeeper 集群,但是您仍可以将其中的许多工具与 Event Streams 配合使用。 随着 Kafka 的开发,先前需要连接到 ZooKeeper 的许多工具不再具有该需求。

这些控制台工具位于 Kafka 下载的 bin 目录中。 您可以从 Apache Kafka 下载下载客户机。

要向这些工具提供 SASL 凭证,请基于以下示例创建属性文件:

    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="USER" password="PASSWORD";
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    ssl.protocol=TLSv1.2
    ssl.enabled.protocols=TLSv1.2
    ssl.endpoint.identification.algorithm=HTTPS

将 USER 和 PASSWORD 替换为 Event Streams 控制台的 IBM Cloud 服务凭证选项卡中的值。

控制台生产者

您可以将 Kafka 控制台生产者工具与 Event Streams 配合使用。 您必须提供代理程序和 SASL 凭证的列表。

创建属性文件后,可以在终端中运行控制台生产者,如下所示:

   kafka-console-producer.sh --broker-list BOOTSTRAP_ENDPOINTS --producer.config CONFIG_FILE --topic TOPIC_NAME

将示例中的以下变量替换为您自己的值:

  • 具有来自 IBM Cloud 控制台中 Event Streams 服务凭证 选项卡的值的 BOOTSTRAP_ENDPOINTS。
  • 将 CONFIG_FILE 替换为配置文件的路径。

您可以使用此工具的许多其他选项,但需要访问 ZooKeeper的选项除外。

控制台使用者

您可以将 Kafka 控制台使用者工具与 Event Streams 配合使用。 您必须提供引导程序服务器和 SASL 凭证。

如前所述创建属性文件后,在终端中运行控制台使用者,如下所示:

   kafka-console-consumer.sh --bootstrap-server BOOTSTRAP_ENDPOINTS --consumer.config CONFIG_FILE --topic TOPIC_NAME

将示例中的以下变量替换为您自己的值:

  • 具有来自 IBM Cloud 控制台中 Event Streams 服务凭证 选项卡的值的 BOOTSTRAP_ENDPOINTS。
  • 将 CONFIG_FILE 替换为配置文件的路径。

您可以使用此工具的许多其他选项,但需要访问 ZooKeeper的选项除外。

使用者组

您可以将 Kafka 使用者组工具与 Event Streams 配合使用。 由于 Event Streams 不允许连接到其 ZooKeeper 集群,所以有一些选项不可用。

如前所述创建属性文件后,在终端中运行使用者组工具。 例如,您可以列出使用者组,如下所示:

   kafka-consumer-groups.sh --bootstrap-server BOOTSTRAP_ENDPOINTS --command-config CONFIG_FILE --list --timeout 60000

将示例中的以下变量替换为您自己的值:

  • 具有来自 IBM Cloud 控制台中 Event Streams 服务凭证 选项卡的值的 BOOTSTRAP_ENDPOINTS。
  • 将 CONFIG_FILE 替换为配置文件的路径。

使用此工具,您还可以显示详细信息,如使用者的当前位置、其滞后和组的每个分区的客户机标识。 例如:

   kafka-consumer-groups.sh --bootstrap-server BOOTSTRAP_ENDPOINTS --command-config CONFIG_FILE --describe --group GROUP --timeout 60000

将示例中的 GROUP 替换为要检索其详细信息的组名。

请参阅运行 kafka-consumer-groups 工具的以下样本输出:

GROUP              TOPIC    PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG         CONSUMER-ID      HOST          CLIENT-ID
consumer-group-1   foo        0          264             267            3          client-1-abc    example.com    client-1
consumer-group-1   foo        1          124             124            0          client-1-abc    example.com    client-1
consumer-group-1   foo        2          212             212            0          client-2-def    example.com    client-2

从该示例中,您可以看到使用者组 consumer-group-1 具有两个使用者成员,这些成员使用来自主题 foo 的具有三个分区的消息。 它还显示从分区 0 使用的使用者 client-1-abc 有三条消息延迟,因为使用者的当前偏移量为 264,但分区 0 上最后一条消息的偏移量为 267

主题

您可以将 kafka-topics 工具与 Event Streams 配合使用。 请确保使用该工具的 V2.8,因为它不需要 Zookeeper 访问权。

您可能希望使用 kafka-主题 的场景是在现有集群中查找有关主题及其配置的信息,以便可以在新集群中重新创建这些主题。 您可以使用从 kafka-topics 输出的信息,在新集群中创建名称相同的主题。 有关如何创建主题的更多信息,请参阅使用管理 Kafka Java 客户机 APIibmcloud es topic-create 命令。 或者,也可以使用 IBM Event Streams 控制台。

请参阅运行 kafka-topics 工具的以下样本输出:

   bin/kafka-topics.sh --bootstrap-server kafka03-prod01.messagehub.services.us-south.bluemix.net:9093 --command-config vcurr_dal06.properties --describe

Topic:sample-topic	PartitionCount:3	ReplicationFactor:3	 Configs:min.insync.replicas=2,unclean.leader.election.enable=true,retention.bytes=1073741824,segment.bytes=536870912,retention.ms=86400000
    Topic: sample-topic    Partition: 0    Leader: 0    Replicas: 0,2,1    Isr: 0,2,1
    Topic: sample-topic    Partition: 1    Leader: 1    Replicas: 1,2,0	   Isr: 0,2,1
    Topic: sample-topic    Partition: 2    Leader: 2    Replicas: 2,1,0	   Isr: 0,2,1
Topic:testtopic	 PartitionCount:1	 ReplicationFactor:3	Configs:min.insync.replicas=2,unclean.leader.election.enable=true,retention.bytes=1073741824,segment.bytes=536870912,retention.ms=86400000
    Topic: testtopic    Partition: 0    Leader: 0    Replicas: 0,2,1   Isr: 0,2

从样本中,您可以看到主题 sample-topic 具有三个分区和三个复制因子。 此示例还显示了每个分区的引导者所在的代理程序以及同步的副本 (Isr)。例如,分区 0 的引导者位于代理 0 上,追随者位于代理 21 上,并且所有三个副本都同步。 如果查看第二个主题 testtopic,那么它只有一个在代理 021 上复制的分区,但同步副本列表仅显示 02。 这意味着代理 1 上的跟随者落后,因此不在 Isr 列表中。

Kafka Streams 重置

您可以将此工具与 Event Streams 配合使用,来重置 Kafka Streams 应用程序的处理状态,以便您可以从头开始重新处理其输入。 在运行此工具之前,请确保 Streams 应用程序已完全停止。

例如:

   kafka-streams-application-reset.sh --bootstrap-servers BOOTSTRAP_ENDPOINTS --config-file CONFIG_FILE --application-id APP_ID

将示例中的以下变量替换为您自己的值:

  • 具有来自 IBM Cloud 控制台中 Event Streams 服务凭证 选项卡的值的 BOOTSTRAP_ENDPOINTS。
  • 将 CONFIG_FILE 替换为配置文件的路径。
  • 将 APP_ID 替换为 Streams 应用程序标识。