From ca8b14e5367a2932b4acb95897e30e70cb1e2572 Mon Sep 17 00:00:00 2001 From: David Thorpe Date: Sun, 23 Feb 2020 11:58:43 +1030 Subject: [PATCH] Updated code --- cmd/mqttsub/main.go | 6 +-- mosquitto.go | 28 ++++++++++++-- unit/mosquitto/mosquitto.go | 58 ++++++++++++++++++++++++++++ unit/mosquitto/mosquitto_test.go | 66 ++++++++++++++++++++++++++++++++ 4 files changed, 151 insertions(+), 7 deletions(-) diff --git a/cmd/mqttsub/main.go b/cmd/mqttsub/main.go index 6da0268..4053ba8 100644 --- a/cmd/mqttsub/main.go +++ b/cmd/mqttsub/main.go @@ -14,7 +14,7 @@ import ( ) var ( - Format = "%-10v %-40v %-28v\n" + Format = "%-10v %-40v %-40v\n" Header sync.Once ) @@ -56,13 +56,13 @@ func EventHandler(_ context.Context, app gopi.App, evt_ gopi.Event) { Header.Do(func() { fmt.Printf(Format, "TYPE", "TOPIC", "DATA") - fmt.Printf(Format, strings.Repeat("-", 10), strings.Repeat("-", 40), strings.Repeat("-", 28)) + fmt.Printf(Format, strings.Repeat("-", 10), strings.Repeat("-", 40), strings.Repeat("-", 40)) }) message := strings.TrimPrefix(fmt.Sprint(evt.Type()), "MOSQ_FLAG_EVENT_") topic := TruncateString(evt.Topic(), 40) data := "" if len(evt.Data()) > 0 { - data = TruncateString(strconv.Quote(string(evt.Data())), 28) + data = TruncateString(strconv.Quote(string(evt.Data())), 40) } fmt.Printf(Format, message, topic, data) } diff --git a/mosquitto.go b/mosquitto.go index 36f41ce..2fd73fa 100644 --- a/mosquitto.go +++ b/mosquitto.go @@ -2,7 +2,9 @@ package mosquitto import ( "context" + "fmt" "strings" + "time" // Frameworks "github.com/djthorpe/gopi/v2" @@ -36,6 +38,12 @@ type Client interface { // Publish []byte data to topic and return request-id Publish(string, []byte, ...Opt) (int, error) + // Publish JSON data to topic and return request-id + PublishJSON(string, interface{}, ...Opt) (int, error) + + // Publish measurements in influxdata line protocol format and return request-id + PublishInflux(string, string, map[string]interface{}, ...Opt) (int, error) + // Wait for a specific request-id or 0 for a connect or disconnect event // with context (for timeout) WaitFor(context.Context, int) (Event, error) @@ -60,10 +68,12 @@ type Event interface { // Function options type Opt struct { - Type Option - Int int - Bool bool - Flags Flags + Type Option + Int int + Bool bool + Flags Flags + String string + Timestamp time.Time } //////////////////////////////////////////////////////////////////////////////// @@ -73,6 +83,10 @@ func OptQOS(value int) Opt { return Opt{Type: MOSQ_OPTION_QOS, Int: va func OptRetain(value bool) Opt { return Opt{Type: MOSQ_OPTION_RETAIN, Bool: value} } func OptFlags(value Flags) Opt { return Opt{Type: MOSQ_OPTION_FLAGS, Flags: value} } func OptKeepaliveSecs(value int) Opt { return Opt{Type: MOSQ_OPTION_KEEPALIVE, Int: value} } +func OptTag(name, value string) Opt { + return Opt{Type: MOSQ_OPTION_TAG, String: fmt.Sprintf("%s=%s", strings.TrimSpace(name), strings.TrimSpace(value))} +} +func OptTimestamp(value time.Time) Opt { return Opt{Type: MOSQ_OPTION_TIMESTAMP, Timestamp: value} } //////////////////////////////////////////////////////////////////////////////// // CONSTANTS @@ -97,6 +111,8 @@ const ( MOSQ_OPTION_RETAIN // BoolValue MOSQ_OPTION_FLAGS // FlagsValue MOSQ_OPTION_KEEPALIVE // IntValue + MOSQ_OPTION_TAG // StringValue + MOSQ_OPTION_TIMESTAMP // TimeValue ) //////////////////////////////////////////////////////////////////////////////// @@ -150,6 +166,10 @@ func (o Option) String() string { return "MOSQ_OPTION_FLAGS" case MOSQ_OPTION_KEEPALIVE: return "MOSQ_OPTION_KEEPALIVE" + case MOSQ_OPTION_TAG: + return "MOSQ_OPTION_TAG" + case MOSQ_OPTION_TIMESTAMP: + return "MOSQ_OPTION_TIMESTAMP" default: return "[?? Invalid Option value]" } diff --git a/unit/mosquitto/mosquitto.go b/unit/mosquitto/mosquitto.go index 78c09ac..a582d83 100644 --- a/unit/mosquitto/mosquitto.go +++ b/unit/mosquitto/mosquitto.go @@ -2,10 +2,12 @@ package mosquitto import ( "context" + "encoding/json" "fmt" "net" "regexp" "strconv" + "strings" "sync" // Frameworks @@ -339,6 +341,62 @@ func (this *mosquitto) Publish(topic string, data []byte, opts ...iface.Opt) (in } } +//////////////////////////////////////////////////////////////////////////////// +// PUBLISH JSON & INFLUX FORMATS + +func (this *mosquitto) PublishJSON(topic string, data interface{}, opts ...iface.Opt) (int, error) { + if json, err := json.Marshal(data); err != nil { + return 0, err + } else { + return this.Publish(topic, json, opts...) + } +} + +// Influx line protocol +// https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/ +// Include one or more OptTag(name,value) for tags +// Include one OptTimestamp(time.Time) to set timestamp +func (this *mosquitto) PublishInflux(topic string, measurement string, fields map[string]interface{}, opts ...iface.Opt) (int, error) { + // Check parameters + if len(fields) == 0 { + return 0, gopi.ErrBadParameter.WithPrefix("fields") + } + if measurement == "" { + return 0, gopi.ErrBadParameter.WithPrefix("measurement") + } + + // Process options + str := strings.TrimSpace(measurement) + ts := "" + other := make([]iface.Opt, 0, len(opts)) + for _, opt := range opts { + switch opt.Type { + case iface.MOSQ_OPTION_TAG: + str += "," + opt.String + case iface.MOSQ_OPTION_TIMESTAMP: + ts = " " + fmt.Sprint(opt.Timestamp.UnixNano()) + default: + other = append(other, opt) + } + } + + // Process fields + delim := " " + for k, v := range fields { + switch v.(type) { + case float32, float64, bool, int, uint, int8, uint8, int16, uint16, int32, uint32, int64, uint64: + str += delim + fmt.Sprintf("%v=%v", strings.TrimSpace(k), v) + case string: + str += delim + fmt.Sprintf("%v=%v", strings.TrimSpace(k), strconv.Quote(v.(string))) + default: + return 0, gopi.ErrBadParameter.WithPrefix(k) + } + delim = "," + } + + return this.Publish(topic, []byte(str+ts), other...) +} + //////////////////////////////////////////////////////////////////////////////// // WAIT FOR diff --git a/unit/mosquitto/mosquitto_test.go b/unit/mosquitto/mosquitto_test.go index 0e19fa8..9b61087 100644 --- a/unit/mosquitto/mosquitto_test.go +++ b/unit/mosquitto/mosquitto_test.go @@ -84,3 +84,69 @@ func Main_Test_Mosquitto_002(app gopi.App, t *testing.T) { } } } + +func Test_Mosquitto_003(t *testing.T) { + args := []string{"-mqtt.broker", TEST_SERVER} + if app, err := app.NewTestTool(t, Main_Test_Mosquitto_003, args, "mosquitto"); err != nil { + t.Error(err) + } else { + app.Run() + } +} + +func Main_Test_Mosquitto_003(app gopi.App, t *testing.T) { + mosquitto := app.UnitInstance("mosquitto").(mosq.Client) + bus := app.Bus() + bus.DefaultHandler(gopi.EVENT_NS_DEFAULT, func(_ context.Context, _ gopi.App, evt gopi.Event) { + t.Log(evt) + }) + if err := mosquitto.Connect(); err != nil { + t.Error(err) + } else { + time.Sleep(2 * time.Second) + for i := 0; i < 10; i++ { + if _, err := mosquitto.PublishJSON("test", 100.8); err != nil { + t.Error(err) + } + time.Sleep(100 * time.Millisecond) + } + if err := mosquitto.Disconnect(); err != nil { + t.Error(err) + } + } +} + +func Test_Mosquitto_004(t *testing.T) { + args := []string{"-mqtt.broker", TEST_SERVER} + if app, err := app.NewTestTool(t, Main_Test_Mosquitto_004, args, "mosquitto"); err != nil { + t.Error(err) + } else { + app.Run() + } +} + +func Main_Test_Mosquitto_004(app gopi.App, t *testing.T) { + mosquitto := app.UnitInstance("mosquitto").(mosq.Client) + bus := app.Bus() + bus.DefaultHandler(gopi.EVENT_NS_DEFAULT, func(_ context.Context, _ gopi.App, evt gopi.Event) { + t.Log(evt) + }) + if err := mosquitto.Connect(); err != nil { + t.Error(err) + } else { + time.Sleep(2 * time.Second) + for i := 0; i < 10; i++ { + fields := map[string]interface{}{ + "v1": i, + "v2": float64(i) / 2, + } + if _, err := mosquitto.PublishInflux("test", "test", fields, mosq.OptTag("host", "rpi4.lan"), mosq.OptTimestamp(time.Now())); err != nil { + t.Error(err) + } + time.Sleep(100 * time.Millisecond) + } + if err := mosquitto.Disconnect(); err != nil { + t.Error(err) + } + } +}