IBM Cloud Docs
使用消息

使用消息

使用者是使用 Kafka 主题中消息流的应用程序。 使用者可以预订一个或多个主题或分区。 本信息主要介绍作为Apache Kafka项目一部分的Java™编程接口。 这些概念也适用于其他语言,但名称有时会略有不同。

使用者连接到 Kafka 时,会建立初始引导程序连接。 此连接可以是与集群中任一服务器的连接。 使用者要使用主题中的消息时,会请求有关该主题的分区和领导权信息。 然后,消费者与分区领导者建立另一个连接,并可以消费信息。 使用者连接到 Kafka 集群时,这些操作会在内部自动执行。

使用者通常是一个长时间运行的应用程序。 消费者通过定期调用 Consumer.poll(...) 从Kafka请求消息。 使用者调用 poll(),接收一批消息,对其及时进行处理,然后再次调用 poll()

使用者处理消息后,不会从其主题中除去该消息。 相反,消费者可以选择多种方式通知Kafka哪些消息已被处理。 此过程称为落实偏移量。

在程序界面中,一条信息被称为一条记录。 例如,对于使用者 API,Java 类 org.apache.kafka.clients.consumer.ConsumerRecord 用于表示消息。 _记录_和_信息_这两个术语可以互换使用,但记录基本上是用来表示信息的。

您可能会发现在 Event Streams中阅读此信息以及 生成消息 很有用。

配置使用者属性

对于控制其行为方面的使用者,存在许多配置设置。 以下设置是一些最重要的设置。

配置使用者属性
名称 描述 有效值 缺省
key.deserializer 用于对键进行反序列化的类。 实现 Deserializer 接口的Java类,如org.apache.kafka.common.serialization.StringDeserializer。 无缺省值 - 必须指定值
value.deserializer 用于对值进行反序列化的类。 实现 Deserializer 接口的Java类,如org.apache.kafka.common.serialization.StringDeserializer。 无默认值 - 您必须指定一个值。
group.id 使用者所属的使用者组的标识。 字符串 无缺省值
auto.offset.reset 使用者没有初始偏移量或当前偏移量在集群中不再可用时的行为。 latest, earliest, none 最新
enable.auto.commit 确定是否在后台自动落实使用者的偏移量。 true,false True
auto.commit.interval.ms 定期落实偏移量的间隔毫秒数。 0,... 5000(5 秒)
max.poll.records 调用 poll() 时返回的最大记录数。 1,... 500
session.timeout.ms 毫秒数,在此时间内必须接收到使用者脉动信号才能保持使用者在使用者组中的成员资格。 6000-300000 10000(10 秒)
max.poll.interval.ms 使用者离开组之前轮询的最大时间间隔。 1,... 300000(5 分钟)

提供了许多其他配置设置,但请先阅读 Apache Kafka 文档,然后再开始对其进行试验。

使用者组

_消费者群组_是指合作消费一个或多个主题信息的消费者群组。 组中的使用者全部使用相同的 group.id 配置值。 如果您需要多个使用者来处理工作负载,可以在同一使用者组中运行多个使用者。 即使只需要一个消费者,通常也要为 group.id 指定一个值。

每个消费者群组都有一台服务器,称为_协调器_,负责为群组中的消费者分配分区。 这项责任会分布在集群中的各个服务器上,以均衡负载。 每次进行组重新均衡时,为使用者分配的分区都会更改。

使用者加入使用者组时,会发现该组的协调者。 然后,消费者告诉协调器它想加入组,协调器开始重新平衡整个组的分区,将新成员纳入其中。

来自单个分区的消息仅由每个组中的一个使用者进行处理。 这种机制可确保每个分区上的报文按顺序处理。 请参阅下图以获取一个示例,其中主题包含三个分区,而使用该主题的使用者组包含两个使用者。 为组中的一个使用者分配了两个分区,为另一个使用者分配了一个分区。

使用者组图。
Consumer group example

当用户组发生以下变化之一时,该组会通过向组成员分配分区来重新平衡,以适应变化:

  • 一名消费者加入小组。
  • 消费者离开小组。
  • 消费者被协调员视为不再存活。
  • 在现有主题中添加新分区。

如果你有一个重新平衡的消费者群组,请注意,任何离开群组的消费者在重新加入群组之前,其提交都会被拒绝。 在这种情况下,使用者需要重新加入该组,在该组中为其分配的分区可能会与先前使用的不同。

使用者活动性

Kafka 会自动检测失败的使用者,以便可以将分区重新分配给有效的使用者。 它使用两种机制: 轮询和脉动信号。

如果从 Consumer.poll(...) 返回的消息批处理较大或处理耗时,那么再次调用 poll() 之前的延迟可能很重要或不可预测。 在某些情况下,有必要配置较长的最大轮询时间间隔,这样就不会因为消息处理需要一段时间而将使用者从组中除去。 如果此机制是唯一可用的机制,那么检测失败的使用者所需的时间也很长。

