Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to get metrics out of nexus. #2

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
129 changes: 129 additions & 0 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
package metrics

import (
"net/http"
"strconv"
"sync/atomic"

"github.com/cornelk/hashmap"
"github.com/gammazero/nexus/stdlog"
"github.com/ugorji/go/codec"
)

var (
logger stdlog.StdLog
)

type DisplayGeneral map[string]interface{}

// MetricMap is intended to be used as an quick acccess way to increase and decrease simple values such as `in/outMessageCount` and `..Authorization`
type MetricMap struct {
mp *hashmap.HashMap
}

// MetricGlobal is the global instance of the metric hashmap
var MetricGlobal = &MetricMap{mp: hashmap.New(64)}
var handler codec.JsonHandle
var h = &handler

func startAPI(port uint16) {
http.HandleFunc("/metrics", metricToJSON)
http.ListenAndServe(":"+strconv.Itoa(int(port)), nil)
}

// metricToJSON creates raw view of current data of MetricGlobal
func metricToJSON(w http.ResponseWriter, r *http.Request) {
disMtr, err := MetricGlobal.MetricMapToGoMap()
if err != nil {
return
}
buffer := make([]byte, 128)
encoder := codec.NewEncoderBytes(&buffer, h)
encoder.Encode(disMtr)
if err != nil {
return
}
w.Write(buffer)
}

// IncrementAtomicUint64Key executes an atomic increment on the specified key of the map
func (mp *MetricMap) IncrementAtomicUint64Key(key string) {
IncrementAtomicUint64KeyOf(mp.mp, key)
}

// IncrementAtomicUint64KeyOf executes an atomic increment on the specified key of any given map
func IncrementAtomicUint64KeyOf(hmp *hashmap.HashMap, key string) {
var amt uint64
curamt, _ := hmp.GetOrInsert(key, &amt)
count := (curamt).(*uint64)
atomic.AddUint64(count, 1)
}

// IncreaseAtomicUint64Key executes an atomic increase of diff on the specified key of the map
func (mp *MetricMap) IncreaseAtomicUint64Key(key string, diff uint64) {
IncreaseAtomicUint64KeyOf(mp.mp, key, diff)
}

// IncreaseAtomicUint64KeyOf executes an atomic increase of diff on the specified key of any given map
func IncreaseAtomicUint64KeyOf(hmp *hashmap.HashMap, key string, diff uint64) {
var amt uint64
curamt, _ := hmp.GetOrInsert(key, &amt)
count := (curamt).(*uint64)
atomic.AddUint64(count, diff)
}

// GetSubMapOf returns the saved MetricMap of the given key of any given map. Creates one if none exists
func GetSubMapOf(hmp *hashmap.HashMap, key string) (mp *MetricMap) {
var m MetricMap
val, loaded := hmp.GetOrInsert(key, &m)
mp = (val).(*MetricMap)
if !loaded {
mp.mp = hashmap.New(128)
}
hmp.Set(key, mp)
return
}

// GetSubMap returns the saved MetricMap of the given key of the map. Creates one if none exists
func (mp *MetricMap) GetSubMap(key string) (dmp *MetricMap) {
dmp = GetSubMapOf(mp.mp, key)
return
}

// SendMsgCountHandler increments SendMessageCount
func SendMsgCountHandler() {
IncrementAtomicUint64KeyOf(MetricGlobal.mp, "SendMessageCount")
}

// RecvMsgCountHandler increments RecvMesssageCount
func RecvMsgCountHandler() {
IncrementAtomicUint64KeyOf(MetricGlobal.mp, "RecvMesssageCount")
}

// RecvMsgLenHandler increses RecvTrafficBytesTotal by the length of the received message
func RecvMsgLenHandler(len uint64) {
IncreaseAtomicUint64KeyOf(MetricGlobal.mp, "RecvTrafficBytesTotal", len)
}

// SendMsgLenHandler increases SendTrafficBytesTotal by the length of the send message
func SendMsgLenHandler(len uint64) {
IncreaseAtomicUint64KeyOf(MetricGlobal.mp, "SendTrafficBytesTotal", len)
}

// MetricMapToGoMap transform Hashmap to normal Go Map
func (hmp *MetricMap) MetricMapToGoMap() (disMtr DisplayGeneral, err error) {
disMtr = make(map[string]interface{}, 32)
for k := range hmp.mp.Iter() {
if m, ok := (k.Value).(*MetricMap); ok {
dmp, e := m.MetricMapToGoMap()
if err != nil {
err = e
return
}
disMtr[(k.Key).(string)] = dmp
} else {
disMtr[(k.Key).(string)] = k.Value
}
}
return
}
38 changes: 38 additions & 0 deletions metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package metrics

