技術メモ

技術メモ

ラフなメモ

GoでMQTTのpub/subを試す

MQTT Broker

MQTT Brokerはmosquitoを使うことにする。Dockerで立ち上げる。

> docker run -it -p 1883:1883 -p 9001:9001 eclipse-mosquitto
1577610628: mosquitto version 1.6.8 starting
1577610628: Config loaded from /mosquitto/config/mosquitto.conf.
1577610628: Opening ipv4 listen socket on port 1883.
1577610628: Opening ipv6 listen socket on port 1883.

MQTT Subscriberの実装

サンプル実装として go-mqtt/sample のトピックをQOS = 0でsubscribeすることにする。mqtt clientにはpahoを使うのが良さそう。ブリッジアプリとして機能するときはselectしているチャネル経由でsubscribeしたメッセージを後続に渡すのが良いだろう。

package main

import (
    "fmt"
    "log"
    "os"
    "os/signal"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
    msgCh := make(chan mqtt.Message)
    var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
        msgCh <- msg
    }
    opts := mqtt.NewClientOptions()
    opts.AddBroker("tcp://localhost:1883")
    c := mqtt.NewClient(opts)

    if token := c.Connect(); token.Wait() && token.Error() != nil {
        log.Fatalf("Mqtt error: %s", token.Error())
    }

    if subscribeToken := c.Subscribe("go-mqtt/sample", 0, f); subscribeToken.Wait() && subscribeToken.Error() != nil {
        log.Fatal(subscribeToken.Error())
    }

    signalCh := make(chan os.Signal, 1)
    signal.Notify(signalCh, os.Interrupt)

    for {
        select {
        case m := <-msgCh:
            fmt.Printf("topic: %v, payload: %v\n", m.Topic(), string(m.Payload()))
        case <-signalCh:
            fmt.Printf("Interrupt detected.\n")
            c.Disconnect(1000)
            return
        }
    }
}

テストにpublishする実装。go buildなどをして、上記のgoアプリとは別のターミナルで起動させる。

package main

import (
    "fmt"
    "log"

    mqtt "github.com/eclipse/paho.mqtt.golang"
)

func main() {
    opts := mqtt.NewClientOptions()
    opts.AddBroker("tcp://localhost:1883")
    c := mqtt.NewClient(opts)

    if token := c.Connect(); token.Wait() && token.Error() != nil {
        log.Fatalf("Mqtt error: %s", token.Error())
    }

    for i := 0; i < 5; i++ {
        text := fmt.Sprintf("this is msg #%d!", i)
        token := c.Publish("go-mqtt/sample", 0, false, text)
        token.Wait()
    }

    c.Disconnect(250)

    fmt.Println("Complete publish")
}

subscriberを起動させたまま、publisherを実行すると以下のようになる。

  • MQTT Subscriber が接続したときのMQTT Brokerのログ
1577610760: New connection from 172.17.0.1 on port 1883.
1577610760: New client connected from 172.17.0.1 as auto-0D5C578F-3020-BB4E-DD27-175C728719A3 (p2, c1, k30).
  • MQTT Publisher が接続したときのMQTT Brokerのログ
1577610793: New connection from 172.17.0.1 on port 1883.
1577610793: New client connected from 172.17.0.1 as auto-1CD1DCAB-9437-1182-33EB-21392BCF739C (p2, c1, k30).
1577610793: Client auto-1CD1DCAB-9437-1182-33EB-21392BCF739C disconnected.
  • MQTT Subscriber
go run main.go
topic: go-mqtt/sample, payload: this is msg #0!
topic: go-mqtt/sample, payload: this is msg #1!
topic: go-mqtt/sample, payload: this is msg #2!
topic: go-mqtt/sample, payload: this is msg #3!
topic: go-mqtt/sample, payload: this is msg #4!

簡易的だけどこんな感じで。

その他参考

このあたりの実装は参考になりそう。