IBM Cloud Docs
生成消息

生成消息

生产者是用于将消息流发布到 Kafka 主题的应用程序。 此处的信息主要针对 Java 编程接口;Java 编程接口是 Apache Kafka 项目的一部分。 这些概念也适用于其他语言,但名称有时会略有不同。

在程序界面中,一条信息被称为一条记录。 例如,从生产者 API 的角度来看,Java 类 org.apache.kafka.clients.producer.ProducerRecord 用于表示消息。 _记录_和_信息_这两个术语可以互换使用,但记录基本上是用来表示信息的。

生产者连接到 Kafka 时,会建立初始引导程序连接。 此连接可以是与集群中任一服务器的连接。 生产者要将消息发布到主题时,会请求该主题的分区和领导权信息。 然后,生产者与分区领导者建立另一个连接,并发布信息。 生产者连接到 Kafka 集群时,这些操作会在内部自动执行。

为确保可用性,Kafka 代理程序会复制消息,以便在一个代理程序不可用时,其他代理程序仍可接收来自生产者的消息并将其发送给使用者。Event Streams 使用复制因子 3,这意味着每条消息都存储在三个代理上。 消息发送到分区领导者时,该消息不会立即可供使用者使用。 领导者会将消息的记录附加到分区,并为其分配该分区的下一个偏移量数字。 在所有同步复制的跟随者复制了记录并确认已将记录写入其复制后,该记录就已提交,可供消费者使用。

每条信息都以记录的形式表示,记录由两部分组成:键和值。 键通常用于表示消息相关数据,值是消息的主体。 由于Kafka生态系统中的许多工具(如连接其他系统的连接器)只使用值而忽略键,因此最好将所有消息数据放在值中,而将键用于分区或日志压缩。 不要依赖从Kafka读取的所有内容来使用密钥。

其他许多消息传递系统也可以随消息一起传递其他信息。 为此,Kafka 0.11版引入了记录头。

您可能会发现在 Event Streams中随 使用消息 一起阅读此信息很有用。

配置设置

生产者存在许多配置设置。 您可以控制生产者的各个方面,包括批处理、重试和消息确认。 下面是最重要的方面:

生产者配置设置
名称 描述 有效值 缺省
key.serializer 用于对键进行序列化的类。 实现 Serializer 接口的Java类,如org.apache.kafka.common.serialization.StringSerializer。 无默认值 - 您必须指定一个值。
value.serializer 用于对值进行序列化的类。 实现 Serializer 接口的Java类,如org.apache.kafka.common.serialization.StringSerializer。 无默认值 - 您必须指定一个值。
acks 确认发布的每条消息所需的服务器数。 这可控制生产者需要的耐久性保证。 0,1,all(或 -1) all (Kafka 3.0 和更高版本) 1 (低于 Kafka 3.0)
retries 发送遇到错误时,客户机重新发送消息的次数。 0,... 0
max.block.ms 发送或元数据请求可以阻止等待的毫秒数。 0,... 60000(1 分钟)
max.in.flight.requests.per.connection 客户端在阻止进一步请求之前在连接上发送的未确认请求的最大数量。 1,... 5
request.timeout.ms 生产者等待对请求的响应的最长时间量。 如果在超时前未收到响应,则重试请求;如果重试次数已用尽,则请求失败。 0,... 30000(30 秒)

还有更多配置设置可供选择,但请确保在尝试使用之前仔细阅读 Apache Kafka文档

分区

在 Kafka 中,分区是可伸缩性单元。 因此,分区是提高吞吐量的有效方法,因为它允许主题数据以多个并行流的形式流动。

生产者在主题上发布消息时,可以选择使用哪个分区。 如果排序很重要,请注意分区是记录的有序序列,而主题则由一个或多个分区组成。 如果要按顺序传递一组消息,请确保这些消息全部传递到同一分区上。 实现这一操作的最直接方法是为所有这些消息提供相同的键。

生产者发布消息时,可以明确指定分区号。 这样可以直接控制,但也会使生产者代码更加复杂,因为它将承担管理分区选择的责任。 有关更多信息,请参阅方法调用 Producer.partitionsFor。 例如,该调用适用于 Kafka版本2.2.0

