ファイナンス、情報通信技術のスキル・アグリゲーション・サイト

' . iseeit.jp 情報通信技術 . '
 

RabbitMQとPython / Node.js / PHPによる非同期メッセージング入門

1. RabbitMQの概要とUbuntu 24.04 LTSへのインストール手順

1-1. RabbitMQの概要

RabbitMQはオープンソースのメッセージブローカーで、アプリケーション間の非同期通信を仲介する役割を持ちます。 プロデューサー(送信側)がメッセージをキューに送信し、コンシューマー(受信側)がキューからメッセージを取得して処理します。 これにより、送信側と受信側を疎結合にし、負荷分散・再試行・バッファリングなどを柔軟に実現できます。

1-2. Ubuntu 24.04 LTSへのRabbitMQインストール(標準リポジトリ)

もっともシンプルな方法は、Ubuntu標準リポジトリからインストールする方法です。

# パッケージインデックス更新
sudo apt update

# RabbitMQサーバーのインストール
sudo apt install -y rabbitmq-server

# サービス状態の確認
sudo systemctl status rabbitmq-server

# 自動起動の確認
sudo systemctl is-enabled rabbitmq-server

デフォルトではAMQPのポート 5672 が使用されます。 ローカル開発用途であれば、このまま利用して問題ありません。

1-3. 管理コンソール(Management Plugin)の有効化(任意)

Webブラウザからキューやメッセージを確認したい場合は、管理プラグインを有効化します。

# 管理プラグイン有効化
sudo rabbitmq-plugins enable rabbitmq_management

# サービス再起動
sudo systemctl restart rabbitmq-server

デフォルトでは http://localhost:15672 で管理画面にアクセスできます。 初期ユーザーは guest / guest ですが、セキュリティのため本番環境では専用ユーザーを作成してください。

2. Python: pikaモジュールのインストールとProducer/Consumer非同期通信プログラム例

2-1. Python用RabbitMQクライアントのインストール

Pythonでは一般的に pika を利用します。

# 仮想環境は任意(推奨)
# python -m venv venv
# source venv/bin/activate

pip install pika

2-2. Producer(メッセージ送信側)の例

シンプルなキュー hello にメッセージを送信する例です。

# producer.py
import pika

# RabbitMQサーバーへ接続(ローカルホスト想定)
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host="localhost")
)
channel = connection.channel()

# キュー宣言(存在しない場合は作成される)
channel.queue_declare(queue="hello", durable=True)

# メッセージ送信
message = "Hello RabbitMQ from Python!"
channel.basic_publish(
    exchange="",
    routing_key="hello",
    body=message.encode("utf-8"),
    properties=pika.BasicProperties(
        delivery_mode=2  # メッセージを永続化
    ),
)

print(f"[x] Sent: {message}")

connection.close()

2-3. Consumer(メッセージ受信側)の例

コンシューマーはキュー hello を監視し、メッセージを受信するとコールバック関数で処理します。 RabbitMQとの通信は非同期に行われ、メッセージ到着時にコールバックが呼ばれます。

# consumer.py
import pika
import time

def callback(ch, method, properties, body):
    message = body.decode("utf-8")
    print(f"[x] Received: {message}")
    # 処理のシミュレーション
    time.sleep(1)
    print("[x] Done")
    # 明示的にACKを返すことで、メッセージをキューから削除
    ch.basic_ack(delivery_tag=method.delivery_tag)

# 接続
connection = pika.BlockingConnection(
    pika.ConnectionParameters(host="localhost")
)
channel = connection.channel()

# キュー宣言(Producer側と同じ定義)
channel.queue_declare(queue="hello", durable=True)

# 公平分配のための設定(1つずつ処理)
channel.basic_qos(prefetch_count=1)

# コンシューム開始
channel.basic_consume(
    queue="hello",
    on_message_callback=callback
)

print("[*] Waiting for messages. To exit press CTRL+C")
channel.start_consuming()

別々のターミナルで consumer.py を起動しておき、もう一方で producer.py を実行すると、 メッセージが非同期にキュー経由で渡される様子を確認できます。

3. Python: aio-pika のインストール、Producer/Consumer例、pika との違い

3-1. aio-pika の概要

aio-pika は Python の非同期フレームワーク asyncio に対応した RabbitMQ クライアントです。 pika の BlockingConnection とは異なり、イベントループ上で非同期にメッセージ送受信を行えるため、 高スループットなアプリケーションや多数の同時接続を扱う場面で有利です。

3-2. aio-pika のインストール

pip install aio-pika

3-3. aio-pika による Producer(送信側)例

# aio_producer.py
import asyncio
import aio_pika

async def main():
    queue_name = "hello_async"
    message = "Hello RabbitMQ from aio-pika!"

    # 接続
    connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
    async with connection:
        channel = await connection.channel()

        # キュー宣言
        queue = await channel.declare_queue(queue_name, durable=True)

        # メッセージ送信
        await channel.default_exchange.publish(
            aio_pika.Message(
                body=message.encode("utf-8"),
                delivery_mode=aio_pika.DeliveryMode.PERSISTENT
            ),
            routing_key=queue.name
        )

        print("[x] Sent:", message)

asyncio.run(main())

3-4. aio-pika による Consumer(受信側)例

# aio_consumer.py
import asyncio
import aio_pika

async def main():
    queue_name = "hello_async"

    # 接続
    connection = await aio_pika.connect_robust("amqp://guest:guest@localhost/")
    async with connection:
        channel = await connection.channel()

        # 公平分配
        await channel.set_qos(prefetch_count=1)

        # キュー宣言
        queue = await channel.declare_queue(queue_name, durable=True)

        print("[*] Waiting for messages. To exit press CTRL+C")

        # メッセージ受信
        async with queue.iterator() as queue_iter:
            async for message in queue_iter:
                async with message.process():
                    print("[x] Received:", message.body.decode("utf-8"))
                    await asyncio.sleep(1)
                    print("[x] Done")

asyncio.run(main())

別ターミナルで python aio_consumer.py を起動し、もう一方で python aio_producer.py を実行すると、 非同期イベントループ上でメッセージが処理される様子を確認できます。

3-5. pika と aio-pika の違い

項目 pika aio-pika
実行モデル 同期(BlockingConnection) 非同期(asyncioベース)
適した用途 単純なスクリプト、小規模処理 高スループット、同時接続多数、Webアプリ
コードスタイル 通常の関数で記述 async/await を使用
パフォーマンス 大量メッセージ処理では制約あり イベントループにより効率的に処理
学習コスト 低い(同期処理) やや高い(asyncioの理解が必要)

4. Node.jsライブラリのインストールとProducer/Consumer非同期通信プログラム例

4-1. Node.js用RabbitMQクライアントのインストール

Node.jsでは amqplib がよく利用されます。

# プロジェクト作成
mkdir node-rabbitmq-example
cd node-rabbitmq-example
npm init -y

# RabbitMQクライアントライブラリ
npm install amqplib

4-2. Producer(メッセージ送信側)の例

// producer.js
const amqp = require("amqplib");

async function run() {
  const queue = "hello";
  const message = "Hello RabbitMQ from Node.js!";

  // 接続
  const connection = await amqp.connect("amqp://localhost");
  const channel = await connection.createChannel();

  // キュー宣言
  await channel.assertQueue(queue, { durable: true });

  // メッセージ送信
  channel.sendToQueue(queue, Buffer.from(message), {
    persistent: true,
  });

  console.log("[x] Sent:", message);

  await channel.close();
  await connection.close();
}

run().catch(console.error);

4-3. Consumer(メッセージ受信側)の例

// consumer.js
const amqp = require("amqplib");

