-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathclient_mqtt.go
120 lines (92 loc) · 2.34 KB
/
client_mqtt.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package raptor
import (
"net/url"
"strings"
mqtt "github.com/eclipse/paho.mqtt.golang"
"github.com/raptorbox/raptor-sdk-go/models"
)
// BrokerConnection track connection status to the broker
type BrokerConnection struct {
connected bool
client mqtt.Client
}
func (c *DefaultClient) connectToBroker() (mqtt.Client, error) {
mqttClient, err := c.GetBrokerClient()
if err != nil {
return nil, err
}
if !mqttClient.IsConnected() {
if token := mqttClient.Connect(); token.Wait() && token.Error() != nil {
return nil, token.Error()
}
}
return mqttClient, nil
}
// GetBrokerClient return a MQTT client
func (c *DefaultClient) GetBrokerClient() (mqtt.Client, error) {
if c.brokerConnection.client != nil {
return c.brokerConnection.client, nil
}
u, err := url.Parse(c.GetConfig().GetURL())
if err != nil {
return c.brokerConnection.client, err
}
mqttURI := "tcp"
mqttPort := "1883"
if u.Scheme == "https" {
mqttURI += "s"
mqttPort = "8883"
}
mqttURI += "://" + u.Hostname() + ":" + mqttPort
log.Debugf("MQTT uri %s", mqttURI)
opts := mqtt.NewClientOptions().AddBroker(mqttURI)
client := mqtt.NewClient(opts)
c.brokerConnection.client = client
return c.brokerConnection.client, nil
}
//Subscribe to topic
func (c *DefaultClient) Subscribe(topic string, cb func(event models.Payload)) error {
mqttClient, err := c.connectToBroker()
if err != nil {
return err
}
if token := mqttClient.Subscribe(topic, 0, func(client mqtt.Client, msg mqtt.Message) {
parts := strings.Split(topic, "/")
var err error
var p models.Payload
switch parts[0] {
case "tree":
p = &models.TreeNodePayload{}
return
case "stream":
p = &models.StreamPayload{}
case "inventory":
p = &models.DevicePayload{}
default:
//noop
}
err = FromJSON(msg.Payload(), p)
if err == nil {
cb(p)
return
}
log.Errorf("Error handling message for `%s`: %s", parts[0], string(msg.Payload()))
}); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}
//Unsubscribe from topic
func (c *DefaultClient) Unsubscribe(topic string, cb func(event models.Payload)) error {
mqttClient, err := c.GetBrokerClient()
if err != nil {
return err
}
if !mqttClient.IsConnected() {
return nil
}
if token := mqttClient.Unsubscribe(topic); token.Wait() && token.Error() != nil {
return token.Error()
}
return nil
}