ファイナンス、情報通信技術のスキル・アグリゲーション・サイト
MQTT (Message Queue Telemetry Transport) は、Publisher から発信したメッセージを Broker を通じて Subscriber へ配信するプロトコルです。
Paho (MQTT Client Library)を利用して、 Mostuitto (MQTT Broker)に対してメッセージを Publishing、Subscribing するクライアントプログラム例です。
Mostuitto インストールについては、「Mosquitto(MQTT Broker)を Windows と Ubuntu にインストール」も参照してください。
SSL 証明書は、プライベート証明書を作成しました。手順については、「プライベート認証局でプライベート SSL/TLS 証明書を発行する」も参照してください。
Ubuntu 16.04 LTS で実行確認しました。なお、ブラウザでの確認は、Ubuntu 上の Firefox を利用しました。SSL の利用時に CA 証明書のインポートが必要です。
Python による Paho サンプルプログラムです。
http://www.eclipse.org/
Paho の Python ライブラリモジュールをインストールします。
$ sudo pip install paho-mqtt
mosquitto.conf の設定内容です。
# Default listener
listener 1883
# Certificate based SSL/TLS support
listener 8883
cafile   /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile  /etc/mosquitto/certs/server.key
Paho の Subscribe プログラム例です。
import paho.mqtt.client as mqtt
import ssl
host = 'localhost'
port = 8883
ca_cert = '/etc/mosquitto/certs/ca.crt'
certfile = '/etc/mosquitto/certs/client.crt'
keyfile = '/etc/mosquitto/certs/client.key'
topic = 'test'
def on_connect(client, userdata, flags, reason_code, properties):
  print('status {0}'.format(reason_code))
  client.subscribe(topic)
def on_message(client, userdata, msg):
  print('[' + msg.topic + ']' + str(msg.payload))
if __name__ == '__main__':
  client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
  client.on_connect = on_connect
  client.on_message = on_message
  client.tls_set(ca_cert, certfile, keyfile,
    cert_reqs = mqtt.ssl.CERT_REQUIRED,
    tls_version = mqtt.ssl.PROTOCOL_TLSv1_2,
    ciphers = None)
  client.tls_insecure_set(True)
  client.connect(host, port=port, keepalive=60)
  client.loop_forever()
Paho の Publish プログラム例です。
import paho.mqtt.client as mqtt
import ssl
host = 'localhost'
port = 8883
ca_cert = '/etc/mosquitto/certs/ca.crt'
certfile = '/etc/mosquitto/certs/client.crt'
keyfile = '/etc/mosquitto/certs/client.key'
topic = 'test'
client = mqtt.Client(mqtt.CallbackAPIVersion.VERSION2)
client.tls_set(ca_cert, certfile, keyfile,
  cert_reqs = mqtt.ssl.CERT_REQUIRED,
  tls_version = mqtt.ssl.PROTOCOL_TLSv1_2,
  ciphers = None)
client.tls_insecure_set(True)
client.connect(host, port=port, keepalive=60)
client.publish(topic, 'Message')
client.disconnect()
JavaScript による Paho サンプルプログラムです。
http://www.eclipse.org/
CDN により Paho ライブラリを取り込みます。
<script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type="text/javascript"></script>
mosquitto.conf の設定内容です。WebSocket を使用します。
# WebSocket listener
listener 11883
protocol websockets
# Certificate based SSL/TLS support
listener 18883
protocol websockets
cafile   /etc/mosquitto/certs/ca.crt
certfile /etc/mosquitto/certs/server.crt
keyfile  /etc/mosquitto/certs/server.key
HTML5 & JQuery のチャットプログラム風のサンプルです。
<!DOCTYPE html>
<html>
 <head>
  <meta charset="UTF-8">
  <title>MQTT Client Demo</title>
  <meta name="viewport" content="width=device-width, initial-scale=1">
  <script src="https://code.jquery.com/jquery-3.2.1.min.js"></script>
  <script src="https://cdnjs.cloudflare.com/ajax/libs/paho-mqtt/1.0.1/mqttws31.min.js" type="text/javascript"></script>
  <script type="text/javascript">
function ws_mqtt() {
  // Create a client instance
  // client = new Paho.MQTT.Client("host", port,"client_id"); 
  client = new Paho.MQTT.Client("localhost", 18883, "id_" + parseInt(Math.random() * 100, 10));
  // set callback handlers
  client.onConnectionLost = onConnectionLost;
  client.onMessageArrived = onMessageArrived;
  var options = {
    useSSL: true,
    onSuccess:onConnect,
    onFailure:doFail
  }
  // connect the client
  client.connect(options);
  // called when the client connects
  function onConnect() {
    // Once a connection has been made, make a subscription and send a message.
    console.log("onConnect");
    client.subscribe("test");
  }
  function doFail(e){
    console.log(e);
  }
  // called when the client loses its connection
  function onConnectionLost(responseObject) {
    if (responseObject.errorCode !== 0) {
      $("#chat").prepend("<p>onConnectionLost:"+responseObject.errorMessage+"</p>");
    }
  }
  // called when a message arrives
  function onMessageArrived(message) {
    $("#chat").prepend("<p>onMessageArrived:"+message.payloadString+"</p>");
  }
  $("#chatform").submit(function (evt) {
    var line = $("#chatform [type=text]").val()
    $("#chatform [type=text]").val("")
    message = new Paho.MQTT.Message(line);
    message.destinationName = "test";
    client.send(message);
    return false;
  });
}
  </script>
 </head>
 <body>
  <div id="chat" style="width: 60em; height: 20em; overflow:auto; border: 1px solid black">
  </div>
  <form id="chatform">
  <p>Message</p>
  <input type="text" />
  <input type="submit" value="送信" />
  </form>
  <script type="text/javascript">
window.onload = function() {
  $("#chat").prepend("<p>ONLOAD</p>");
  ws_mqtt();
}
  </script>
 </body>
</html>