为了更轻松地处理使用者活动性问题,Kafka 0.10.1 中添加了后台脉动信号发送功能。 组协调者期望组成员定期向其发送脉动信号,以指明自己仍然处于活动状态。 消费者中运行一个后台心跳线程,定期向协调者发送心跳。 如果协调器在_会话超时时间_内没有收到群成员的心跳,协调器就会将该成员从群中删除,并开始重新平衡群。 会话超时可以比最大轮询间隔短得多,这样,即使报文处理需要很长时间,检测到消费者失败所需的时间也可以很短。

您可以使用 max.poll.interval.ms 属性配置最大轮询间隔,使用 session.timeout.ms 属性配置会话超时。 除非处理一批邮件需要 5 分钟以上,否则不需要使用这些设置。

管理偏移量

对于每个消费者组,Kafka会为每个被消费的分区维护已提交偏移量。 使用者处理消息后,不会从分区中除去该消息。 相反,它只是通过一个称为提交偏移量的过程来更新当前偏移量。

Event Streams 会将已落实偏移量信息保留 7 天。

如果不存在现有已落实的偏移量,该怎么办?

当消费者启动并被分配到一个分区进行消费时,它会从其组的承诺偏移量开始。 如果不存在已提交的偏移量,消费者可根据以下 * 属性的设置,选择从最早的还是最新的可用报文开始处理 属性的设置 auto.offset.reset 选择从最早的还是最新的可用信息开始,如下所示:

  • latest (缺省值): 使用者仅接收和使用预订后到达的消息。 您的消费者不知道订阅之前发送的信息,因此不要指望所有信息都从主题中消费。
  • earliest: 使用者从头开始使用所有消息。

如果消费者在处理报文后但在提交偏移量前发生故障,则提交的偏移量信息不会反映报文的处理情况。 这意味着信息将由该组中下一个被分配分区的消费者再次处理。

已落实偏移量保存在 Kafka 中,并且使用者重新启动后,使用者会从其上次停止的点恢复。 如果存在承诺偏移,则不使用 auto.offset.reset 属性。

自动落实偏移量

提交偏移量的最简单方法是让Kafka消费者自动执行。 这很简单,但与手动输入相比,它的控制能力确实较弱。 缺省情况下,使用者每 5 秒自动落实一次偏移量。 此缺省落实每 5 秒钟执行一次,与使用者处理消息的进度无关。 此外,当消费者调用 poll() 时,也会导致上一次调用 poll() 所返回的最新偏移量被提交(因为假定之前的信息已全部处理完毕)。

如果已提交的偏移量超过了报文的处理量,而消费者又出现故障,就有可能导致某些报文未被处理。 这是因为处理会在已落实偏移量处重新开始,此点晚于失败之前处理的最后一个消息。 为此,如果可靠性比简单更重要,那么通常最好是手动落实偏移量。

手动落实偏移量

如果 enable.auto.commit 设置为 false,那么使用者会手动落实其偏移量。 使用者可以手动同步或异步落实其偏移量。 常用模式是基于定期计时器落实最新处理的消息的偏移量。 此模式意味着每个消息至少处理一次,但是已落实偏移量绝不会超过当前正在处理的消息进度。 定期计时器的频率可控制使用者失败后可以重新处理的消息数量。 应用程序重新启动或组重新均衡时,会再次从上次保存的已落实偏移量开始检索消息。

已落实偏移量是指从此位置恢复处理的消息的偏移量。 此偏移量通常为最近处理的消息加一

使用者滞后

分区的使用者滞后是指最近发布的消息的偏移量与使用者的已落实偏移量之间的差异。 换句话说,就是已产生的记录数与已消耗的记录数之差。 虽然通常生产速率和使用速率中会有自然的变化,但是在较长的时间段内,使用速率不应该比生产速率慢。

如果你发现消费者正在成功处理信息,但偶尔会出现跳过一组信息的情况,这可能是消费者跟不上的迹象。 对于不使用日志压缩的主题,可通过定期删除旧日志段来管理日志空间量。 如果消耗者落后太多,以至于正在消耗已删除日志段中的信息,它就会突然跳转到下一个日志段的起点。 如果使用者有必要处理所有消息,那么从这个使用者的角度来说,此行为指示消息丢失。

您可以使用 kafka-consumer-groups 工具查看消费者滞后情况。 还可以使用使用者 API 和使用者度量值来达到相同目的。

控制消息使用速度

如果由于信息泛滥导致信息处理出现问题,可以设置消费者选项来控制信息消耗的速度。 使用 fetch.max.bytesmax.poll.records 可控制调用一次 poll() 可以返回多少数据。

处理使用者重新均衡

