IBM Cloud Docs
RabbitMQ Streams

RabbitMQ Streams

RabbitMQ Streams es una estructura de datos replicada persistente que funciona de forma similar a las colas, almacenando en el almacenamiento intermedio los mensajes de los productores para que los consuman los consumidores. Sin embargo, las corrientes difieren de dos maneras:

  • Cómo los productores les escriben mensajes.
  • Cómo leen los consumidores los mensajes de ellos.

Las secuencias modelan un registro de sólo adición de mensajes que se pueden leer repetidamente hasta que caducan. Las secuencias siempre son persistentes y se replican. Una descripción más técnica de este comportamiento de corriente es "semántica de consumidor no destructiva".

Para leer mensajes de una corriente en RabbitMQ, uno o varios consumidores se suscriben a ella y leen el mismo mensaje tantas veces como deseen. Los consumidores hablan con una secuencia a través de clientes basados en AMQP y utilizan el protocolo AMQP.

Casos de uso

Los casos de uso para corrientes incluyen:

  • Arquitecturas de abanico de salida: donde muchos consumidores necesitan leer el mismo mensaje.
  • Reproducción y viaje en el tiempo: Donde los consumidores necesitan leer y volver a leer el mismo mensaje o empezar a leer desde cualquier punto de la secuencia.
  • Retrasos grandes: las secuencias están diseñadas para almacenar grandes cantidades de datos de forma eficiente con una sobrecarga mínima en memoria.
  • Alto rendimiento: RabbitMQ Streams procesa volúmenes relativamente más altos de mensajes por segundo.

Para obtener más información, consulte RabbitMQ Streams.

Cómo utilizar RabbitMQ Streams

Una biblioteca de cliente AMQP 0.9.1 que puede especificar argumentos de consumidor y cola opcionales puede utilizar corrientes de datos como colas AMQP 0.9.1 normales.

Utilización con una biblioteca de cliente AMQP

Hay tres pasos para trabajar con RabbitMQ Streams a través de una biblioteca de cliente AMQP:

  1. Declarar/crear instancia de una secuencia
  2. Publicar(escribir)mensajes en la corriente
  3. Consumir(leer)mensajes de la corriente

Declaración de una corriente de RabbitMQ

Puede crear una corriente utilizando la interfaz de gestión de RabbitMQ.

  • En primer lugar, seleccione Añadir una cola nueva.
  • A continuación, en el desplegable Tipo, seleccione Corriente.

De forma alternativa, cree una corriente utilizando la interfaz de gestión de RabbitMQ creando una cola con el tipo “stream”. Si una cola ya está presente, no se creará. Declare una corriente con un mandato como:

import pika, os

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<port>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare a Stream, named test_stream
channel.queue_declare(
  queue='test_stream',
      durable=True,
  arguments={"x-queue-type": "stream"}
)

Publicación de una corriente de RabbitMQ

El script siguiente declara una ruta de RabbitMQ (test_stream) y, a continuación, publica un mensaje en ella a través de la función basic_publish :

import pika, os

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<port>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare a Stream, named test_stream
channel.queue_declare(
  queue='test_stream',
      durable=True,
  arguments={"x-queue-type": "stream"}
)

# Publish a message to the test_stream
channel.basic_publish(
  exchange='',
  routing_key='test_stream',
  body='Welcome email message'
)

Consumo de una corriente de RabbitMQ

Los mensajes se pueden consumir desde una secuencia de la misma forma que las colas realizan esta tarea, con dos diferencias principales:

  1. El consumo de mensajes en RabbitMQ Streams requiere establecer la captación previa deQoS.
  2. Especifique un desplazamiento para empezar a leer/consumir desde cualquier punto de la secuencia de registro. Si no se especifica, el consumidor empieza a leer el desplazamiento más reciente grabado en la secuencia de registro después de que se inicie.

El script siguiente declara test_stream de nuevo, estableciendo la captación previa QoS en 100 utilizando la función basic_qos. El consumidor desencadena la función de devolución de llamada cuando procesa un mensaje nuevo. A continuación, la función de devolución de llamada invoca la función send_welcome_email que simula el envío de un correo electrónico a un usuario.

import pika, os, time

def send_welcome_email(msg):
  print("Welcome Email task processing")
  print(" [x] Received " + str(msg))
  time.sleep(5) # simulate sending email to a user --delays for 5 seconds
  print("Email successfully sent!")
  return

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<password>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare our stream
channel.queue_declare(
  queue='test_stream',
  durable=True,
  arguments={"x-queue-type": "stream"}
)

# create a function which is called on incoming messages
def callback(ch, method, properties, body):
  send_welcome_email(body)

# Set the consumer QoS prefetch
channel.basic_qos(
  prefetch_count=100
)

# Consume messages published to the stream
channel.basic_consume(
  'test_stream',
  callback,
)

# start consuming (blocks)
channel.start_consuming()
connection.close()

Tenga en cuenta cómo no se especifica un desplazamiento en nuestro basic_consume: # Consume messages published to the stream channel.basic_consume( 'test_stream', callback). Como resultado, el consumidor empieza a leer desde el desplazamiento más reciente escrito en test_stream después de que se inicie el consumidor. Después de se ha enfatizado deliberadamente aquí para permitir el examen cruzado de un comportamiento interesante de las rutas.

Cómo establecer un desplazamiento

Como las secuencias nunca suprimen ningún mensaje, cualquier consumidor puede empezar a leer/consumir desde cualquier punto del registro. Esto lo controla el argumento de consumidor x-stream-offset. Si no se especifica, el consumidor empezará a leer desde el siguiente desplazamiento grabado en el registro después de que se inicie el consumidor. Se admiten los valores siguientes:

  • primero-empezar desde el primer mensaje disponible en el registro
  • last-esto comienza a leer desde el último "fragmento" escrito de los mensajes
  • siguiente-igual que no especificar ningún desplazamiento
  • Desplazamiento-un valor numérico que especifica un desplazamiento exacto para adjuntar al registro en.
  • Indicación de fecha y hora: valor de indicación de fecha y hora que especifica el punto en el tiempo que se adjuntará al registro. Se sujeta al desplazamiento más cercano, si la indicación de fecha y hora está fuera de rango para la secuencia, se sujeta el inicio o el final del registro, por ejemplo: 00:00:00 UTC, 1970-01-01. Tenga en cuenta que los consumidores pueden recibir mensajes publicados un poco antes de la indicación de fecha y hora especificada.
  • Intervalo-un valor de serie que especifica el intervalo de tiempo relativo a la hora actual en la que adjuntar el registro. Utiliza la misma especificación que x-max-age.

El siguiente ejemplo de código muestra cómo utilizar la primera especificación de desplazamiento:

# Grabbing the first message
channel.basic_consume(
  'test_stream',
  callback,
  arguments={"x-stream-offset": "first"}
)

Este código muestra cómo especificar un desplazamiento específico del que consumir:

channel.basic_consume(
  'test_stream',
  callback,
  arguments={"x-stream-offset": 5000}
)

Otras operaciones de Streams

Las operaciones siguientes se pueden utilizar de forma similar a las colas clásicas y de quórum, pero algunas tienen algún comportamiento específico de cola: