Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] OTEL Observability #297

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions clients.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ func (cl *Client) ReadFixedHeader(fh *packets.FixedHeader) error {
func (cl *Client) ReadPacket(fh *packets.FixedHeader) (pk packets.Packet, err error) {
atomic.AddInt64(&cl.ops.info.PacketsReceived, 1)

pk.Ctx, pk.Cancel = context.WithCancel(cl.State.open)

pk.ProtocolVersion = cl.Properties.ProtocolVersion // inherit client protocol version for decoding
pk.FixedHeader = *fh
p := make([]byte, pk.FixedHeader.Remaining)
Expand Down
140 changes: 140 additions & 0 deletions examples/otel/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// SPDX-License-Identifier: MIT
// SPDX-FileCopyrightText: 2022 mochi-mqtt, mochi-co
// SPDX-FileContributor: ishan-daga

package main

import (
"bytes"
"context"
"flag"
"fmt"
"log"
"os"
"os/signal"
"syscall"

mqtt "github.com/mochi-mqtt/server/v2"
"github.com/mochi-mqtt/server/v2/hooks/auth"
"github.com/mochi-mqtt/server/v2/listeners"
"github.com/mochi-mqtt/server/v2/packets"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
)

func main() {
shut := initTracer()
defer shut(context.Background())

tcpAddr := flag.String("tcp", ":1883", "network address for TCP listener")
flag.Parse()

sigs := make(chan os.Signal, 1)
done := make(chan bool, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
done <- true
}()

server := mqtt.New(&mqtt.Options{
OTELTracing: true,
OTELTracerName: "mochi-mqtt",
})
server.Options.Capabilities.MaximumClientWritesPending = 16 * 1024
_ = server.AddHook(new(auth.AllowHook), nil)
_ = server.AddHook(new(ExampleHook), nil)

tcp := listeners.NewTCP("t1", *tcpAddr, nil)
err := server.AddListener(tcp)
if err != nil {
log.Fatal(err)
}

go func() {
err := server.Serve()
if err != nil {
log.Fatal(err)
}
}()

<-done
server.Log.Warn("caught signal, stopping...")
_ = server.Close()
server.Log.Info("main.go finished")
}

type ExampleHook struct {
mqtt.HookBase
}

func (h *ExampleHook) ID() string {
return "events-example"
}

func (h *ExampleHook) Provides(b byte) bool {
return bytes.Contains([]byte{
mqtt.OnConnect,
mqtt.OnDisconnect,
mqtt.OnSubscribed,
mqtt.OnUnsubscribed,
mqtt.OnPublished,
mqtt.OnPublish,
}, []byte{b})
}

func (h *ExampleHook) Init(config any) error {
h.Log.Info("initialised")
return nil
}

func (h *ExampleHook) OnConnect(cl *mqtt.Client, pk packets.Packet) error {
h.Log.InfoContext(pk.Ctx, "client connected", "client", cl.ID)
return nil
}

func (h *ExampleHook) OnDisconnect(cl *mqtt.Client, err error, expire bool) {
if err != nil {
h.Log.Info("client disconnected", "client", cl.ID, "expire", expire, "error", err)
} else {
h.Log.Info("client disconnected", "client", cl.ID, "expire", expire)
}

}

func (h *ExampleHook) OnSubscribed(cl *mqtt.Client, pk packets.Packet, reasonCodes []byte) {
h.Log.InfoContext(pk.Ctx, fmt.Sprintf("subscribed qos=%v", reasonCodes), "client", cl.ID, "filters", pk.Filters)
}

func (h *ExampleHook) OnUnsubscribed(cl *mqtt.Client, pk packets.Packet) {
h.Log.InfoContext(pk.Ctx, "unsubscribed", "client", cl.ID, "filters", pk.Filters)
}

func (h *ExampleHook) OnPublish(cl *mqtt.Client, pk packets.Packet) (packets.Packet, error) {
h.Log.InfoContext(pk.Ctx, "received from client", "client", cl.ID, "payload", string(pk.Payload))

pkx := pk
if string(pk.Payload) == "hello" {
pkx.Payload = []byte("hello world")
h.Log.InfoContext(pk.Ctx, "received modified packet from client", "client", cl.ID, "payload", string(pkx.Payload))
}

if pk.Ctx == nil {
h.Log.Warn("nil context")
return pkx, nil
}

span := trace.SpanFromContext(pk.Ctx)
span.AddEvent("published", trace.WithAttributes(
attribute.String("client", cl.ID),
attribute.String("payload", string(pk.Payload)),
))

h.Log.DebugContext(pk.Ctx, "published to client", "traceid", span.SpanContext().TraceID().String())

return pkx, nil
}

func (h *ExampleHook) OnPublished(cl *mqtt.Client, pk packets.Packet) {
h.Log.InfoContext(pk.Ctx, "published to client", "client", cl.ID, "payload", string(pk.Payload))
}
61 changes: 61 additions & 0 deletions examples/otel/otel-setup.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package main

import (
"context"
"log"
"strings"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"

"go.opentelemetry.io/otel/sdk/resource"
sdktrace "go.opentelemetry.io/otel/sdk/trace"
"google.golang.org/grpc/credentials"
)

func initTracer() func(context.Context) error {
serviceName := "mochi-mqtt"
insecure := "true"
collectorURL := "localhost:4317"

var secureOption otlptracegrpc.Option

if strings.ToLower(insecure) == "false" || insecure == "0" || strings.ToLower(insecure) == "f" {
secureOption = otlptracegrpc.WithTLSCredentials(credentials.NewClientTLSFromCert(nil, ""))
} else {
secureOption = otlptracegrpc.WithInsecure()
}

exporter, err := otlptrace.New(
context.Background(),
otlptracegrpc.NewClient(
secureOption,
otlptracegrpc.WithEndpoint(collectorURL),
),
)

if err != nil {
log.Fatalf("Failed to create exporter: %v", err)
}
resources, err := resource.New(
context.Background(),
resource.WithAttributes(
attribute.String("service.name", serviceName),
attribute.String("library.language", "go"),
),
)
if err != nil {
log.Fatalf("Could not set resources: %v", err)
}

otel.SetTracerProvider(
sdktrace.NewTracerProvider(
sdktrace.WithSampler(sdktrace.AlwaysSample()),
sdktrace.WithBatcher(exporter),
sdktrace.WithResource(resources),
),
)
return exporter.Shutdown
}
2 changes: 1 addition & 1 deletion examples/paho.testing/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func (h *pahoAuthHook) OnConnectAuthenticate(cl *mqtt.Client, pk packets.Packet)
return true
}

func (h *pahoAuthHook) OnACLCheck(cl *mqtt.Client, topic string, write bool) bool {
func (h *pahoAuthHook) OnACLCheck(cl *mqtt.Client, pk packets.Packet, topic string, write bool) bool {
return topic != "test/nosubscribe"
}

Expand Down
27 changes: 23 additions & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,43 @@ require (
github.com/gorilla/websocket v1.5.0
github.com/jinzhu/copier v0.3.5
github.com/rs/xid v1.4.0
github.com/stretchr/testify v1.7.1
github.com/stretchr/testify v1.8.4
github.com/timshannon/badgerhold v1.0.0
go.etcd.io/bbolt v1.3.5
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.40.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.17.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.17.0
go.opentelemetry.io/otel/sdk v1.17.0
go.opentelemetry.io/otel/sdk/metric v0.40.0
google.golang.org/grpc v1.58.0
gopkg.in/yaml.v3 v3.0.1
)

require (
github.com/cenkalti/backoff/v4 v4.2.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.40.0 // indirect
go.opentelemetry.io/otel/metric v1.17.0 // indirect
go.opentelemetry.io/otel/trace v1.17.0 // indirect
go.opentelemetry.io/proto/otlp v1.0.0 // indirect
golang.org/x/text v0.11.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
)

require (
github.com/AndreasBriese/bbloom v0.0.0-20190825152654-46b345b51c96 // indirect
github.com/alicebob/gopher-json v0.0.0-20200520072559-a9ecdc9d1d3a // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgraph-io/badger v1.6.0 // indirect
github.com/dgryski/go-farm v0.0.0-20190423205320-6a90982ecee2 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/golang/protobuf v1.5.0 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/yuin/gopher-lua v0.0.0-20210529063254-f4c35e4016d9 // indirect
Expand Down
Loading
Loading