GoでMQTTのpub/subを試す
MQTT Broker
MQTT Brokerはmosquitoを使うことにする。Dockerで立ち上げる。
> docker run -it -p 1883:1883 -p 9001:9001 eclipse-mosquitto 1577610628: mosquitto version 1.6.8 starting 1577610628: Config loaded from /mosquitto/config/mosquitto.conf. 1577610628: Opening ipv4 listen socket on port 1883. 1577610628: Opening ipv6 listen socket on port 1883.
MQTT Subscriberの実装
サンプル実装として go-mqtt/sample
のトピックをQOS = 0でsubscribeすることにする。mqtt clientにはpahoを使うのが良さそう。ブリッジアプリとして機能するときはselectしているチャネル経由でsubscribeしたメッセージを後続に渡すのが良いだろう。
package main import ( "fmt" "log" "os" "os/signal" mqtt "github.com/eclipse/paho.mqtt.golang" ) func main() { msgCh := make(chan mqtt.Message) var f mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { msgCh <- msg } opts := mqtt.NewClientOptions() opts.AddBroker("tcp://localhost:1883") c := mqtt.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { log.Fatalf("Mqtt error: %s", token.Error()) } if subscribeToken := c.Subscribe("go-mqtt/sample", 0, f); subscribeToken.Wait() && subscribeToken.Error() != nil { log.Fatal(subscribeToken.Error()) } signalCh := make(chan os.Signal, 1) signal.Notify(signalCh, os.Interrupt) for { select { case m := <-msgCh: fmt.Printf("topic: %v, payload: %v\n", m.Topic(), string(m.Payload())) case <-signalCh: fmt.Printf("Interrupt detected.\n") c.Disconnect(1000) return } } }
テストにpublishする実装。go buildなどをして、上記のgoアプリとは別のターミナルで起動させる。
package main import ( "fmt" "log" mqtt "github.com/eclipse/paho.mqtt.golang" ) func main() { opts := mqtt.NewClientOptions() opts.AddBroker("tcp://localhost:1883") c := mqtt.NewClient(opts) if token := c.Connect(); token.Wait() && token.Error() != nil { log.Fatalf("Mqtt error: %s", token.Error()) } for i := 0; i < 5; i++ { text := fmt.Sprintf("this is msg #%d!", i) token := c.Publish("go-mqtt/sample", 0, false, text) token.Wait() } c.Disconnect(250) fmt.Println("Complete publish") }
subscriberを起動させたまま、publisherを実行すると以下のようになる。
- MQTT Subscriber が接続したときのMQTT Brokerのログ
1577610760: New connection from 172.17.0.1 on port 1883. 1577610760: New client connected from 172.17.0.1 as auto-0D5C578F-3020-BB4E-DD27-175C728719A3 (p2, c1, k30).
- MQTT Publisher が接続したときのMQTT Brokerのログ
1577610793: New connection from 172.17.0.1 on port 1883. 1577610793: New client connected from 172.17.0.1 as auto-1CD1DCAB-9437-1182-33EB-21392BCF739C (p2, c1, k30). 1577610793: Client auto-1CD1DCAB-9437-1182-33EB-21392BCF739C disconnected.
- MQTT Subscriber
go run main.go topic: go-mqtt/sample, payload: this is msg #0! topic: go-mqtt/sample, payload: this is msg #1! topic: go-mqtt/sample, payload: this is msg #2! topic: go-mqtt/sample, payload: this is msg #3! topic: go-mqtt/sample, payload: this is msg #4!
簡易的だけどこんな感じで。
その他参考
このあたりの実装は参考になりそう。