IBM Cloud Docs
RabbitMQ 流

RabbitMQ 流

RabbitMQ Streams 是一种持久复制的数据结构,其功能类似于队列,用于缓冲来自生产者的消息以供使用者使用。 但是,流在两种方面有所不同:

  • 生产者如何向其写入消息。
  • 使用者如何从中读取消息。

Streams 对可重复读取的消息的仅追加日志进行建模,直到这些消息到期为止。 流始终是持久的和复制的。 对此流行为的更技术性描述是“无损消费者语义”。

要从 RabbitMQ中的流读取消息,一个或多个使用者预订该消息并根据需要多次读取同一消息。 使用者通过基于 AMQP 的客户机与流通信,并使用 AMQP 协议。

用例

流的用例包括:

  • 扇出式架构: 许多消费者需要阅读相同的消息。
  • 重放和时间旅行: 消费者需要阅读和重新阅读相同的消息,或者从流中的任何位置开始阅读。
  • 大型积压: Streams 旨在以最小的内存开销高效地存储更多数据。
  • 高吞吐量: RabbitMQ Streams 每秒处理的消息量相对较高。

有关更多信息,请参阅 RabbitMQ Streams

如何使用 RabbitMQ Streams

可指定 可选队列和使用者参数 的 AMQP 0.9.1 客户机库能够将流用作常规 AMQP 0.9.1 队列。

与 AMQP 客户机库配合使用

通过 AMQP 客户机库使用 RabbitMQ 流有三个步骤:

  1. 声明/实例化流
  2. 将消息发布(写入)到流
  3. 使用(读取)来自流的消息

声明 RabbitMQ 流

您可以使用 RabbitMQ 管理界面来创建流。

  • 首先,选择 添加新队列
  • 接下来,在 类型 下拉列表中,选择

或者,通过使用 RabbitMQ 管理界面创建类型为 “stream” 的队列来创建流。 如果队列已存在,那么将不会创建该队列。 使用类似如下的命令声明流:

import pika, os

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<port>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare a Stream, named test_stream
channel.queue_declare(
  queue='test_stream',
      durable=True,
  arguments={"x-queue-type": "stream"}
)

发布 RabbitMQ 流

以下脚本声明 RabbitMQ 流 (test_stream),然后通过 basic_publish 函数向其发布消息:

import pika, os

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<port>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare a Stream, named test_stream
channel.queue_declare(
  queue='test_stream',
      durable=True,
  arguments={"x-queue-type": "stream"}
)

# Publish a message to the test_stream
channel.basic_publish(
  exchange='',
  routing_key='test_stream',
  body='Welcome email message'
)

使用 RabbitMQ 流

可以通过队列完成此任务的相同方式从流中使用消息,这有两个主要差异:

  1. 在 RabbitMQ Streams 中使用消息需要设置 QoS 预取
  2. 指定从日志流中的任何点开始读取/使用的偏移量。 如果未指定,那么使用者将在日志流启动后开始从写入日志流的最新偏移量中进行读取。

以下脚本再次声明 test_stream,并使用 basic_qos 函数将 QoS 预取设置为 100。 使用者在处理新消息时触发回调函数。 然后,回调函数将调用模拟向用户发送电子邮件的 send_welcome_email 函数。

import pika, os, time

def send_welcome_email(msg):
  print("Welcome Email task processing")
  print(" [x] Received " + str(msg))
  time.sleep(5) # simulate sending email to a user --delays for 5 seconds
  print("Email successfully sent!")
  return

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<password>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare our stream
channel.queue_declare(
  queue='test_stream',
  durable=True,
  arguments={"x-queue-type": "stream"}
)

# create a function which is called on incoming messages
def callback(ch, method, properties, body):
  send_welcome_email(body)

# Set the consumer QoS prefetch
channel.basic_qos(
  prefetch_count=100
)

# Consume messages published to the stream
channel.basic_consume(
  'test_stream',
  callback,
)

# start consuming (blocks)
channel.start_consuming()
connection.close()

请注意,如何在 basic_consume 中未指定偏移量: # Consume messages published to the stream channel.basic_consume( 'test_stream', callback)。 因此,使用者会在使用者启动后从写入 test_stream 的最新偏移量开始读取。 此处特意强调了 之后,以允许对流的有趣行为进行交叉检查。

如何设置偏移量

由于流从不删除任何消息,因此任何使用者都可以从日志中的任何点开始读取/使用消息。 这由 x-stream-offset 使用者参数控制。 如果未指定,那么使用者将从使用者启动后写入日志的下一个偏移量开始读取。 支持以下值:

  • first-从日志中的第一条可用消息开始
  • last-这将从上次写入的“消息块”开始读取
  • next-与不指定任何偏移量相同
  • Offset-一个数字值,用于指定要附加到日志的精确偏移量。
  • Timestamp-一个时间戳记值,用于指定要附加到日志的时间点。 它夹到最接近的偏移量,如果时间戳记超出流的范围,它夹到日志的开始或结束,例如: 00:00:00 UTC,1970-01-01。 请注意,使用者可以接收在指定时间戳记之前发布的消息。
  • Interval-一个字符串值,用于指定相对于连接日志的当前时间的时间间隔。 使用与 x-max-age 相同的规范。

以下代码示例显示如何使用第一个偏移量规范:

# Grabbing the first message
channel.basic_consume(
  'test_stream',
  callback,
  arguments={"x-stream-offset": "first"}
)

此代码显示如何指定要使用的特定偏移量:

channel.basic_consume(
  'test_stream',
  callback,
  arguments={"x-stream-offset": 5000}
)

其他 Streams 操作

可以采用类似于经典队列和定额队列的方式来使用以下操作,但某些操作具有特定于队列的行为: