RabbitMQ ストリーム
RabbitMQ Streams は、コンシューマーによるコンシュームのためにプロデューサーからのメッセージをバッファリングする、キューと同様に機能する永続的な複製データ構造です。 ただし、ストリームには以下の 2 つの違いがあります。
- プロデューサーがメッセージを書き込む方法。
- コンシューマーがメッセージを読み取る方法。
Streams は、有効期限が切れるまで繰り返し読み取ることができるメッセージの付加専用ログをモデル化します。 ストリームは常に永続的で複製されます。 このストリームの振る舞いのより技術的な説明は、「非破壊的な消費者セマンティクス」です。
RabbitMQでストリームからメッセージを読み取るには、1 つ以上のコンシューマーがそのストリームにサブスクライブし、必要な回数だけ同じメッセージを読み取ります。 コンシューマーは、AMQP ベースのクライアントを介してストリームと対話し、AMQP プロトコルを使用します。
ユース・ケース
ストリームのユース・ケースには、以下のものがあります。
- ファンアウト・アーキテクチャー: 多くのコンシューマーが同じメッセージを読み取る必要がある場合。
- 再生およびタイム・トラベル: 消費者が同じメッセージを読んで再読み取りする必要がある場合、またはストリーム内の任意のポイントから読み取りを開始する必要がある場合。
- 大容量バックログ: ストリームは、メモリー内のオーバーヘッドを最小限に抑えながら、大量のデータを効率的に保管するように設計されています。
- 高スループット: RabbitMQ Streams は、1 秒当たりのメッセージ数を比較的多く処理します。
詳しくは、 RabbitMQ Streamsを参照してください。
RabbitMQ ストリームの使用方法
オプションのキューおよびコンシューマー引数 を指定できる AMQP 0.9.1 クライアント・ライブラリーは、ストリームを通常の AMQP 0.9.1 キューとして使用できます。
AMQP クライアント・ライブラリーでの使用
AMQP クライアント・ライブラリーを介して RabbitMQ ストリームを操作するには、以下の 3 つのステップを実行します。
RabbitMQ ストリームの宣言
RabbitMQ 管理インターフェースを使用してストリームを作成できます。
- 最初に、 「新規キューの追加」 を選択します。
- 次に、 「タイプ」 ドロップダウンで *「ストリーム」*を選択します。
あるいは、タイプ “stream”
のキューを作成して、 RabbitMQ 管理インターフェースを使用してストリームを作成します。 キューが既に存在する場合、そのキューは作成されません。 以下のようなコマンドを使用してストリームを宣言します。
import pika, os
credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)
# Connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='<hostname>',
port='<port>',
credentials=credentials,
virtual_host="/",
ssl_options=ssl_options))
channel = connection.channel() # start a channel
# Declare a Stream, named test_stream
channel.queue_declare(
queue='test_stream',
durable=True,
arguments={"x-queue-type": "stream"}
)
RabbitMQ ストリームのパブリッシュ
以下のスクリプトは、 RabbitMQ ストリーム (test_stream
) を宣言し、 basic_publish
関数を介してそのストリームにメッセージをパブリッシュします。
import pika, os
credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)
# Connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='<hostname>',
port='<port>',
credentials=credentials,
virtual_host="/",
ssl_options=ssl_options))
channel = connection.channel() # start a channel
# Declare a Stream, named test_stream
channel.queue_declare(
queue='test_stream',
durable=True,
arguments={"x-queue-type": "stream"}
)
# Publish a message to the test_stream
channel.basic_publish(
exchange='',
routing_key='test_stream',
body='Welcome email message'
)
RabbitMQ ストリームのコンシューム
メッセージは、キューがこのタスクを実行するのと同じ方法でストリームから取り込むことができますが、以下の 2 つの大きな違いがあります。
- RabbitMQ Streams でメッセージをコンシュームするには、 QoS プリフェッチを設定する必要があります。
- ログ・ストリーム内の任意のポイントから読み取り/消費を開始するオフセットを指定します。 指定されていない場合、コンシューマーは、開始後にログ・ストリームに書き込まれた最新のオフセットから読み取りを開始します。
以下のスクリプトは、 test_stream
を再度宣言し、 basic_qos
関数を使用して QoS プリフェッチを 100
に設定します。 コンシューマーは、新規メッセージの処理時にコールバック関数をトリガーします。 次に、コールバック関数は、ユーザーへの E メールの送信をシミュレートする send_welcome_email
関数を呼び出します。
import pika, os, time
def send_welcome_email(msg):
print("Welcome Email task processing")
print(" [x] Received " + str(msg))
time.sleep(5) # simulate sending email to a user --delays for 5 seconds
print("Email successfully sent!")
return
credentials = pika.PlainCredentials('username', 'password')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
ssl_options = pika.SSLOptions(context)
# Connection
connection = pika.BlockingConnection(
pika.ConnectionParameters(
host='<hostname>',
port='<password>',
credentials=credentials,
virtual_host="/",
ssl_options=ssl_options))
channel = connection.channel() # start a channel
# Declare our stream
channel.queue_declare(
queue='test_stream',
durable=True,
arguments={"x-queue-type": "stream"}
)
# create a function which is called on incoming messages
def callback(ch, method, properties, body):
send_welcome_email(body)
# Set the consumer QoS prefetch
channel.basic_qos(
prefetch_count=100
)
# Consume messages published to the stream
channel.basic_consume(
'test_stream',
callback,
)
# start consuming (blocks)
channel.start_consuming()
connection.close()
basic_consume
でオフセットがどのように指定されていないかに注意してください: # Consume messages published to the stream channel.basic_consume( 'test_stream', callback)
。 その結果、コンシューマーは、開始後に test_stream
に書き込まれた最新のオフセットから読み取りを開始します。
ここでは、 後 を意図的に強調して、ストリームの関心のある振る舞いを相互に調査できるようにしています。
オフセットの設定方法
ストリームはメッセージを削除しないため、どのコンシューマーもログ内の任意のポイントから読み取り/コンシュームを開始できます。 これは、 x-stream-offset
コンシューマー引数によって制御されます。 指定されていない場合、コンシューマーは、開始後にログに書き込まれた次のオフセットから読み取りを開始します。 以下の値がサポートされます。
- first-ログ内の最初の使用可能なメッセージから開始します。
- last-最後に書き込まれたメッセージの「チャンク」から読み取りを開始します。
- next-オフセットを指定しない場合と同じ
- Offset-ログに付加される正確なオフセットを指定する数値。
- タイム・スタンプ-ログに付加する時点を指定するタイム・スタンプ値。 これは、最も近いオフセットにクランプします。タイム・スタンプがストリームの範囲外の場合、ログの開始または終了のいずれかをクランプします (例: 00:00:00 UTC, 1970-01-01)。 コンシューマーは、指定されたタイム・スタンプより少し前にパブリッシュされたメッセージを受信する可能性があることに注意してください。
- Interval-ログを接続する現在時刻を基準にした時間間隔を指定するストリング値。
x-max-age
と同じ指定を使用します。
以下のコード例は、最初のオフセット指定の使用方法を示しています。
# Grabbing the first message
channel.basic_consume(
'test_stream',
callback,
arguments={"x-stream-offset": "first"}
)
このコードは、消費元の特定のオフセットを指定する方法を示しています。
channel.basic_consume(
'test_stream',
callback,
arguments={"x-stream-offset": 5000}
)
その他のストリーム操作
以下の操作は、クラシック・キューおよびクォーラム・キューと同様の方法で使用できますが、キュー固有の動作を持つものもあります。
- 宣言 (declaration)
- キューの削除
- パブリッシャーの確認
- コンシューム (サブスクリプション): コンシュームには、 QoS プリフェッチを設定する必要があります。 これらのパックは、消費者の現在のオフセットを進めるためのクレジット・メカニズムとして機能します。
- コンシューマー用の QoS プリフェッチ の設定
- コンシューマー確認応答 ( QoS プリフェッチ制限 に留意してください)
- コンシューマーの取り消し