IBM Cloud Docs
RabbitMQ ストリーム

RabbitMQ ストリーム

RabbitMQ Streams は、コンシューマーによるコンシュームのためにプロデューサーからのメッセージをバッファリングする、キューと同様に機能する永続的な複製データ構造です。 ただし、ストリームには以下の 2 つの違いがあります。

  • プロデューサーがメッセージを書き込む方法。
  • コンシューマーがメッセージを読み取る方法。

Streams は、有効期限が切れるまで繰り返し読み取ることができるメッセージの付加専用ログをモデル化します。 ストリームは常に永続的で複製されます。 このストリームの振る舞いのより技術的な説明は、「非破壊的な消費者セマンティクス」です。

RabbitMQでストリームからメッセージを読み取るには、1 つ以上のコンシューマーがそのストリームにサブスクライブし、必要な回数だけ同じメッセージを読み取ります。 コンシューマーは、AMQP ベースのクライアントを介してストリームと対話し、AMQP プロトコルを使用します。

ユース・ケース

ストリームのユース・ケースには、以下のものがあります。

  • ファンアウト・アーキテクチャー: 多くのコンシューマーが同じメッセージを読み取る必要がある場合。
  • 再生およびタイム・トラベル: 消費者が同じメッセージを読んで再読み取りする必要がある場合、またはストリーム内の任意のポイントから読み取りを開始する必要がある場合。
  • 大容量バックログ: ストリームは、メモリー内のオーバーヘッドを最小限に抑えながら、大量のデータを効率的に保管するように設計されています。
  • 高スループット: RabbitMQ Streams は、1 秒当たりのメッセージ数を比較的多く処理します。

詳しくは、 RabbitMQ Streamsを参照してください。

RabbitMQ ストリームの使用方法

オプションのキューおよびコンシューマー引数 を指定できる AMQP 0.9.1 クライアント・ライブラリーは、ストリームを通常の AMQP 0.9.1 キューとして使用できます。

AMQP クライアント・ライブラリーでの使用

AMQP クライアント・ライブラリーを介して RabbitMQ ストリームを操作するには、以下の 3 つのステップを実行します。

  1. ストリームの宣言/インスタンス化
  2. ストリームへのメッセージのパブリッシュ(書き込み)
  3. ストリームからのメッセージのコンシューム(読み取り)

RabbitMQ ストリームの宣言

RabbitMQ 管理インターフェースを使用してストリームを作成できます。

  • 最初に、 「新規キューの追加」 を選択します。
  • 次に、 「タイプ」 ドロップダウンで *「ストリーム」*を選択します。

あるいは、タイプ “stream” のキューを作成して、 RabbitMQ 管理インターフェースを使用してストリームを作成します。 キューが既に存在する場合、そのキューは作成されません。 以下のようなコマンドを使用してストリームを宣言します。

import pika, os

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<port>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare a Stream, named test_stream
channel.queue_declare(
  queue='test_stream',
      durable=True,
  arguments={"x-queue-type": "stream"}
)

RabbitMQ ストリームのパブリッシュ

以下のスクリプトは、 RabbitMQ ストリーム (test_stream) を宣言し、 basic_publish 関数を介してそのストリームにメッセージをパブリッシュします。

import pika, os

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<port>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare a Stream, named test_stream
channel.queue_declare(
  queue='test_stream',
      durable=True,
  arguments={"x-queue-type": "stream"}
)

# Publish a message to the test_stream
channel.basic_publish(
  exchange='',
  routing_key='test_stream',
  body='Welcome email message'
)

RabbitMQ ストリームのコンシューム

メッセージは、キューがこのタスクを実行するのと同じ方法でストリームから取り込むことができますが、以下の 2 つの大きな違いがあります。

  1. RabbitMQ Streams でメッセージをコンシュームするには、 QoS プリフェッチを設定する必要があります。
  2. ログ・ストリーム内の任意のポイントから読み取り/消費を開始するオフセットを指定します。 指定されていない場合、コンシューマーは、開始後にログ・ストリームに書き込まれた最新のオフセットから読み取りを開始します。

以下のスクリプトは、 test_stream を再度宣言し、 basic_qos 関数を使用して QoS プリフェッチを 100 に設定します。 コンシューマーは、新規メッセージの処理時にコールバック関数をトリガーします。 次に、コールバック関数は、ユーザーへの E メールの送信をシミュレートする send_welcome_email 関数を呼び出します。

import pika, os, time

def send_welcome_email(msg):
  print("Welcome Email task processing")
  print(" [x] Received " + str(msg))
  time.sleep(5) # simulate sending email to a user --delays for 5 seconds
  print("Email successfully sent!")
  return

credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)

# Connection
connection = pika.BlockingConnection(
  pika.ConnectionParameters(
    host='<hostname>',
    port='<password>',
    credentials=credentials,
    virtual_host="/",
    ssl_options=ssl_options))
channel = connection.channel() # start a channel

# Declare our stream
channel.queue_declare(
  queue='test_stream',
  durable=True,
  arguments={"x-queue-type": "stream"}
)

# create a function which is called on incoming messages
def callback(ch, method, properties, body):
  send_welcome_email(body)

# Set the consumer QoS prefetch
channel.basic_qos(
  prefetch_count=100
)

# Consume messages published to the stream
channel.basic_consume(
  'test_stream',
  callback,
)

# start consuming (blocks)
channel.start_consuming()
connection.close()

basic_consume でオフセットがどのように指定されていないかに注意してください: # Consume messages published to the stream channel.basic_consume( 'test_stream', callback)。 その結果、コンシューマーは、開始後に test_stream に書き込まれた最新のオフセットから読み取りを開始します。 ここでは、 を意図的に強調して、ストリームの関心のある振る舞いを相互に調査できるようにしています。

オフセットの設定方法

ストリームはメッセージを削除しないため、どのコンシューマーもログ内の任意のポイントから読み取り/コンシュームを開始できます。 これは、 x-stream-offset コンシューマー引数によって制御されます。 指定されていない場合、コンシューマーは、開始後にログに書き込まれた次のオフセットから読み取りを開始します。 以下の値がサポートされます。

  • first-ログ内の最初の使用可能なメッセージから開始します。
  • last-最後に書き込まれたメッセージの「チャンク」から読み取りを開始します。
  • next-オフセットを指定しない場合と同じ
  • Offset-ログに付加される正確なオフセットを指定する数値。
  • タイム・スタンプ-ログに付加する時点を指定するタイム・スタンプ値。 これは、最も近いオフセットにクランプします。タイム・スタンプがストリームの範囲外の場合、ログの開始または終了のいずれかをクランプします (例: 00:00:00 UTC, 1970-01-01)。 コンシューマーは、指定されたタイム・スタンプより少し前にパブリッシュされたメッセージを受信する可能性があることに注意してください。
  • Interval-ログを接続する現在時刻を基準にした時間間隔を指定するストリング値。 x-max-age と同じ指定を使用します。

以下のコード例は、最初のオフセット指定の使用方法を示しています。

# Grabbing the first message
channel.basic_consume(
  'test_stream',
  callback,
  arguments={"x-stream-offset": "first"}
)

このコードは、消費元の特定のオフセットを指定する方法を示しています。

channel.basic_consume(
  'test_stream',
  callback,
  arguments={"x-stream-offset": 5000}
)

その他のストリーム操作

以下の操作は、クラシック・キューおよびクォーラム・キューと同様の方法で使用できますが、キュー固有の動作を持つものもあります。