如果生产者未指定分区号,那么分区的选择由分区程序执行。 Kafka 生产者中内置缺省分区程序的工作方式如下:

  • 如果记录没有键,将以循环方式选择分区。
  • 如果记录有键,将通过计算键的散列值来选择分区。 这相当于为具有相同键的所有消息选择同一分区。

您还可以编写自己的定制分区程序。 定制分区程序可以选择用于将记录分配给分区的任何方案。 例如,仅使用键中的信息子集或特定于应用程序的标识。

消息排序

Kafka 通常会按生产者发送消息的顺序写入消息。 不过,在某些情况下,重试可能会导致报文重复或重新排序。 如果要按顺序发送一连串报文,就必须确保所有报文都写入同一个分区,因为这是保证报文顺序的唯一方法。

制作者还能自动重试发送信息。 启用重试功能是个好主意,因为如果不启用重试功能,您的应用程序代码就必须自己执行重试。 将 Kafka 中的批处理和自动重试结合使用可能会导致消息的重复和重新排序。

例如,如果您就一个主题发布了一连串三条信息<M1、M2、M3>。 这些记录可能都属于同一批次,因此会一起发送给分区负责人。 然后领导者将这些记录写入分区,并将其复制为单独的记录。 如果发生故障,那么可能会将 M1 和 M2 添加到分区,但不会将 M3 添加到分区。 保险销售人未收到应答,因此它重试发送 <M1,M2和 M3>。新引导者将 M1,M2 和 M3 写入分区,该分区现在包含 <M1,M2,M1,M2和 M3>,其中重复的 M1 遵循原始 M2。 如果将每个代理程序中的动态请求数限制为只有一个,就可以防止这种重新排序操作。 您可能仍会发现单个记录重复 (例如 <M1,M2,M2和 M3>),但不会出现顺序混乱的情况。 在Kafka 0.11或更高版本中,还可以使用惰性生产者(idempotent producer)功能来防止M2 的重复。

Kafka的通常做法是编写应用程序来处理偶尔出现的重复消息,因为只有一个请求在飞行中对性能的影响非常大。

消息确认

发布信息时,您可以使用 acks 生产者配置来选择所需的确认级别。 该选择代表吞吐量和可靠性之间的平衡。 存在以下三个级别。

acks=0(最不可靠)
信息一经写入网络,即被视为已发送。 无需分区领导者的确认。 因此,如果分区领导权更改,消息可能会丢失。 这种级别的确认速度很快,但在某些情况下可能会丢失信息。
acks=1(缺省值)
一旦分区领导者成功地将其记录写入分区,消息就会被确认给生产者。 由于确认发生在记录到达同步副本之前,因此如果领导者发生故障,但跟随者尚未收到信息,信息就可能丢失。 如果分区领导发生变化,旧的领导会通知生产者,生产者可以处理错误并重新尝试向新的领导发送信息。 由于信息在所有副本确认收到之前就已被确认,因此如果分区领导层发生变化,已被确认但尚未完全复制的信息可能会丢失。
acks=all(最可靠)
当分区领导者成功写入其记录,且所有同步副本也写入其记录时,生产者就会收到该消息。 如果分区领导者更改,那么只要至少有一个同步副本可用,消息就不会丢失。

即便不等待向生产者确认消息,消息也仍然在落实后才可使用,这意味着复制到同步副本完成。 换句话说,从生产者角度来看,发送消息的等待时间短于从生产者发送消息到使用者接收消息之间所测得的端到端等待时间。

如果可能,在发布下一条信息之前,请避免等待信息确认。 因为这种等待会让生产者无法将消息一起批处理,并且还会使消息发布速率降低到低于网络的来回延迟时间。

批处理、调速和压缩

为了提高效率,生产者会将成批的记录收集在一起发送到服务器。 如果启用压缩,生产者会压缩每个批次,这可以减少需要通过网络传输的数据,从而提高性能。

