ファイナンス、情報通信技術のスキル・アグリゲーション・サイト
RabbitMQはオープンソースのメッセージブローカーで、アプリケーション間の非同期通信を仲介する役割を持ちます。 プロデューサー(送信側)がメッセージをキューに送信し、コンシューマー(受信側)がキューからメッセージを取得して処理します。 これにより、送信側と受信側を疎結合にし、負荷分散・再試行・バッファリングなどを柔軟に実現できます。
もっともシンプルな方法は、Ubuntu標準リポジトリからインストールする方法です。
# パッケージインデックス更新
sudo apt update
# RabbitMQサーバーのインストール
sudo apt install -y rabbitmq-server
# サービス状態の確認
sudo systemctl status rabbitmq-server
# 自動起動の確認
sudo systemctl is-enabled rabbitmq-server
デフォルトではAMQPのポート 5672 が使用されます。
ローカル開発用途であれば、このまま利用して問題ありません。
Webブラウザからキューやメッセージを確認したい場合は、管理プラグインを有効化します。
# 管理プラグイン有効化
sudo rabbitmq-plugins enable rabbitmq_management
# サービス再起動
sudo systemctl restart rabbitmq-server
デフォルトでは http://localhost:15672 で管理画面にアクセスできます。
初期ユーザーは guest / guest ですが、セキュリティのため本番環境では専用ユーザーを作成してください。
Pythonでは一般的に pika を利用します。
# 仮想環境は任意(推奨)
# python -m venv venv
# source venv/bin/activate
pip install pika
シンプルなキュー hello にメッセージを送信する例です。
# producer.py
import pika
# RabbitMQサーバーへ接続(ローカルホスト想定)
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
channel = connection.channel()
# キュー宣言(存在しない場合は作成される)
channel.queue_declare(queue="hello", durable=True)
# メッセージ送信
message = "Hello RabbitMQ from Python!"
channel.basic_publish(
exchange="",
routing_key="hello",
body=message.encode("utf-8"),
properties=pika.BasicProperties(
delivery_mode=2 # メッセージを永続化
),
)
print(f"[x] Sent: {message}")
connection.close()
コンシューマーはキュー hello を監視し、メッセージを受信するとコールバック関数で処理します。
RabbitMQとの通信は非同期に行われ、メッセージ到着時にコールバックが呼ばれます。
# consumer.py
import pika
import time
def callback(ch, method, properties, body):
message = body.decode("utf-8")
print(f"[x] Received: {message}")
# 処理のシミュレーション
time.sleep(1)
print("[x] Done")
# 明示的にACKを返すことで、メッセージをキューから削除
ch.basic_ack(delivery_tag=method.delivery_tag)
# 接続
connection = pika.BlockingConnection(
pika.ConnectionParameters(host="localhost")
)
channel = connection.channel()
# キュー宣言(Producer側と同じ定義)
channel.queue_declare(queue="hello", durable=True)
# 公平分配のための設定(1つずつ処理)
channel.basic_qos(prefetch_count=1)
# コンシューム開始
channel.basic_consume(
queue="hello",
on_message_callback=callback
)
print("[*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()
別々のターミナルで consumer.py を起動しておき、もう一方で producer.py を実行すると、
メッセージが非同期にキュー経由で渡される様子を確認できます。
aio-pika は Python の非同期フレームワーク asyncio に対応した RabbitMQ クライアントです。
pika の BlockingConnection とは異なり、イベントループ上で非同期にメッセージ送受信を行えるため、
高スループットなアプリケーションや多数の同時接続を扱う場面で有利です。
pip install aio-pika
# aio_producer.py
import asyncio
import aio_pika
async def main():
queue_name = "hello_async"
message = "Hello RabbitMQ from aio-pika!"
# 接続
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# キュー宣言
queue = await channel.declare_queue(queue_name, durable=True)
# メッセージ送信
await channel.default_exchange.publish(
aio_pika.Message(
body=message.encode("utf-8"),
delivery_mode=aio_pika.DeliveryMode.PERSISTENT
),
routing_key=queue.name
)
print("[x] Sent:", message)
asyncio.run(main())
# aio_consumer.py
import asyncio
import aio_pika
async def main():
queue_name = "hello_async"
# 接続
connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
async with connection:
channel = await connection.channel()
# 公平分配
await channel.set_qos(prefetch_count=1)
# キュー宣言
queue = await channel.declare_queue(queue_name, durable=True)
print("[*] Waiting for messages. To exit press CTRL+C")
# メッセージ受信
async with queue.iterator() as queue_iter:
async for message in queue_iter:
async with message.process():
print("[x] Received:", message.body.decode("utf-8"))
await asyncio.sleep(1)
print("[x] Done")
asyncio.run(main())
別ターミナルで python aio_consumer.py を起動し、もう一方で python aio_producer.py を実行すると、
非同期イベントループ上でメッセージが処理される様子を確認できます。
| 項目 | pika | aio-pika |
|---|---|---|
| 実行モデル | 同期(BlockingConnection) | 非同期(asyncioベース) |
| 適した用途 | 単純なスクリプト、小規模処理 | 高スループット、同時接続多数、Webアプリ |
| コードスタイル | 通常の関数で記述 | async/await を使用 |
| パフォーマンス | 大量メッセージ処理では制約あり | イベントループにより効率的に処理 |
| 学習コスト | 低い(同期処理) | やや高い(asyncioの理解が必要) |
Node.jsでは amqplib がよく利用されます。
# プロジェクト作成
mkdir node-rabbitmq-example
cd node-rabbitmq-example
npm init -y
# RabbitMQクライアントライブラリ
npm install amqplib
// producer.js
const amqp = require("amqplib");
async function run() {
const queue = "hello";
const message = "Hello RabbitMQ from Node.js!";
// 接続
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
// キュー宣言
await channel.assertQueue(queue, { durable: true });
// メッセージ送信
channel.sendToQueue(queue, Buffer.from(message), {
persistent: true,
});
console.log("[x] Sent:", message);
await channel.close();
await connection.close();
}
run().catch(console.error);
// consumer.js
const amqp = require("amqplib");
async function run() {
const queue = "hello";
// 接続
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
// キュー宣言
await channel.assertQueue(queue, { durable: true });
// 公平分配
await channel.prefetch(1);
console.log("[*] Waiting for messages. To exit press CTRL+C");
// メッセージ受信
channel.consume(
queue,
async (msg) => {
if (msg !== null) {
const content = msg.content.toString();
console.log("[x] Received:", content);
// 処理のシミュレーション
await new Promise((resolve) => setTimeout(resolve, 1000));
console.log("[x] Done");
// ACK
channel.ack(msg);
}
},
{ noAck: false }
);
}
run().catch(console.error);
別ターミナルで node consumer.js を起動し、もう一方で node producer.js を実行すると、
非同期にメッセージが処理される様子が確認できます。
PHPでは php-amqplib/php-amqplib をComposerでインストールします。
# プロジェクト作成
mkdir php-rabbitmq-example
cd php-rabbitmq-example
# Composer初期化(対話形式)
composer init
# RabbitMQクライアントライブラリ
composer require php-amqplib/php-amqplib
<?php
// producer.php
require __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$queue = 'hello';
$messageBody = 'Hello RabbitMQ from PHP!';
// 接続
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// キュー宣言
$channel->queue_declare($queue, false, true, false, false);
// メッセージ作成(永続化)
$msg = new AMQPMessage(
$messageBody,
['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);
// 送信
$channel->basic_publish($msg, '', $queue);
echo "[x] Sent: {$messageBody}\n";
$channel->close();
$connection->close();
?>
<?php
// consumer.php
require __DIR__ . '/vendor/autoload.php';
use PhpAmqpLib\Connection\AMQPStreamConnection;
$queue = 'hello';
// 接続
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();
// キュー宣言
$channel->queue_declare($queue, false, true, false, false);
// 公平分配
$channel->basic_qos(null, 1, null);
echo "[*] Waiting for messages. To exit press CTRL+C\n";
$callback = function ($msg) {
echo "[x] Received: ", $msg->body, "\n";
// 処理のシミュレーション
sleep(1);
echo "[x] Done\n";
// ACK
$msg->ack();
};
// メッセージ受信開始
$channel->basic_consume($queue, '', false, false, false, false, $callback);
// イベントループ
while ($channel->is_consuming()) {
$channel->wait();
}
$channel->close();
$connection->close();
?>
別ターミナルで php consumer.php を起動し、もう一方で php producer.php を実行すると、
キューを介した非同期メッセージ処理を確認できます。