当消费者添加到群组或从群组中删除时,群组会重新平衡,消费者无法消费信息。 这将导致消费者群体中的所有消费者在短时间内无法使用。

如果收到“分区已撤销”回调通知,请使用ConsumerRebalanceListener手动提交偏移量(如果不使用自动提交),并暂停进一步处理,直到收到“分区已分配”回调的成功重新平衡通知。

代码片段

这些代码片段都是高层次的,用于说明相关概念。 有关完整示例,请参阅 GitHub中的 Event Streams 样本。

要将使用者连接到 Event Streams,您需要创建服务凭证。 有关如何获取这些凭证的信息,请参阅 连接到 Event Streams

在使用者代码中,首先需要构建配置属性集。 Event Streams的所有连接都使用 TLS 和用户密码验证,因此至少需要这些属性。 将 BOOTSTRAP_ENDPOINTS、USER 和 PASSWORD 替换为您自己服务证书中的内容:

Properties props = new Properties();
 props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
 props.put("security.protocol", "SASL_SSL");
 props.put("sasl.mechanism", "PLAIN");
 props.put("ssl.protocol", "TLSv1.2");
 props.put("ssl.enabled.protocols", "TLSv1.2");
 props.put("ssl.endpoint.identification.algorithm", "HTTPS");

要使用消息,还需要为键和值指定反序列化器,如以下示例中所示。

 props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
 props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

这些反序列化器必须与生产者使用的序列化器匹配。

然后,通过 KafkaConsumer 来使用消息,其中每条消息由一个 ConsumerRecord 表示。 使用消息最常用的方法是通过设置组标识,将使用者放入使用者组中,然后调用 subscribe() 来获取主题列表。 消费者会被分配到一些分区进行消费,但如果组中的消费者多于主题中的分区,消费者可能不会被分配到任何分区。 接下来,在一个循环中调用 poll(),接收一批要处理的消息,其中每条消息由一个 ConsumerRecord 表示。

props.put("group.id", "G1");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));  // T1 is the topic name
while (true) {
   ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
   for (ConsumerRecord<String, String> record : records)
     System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

这个消费者循环将永远运行,但可以通过调用 Consumer.wakeup() 从另一个线程中断,从而实现整洁的关闭。

要手动落实偏移量,首先必须将 enable.auto.commit 配置设置为 false。 然后,使用 Consumer.commmitSync()Consumer.commitAsync() 定期更新使用者的已落实偏移量。 为了简单起见,此示例处理每个分区的记录并分别落实最后一个偏移量。

props.put("group.id", "G1");
props.put("enable.auto.commit", "false");
Consumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("T1"));
try {
  while (true) {
    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
    for (TopicPartition tp : records.partitions()) {
      List<ConsumerRecord<String, String>> partRecords = records.records(tp);
      long lastOffset = 0;
      for (ConsumerRecord<String, String> record : partRecords) {
        System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
        lastOffset = record.offset();
      }
      // having processed all the records in the above loop, we commit the partition's offset to 1 more than the last offset
      consumer.commitSync(Collections.singletonMap(tp, new OffsetAndMetadata(lastOffset + 1)));
    }
  }
}
finally {
  consumer.close();
}

异常处理

任何使用 Kafka 客户机的稳健应用程序都需要处理某些预期情境的异常。 在某些情况下,异常不会被直接抛出,因为有些方法是异步的,会通过 Future 或回调来传递结果。 请查看 GitHub 中显示完整示例的示例代码。

处理代码中的以下异常列表:

org.apache.kafka.common.errors.WakeupException

由于调用了 Consumer.wakeup() 而由 Consumer.poll(...) 抛出。 这是中断消费者轮询循环的标准方法。 轮询循环退出,并调用 Consumer.close() 以完全断开连接。

org.apache.kafka.common.errors.NotLeaderForPartitionException

当分区的领导权发生更改时作为 Producer.send(...) 的结果抛出。 客户机会自动刷新其元数据,以查找最新的领导者信息。 使用更新后的元数据重试成功的操作。

org.apache.kafka.common.errors.CommitFailedException

发生不可恢复错误时由于 Consumer.commitSync(...) 而抛出。 在某些情况下,无法重复操作,因为分区分配已发生变化,消费者无法再提交其偏移量。 由于在一次调用中对多个分区使用 Consumer.commitSync(...) 时可能会部分成功,因此对每个分区使用单独的 Consumer.commitSync(...) 调用可以简化错误恢复。

org.apache.kafka.common.errors.TimeoutException

如果无法检索元数据,那么由 Producer.send(...), Consumer.listTopics() 抛出。 请求的确认未在 request.timeout.ms 内返回时,在 send 回调(或返回的 Future)中也会看到此异常。 客户机可以重试操作,但重复操作的结果取决于具体操作。 例如,如果重试发送信息,则信息可能被重复发送。