如果尝试使用的消息发布速度比向服务器发送消息的速度快,那么生产者会自动将消息缓存到批处理请求中。 生产者会为每个分区维护一个缓冲区,用于缓存未发送的记录。 在某些情况下,即使是批处理也无法达到您想要的速度。

另外还有一个影响因素。 为了防止单个生产者或使用者大量涌入集群而使集群应接不暇,Event Streams 应用了吞吐量配额。 程序会计算每个生产者发送数据的速率,并对试图超过其配额的任何生产者进行调速。 应用调速的方式是略微延迟向生产者发送响应。 通常,此行为就像是一个天然制动器。

有关吞吐量指导的更多信息,请参阅 限制和配额

总之,发布消息时,其记录会首先写入生产者中的缓冲区。 生产者在后台对记录分批后,将记录发送到服务器。 然后服务器会响应生产者,如果生产者发布速度太快,可能会应用调速延迟。 如果生产者的缓冲区满了,生产者的发送调用就会延迟,但最终可能会出现异常而失败。

传递语义

Kafka 提供了以下多个不同的消息传递语义:

  • 最多一次:信息可能会丢失,无法重新发送。
  • 至少一次:信息不会丢失,但可能会有重复。
  • 仅此一次:信息不会丢失,也不会重复。

传递语义取决于以下设置:

  • acks
  • retries
  • enable.idempotence

默认情况下,Kafka使用至少一次语义。

要启用精确一次语义,必须使用幂等生产者或事务生产者。 幂等生产者是通过将 enable.idempotence 设为 true 来启用的,可保证每个消息只有一个副本写入 Kafka(即使重试也是如此)。 事务生产者允许将数据发送到多个分区,要么成功传递所有消息,要么不传递任何消息。 也就是说,事务要么完全落实,要么完全废弃。 您还可以在事务中包含偏移量,以构建向Kafka 读取、处理和写入消息的应用程序。

代码片段

这些代码片段都是高层次的,用于说明相关概念。 有关完整示例,请参阅 GitHub中的 Event Streams 样本。

要将使用者连接到 Event Streams,必须创建服务凭证。 有关更多信息,请参阅连接 Event Streams

在生产者代码中,您首先需要构建配置属性集。 Event Streams的所有连接都使用 TLS 以及用户和密码验证,因此至少需要这些属性。 将 BOOTSTRAP_ENDPOINTS、USER 和 PASSWORD 替换为您自己服务证书中的内容:

Properties props = new Properties();
 props.put("bootstrap.servers", BOOTSTRAP_ENDPOINTS);
 props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"USER\" password=\"PASSWORD\";");
 props.put("security.protocol", "SASL_SSL");
 props.put("sasl.mechanism", "PLAIN");
 props.put("ssl.protocol", "TLSv1.2");
 props.put("ssl.enabled.protocols", "TLSv1.2");
 props.put("ssl.endpoint.identification.algorithm", "HTTPS");

例如,要发送信息,还需要为键和值指定序列化器:

 props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

这些序列化器必须与使用者所使用的反序列化器相匹配。

然后,使用KafkaProducer发送消息,其中每条消息都由ProducerRecord 表示。 完成后别忘了关闭 KafkaProducer。 以下代码仅发送消息,而不会等待以确认发送是否成功。 消息将发送到主题 T1,键为字符串 key,值为字符串 value

 Producer<String, String> producer = new KafkaProducer<>(props);
 producer.send(new ProducerRecord<String, String>("T1", "key", "value"));
 producer.close();

send() 方法是异步的,并会返回 Future,可用于检查其完成情况:

 Future<RecordMetadata> f = producer.send(new ProducerRecord<String, String>("T1", "key", "value"));
// Do some other stuff
// Now wait for the result of the send
 RecordMetadata rm = f.get();
 long offset = rm.offset;

或者,您也可以在发送信息时提供一个回调:

producer.send(new ProducerRecord<String,String>("T1","key","value", new Callback() {
          public void onCompletion(RecordMetadata metadata, Exception exception) {
                     // This is called when the send completes, either successfully or with an exception
          }
});

有关更多信息,请参阅 Kafka 客户机的 Javadoc