RabbitMQ Streams
RabbitMQ Streams es una estructura de datos replicada persistente que funciona de forma similar a las colas, almacenando en el almacenamiento intermedio los mensajes de los productores para que los consuman los consumidores. Sin embargo, las corrientes difieren de dos maneras:
- Cómo los productores les escriben mensajes.
- Cómo leen los consumidores los mensajes de ellos.
Las secuencias modelan un registro de sólo adición de mensajes que se pueden leer repetidamente hasta que caducan. Las secuencias siempre son persistentes y se replican. Una descripción más técnica de este comportamiento de corriente es "semántica de consumidor no destructiva".
Para leer mensajes de una corriente en RabbitMQ, uno o varios consumidores se suscriben a ella y leen el mismo mensaje tantas veces como deseen. Los consumidores hablan con una secuencia a través de clientes basados en AMQP y utilizan el protocolo AMQP.
Casos de uso
Los casos de uso para corrientes incluyen:
- Arquitecturas de abanico de salida: donde muchos consumidores necesitan leer el mismo mensaje.
- Reproducción y viaje en el tiempo: Donde los consumidores necesitan leer y volver a leer el mismo mensaje o empezar a leer desde cualquier punto de la secuencia.
- Retrasos grandes: las secuencias están diseñadas para almacenar grandes cantidades de datos de forma eficiente con una sobrecarga mínima en memoria.
- Alto rendimiento: RabbitMQ Streams procesa volúmenes relativamente más altos de mensajes por segundo.
Para obtener más información, consulte RabbitMQ Streams.
Cómo utilizar RabbitMQ Streams
Una biblioteca de cliente AMQP 0.9.1 que puede especificar argumentos de consumidor y cola opcionales puede utilizar corrientes de datos como colas AMQP 0.9.1 normales.
Utilización con una biblioteca de cliente AMQP
Hay tres pasos para trabajar con RabbitMQ Streams a través de una biblioteca de cliente AMQP:
- Declarar/crear instancia de una secuencia
- Publicar(escribir)mensajes en la corriente
- Consumir(leer)mensajes de la corriente
Declaración de una corriente de RabbitMQ
Puede crear una corriente utilizando la interfaz de gestión de RabbitMQ.
- En primer lugar, seleccione Añadir una cola nueva.
- A continuación, en el desplegable Tipo, seleccione Corriente.
De forma alternativa, cree una corriente utilizando la interfaz de gestión de RabbitMQ creando una cola con el tipo “stream”
. Si una cola ya está presente, no se creará. Declare una corriente con un mandato como:
import pika, os
credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)
# Connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='<hostname>',
port='<port>',
credentials=credentials,
virtual_host="/",
ssl_options=ssl_options))
channel = connection.channel() # start a channel
# Declare a Stream, named test_stream
channel.queue_declare(
queue='test_stream',
durable=True,
arguments={"x-queue-type": "stream"}
)
Publicación de una corriente de RabbitMQ
El script siguiente declara una ruta de RabbitMQ (test_stream
) y, a continuación, publica un mensaje en ella a través de la función basic_publish
:
import pika, os
credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)
# Connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='<hostname>',
port='<port>',
credentials=credentials,
virtual_host="/",
ssl_options=ssl_options))
channel = connection.channel() # start a channel
# Declare a Stream, named test_stream
channel.queue_declare(
queue='test_stream',
durable=True,
arguments={"x-queue-type": "stream"}
)
# Publish a message to the test_stream
channel.basic_publish(
exchange='',
routing_key='test_stream',
body='Welcome email message'
)
Consumo de una corriente de RabbitMQ
Los mensajes se pueden consumir desde una secuencia de la misma forma que las colas realizan esta tarea, con dos diferencias principales:
- El consumo de mensajes en RabbitMQ Streams requiere establecer la captación previa deQoS.
- Especifique un desplazamiento para empezar a leer/consumir desde cualquier punto de la secuencia de registro. Si no se especifica, el consumidor empieza a leer el desplazamiento más reciente grabado en la secuencia de registro después de que se inicie.
El script siguiente declara test_stream
de nuevo, estableciendo la captación previa QoS en 100
utilizando la función basic_qos
. El consumidor desencadena la función de devolución de llamada cuando procesa
un mensaje nuevo. A continuación, la función de devolución de llamada invoca la función send_welcome_email
que simula el envío de un correo electrónico a un usuario.
import pika, os, time
def send_welcome_email(msg):
print("Welcome Email task processing")
print(" [x] Received " + str(msg))
time.sleep(5) # simulate sending email to a user --delays for 5 seconds
print("Email successfully sent!")
return
credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)
# Connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='<hostname>',
port='<password>',
credentials=credentials,
virtual_host="/",
ssl_options=ssl_options))
channel = connection.channel() # start a channel
# Declare our stream
channel.queue_declare(
queue='test_stream',
durable=True,
arguments={"x-queue-type": "stream"}
)
# create a function which is called on incoming messages
def callback(ch, method, properties, body):
send_welcome_email(body)
# Set the consumer QoS prefetch
channel.basic_qos(
prefetch_count=100
)
# Consume messages published to the stream
channel.basic_consume(
'test_stream',
callback,
)
# start consuming (blocks)
channel.start_consuming()
connection.close()
Tenga en cuenta cómo no se especifica un desplazamiento en nuestro basic_consume
: # Consume messages published to the stream channel.basic_consume( 'test_stream', callback)
. Como resultado, el consumidor empieza
a leer desde el desplazamiento más reciente escrito en test_stream
después de que se inicie el consumidor. Después de se ha enfatizado deliberadamente aquí para permitir el examen cruzado de un comportamiento interesante
de las rutas.
Cómo establecer un desplazamiento
Como las secuencias nunca suprimen ningún mensaje, cualquier consumidor puede empezar a leer/consumir desde cualquier punto del registro. Esto lo controla el argumento de consumidor x-stream-offset
. Si no se especifica, el consumidor
empezará a leer desde el siguiente desplazamiento grabado en el registro después de que se inicie el consumidor. Se admiten los valores siguientes:
- primero-empezar desde el primer mensaje disponible en el registro
- last-esto comienza a leer desde el último "fragmento" escrito de los mensajes
- siguiente-igual que no especificar ningún desplazamiento
- Desplazamiento-un valor numérico que especifica un desplazamiento exacto para adjuntar al registro en.
- Indicación de fecha y hora: valor de indicación de fecha y hora que especifica el punto en el tiempo que se adjuntará al registro. Se sujeta al desplazamiento más cercano, si la indicación de fecha y hora está fuera de rango para la secuencia, se sujeta el inicio o el final del registro, por ejemplo: 00:00:00 UTC, 1970-01-01. Tenga en cuenta que los consumidores pueden recibir mensajes publicados un poco antes de la indicación de fecha y hora especificada.
- Intervalo-un valor de serie que especifica el intervalo de tiempo relativo a la hora actual en la que adjuntar el registro. Utiliza la misma especificación que
x-max-age
.
El siguiente ejemplo de código muestra cómo utilizar la primera especificación de desplazamiento:
# Grabbing the first message
channel.basic_consume(
'test_stream',
callback,
arguments={"x-stream-offset": "first"}
)
Este código muestra cómo especificar un desplazamiento específico del que consumir:
channel.basic_consume(
'test_stream',
callback,
arguments={"x-stream-offset": 5000}
)
Otras operaciones de Streams
Las operaciones siguientes se pueden utilizar de forma similar a las colas clásicas y de quórum, pero algunas tienen algún comportamiento específico de cola:
- Declaración
- Supresión de cola
- Confirmaciones de publicador
- Consumo (suscripción): el consumo requiere que se establezca la captación previa de QoS. Los acks funcionan como un mecanismo de crédito para avanzar en la actual compensación del consumidor.
- Establecimiento de QoS captación previa para consumidores
- Confirmaciones del consumidor (tenga en cuenta QoS Limitaciones de captación previa )
- Cancelación de consumidores