import (
"net/http/httptest"
"testing"
)

func TestServer(t *testing.T) {
go startAPI(3453)
mp := GetSubMapOf(MetricGlobal.mp, "tester")
IncrementAtomicUint64KeyOf(mp.mp, "test")
IncreaseAtomicUint64KeyOf(mp.mp, "test", 54)
r := httptest.NewRequest("", "localhost:3444", nil)
w := httptest.NewRecorder()
metricToJSON(w, r)
}

func TestFlat(t *testing.T) {
IncrementAtomicUint64KeyOf(MetricGlobal.mp, "testing")
IncreaseAtomicUint64KeyOf(MetricGlobal.mp, "testing", 54)
}

func TestMap(t *testing.T) {
mp := GetSubMapOf(MetricGlobal.mp, "tester")
IncrementAtomicUint64KeyOf(mp.mp, "test")
IncreaseAtomicUint64KeyOf(mp.mp, "test", 54)
}

func TestBuiltinHandlers(t *testing.T) {
RecvMsgCountHandler()
SendMsgCountHandler()
SendMsgLenHandler(54)
RecvMsgLenHandler(54)
}

func TestConvert(t *testing.T) {
MetricGlobal.MetricMapToGoMap()
}
42 changes: 31 additions & 11 deletions router/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ type subscription struct {
subscribers map[*session]struct{}
}

// FilterFactory is a function which creates a PublishFilter from a publication
type FilterFactory func(msg *wamp.Publish) PublishFilter

type Broker struct {
// topic -> subscription
topicSubscription map[wamp.URI]*subscription
Expand All @@ -61,15 +64,19 @@ type Broker struct {
strictURI bool
allowDisclose bool

log stdlog.StdLog
debug bool
log stdlog.StdLog
debug bool
filterFactory FilterFactory
}

// NewBroker returns a new default broker implementation instance.
func NewBroker(logger stdlog.StdLog, strictURI, allowDisclose, debug bool) *Broker {
func NewBroker(logger stdlog.StdLog, strictURI, allowDisclose, debug bool, publishFilter FilterFactory) *Broker {
if logger == nil {
panic("logger is nil")
}
if publishFilter == nil {
publishFilter = NewSimplePublishFilter
}
b := &Broker{
topicSubscription: map[wamp.URI]*subscription{},
pfxTopicSubscription: map[wamp.URI]*subscription{},
Expand All @@ -88,8 +95,9 @@ func NewBroker(logger stdlog.StdLog, strictURI, allowDisclose, debug bool) *Brok
strictURI: strictURI,
allowDisclose: allowDisclose,

log: logger,
debug: debug,
log: logger,
debug: debug,
filterFactory: publishFilter,
}
go b.run()
return b
Expand Down Expand Up @@ -151,13 +159,14 @@ func (b *Broker) Publish(pub *session, msg *wamp.Publish) {
Details: wamp.Dict{},
Error: wamp.ErrOptionDisallowedDiscloseMe,
})
return
}
disclose = true
}
pubID := wamp.GlobalID()

// Get blacklists and whitelists, if any, from publish message.
filter := newPublishFilter(msg)
filter := b.filterFactory(msg)

b.actionChan <- func() {
b.publish(pub, msg, pubID, excludePub, disclose, filter)
Expand Down Expand Up @@ -243,7 +252,7 @@ func (b *Broker) run() {
}
}

