IBM Cloud Docs
RabbitMQ-Datenströme

RabbitMQ-Datenströme

RabbitMQ Streams ist eine persistente replizierte Datenstruktur, die ähnlich wie Warteschlangen funktioniert und Nachrichten von Produzenten für den Konsum durch Konsumenten puffert. Datenströme unterscheiden sich jedoch auf zwei Arten:

  • Wie Erzeuger Nachrichten an sie schreiben.
  • Wie Konsumenten Nachrichten von ihnen lesen.

Datenströme modellieren ein Protokoll des Typs 'Nur anhängen' von Nachrichten, die wiederholt gelesen werden können, bis sie ablaufen. Datenströme sind immer persistent und repliziert. Eine technische Beschreibung dieses Datenstromverhaltens ist die "zerstörungsfreie Konsumentensemantik".

Zum Lesen von Nachrichten aus einem Datenstrom in RabbitMQsubskribieren ein oder mehrere Konsumenten diesen Datenstrom und lesen dieselbe Nachricht beliebig oft. Konsumenten kommunizieren über AMQP-basierte Clients mit einem Datenstrom und verwenden das AMQP-Protokoll.

Anwendungsfälle

Zu den Anwendungsfällen für Datenströme gehören:

  • Fan-out-Architekturen: Wo viele Konsumenten dieselbe Nachricht lesen müssen.
  • Wiedergabe und Zeitreise: Hier müssen Konsumenten dieselbe Nachricht lesen und erneut lesen oder an einem beliebigen Punkt im Datenstrom lesen.
  • Große Rückstände: Streams sind so konzipiert, dass größere Datenmengen effizient und mit minimalem speicherinternen Aufwand gespeichert werden.
  • Hoher Durchsatz: RabbitMQ Streams verarbeitet relativ mehr Nachrichten pro Sekunde.

Weitere Informationen finden Sie unter RabbitMQ Streams.

Verwendung von RabbitMQ Streams

Eine AMQP-Clientbibliothek 0.9.1, die optionale Warteschlangen-und Konsumentenargumente angeben kann, kann Datenströme als reguläre AMQP-Warteschlangen 0.9.1 verwenden.

Mit AMQP-Clientbibliothek verwenden

Es gibt drei Schritte zum Arbeiten mit RabbitMQ Streams über eine AMQP-Clientbibliothek:

  1. Datenstrom deklarieren/instanziieren
  2. Nachrichten im Datenstrom veröffentlichen(schreiben)
  3. Nachrichten aus dem Datenstrom konsumieren(lesen)

RabbitMQ-Datenstrom deklarieren

Sie können einen Datenstrom mithilfe der RabbitMQ-Managementschnittstelle erstellen.

  • Wählen Sie zunächst Neue Warteschlange hinzufügen aus.
  • Wählen Sie als Nächstes in der Dropdown-Liste Typ die Option Datenstromaus.

Alternativ können Sie einen Stream erstellen, indem Sie die RabbitMQ-Managementschnittstelle verwenden, indem Sie eine Warteschlange mit dem Typ “stream” erstellen. Wenn bereits eine Warteschlange vorhanden ist, wird sie nicht erstellt. Deklarieren Sie einen Datenstrom mit einem Befehl wie dem folgenden:

import pika, os

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<port>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare a Stream, named test_stream
channel.queue_declare(
  queue='test_stream',
      durable=True,
  arguments={"x-queue-type": "stream"}
)

RabbitMQ-Datenstrom veröffentlichen

Das folgende Script deklariert einen RabbitMQ-Stream (test_stream) und veröffentlicht anschließend über die Funktion basic_publish eine Nachricht:

import pika, os

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<port>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare a Stream, named test_stream
channel.queue_declare(
  queue='test_stream',
      durable=True,
  arguments={"x-queue-type": "stream"}
)

# Publish a message to the test_stream
channel.basic_publish(
  exchange='',
  routing_key='test_stream',
  body='Welcome email message'
)

RabbitMQ-Datenstrom verarbeiten