async function run() {
  const queue = "hello";

  // 接続
  const connection = await amqp.connect("amqp://localhost");
  const channel = await connection.createChannel();

  // キュー宣言
  await channel.assertQueue(queue, { durable: true });

  // 公平分配
  await channel.prefetch(1);

  console.log("[*] Waiting for messages. To exit press CTRL+C");

  // メッセージ受信
  channel.consume(
    queue,
    async (msg) => {
      if (msg !== null) {
        const content = msg.content.toString();
        console.log("[x] Received:", content);

        // 処理のシミュレーション
        await new Promise((resolve) => setTimeout(resolve, 1000));
        console.log("[x] Done");

        // ACK
        channel.ack(msg);
      }
    },
    { noAck: false }
  );
}

run().catch(console.error);

別ターミナルで node consumer.js を起動し、もう一方で node producer.js を実行すると、 非同期にメッセージが処理される様子が確認できます。

5.PHPライブラリのインストールとProducer/Consumer非同期通信プログラム例

5-1. PHP用RabbitMQクライアントのインストール

PHPでは php-amqplib/php-amqplib をComposerでインストールします。

# プロジェクト作成
mkdir php-rabbitmq-example
cd php-rabbitmq-example

# Composer初期化(対話形式)
composer init

# RabbitMQクライアントライブラリ
composer require php-amqplib/php-amqplib

5-2. Producer(メッセージ送信側)の例


<?php
// producer.php
require __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$queue = 'hello';
$messageBody = 'Hello RabbitMQ from PHP!';

// 接続
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// キュー宣言
$channel->queue_declare($queue, false, true, false, false);

// メッセージ作成(永続化)
$msg = new AMQPMessage(
    $messageBody,
    ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]
);

// 送信
$channel->basic_publish($msg, '', $queue);

echo "[x] Sent: {$messageBody}\n";

$channel->close();
$connection->close();
?>

5-3. Consumer(メッセージ受信側)の例


<?php
// consumer.php
require __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;

$queue = 'hello';

// 接続
$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest');
$channel = $connection->channel();

// キュー宣言
$channel->queue_declare($queue, false, true, false, false);

// 公平分配
$channel->basic_qos(null, 1, null);

echo "[*] Waiting for messages. To exit press CTRL+C\n";

$callback = function ($msg) {
    echo "[x] Received: ", $msg->body, "\n";
    // 処理のシミュレーション
    sleep(1);
    echo "[x] Done\n";
    // ACK
    $msg->ack();
};

// メッセージ受信開始
$channel->basic_consume($queue, '', false, false, false, false, $callback);

// イベントループ
while ($channel->is_consuming()) {
    $channel->wait();
}

$channel->close();
$connection->close();
?>

別ターミナルで php consumer.php を起動し、もう一方で php producer.php を実行すると、 キューを介した非同期メッセージ処理を確認できます。

ファイナンシャル・プランニング
6つの係数

終価係数 : 元本を一定期間一定利率で複利運用したとき、将来いくら になるかを計算するときに利用します。

現価係数 : 将来の一定期間後に目標のお金を得るために、現在いくら の元本で複利運用を開始すればよいかを計算するときに利用します。

年金終価係数 : 一定期間一定利率で毎年一定金額を複利運用で 積み立て たとき、将来いくら になるかを計算するときに利用します。

年金現価係数 : 元本を一定利率で複利運用しながら、毎年一定金額を一定期間 取り崩し ていくとき、現在いくら の元本で複利運用を開始すればよいかを計算するときに利用します。

減債基金係数 : 将来の一定期間後に目標のお金を得るために、一定利率で一定金額を複利運用で 積み立て るとき、毎年いくら ずつ積み立てればよいかを計算するときに利用します。

資本回収係数 : 元本を一定利率で複利運用しながら、毎年一定金額を一定期間 取り崩し ていくとき、毎年いくら ずつ受け取りができるかを計算するときに利用します。

積み立て&取り崩しモデルプラン

積立金額→年金額の計算 : 年金終価係数、終価係数、資本回収係数を利用して、複利運用で積み立てた資金から、将来取り崩すことのできる年金額を計算します。

年金額→積立金額の計算 : 年金現価係数、現価係数、減債基金係数を利用して、複利運用で将来の年金プランに必要な資金の積立金額を計算します。


住宅ローン計算ツール