func (b *Broker) publish(pub *session, msg *wamp.Publish, pubID wamp.ID, excludePub, disclose bool, filter *publishFilter) {
func (b *Broker) publish(pub *session, msg *wamp.Publish, pubID wamp.ID, excludePub, disclose bool, filter PublishFilter) {
// Publish to subscribers with exact match.
if sub, ok := b.topicSubscription[msg.Topic]; ok {
b.pubEvent(pub, msg, pubID, sub, excludePub, false, disclose, filter)
Expand Down Expand Up @@ -440,17 +449,28 @@ func (b *Broker) removeSession(subscriber *session) {
}
}

func allowPublish(sub *session, filter PublishFilter) bool {
if filter == nil {
return true
}
if filter.LockRequired() {
sub.RLock()
defer sub.RUnlock()
}
return filter.PublishAllowed(&sub.Session)
}

// pubEvent sends an event to all subscribers that are not excluded from
// receiving the event.
func (b *Broker) pubEvent(pub *session, msg *wamp.Publish, pubID wamp.ID, sub *subscription, excludePublisher, sendTopic, disclose bool, filter *publishFilter) {
func (b *Broker) pubEvent(pub *session, msg *wamp.Publish, pubID wamp.ID, sub *subscription, excludePublisher, sendTopic, disclose bool, filter PublishFilter) {
for subscriber, _ := range sub.subscribers {
// Do not send event to publisher.
if subscriber == pub && excludePublisher {
continue
}

// Check if receiver is restricted.
if filter != nil && !filter.publishAllowed(subscriber) {
if !allowPublish(subscriber, filter) {
continue
}

Expand Down Expand Up @@ -582,13 +602,13 @@ func disclosePublisher(pub *session, details wamp.Dict) {
details[rolePub] = pub.ID
// These values are not required by the specification, but are here for
// compatibility with Crossbar.
pub.rLock()
pub.RLock()
for _, f := range []string{"authid", "authrole"} {
if val, ok := pub.Details[f]; ok {
details[fmt.Sprintf("%s_%s", rolePub, f)] = val
}
}
pub.rUnlock()
pub.RUnlock()
}

// ----- Subscription Meta Procedure Handlers -----
Expand Down
18 changes: 9 additions & 9 deletions router/broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (p *testPeer) Close() { return }

func TestBasicSubscribe(t *testing.T) {
// Test subscribing to a topic.
broker := NewBroker(logger, false, true, debug)
broker := NewBroker(logger, false, true, debug, nil)
subscriber := newTestPeer()
sess := newSession(subscriber, 0, nil)
testTopic := wamp.URI("nexus.test.topic")
Expand Down Expand Up @@ -142,7 +142,7 @@ func TestBasicSubscribe(t *testing.T) {
}

func TestUnsubscribe(t *testing.T) {
broker := NewBroker(logger, false, true, debug)
broker := NewBroker(logger, false, true, debug, nil)
testTopic := wamp.URI("nexus.test.topic")

// Subscribe session1 to topic
Expand Down Expand Up @@ -250,7 +250,7 @@ func TestUnsubscribe(t *testing.T) {

func TestRemove(t *testing.T) {
// Subscribe to topic
broker := NewBroker(logger, false, true, debug)
broker := NewBroker(logger, false, true, debug, nil)
subscriber := newTestPeer()
sess := newSession(subscriber, 0, nil)
testTopic := wamp.URI("nexus.test.topic")
Expand Down Expand Up @@ -292,7 +292,7 @@ func TestRemove(t *testing.T) {
}

func TestBasicPubSub(t *testing.T) {
broker := NewBroker(logger, false, true, debug)
broker := NewBroker(logger, false, true, debug, nil)
subscriber := newTestPeer()
sess := newSession(subscriber, 0, nil)
testTopic := wamp.URI("nexus.test.topic")
Expand Down Expand Up @@ -331,7 +331,7 @@ func TestBasicPubSub(t *testing.T) {

func TestPrefxPatternBasedSubscription(t *testing.T) {
// Test match=prefix
broker := NewBroker(logger, false, true, debug)
broker := NewBroker(logger, false, true, debug, nil)
subscriber := newTestPeer()
sess := newSession(subscriber, 0, nil)
testTopic := wamp.URI("nexus.test.topic")
Expand Down Expand Up @@ -394,7 +394,7 @@ func TestPrefxPatternBasedSubscription(t *testing.T) {

func TestWildcardPatternBasedSubscription(t *testing.T) {
// Test match=prefix
broker := NewBroker(logger, false, true, debug)
broker := NewBroker(logger, false, true, debug, nil)
subscriber := newTestPeer()
sess := newSession(subscriber, 0, nil)
testTopic := wamp.URI("nexus.test.topic")
Expand Down Expand Up @@ -465,7 +465,7 @@ func TestWildcardPatternBasedSubscription(t *testing.T) {
}

func TestSubscriberBlackwhiteListing(t *testing.T) {
broker := NewBroker(logger, false, true, debug)
broker := NewBroker(logger, false, true, debug, nil)
subscriber := newTestPeer()
details := wamp.Dict{
"authid": "jdoe",
Expand Down Expand Up @@ -572,7 +572,7 @@ func TestSubscriberBlackwhiteListing(t *testing.T) {
}

func TestPublisherExclusion(t *testing.T) {
broker := NewBroker(logger, false, true, debug)
broker := NewBroker(logger, false, true, debug, nil)
subscriber := newTestPeer()
sess := newSession(subscriber, 0, nil)
testTopic := wamp.URI("nexus.test.topic")
Expand Down Expand Up @@ -646,7 +646,7 @@ func TestPublisherExclusion(t *testing.T) {
}

func TestPublisherIdentification(t *testing.T) {
broker := NewBroker(logger, false, true, debug)
broker := NewBroker(logger, false, true, debug, nil)
subscriber := newTestPeer()

details := wamp.Dict{
Expand Down
Loading