Nachrichten können aus einem Stream auf dieselbe Weise konsumiert werden, wie Warteschlangen diese Task ausführen, mit zwei Hauptunterschieden:

  1. Das Verarbeiten von Nachrichten in RabbitMQ Streams erfordert die Einstellung des QoS Vorablesezugriffs.
  2. Geben Sie einen Offset an, um das Lesen/Konsumieren von einem beliebigen Punkt im Protokolldatenstrom zu starten. Wenn nicht angegeben, beginnt der Konsument mit dem Lesen des zuletzt in den Protokolldatenstrom geschriebenen Offsets, nachdem er gestartet wurde.

Das folgende Script deklariert test_stream erneut und setzt den QoS-Vorablesezugriff mit der Funktion basic_qos auf 100. Der Konsument löst die Callback-Funktion aus, wenn er eine neue Nachricht verarbeitet. Die Callback-Funktion ruft dann die Funktion send_welcome_email auf, die das Senden einer E-Mail an einen Benutzer simuliert.

import pika, os, time

def send_welcome_email(msg):
  print("Welcome Email task processing")
  print(" [x] Received " + str(msg))
  time.sleep(5) # simulate sending email to a user --delays for 5 seconds
  print("Email successfully sent!")
  return

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<password>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare our stream
channel.queue_declare(
  queue='test_stream',
  durable=True,
  arguments={"x-queue-type": "stream"}
)

# create a function which is called on incoming messages
def callback(ch, method, properties, body):
  send_welcome_email(body)

# Set the consumer QoS prefetch
channel.basic_qos(
  prefetch_count=100
)

# Consume messages published to the stream
channel.basic_consume(
  'test_stream',
  callback,
)

# start consuming (blocks)
channel.start_consuming()
connection.close()

Beachten Sie, dass in basic_consume kein Offset angegeben ist: # Consume messages published to the stream channel.basic_consume( 'test_stream', callback). Daher beginnt der Konsument mit dem Lesen des zuletzt in test_stream geschriebenen Offsets, nachdem der Konsument gestartet wurde. Nach wurde hier bewusst betont, um die Queruntersuchung eines interessanten Verhaltens von Streams zu ermöglichen.

So legen Sie einen Offset fest

Da Datenströme keine Nachrichten löschen, kann jeder Konsument an jedem Punkt im Protokoll mit dem Lesen/Konsumieren beginnen. Dies wird durch das Konsumentenargument x-stream-offset gesteuert. Wenn er nicht angegeben ist, beginnt der Konsument mit dem Lesen ab dem nächsten Offset, der nach dem Start des Konsumenten in das Protokoll geschrieben wird. Die folgenden Werte werden unterstützt:

  • first-Start bei der ersten verfügbaren Nachricht im Protokoll
  • last-Startet das Lesen aus dem zuletzt geschriebenen "Block" von Nachrichten.
  • next-wie ohne Angabe eines Offsets
  • Offset-Ein numerischer Wert, der einen exakten Offset angibt, der an das Protokoll angehängt werden soll.
  • Timestamp-Ein Zeitmarkenwert, der den Zeitpunkt angibt, zu dem eine Verbindung zum Protokoll hergestellt wird. Sie wird an den nächsten Offset geklemmt. Wenn die Zeitmarke außerhalb des gültigen Bereichs für den Datenstrom liegt, wird entweder der Anfang oder das Ende des Protokolls geklemmt. Beispiel: 00:00:00 UTC, 1970-01-01. Beachten Sie, dass Konsumenten Nachrichten empfangen können, die ein Bit vor der angegebenen Zeitmarke veröffentlicht wurden.
  • Intervall-Ein Zeichenfolgewert, der das Zeitintervall relativ zur aktuellen Zeit angibt, an dem das Protokoll angehängt werden soll. Verwendet dieselbe Spezifikation wie x-max-age.

Das folgende Codebeispiel zeigt, wie die erste Offsetspezifikation verwendet wird:

# Grabbing the first message
channel.basic_consume(
  'test_stream',
  callback,
  arguments={"x-stream-offset": "first"}
)

Dieser Code zeigt, wie ein bestimmter Offset für die Verarbeitung angegeben wird:

channel.basic_consume(
  'test_stream',
  callback,
  arguments={"x-stream-offset": 5000}
)

Andere Datenstromoperationen

Die folgenden Operationen können auf ähnliche Weise wie klassische Warteschlangen und Quorumwarteschlangen verwendet werden, aber einige haben ein warteschlangenspezifisches Verhalten: