Skip to content

Commit

Permalink
Updated code
Browse files Browse the repository at this point in the history
  • Loading branch information
djthorpe committed Feb 23, 2020
1 parent ac98bb2 commit ca8b14e
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 7 deletions.
6 changes: 3 additions & 3 deletions cmd/mqttsub/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

var (
Format = "%-10v %-40v %-28v\n"
Format = "%-10v %-40v %-40v\n"
Header sync.Once
)

Expand Down Expand Up @@ -56,13 +56,13 @@ func EventHandler(_ context.Context, app gopi.App, evt_ gopi.Event) {

Header.Do(func() {
fmt.Printf(Format, "TYPE", "TOPIC", "DATA")
fmt.Printf(Format, strings.Repeat("-", 10), strings.Repeat("-", 40), strings.Repeat("-", 28))
fmt.Printf(Format, strings.Repeat("-", 10), strings.Repeat("-", 40), strings.Repeat("-", 40))
})
message := strings.TrimPrefix(fmt.Sprint(evt.Type()), "MOSQ_FLAG_EVENT_")
topic := TruncateString(evt.Topic(), 40)
data := "<nil>"
if len(evt.Data()) > 0 {
data = TruncateString(strconv.Quote(string(evt.Data())), 28)
data = TruncateString(strconv.Quote(string(evt.Data())), 40)
}
fmt.Printf(Format, message, topic, data)
}
Expand Down
28 changes: 24 additions & 4 deletions mosquitto.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package mosquitto

import (
"context"
"fmt"
"strings"
"time"

// Frameworks
"github.com/djthorpe/gopi/v2"
Expand Down Expand Up @@ -36,6 +38,12 @@ type Client interface {
// Publish []byte data to topic and return request-id
Publish(string, []byte, ...Opt) (int, error)

// Publish JSON data to topic and return request-id
PublishJSON(string, interface{}, ...Opt) (int, error)

// Publish measurements in influxdata line protocol format and return request-id
PublishInflux(string, string, map[string]interface{}, ...Opt) (int, error)

// Wait for a specific request-id or 0 for a connect or disconnect event
// with context (for timeout)
WaitFor(context.Context, int) (Event, error)
Expand All @@ -60,10 +68,12 @@ type Event interface {

// Function options
type Opt struct {
Type Option
Int int
Bool bool
Flags Flags
Type Option
Int int
Bool bool
Flags Flags
String string
Timestamp time.Time
}

////////////////////////////////////////////////////////////////////////////////
Expand All @@ -73,6 +83,10 @@ func OptQOS(value int) Opt { return Opt{Type: MOSQ_OPTION_QOS, Int: va
func OptRetain(value bool) Opt { return Opt{Type: MOSQ_OPTION_RETAIN, Bool: value} }
func OptFlags(value Flags) Opt { return Opt{Type: MOSQ_OPTION_FLAGS, Flags: value} }
func OptKeepaliveSecs(value int) Opt { return Opt{Type: MOSQ_OPTION_KEEPALIVE, Int: value} }
func OptTag(name, value string) Opt {
return Opt{Type: MOSQ_OPTION_TAG, String: fmt.Sprintf("%s=%s", strings.TrimSpace(name), strings.TrimSpace(value))}
}
func OptTimestamp(value time.Time) Opt { return Opt{Type: MOSQ_OPTION_TIMESTAMP, Timestamp: value} }

////////////////////////////////////////////////////////////////////////////////
// CONSTANTS
Expand All @@ -97,6 +111,8 @@ const (
MOSQ_OPTION_RETAIN // BoolValue
MOSQ_OPTION_FLAGS // FlagsValue
MOSQ_OPTION_KEEPALIVE // IntValue
MOSQ_OPTION_TAG // StringValue
MOSQ_OPTION_TIMESTAMP // TimeValue
)

////////////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -150,6 +166,10 @@ func (o Option) String() string {
return "MOSQ_OPTION_FLAGS"
case MOSQ_OPTION_KEEPALIVE:
return "MOSQ_OPTION_KEEPALIVE"
case MOSQ_OPTION_TAG:
return "MOSQ_OPTION_TAG"
case MOSQ_OPTION_TIMESTAMP:
return "MOSQ_OPTION_TIMESTAMP"
default:
return "[?? Invalid Option value]"
}
Expand Down
58 changes: 58 additions & 0 deletions unit/mosquitto/mosquitto.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package mosquitto

import (
"context"
"encoding/json"
"fmt"
"net"
"regexp"
"strconv"
"strings"
"sync"

// Frameworks
Expand Down Expand Up @@ -339,6 +341,62 @@ func (this *mosquitto) Publish(topic string, data []byte, opts ...iface.Opt) (in
}
}

////////////////////////////////////////////////////////////////////////////////
// PUBLISH JSON & INFLUX FORMATS

func (this *mosquitto) PublishJSON(topic string, data interface{}, opts ...iface.Opt) (int, error) {
if json, err := json.Marshal(data); err != nil {
return 0, err
} else {
return this.Publish(topic, json, opts...)
}
}

// Influx line protocol
// https://docs.influxdata.com/influxdb/v1.7/write_protocols/line_protocol_tutorial/
// Include one or more OptTag(name,value) for tags
// Include one OptTimestamp(time.Time) to set timestamp
func (this *mosquitto) PublishInflux(topic string, measurement string, fields map[string]interface{}, opts ...iface.Opt) (int, error) {
// Check parameters
if len(fields) == 0 {
return 0, gopi.ErrBadParameter.WithPrefix("fields")
}
if measurement == "" {
return 0, gopi.ErrBadParameter.WithPrefix("measurement")
}

// Process options
str := strings.TrimSpace(measurement)
ts := ""
other := make([]iface.Opt, 0, len(opts))
for _, opt := range opts {
switch opt.Type {
case iface.MOSQ_OPTION_TAG:
str += "," + opt.String
case iface.MOSQ_OPTION_TIMESTAMP:
ts = " " + fmt.Sprint(opt.Timestamp.UnixNano())
default:
other = append(other, opt)
}
}

// Process fields
delim := " "
for k, v := range fields {
switch v.(type) {
case float32, float64, bool, int, uint, int8, uint8, int16, uint16, int32, uint32, int64, uint64:
str += delim + fmt.Sprintf("%v=%v", strings.TrimSpace(k), v)
case string:
str += delim + fmt.Sprintf("%v=%v", strings.TrimSpace(k), strconv.Quote(v.(string)))
default:
return 0, gopi.ErrBadParameter.WithPrefix(k)
}
delim = ","
}

return this.Publish(topic, []byte(str+ts), other...)
}

////////////////////////////////////////////////////////////////////////////////
// WAIT FOR

Expand Down
66 changes: 66 additions & 0 deletions unit/mosquitto/mosquitto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,3 +84,69 @@ func Main_Test_Mosquitto_002(app gopi.App, t *testing.T) {
}
}
}

func Test_Mosquitto_003(t *testing.T) {
args := []string{"-mqtt.broker", TEST_SERVER}
if app, err := app.NewTestTool(t, Main_Test_Mosquitto_003, args, "mosquitto"); err != nil {
t.Error(err)
} else {
app.Run()
}
}

func Main_Test_Mosquitto_003(app gopi.App, t *testing.T) {
mosquitto := app.UnitInstance("mosquitto").(mosq.Client)
bus := app.Bus()
bus.DefaultHandler(gopi.EVENT_NS_DEFAULT, func(_ context.Context, _ gopi.App, evt gopi.Event) {
t.Log(evt)
})
if err := mosquitto.Connect(); err != nil {
t.Error(err)
} else {
time.Sleep(2 * time.Second)
for i := 0; i < 10; i++ {
if _, err := mosquitto.PublishJSON("test", 100.8); err != nil {
t.Error(err)
}
time.Sleep(100 * time.Millisecond)
}
if err := mosquitto.Disconnect(); err != nil {
t.Error(err)
}
}
}

func Test_Mosquitto_004(t *testing.T) {
args := []string{"-mqtt.broker", TEST_SERVER}
if app, err := app.NewTestTool(t, Main_Test_Mosquitto_004, args, "mosquitto"); err != nil {
t.Error(err)
} else {
app.Run()
}
}

func Main_Test_Mosquitto_004(app gopi.App, t *testing.T) {
mosquitto := app.UnitInstance("mosquitto").(mosq.Client)
bus := app.Bus()
bus.DefaultHandler(gopi.EVENT_NS_DEFAULT, func(_ context.Context, _ gopi.App, evt gopi.Event) {
t.Log(evt)
})
if err := mosquitto.Connect(); err != nil {
t.Error(err)
} else {
time.Sleep(2 * time.Second)
for i := 0; i < 10; i++ {
fields := map[string]interface{}{
"v1": i,
"v2": float64(i) / 2,
}
if _, err := mosquitto.PublishInflux("test", "test", fields, mosq.OptTag("host", "rpi4.lan"), mosq.OptTimestamp(time.Now())); err != nil {
t.Error(err)
}
time.Sleep(100 * time.Millisecond)
}
if err := mosquitto.Disconnect(); err != nil {
t.Error(err)
}
}
}

0 comments on commit ca8b14e

Please sign in to comment.