Flussi RabbitMQ
RabbitMQ Streams è una struttura di dati replicati persistenti che funziona in modo simile alle code, memorizzando nel buffer i messaggi dei produttori per l'utilizzo da parte dei consumer. Tuttavia, gli stream differiscono in due modi:
- Come i produttori scrivono loro i messaggi.
- Modalità di lettura dei messaggi da parte dei consumatori.
I flussi modellano un log di solo accodamento di messaggi che possono essere letti ripetutamente fino alla scadenza. Gli stream sono sempre persistenti e replicati. Una descrizione più tecnica di questo comportamento del flusso è "semantica del consumatore non distruttiva".
Per leggere i messaggi da un flusso in RabbitMQ, uno o più consumer eseguono la sottoscrizione e leggono lo stesso messaggio tutte le volte che lo desiderano. I consumatori parlano con un flusso attraverso i client basati su AMQP e utilizzano il protocollo AMQP.
Casi di utilizzo
I casi di utilizzo per gli stream includono:
- Architetture fan - out: dove molti utenti devono leggere lo stesso messaggio.
- Ripetizione e viaggio nel tempo: dove i consumatori devono leggere e rileggere lo stesso messaggio o iniziare a leggere da qualsiasi punto del flusso.
- Backlog di grandi dimensioni: i flussi sono progettati per archiviare in modo efficiente grandi quantità di dati con un sovraccarico minimo in memoria.
- Velocità di trasmissione elevata: RabbitMQ Streams elabora volumi di messaggi relativamente più elevati al secondo.
Per ulteriori informazioni, vedi RabbitMQ Streams.
Come utilizzare RabbitMQ Streams
Una libreria client AMQP 0.9.1 che può specificare argomenti consumer e coda facoltativi è in grado di utilizzare i flussi come code AMQP 0.9.1 regolari.
Utilizzo con una libreria client AMQP
Ci sono tre passi per utilizzare RabbitMQ Streams attraverso una libreria client AMQP:
- Dichiarare / Istanziare un flusso
- Pubblicare(scrivere)messaggi nel flusso
- Utilizza(leggi)messaggi dal flusso
Dichiarazione di un flusso RabbitMQ
È possibile creare un flusso utilizzando l'interfaccia di gestione RabbitMQ.
- Innanzitutto, selezionare Aggiungi una nuova coda.
- Successivamente, nell'elenco a discesa Tipo, selezionare Flusso.
In alternativa, creare un flusso utilizzando l'interfaccia di gestione di RabbitMQ creando una coda con tipo “stream”
. Se una coda è già presente, non verrà creata. Dichiarare un flusso con un comando come:
import pika, os
credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)
# Connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='<hostname>',
port='<port>',
credentials=credentials,
virtual_host="/",
ssl_options=ssl_options))
channel = connection.channel() # start a channel
# Declare a Stream, named test_stream
channel.queue_declare(
queue='test_stream',
durable=True,
arguments={"x-queue-type": "stream"}
)
Pubblicazione di un flusso RabbitMQ
Il seguente script dichiara un flusso RabbitMQ (test_stream
), quindi pubblica un messaggio attraverso la funzione basic_publish
:
import pika, os
credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)
# Connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='<hostname>',
port='<port>',
credentials=credentials,
virtual_host="/",
ssl_options=ssl_options))
channel = connection.channel() # start a channel
# Declare a Stream, named test_stream
channel.queue_declare(
queue='test_stream',
durable=True,
arguments={"x-queue-type": "stream"}
)
# Publish a message to the test_stream
channel.basic_publish(
exchange='',
routing_key='test_stream',
body='Welcome email message'
)
Utilizzo di uno stream RabbitMQ
I messaggi possono essere utilizzati da un flusso nello stesso modo in cui le code eseguono questa attività, con due differenze principali:
- L'utilizzo dei messaggi in RabbitMQ Streams richiede l'impostazione della ricercaQoS.
- Specificare un offset per iniziare la lettura / utilizzo da qualsiasi punto del flusso di log. Se non specificato, il consumer inizia la lettura dall'offset più recente scritto nel flusso di log dopo l'avvio.
Il seguente script dichiara nuovamente test_stream
, impostando la ricerca QoS su 100
utilizzando la funzione basic_qos
. Il consumer attiva la funzione di callback quando elabora un nuovo messaggio.
La funzione di callback richiama quindi la funzione send_welcome_email
che simula l'invio di un'email a un utente.
import pika, os, time
def send_welcome_email(msg):
print("Welcome Email task processing")
print(" [x] Received " + str(msg))
time.sleep(5) # simulate sending email to a user --delays for 5 seconds
print("Email successfully sent!")
return
credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)
# Connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='<hostname>',
port='<password>',
credentials=credentials,
virtual_host="/",
ssl_options=ssl_options))
channel = connection.channel() # start a channel
# Declare our stream
channel.queue_declare(
queue='test_stream',
durable=True,
arguments={"x-queue-type": "stream"}
)
# create a function which is called on incoming messages
def callback(ch, method, properties, body):
send_welcome_email(body)
# Set the consumer QoS prefetch
channel.basic_qos(
prefetch_count=100
)
# Consume messages published to the stream
channel.basic_consume(
'test_stream',
callback,
)
# start consuming (blocks)
channel.start_consuming()
connection.close()
Notare come un offset non è specificato in basic_consume
: # Consume messages published to the stream channel.basic_consume( 'test_stream', callback)
. Di conseguenza, il consumer inizia a leggere dall'offset più
recente scritto in test_stream
dopo l'avvio del consumer. Dopo è stato deliberatamente enfatizzato qui per consentire l'esame incrociato di un comportamento interessante dei flussi.
Come impostare un offset
Poiché i flussi non eliminano mai alcun messaggio, qualsiasi utente può iniziare a leggere / consumare da qualsiasi punto del log. Questo è controllato dall'argomento consumer x-stream-offset
. Se non è specificato, il consumer
inizierà la lettura dall'offset successivo scritto nel log dopo l'avvio del consumer. Sono supportati i seguenti valori:
- first - inizia dal primo messaggio disponibile nella registrazione
- ultimo - inizia la lettura dall'ultima "porzione" scritta dei messaggi
- next - equivale a non specificare alcun offset
- Offset - un valore numerico che specifica un offset esatto da collegare al log.
- Registrazione data/ora - un valore di registrazione data / ora che specifica il momento in cui collegarsi al log. Si blocca all'offset più vicino, se la data / ora non è compresa nell'intervallo per il flusso, si blocca l'inizio o la fine del log, ad esempio: 00:00:00 UTC, 1970-01-01. Tenere presente che i consumer possono ricevere messaggi pubblicati un po' prima della data / ora specificata.
- Intervallo - un valore di stringa che specifica l'intervallo di tempo relativo all'ora corrente a cui collegare il log. Utilizza la stessa specifica di
x-max-age
.
Il seguente esempio di codice mostra come utilizzare la prima specifica di offset:
# Grabbing the first message
channel.basic_consume(
'test_stream',
callback,
arguments={"x-stream-offset": "first"}
)
Questo codice mostra come specificare un offset specifico da utilizzare:
channel.basic_consume(
'test_stream',
callback,
arguments={"x-stream-offset": 5000}
)
Altre operazioni stream
Le seguenti operazioni possono essere utilizzate in modo simile alle code classiche e quorum, ma alcune hanno un comportamento specifico per la coda:
- Dichiarazione
- Eliminazione coda
- Conferma pubblicazione
- Consumo (sottoscrizione): l'utilizzo richiede l'impostazione della ricerca QoS. L'ack funziona come un meccanismo di credito per far avanzare la compensazione corrente del consumatore.
- Impostazione QoS prefetch per i consumer
- Riconoscimenti utente (tenere presente QoS Limitazioni di ricerca )
- Cancellazione dei consumatori