IBM Cloud Docs
Flux RabbitMQ

Flux RabbitMQ

RabbitMQ Streams est une structure de données répliquées persistantes qui fonctionne de la même manière que les files d'attente, en mettant en mémoire tampon les messages des producteurs pour qu'ils soient consommés par les consommateurs. Toutefois, les flux diffèrent de deux manières:

  • Comment les producteurs leur écrivent des messages.
  • Comment les consommateurs lisent les messages qu'ils leur envoient.

Les flux modélisent un journal d'ajout uniquement des messages qui peuvent être lus à plusieurs reprises jusqu'à leur expiration. Les flux sont toujours persistants et répliqués. Une description plus technique de ce comportement de flux est la "sémantique de consommateur non destructive".

Pour lire des messages à partir d'un flux dans RabbitMQ, un ou plusieurs consommateurs s'y abonnent et lisent le même message autant de fois qu'ils le souhaitent. Les consommateurs parlent à un flux via des clients AMQP et utilisent le protocole AMQP.

Cas d'utilisation

Les cas d'utilisation des flux sont les suivants:

  • Architectures de sortance: où de nombreux consommateurs ont besoin de lire le même message.
  • Relecture et voyage dans le temps: où les consommateurs doivent lire et relire le même message ou commencer à lire à partir de n'importe quel point du flux.
  • Gros retards: les flux sont conçus pour stocker de plus grandes quantités de données de manière efficace avec un minimum de temps système en mémoire.
  • Débit élevé: RabbitMQ Streams traite des volumes de messages par seconde relativement plus élevés.

Pour plus d'informations, voir RabbitMQ Streams.

Comment utiliser RabbitMQ Streams

Une bibliothèque client AMQP 0.9.1 qui peut spécifier des arguments de file d'attente et de consommateur facultatifs peut utiliser des flux en tant que files d'attente AMQP 0.9.1 standard.

Utilisation avec une bibliothèque client AMQP

L'utilisation de RabbitMQ Streams via une bibliothèque client AMQP s'effectue en trois étapes:

  1. Déclarer / Instancier un flux
  2. Publier(écrire)des messages dans le flux
  3. Consommer(lire)des messages à partir du flux

Déclaration d'un flux RabbitMQ

Vous pouvez créer un flux à l'aide de l'interface de gestion RabbitMQ.

  • Sélectionnez d'abord Ajouter une nouvelle file d'attente.
  • Ensuite, dans la liste déroulante Type, sélectionnez Flux.

Vous pouvez également créer un flux à l'aide de l'interface de gestion RabbitMQ en créant une file d'attente de type “stream”. Si une file d'attente est déjà présente, elle ne sera pas créée. Déclarez un flux avec une commande telle que:

import pika, os

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<port>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare a Stream, named test_stream
channel.queue_declare(
  queue='test_stream',
      durable=True,
  arguments={"x-queue-type": "stream"}
)

Publication d'un flux RabbitMQ

Le script suivant déclare un flux RabbitMQ (test_stream), puis y publie un message via la fonction basic_publish :

import pika, os

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<port>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare a Stream, named test_stream
channel.queue_declare(
  queue='test_stream',
      durable=True,
  arguments={"x-queue-type": "stream"}
)

# Publish a message to the test_stream
channel.basic_publish(
  exchange='',
  routing_key='test_stream',
  body='Welcome email message'
)

Consommation d'un flux RabbitMQ

Les messages peuvent être consommés à partir d'un flux de la même manière que les files d'attente effectuent cette tâche, avec deux différences majeures:

  1. La consommation de messages dans RabbitMQ Streams nécessite la définition de la préextractionQoS.
  2. Indiquez un décalage pour commencer à lire / consommer à partir de n'importe quel point du flux de journalisation. S'il n'est pas spécifié, le consommateur commence la lecture à partir du décalage le plus récent écrit dans le flux de journalisation après son démarrage.

Le script suivant déclare à nouveau test_stream, en définissant la préextraction QoS sur 100 à l'aide de la fonction basic_qos. Le consommateur déclenche la fonction de rappel lorsqu'il traite un nouveau message. La fonction de rappel appelle ensuite la fonction send_welcome_email qui simule l'envoi d'un courrier électronique à un utilisateur.

import pika, os, time

def send_welcome_email(msg):
  print("Welcome Email task processing")
  print(" [x] Received " + str(msg))
  time.sleep(5) # simulate sending email to a user --delays for 5 seconds
  print("Email successfully sent!")
  return

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<password>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare our stream
channel.queue_declare(
  queue='test_stream',
  durable=True,
  arguments={"x-queue-type": "stream"}
)

# create a function which is called on incoming messages
def callback(ch, method, properties, body):
  send_welcome_email(body)

# Set the consumer QoS prefetch
channel.basic_qos(
  prefetch_count=100
)

# Consume messages published to the stream
channel.basic_consume(
  'test_stream',
  callback,
)

# start consuming (blocks)
channel.start_consuming()
connection.close()

Notez qu'un décalage n'est pas spécifié dans notre basic_consume: # Consume messages published to the stream channel.basic_consume( 'test_stream', callback). Par conséquent, le consommateur commence à lire à partir du décalage le plus récent écrit dans test_stream après le démarrage du consommateur. Après a été délibérément souligné ici pour permettre le contre-interrogatoire d'un comportement intéressant des flux.

Comment définir un décalage

Comme les flux ne suppriment jamais de messages, tout consommateur peut commencer à lire / consommer à partir de n'importe quel point du journal. Cette opération est contrôlée par l'argument de consommateur x-stream-offset. S'il n'est pas spécifié, le consommateur commence la lecture à partir du décalage suivant consigné dans le journal après le démarrage du consommateur. Les valeurs suivantes sont prises en charge :

  • first-démarre à partir du premier message disponible dans le journal
  • last-Début de la lecture à partir du dernier "bloc" de messages écrit
  • next-identique à l'absence de décalage
  • Décalage-Valeur numérique spécifiant un décalage exact à associer au journal.
  • Horodatage-Valeur d'horodatage indiquant le point de cohérence à associer au journal. Il s'applique au décalage le plus proche, si l'horodatage est hors plage pour le flux, il s'applique au début ou à la fin du journal, par exemple: 00:00:00 UTC, 1970-01-01. Sachez que les consommateurs peuvent recevoir des messages publiés un peu avant l'horodatage spécifié.
  • Intervalle-valeur de chaîne indiquant l'intervalle de temps relatif à l'heure actuelle à laquelle le journal doit être joint. Utilise la même spécification que x-max-age.

L'exemple de code suivant montre comment utiliser la première spécification de décalage:

# Grabbing the first message
channel.basic_consume(
  'test_stream',
  callback,
  arguments={"x-stream-offset": "first"}
)

Ce code montre comment spécifier un décalage spécifique à partir duquel consommer:

channel.basic_consume(
  'test_stream',
  callback,
  arguments={"x-stream-offset": 5000}
)

Autres opérations Streams

Les opérations suivantes peuvent être utilisées de la même manière que les files d'attente classiques et quorum, mais certaines ont un comportement spécifique à la file d'attente: