diff --git a/metrics/metrics.go b/metrics/metrics.go new file mode 100644 index 00000000..8072d1c6 --- /dev/null +++ b/metrics/metrics.go @@ -0,0 +1,129 @@ +package metrics + +import ( + "net/http" + "strconv" + "sync/atomic" + + "github.com/cornelk/hashmap" + "github.com/gammazero/nexus/stdlog" + "github.com/ugorji/go/codec" +) + +var ( + logger stdlog.StdLog +) + +type DisplayGeneral map[string]interface{} + +// MetricMap is intended to be used as an quick acccess way to increase and decrease simple values such as `in/outMessageCount` and `..Authorization` +type MetricMap struct { + mp *hashmap.HashMap +} + +// MetricGlobal is the global instance of the metric hashmap +var MetricGlobal = &MetricMap{mp: hashmap.New(64)} +var handler codec.JsonHandle +var h = &handler + +func startAPI(port uint16) { + http.HandleFunc("/metrics", metricToJSON) + http.ListenAndServe(":"+strconv.Itoa(int(port)), nil) +} + +// metricToJSON creates raw view of current data of MetricGlobal +func metricToJSON(w http.ResponseWriter, r *http.Request) { + disMtr, err := MetricGlobal.MetricMapToGoMap() + if err != nil { + return + } + buffer := make([]byte, 128) + encoder := codec.NewEncoderBytes(&buffer, h) + encoder.Encode(disMtr) + if err != nil { + return + } + w.Write(buffer) +} + +// IncrementAtomicUint64Key executes an atomic increment on the specified key of the map +func (mp *MetricMap) IncrementAtomicUint64Key(key string) { + IncrementAtomicUint64KeyOf(mp.mp, key) +} + +// IncrementAtomicUint64KeyOf executes an atomic increment on the specified key of any given map +func IncrementAtomicUint64KeyOf(hmp *hashmap.HashMap, key string) { + var amt uint64 + curamt, _ := hmp.GetOrInsert(key, &amt) + count := (curamt).(*uint64) + atomic.AddUint64(count, 1) +} + +// IncreaseAtomicUint64Key executes an atomic increase of diff on the specified key of the map +func (mp *MetricMap) IncreaseAtomicUint64Key(key string, diff uint64) { + IncreaseAtomicUint64KeyOf(mp.mp, key, diff) +} + +// IncreaseAtomicUint64KeyOf executes an atomic increase of diff on the specified key of any given map +func IncreaseAtomicUint64KeyOf(hmp *hashmap.HashMap, key string, diff uint64) { + var amt uint64 + curamt, _ := hmp.GetOrInsert(key, &amt) + count := (curamt).(*uint64) + atomic.AddUint64(count, diff) +} + +// GetSubMapOf returns the saved MetricMap of the given key of any given map. Creates one if none exists +func GetSubMapOf(hmp *hashmap.HashMap, key string) (mp *MetricMap) { + var m MetricMap + val, loaded := hmp.GetOrInsert(key, &m) + mp = (val).(*MetricMap) + if !loaded { + mp.mp = hashmap.New(128) + } + hmp.Set(key, mp) + return +} + +// GetSubMap returns the saved MetricMap of the given key of the map. Creates one if none exists +func (mp *MetricMap) GetSubMap(key string) (dmp *MetricMap) { + dmp = GetSubMapOf(mp.mp, key) + return +} + +// SendMsgCountHandler increments SendMessageCount +func SendMsgCountHandler() { + IncrementAtomicUint64KeyOf(MetricGlobal.mp, "SendMessageCount") +} + +// RecvMsgCountHandler increments RecvMesssageCount +func RecvMsgCountHandler() { + IncrementAtomicUint64KeyOf(MetricGlobal.mp, "RecvMesssageCount") +} + +// RecvMsgLenHandler increses RecvTrafficBytesTotal by the length of the received message +func RecvMsgLenHandler(len uint64) { + IncreaseAtomicUint64KeyOf(MetricGlobal.mp, "RecvTrafficBytesTotal", len) +} + +// SendMsgLenHandler increases SendTrafficBytesTotal by the length of the send message +func SendMsgLenHandler(len uint64) { + IncreaseAtomicUint64KeyOf(MetricGlobal.mp, "SendTrafficBytesTotal", len) +} + +// MetricMapToGoMap transform Hashmap to normal Go Map +func (hmp *MetricMap) MetricMapToGoMap() (disMtr DisplayGeneral, err error) { + disMtr = make(map[string]interface{}, 32) + for k := range hmp.mp.Iter() { + if m, ok := (k.Value).(*MetricMap); ok { + dmp, e := m.MetricMapToGoMap() + if err != nil { + err = e + return + } + disMtr[(k.Key).(string)] = dmp + } else { + disMtr[(k.Key).(string)] = k.Value + } + } + return +} diff --git a/metrics/metrics_test.go b/metrics/metrics_test.go new file mode 100644 index 00000000..ef36b924 --- /dev/null +++ b/metrics/metrics_test.go @@ -0,0 +1,38 @@ +package metrics + +import ( + "net/http/httptest" + "testing" +) + +func TestServer(t *testing.T) { + go startAPI(3453) + mp := GetSubMapOf(MetricGlobal.mp, "tester") + IncrementAtomicUint64KeyOf(mp.mp, "test") + IncreaseAtomicUint64KeyOf(mp.mp, "test", 54) + r := httptest.NewRequest("", "localhost:3444", nil) + w := httptest.NewRecorder() + metricToJSON(w, r) +} + +func TestFlat(t *testing.T) { + IncrementAtomicUint64KeyOf(MetricGlobal.mp, "testing") + IncreaseAtomicUint64KeyOf(MetricGlobal.mp, "testing", 54) +} + +func TestMap(t *testing.T) { + mp := GetSubMapOf(MetricGlobal.mp, "tester") + IncrementAtomicUint64KeyOf(mp.mp, "test") + IncreaseAtomicUint64KeyOf(mp.mp, "test", 54) +} + +func TestBuiltinHandlers(t *testing.T) { + RecvMsgCountHandler() + SendMsgCountHandler() + SendMsgLenHandler(54) + RecvMsgLenHandler(54) +} + +func TestConvert(t *testing.T) { + MetricGlobal.MetricMapToGoMap() +} diff --git a/router/broker.go b/router/broker.go index e776c63f..81383047 100644 --- a/router/broker.go +++ b/router/broker.go @@ -41,6 +41,9 @@ type subscription struct { subscribers map[*session]struct{} } +// FilterFactory is a function which creates a PublishFilter from a publication +type FilterFactory func(msg *wamp.Publish) PublishFilter + type Broker struct { // topic -> subscription topicSubscription map[wamp.URI]*subscription @@ -61,15 +64,19 @@ type Broker struct { strictURI bool allowDisclose bool - log stdlog.StdLog - debug bool + log stdlog.StdLog + debug bool + filterFactory FilterFactory } // NewBroker returns a new default broker implementation instance. -func NewBroker(logger stdlog.StdLog, strictURI, allowDisclose, debug bool) *Broker { +func NewBroker(logger stdlog.StdLog, strictURI, allowDisclose, debug bool, publishFilter FilterFactory) *Broker { if logger == nil { panic("logger is nil") } + if publishFilter == nil { + publishFilter = NewSimplePublishFilter + } b := &Broker{ topicSubscription: map[wamp.URI]*subscription{}, pfxTopicSubscription: map[wamp.URI]*subscription{}, @@ -88,8 +95,9 @@ func NewBroker(logger stdlog.StdLog, strictURI, allowDisclose, debug bool) *Brok strictURI: strictURI, allowDisclose: allowDisclose, - log: logger, - debug: debug, + log: logger, + debug: debug, + filterFactory: publishFilter, } go b.run() return b @@ -151,13 +159,14 @@ func (b *Broker) Publish(pub *session, msg *wamp.Publish) { Details: wamp.Dict{}, Error: wamp.ErrOptionDisallowedDiscloseMe, }) + return } disclose = true } pubID := wamp.GlobalID() // Get blacklists and whitelists, if any, from publish message. - filter := newPublishFilter(msg) + filter := b.filterFactory(msg) b.actionChan <- func() { b.publish(pub, msg, pubID, excludePub, disclose, filter) @@ -243,7 +252,7 @@ func (b *Broker) run() { } } -func (b *Broker) publish(pub *session, msg *wamp.Publish, pubID wamp.ID, excludePub, disclose bool, filter *publishFilter) { +func (b *Broker) publish(pub *session, msg *wamp.Publish, pubID wamp.ID, excludePub, disclose bool, filter PublishFilter) { // Publish to subscribers with exact match. if sub, ok := b.topicSubscription[msg.Topic]; ok { b.pubEvent(pub, msg, pubID, sub, excludePub, false, disclose, filter) @@ -440,9 +449,20 @@ func (b *Broker) removeSession(subscriber *session) { } } +func allowPublish(sub *session, filter PublishFilter) bool { + if filter == nil { + return true + } + if filter.LockRequired() { + sub.RLock() + defer sub.RUnlock() + } + return filter.PublishAllowed(&sub.Session) +} + // pubEvent sends an event to all subscribers that are not excluded from // receiving the event. -func (b *Broker) pubEvent(pub *session, msg *wamp.Publish, pubID wamp.ID, sub *subscription, excludePublisher, sendTopic, disclose bool, filter *publishFilter) { +func (b *Broker) pubEvent(pub *session, msg *wamp.Publish, pubID wamp.ID, sub *subscription, excludePublisher, sendTopic, disclose bool, filter PublishFilter) { for subscriber, _ := range sub.subscribers { // Do not send event to publisher. if subscriber == pub && excludePublisher { @@ -450,7 +470,7 @@ func (b *Broker) pubEvent(pub *session, msg *wamp.Publish, pubID wamp.ID, sub *s } // Check if receiver is restricted. - if filter != nil && !filter.publishAllowed(subscriber) { + if !allowPublish(subscriber, filter) { continue } @@ -582,13 +602,13 @@ func disclosePublisher(pub *session, details wamp.Dict) { details[rolePub] = pub.ID // These values are not required by the specification, but are here for // compatibility with Crossbar. - pub.rLock() + pub.RLock() for _, f := range []string{"authid", "authrole"} { if val, ok := pub.Details[f]; ok { details[fmt.Sprintf("%s_%s", rolePub, f)] = val } } - pub.rUnlock() + pub.RUnlock() } // ----- Subscription Meta Procedure Handlers ----- diff --git a/router/broker_test.go b/router/broker_test.go index a96572d0..325b705c 100644 --- a/router/broker_test.go +++ b/router/broker_test.go @@ -36,7 +36,7 @@ func (p *testPeer) Close() { return } func TestBasicSubscribe(t *testing.T) { // Test subscribing to a topic. - broker := NewBroker(logger, false, true, debug) + broker := NewBroker(logger, false, true, debug, nil) subscriber := newTestPeer() sess := newSession(subscriber, 0, nil) testTopic := wamp.URI("nexus.test.topic") @@ -142,7 +142,7 @@ func TestBasicSubscribe(t *testing.T) { } func TestUnsubscribe(t *testing.T) { - broker := NewBroker(logger, false, true, debug) + broker := NewBroker(logger, false, true, debug, nil) testTopic := wamp.URI("nexus.test.topic") // Subscribe session1 to topic @@ -250,7 +250,7 @@ func TestUnsubscribe(t *testing.T) { func TestRemove(t *testing.T) { // Subscribe to topic - broker := NewBroker(logger, false, true, debug) + broker := NewBroker(logger, false, true, debug, nil) subscriber := newTestPeer() sess := newSession(subscriber, 0, nil) testTopic := wamp.URI("nexus.test.topic") @@ -292,7 +292,7 @@ func TestRemove(t *testing.T) { } func TestBasicPubSub(t *testing.T) { - broker := NewBroker(logger, false, true, debug) + broker := NewBroker(logger, false, true, debug, nil) subscriber := newTestPeer() sess := newSession(subscriber, 0, nil) testTopic := wamp.URI("nexus.test.topic") @@ -331,7 +331,7 @@ func TestBasicPubSub(t *testing.T) { func TestPrefxPatternBasedSubscription(t *testing.T) { // Test match=prefix - broker := NewBroker(logger, false, true, debug) + broker := NewBroker(logger, false, true, debug, nil) subscriber := newTestPeer() sess := newSession(subscriber, 0, nil) testTopic := wamp.URI("nexus.test.topic") @@ -394,7 +394,7 @@ func TestPrefxPatternBasedSubscription(t *testing.T) { func TestWildcardPatternBasedSubscription(t *testing.T) { // Test match=prefix - broker := NewBroker(logger, false, true, debug) + broker := NewBroker(logger, false, true, debug, nil) subscriber := newTestPeer() sess := newSession(subscriber, 0, nil) testTopic := wamp.URI("nexus.test.topic") @@ -465,7 +465,7 @@ func TestWildcardPatternBasedSubscription(t *testing.T) { } func TestSubscriberBlackwhiteListing(t *testing.T) { - broker := NewBroker(logger, false, true, debug) + broker := NewBroker(logger, false, true, debug, nil) subscriber := newTestPeer() details := wamp.Dict{ "authid": "jdoe", @@ -572,7 +572,7 @@ func TestSubscriberBlackwhiteListing(t *testing.T) { } func TestPublisherExclusion(t *testing.T) { - broker := NewBroker(logger, false, true, debug) + broker := NewBroker(logger, false, true, debug, nil) subscriber := newTestPeer() sess := newSession(subscriber, 0, nil) testTopic := wamp.URI("nexus.test.topic") @@ -646,7 +646,7 @@ func TestPublisherExclusion(t *testing.T) { } func TestPublisherIdentification(t *testing.T) { - broker := NewBroker(logger, false, true, debug) + broker := NewBroker(logger, false, true, debug, nil) subscriber := newTestPeer() details := wamp.Dict{ diff --git a/router/dealer.go b/router/dealer.go index 965d9fe9..e1c615ac 100644 --- a/router/dealer.go +++ b/router/dealer.go @@ -204,9 +204,9 @@ func (d *Dealer) Register(callee *session, msg *wamp.Register) { disclose, _ := msg.Options[wamp.OptDiscloseCaller].(bool) // allow disclose for trusted clients if !d.allowDisclose && disclose { - callee.rLock() + callee.RLock() authrole, _ := wamp.AsString(callee.Details["authrole"]) - callee.rUnlock() + callee.RUnlock() if authrole != "trusted" { d.trySend(callee, &wamp.Error{ Type: msg.MessageType(), @@ -516,7 +516,7 @@ func (d *Dealer) matchProcedure(procedure wamp.URI) (*registration, bool) { // No exact match was found. So, search for a prefix or wildcard // match, and prefer the most specific math (longest matched pattern). // If there is a tie, then prefer the first longest prefix. - var matchCount int + matchCount := -1 // initialize matchCount to -1 to catch an empty registration. for pfxProc, pfxReg := range d.pfxProcRegMap { if procedure.PrefixMatch(pfxProc) { if len(pfxProc) > matchCount { @@ -526,6 +526,12 @@ func (d *Dealer) matchProcedure(procedure wamp.URI) (*registration, bool) { } } } + // according to the spec, we have to prefer prefix match over wildcard match: + // https://wamp-proto.org/static/rfc/draft-oberstet-hybi-crossbar-wamp.html#rfc.section.14.3.8.1.4.2 + if ok { + return reg, ok + } + for wcProc, wcReg := range d.wcProcRegMap { if procedure.WildcardMatch(wcProc) { if len(wcProc) > matchCount { @@ -621,6 +627,8 @@ func (d *Dealer) call(caller *session, msg *wamp.Call) { Details: wamp.Dict{}, Error: wamp.ErrOptionDisallowedDiscloseMe, }) + // don't continue a call when discloseMe was disallowed. + return } if callee.HasFeature(roleCallee, featureCallerIdent) { discloseCaller(caller, details) @@ -639,6 +647,11 @@ func (d *Dealer) call(caller *session, msg *wamp.Call) { } } + if reg.match != wamp.MatchExact { + // according to the spec, a router has to provide the actual procedure to the client. + details[wamp.OptProcedure] = msg.Procedure + } + d.calls[msg.Request] = caller invocationID := d.idGen.Next() d.invocations[invocationID] = &invocation{ @@ -1157,11 +1170,11 @@ func discloseCaller(caller *session, details wamp.Dict) { details[roleCaller] = caller.ID // These values are not required by the specification, but are here for // compatibility with Crossbar. - caller.rLock() + caller.RLock() for _, f := range []string{"authid", "authrole"} { if val, ok := caller.Details[f]; ok { details[fmt.Sprintf("%s_%s", roleCaller, f)] = val } } - caller.rUnlock() + caller.RUnlock() } diff --git a/router/dealer_test.go b/router/dealer_test.go index 4db6eef2..6fbb3e9d 100644 --- a/router/dealer_test.go +++ b/router/dealer_test.go @@ -962,7 +962,9 @@ func TestPatternBasedRegistration(t *testing.T) { &wamp.Register{ Request: 123, Procedure: testProcedureWC, - Options: wamp.Dict{"match": "wildcard"}, + Options: wamp.Dict{ + wamp.OptMatch: wamp.MatchWildcard, + }, }) rsp := <-callee.Recv() _, ok := rsp.(*wamp.Registered) diff --git a/router/publishfilter.go b/router/publishfilter.go index b8545056..c13d1409 100644 --- a/router/publishfilter.go +++ b/router/publishfilter.go @@ -6,17 +6,25 @@ import ( "github.com/gammazero/nexus/wamp" ) -type publishFilter struct { - blIDs []wamp.ID - wlIDs []wamp.ID - blMap map[string][]string - wlMap map[string][]string +// PublishFilter is an interface to check whether a publication should be sent +// to a specific session +type PublishFilter interface { + LockRequired() bool + PublishAllowed(sess *wamp.Session) bool } -// newPublishFilter gets any blacklists and whitelists included in a PUBLISH +type simplePublishFilter struct { + blIDs []wamp.ID + wlIDs []wamp.ID + blMap map[string][]string + wlMap map[string][]string + lockRequired bool +} + +// NewSimplePublishFilter gets any blacklists and whitelists included in a PUBLISH // message. If there are no filters defined by the PUBLISH message, then nil // is returned. -func newPublishFilter(msg *wamp.Publish) *publishFilter { +func NewSimplePublishFilter(msg *wamp.Publish) PublishFilter { const ( blacklistPrefix = "exclude_" whitelistPrefix = "eligible_" @@ -79,17 +87,23 @@ func newPublishFilter(msg *wamp.Publish) *publishFilter { if blIDs == nil && wlIDs == nil && blMap == nil && wlMap == nil { return nil } - return &publishFilter{blIDs, wlIDs, blMap, wlMap} + return &simplePublishFilter{blIDs, wlIDs, blMap, wlMap, len(blMap) != 0 || len(wlMap) != 0} +} + +// LockRequired determines whether a consistent state of the subscriber sessions is +// required while running the filter +func (f *simplePublishFilter) LockRequired() bool { + return f.lockRequired } -// publishAllowed determines if a message is allowed to be published to a +// PublishAllowed determines if a message is allowed to be published to a // subscriber, by looking at any blacklists and whitelists provided with the // publish message. // // To receive a published event, the subscriber session must not have any // values that appear in a blacklist, and must have a value from each // whitelist. -func (f *publishFilter) publishAllowed(sub *session) bool { +func (f *simplePublishFilter) PublishAllowed(sub *wamp.Session) bool { // Check each blacklisted ID to see if session ID is blacklisted. for i := range f.blIDs { if f.blIDs[i] == sub.ID { @@ -111,11 +125,6 @@ func (f *publishFilter) publishAllowed(sub *session) bool { } } - if len(f.blMap) != 0 || len(f.wlMap) != 0 { - sub.rLock() - defer sub.rUnlock() - } - // Check blacklists to see if session has a value in any blacklist. for attr, vals := range f.blMap { // Get the session attribute value to compare with blacklist. diff --git a/router/publishfilter_test.go b/router/publishfilter_test.go index 308b868d..32f4e5a4 100644 --- a/router/publishfilter_test.go +++ b/router/publishfilter_test.go @@ -30,7 +30,7 @@ func TestFilterBlacklist(t *testing.T) { Topic: wamp.URI("blacklist.test"), } - pf := newPublishFilter(pub) + pf := NewSimplePublishFilter(pub) details := wamp.Dict{ "authid": allowedAuthid, @@ -38,33 +38,33 @@ func TestFilterBlacklist(t *testing.T) { "misc": "other", } sess := newSession(nil, allowedID, details) - if !pf.publishAllowed(sess) { + if !pf.PublishAllowed(&sess.Session) { t.Error(shouldAllowMsg) } sess = newSession(nil, blacklistID, details) // Check that session is denied by ID. - if pf.publishAllowed(sess) { + if pf.PublishAllowed(&sess.Session) { t.Error(shouldDenyMsg) } sess = newSession(nil, allowedID, details) // Check that session is denied by authid. sess.Details["authid"] = blacklistAuthid - if pf.publishAllowed(sess) { + if pf.PublishAllowed(&sess.Session) { t.Error(shouldDenyMsg) } // Check that session is denied by authrole. sess.Details["authid"] = allowedAuthid sess.Details["authrole"] = blacklistAuthrole - if pf.publishAllowed(sess) { + if pf.PublishAllowed(&sess.Session) { t.Error(shouldDenyMsg) } // Check that session is allowed by not having value in blacklist. delete(sess.Details, "authrole") - if !pf.publishAllowed(sess) { + if !pf.PublishAllowed(&sess.Session) { t.Error(shouldDenyMsg) } } @@ -93,7 +93,7 @@ func TestFilterWhitelist(t *testing.T) { Topic: wamp.URI("whitelist.test"), } - pf := newPublishFilter(pub) + pf := NewSimplePublishFilter(pub) details := wamp.Dict{ "authid": allowedAuthid, @@ -101,33 +101,33 @@ func TestFilterWhitelist(t *testing.T) { "misc": "other", } sess := newSession(nil, allowedID, details) - if !pf.publishAllowed(sess) { + if !pf.PublishAllowed(&sess.Session) { t.Error(shouldAllowMsg) } sess = newSession(nil, deniedID, details) // Check that session is denied by ID. - if pf.publishAllowed(sess) { + if pf.PublishAllowed(&sess.Session) { t.Error(shouldDenyMsg) } sess = newSession(nil, allowedID, details) // Check that session is denied by authid. sess.Details["authid"] = deniedAuthid - if pf.publishAllowed(sess) { + if pf.PublishAllowed(&sess.Session) { t.Error(shouldDenyMsg) } // Check that session is denied by authrole. sess.Details["authid"] = allowedAuthid sess.Details["authrole"] = deniedAuthrole - if pf.publishAllowed(sess) { + if pf.PublishAllowed(&sess.Session) { t.Error(shouldDenyMsg) } // Check that session is denied by not having value in whitelise. delete(sess.Details, "authrole") - if pf.publishAllowed(sess) { + if pf.PublishAllowed(&sess.Session) { t.Error(shouldDenyMsg) } } @@ -159,7 +159,7 @@ func TestFilterBlackWhitelistPrecedence(t *testing.T) { Topic: wamp.URI("whitelist.test"), } - pf := newPublishFilter(pub) + pf := NewSimplePublishFilter(pub) details := wamp.Dict{ "authid": allowedAuthid, @@ -168,20 +168,20 @@ func TestFilterBlackWhitelistPrecedence(t *testing.T) { } sess := newSession(nil, allowedID, details) - if !pf.publishAllowed(sess) { + if !pf.PublishAllowed(&sess.Session) { t.Error(shouldAllowMsg) } sess = newSession(nil, blacklistID, details) // Check that session is denied by ID even thought ID is also in whitelist. - if pf.publishAllowed(sess) { + if pf.PublishAllowed(&sess.Session) { t.Error(shouldDenyMsg) } sess = newSession(nil, allowedID, details) // Check that session is denied by authid even though also whitelisted. sess.Details["authid"] = blacklistAuthid - if pf.publishAllowed(sess) { + if pf.PublishAllowed(&sess.Session) { t.Error(shouldDenyMsg) } } diff --git a/router/realm.go b/router/realm.go index 6246e5f0..7feeb582 100644 --- a/router/realm.go +++ b/router/realm.go @@ -54,6 +54,13 @@ type RealmConfig struct { // procedure. This is disabled by default to avoid requiring Authorizer // logic when it may not be needed otherwise. EnableMetaModify bool `json:"enable_meta_modify"` + + // PublishFilterFactory is a function used to create a PublishFilter to check + // which sessions a publication should be sent to. + // Since it is a function pointer, it should not be set via json config, but + // being configured when embedding nexus. + // A value of nil means, the default filtering is used. + PublishFilterFactory FilterFactory } // Special ID for meta session. @@ -232,7 +239,7 @@ func (r *realm) close() { // the meta client receives GOODBYE from the meta session, the meta // session is done and will not try to publish anything more to the // broker, and it is finally safe to exit and close the broker. - r.metaSess.kill(nil) + r.metaSess.Kill(nil) <-r.metaDone // handleInboundMessages() and metaProcedureHandler() are the only things @@ -336,9 +343,9 @@ func (r *realm) onJoin(sess *session) { // WAMP spec only specifies publishing "session", "authid", "authrole", // "authmethod", "authprovider", "transport". This implementation // publishes all details except transport.auth. - sess.rLock() + sess.RLock() output := r.cleanSessionDetails(sess.Details) - sess.rUnlock() + sess.RUnlock() r.metaPeer.Send(&wamp.Publish{ Request: wamp.GlobalID(), Topic: wamp.MetaEventSessionOnJoin, @@ -571,9 +578,9 @@ func (r *realm) authzMessage(sess *session, msg wamp.Message) bool { } // Write-lock the session, becuase there is no telling what the Authorizer // will do to the session details. - sess.lock() + sess.Lock() isAuthz, err := r.authorizer.Authorize(&safeSession, msg) - sess.unlock() + sess.Unlock() if !isAuthz { errRsp := &wamp.Error{Type: msg.MessageType()} @@ -788,9 +795,9 @@ func (r *realm) sessionCount(msg *wamp.Invocation) wamp.Message { r.actionChan <- func() { var nclients int for _, sess := range r.clients { - sess.rLock() + sess.RLock() authrole, _ := wamp.AsString(sess.Details["authrole"]) - sess.rUnlock() + sess.RUnlock() for j := range filter { if filter[j] == authrole { nclients++ @@ -831,9 +838,9 @@ func (r *realm) sessionList(msg *wamp.Invocation) wamp.Message { r.actionChan <- func() { var ids []wamp.ID for sid, sess := range r.clients { - sess.rLock() + sess.RLock() authrole, _ := wamp.AsString(sess.Details["authrole"]) - sess.rUnlock() + sess.RUnlock() for j := range filter { if filter[j] == authrole { ids = append(ids, sid) @@ -874,9 +881,9 @@ func (r *realm) sessionGet(msg *wamp.Invocation) wamp.Message { // "authmethod", "authprovider", and "transport". All details are returned // in this implementation, except transport.auth, unless Config.MetaStrict // is set to true. - sess.rLock() + sess.RLock() output := r.cleanSessionDetails(sess.Details) - sess.rUnlock() + sess.RUnlock() return &wamp.Yield{ Request: msg.Request, @@ -1233,7 +1240,7 @@ func (r *realm) killSession(sid wamp.ID, reason wamp.URI, message string) error errChan <- errors.New("no such session") return } - sess.kill(goodbye) + sess.Kill(goodbye) close(errChan) } return <-errChan @@ -1252,14 +1259,14 @@ func (r *realm) killSessionsByDetail(key, value string, reason wamp.URI, message continue } - sess.rLock() + sess.RLock() val, ok := wamp.AsString(sess.Details[key]) - sess.rUnlock() + sess.RUnlock() if !ok || val != value { continue } - if sess.kill(goodbye) { + if sess.Kill(goodbye) { kills++ } } @@ -1283,7 +1290,7 @@ func (r *realm) killAllSessions(reason wamp.URI, message string, exclude wamp.ID if sid == exclude { continue } - if sess.kill(goodbye) { + if sess.Kill(goodbye) { kills++ } } @@ -1299,8 +1306,8 @@ func (r *realm) killAllSessions(reason wamp.URI, message string, exclude wamp.ID // updating that item in the session details. An item with a nil value in the // delta wamp.Dict specifies deleting that item from the session details. func (r *realm) modifySessionDetails(sess *session, delta wamp.Dict) { - sess.lock() - defer sess.unlock() + sess.Lock() + defer sess.Unlock() for k, v := range delta { if v == nil { diff --git a/router/router.go b/router/router.go index 08c09870..4454cb9b 100644 --- a/router/router.go +++ b/router/router.go @@ -366,7 +366,7 @@ func (r *router) addRealm(config *RealmConfig) (*realm, error) { realm, err := newRealm( config, - NewBroker(r.log, config.StrictURI, config.AllowDisclose, r.debug), + NewBroker(r.log, config.StrictURI, config.AllowDisclose, r.debug, config.PublishFilterFactory), NewDealer(r.log, config.StrictURI, config.AllowDisclose, r.debug), r.log, r.debug) if err != nil { diff --git a/router/session.go b/router/session.go index bc4d1c72..e76b4658 100644 --- a/router/session.go +++ b/router/session.go @@ -6,7 +6,7 @@ import ( "github.com/gammazero/nexus/wamp" ) -// session is a wrapper around a wamp.Session to provide the router with a +// session is a wrapper around a wamp.session to provide the router with a // lockable killable session. type session struct { wamp.Session @@ -27,12 +27,12 @@ func newSession(peer wamp.Peer, sid wamp.ID, details wamp.Dict) *session { } } -func (s *session) rLock() { s.rwlock.RLock() } -func (s *session) rUnlock() { s.rwlock.RUnlock() } -func (s *session) lock() { s.rwlock.Lock() } -func (s *session) unlock() { s.rwlock.Unlock() } +func (s *session) RLock() { s.rwlock.RLock() } +func (s *session) RUnlock() { s.rwlock.RUnlock() } +func (s *session) Lock() { s.rwlock.Lock() } +func (s *session) Unlock() { s.rwlock.Unlock() } -func (s *session) kill(goodbye *wamp.Goodbye) bool { +func (s *session) Kill(goodbye *wamp.Goodbye) bool { if s.killChan == nil { return false } diff --git a/router/websocketserver.go b/router/websocketserver.go index bd655ce8..fe1239d6 100644 --- a/router/websocketserver.go +++ b/router/websocketserver.go @@ -11,6 +11,7 @@ import ( "net/http" "time" + "github.com/gammazero/nexus/metrics" "github.com/gammazero/nexus/stdlog" "github.com/gammazero/nexus/transport" "github.com/gammazero/nexus/transport/serialize" @@ -107,7 +108,7 @@ type WebsocketServer struct { // Addr: address, // } // server.ListenAndServe() -func NewWebsocketServer(r Router) *WebsocketServer { +func NewWebsocketServer(r Router) (*WebsocketServer, *metrics.MetricMap) { s := &WebsocketServer{ router: r, protocols: map[string]protocol{}, @@ -121,7 +122,7 @@ func NewWebsocketServer(r Router) *WebsocketServer { s.addProtocol(cborWebsocketProtocol, websocket.BinaryMessage, &serialize.CBORSerializer{}) - return s + return s, metrics.MetricGlobal } // Deprecated: Set WebsocketServer.Upgrader and WebsockServer.Xxx members diff --git a/router/websocketserver_test.go b/router/websocketserver_test.go index eb2a1c6f..d600ef47 100644 --- a/router/websocketserver_test.go +++ b/router/websocketserver_test.go @@ -35,7 +35,7 @@ func TestWSHandshakeJSON(t *testing.T) { } defer r.Close() - s := NewWebsocketServer(r) + s, _ := NewWebsocketServer(r) s.Upgrader.EnableCompression = true closer, err := s.ListenAndServe(wsAddr) if err != nil { @@ -73,7 +73,9 @@ func TestWSHandshakeMsgpack(t *testing.T) { } defer r.Close() - closer, err := NewWebsocketServer(r).ListenAndServe(wsAddr) + s, _ := NewWebsocketServer(r) + + closer, err := s.ListenAndServe(wsAddr) if err != nil { t.Fatal(err) } diff --git a/transport/serialize/cborserializer.go b/transport/serialize/cborserializer.go index ff6f0608..04285611 100644 --- a/transport/serialize/cborserializer.go +++ b/transport/serialize/cborserializer.go @@ -4,6 +4,7 @@ import ( "errors" "reflect" + "github.com/gammazero/nexus/metrics" "github.com/gammazero/nexus/wamp" "github.com/ugorji/go/codec" ) @@ -17,17 +18,26 @@ func init() { // CBORSerializer is an implementation of Serializer that handles // serializing and deserializing cbor encoded payloads. -type CBORSerializer struct{} +type CBORSerializer struct { +} // Serialize encodes a Message into a cbor payload. func (s *CBORSerializer) Serialize(msg wamp.Message) ([]byte, error) { var b []byte - return b, codec.NewEncoderBytes(&b, ch).Encode(msgToList(msg)) + err := codec.NewEncoderBytes(&b, ch).Encode(msgToList(msg)) + metrics.SendMsgLenHandler(uint64(len(b))) + metrics.SendMsgCountHandler() + return b, err } // Deserialize decodes a cbor payload into a Message. func (s *CBORSerializer) Deserialize(data []byte) (wamp.Message, error) { var v []interface{} + + // report msg size back to metrics + metrics.RecvMsgLenHandler(uint64(len(data))) + metrics.RecvMsgCountHandler() + err := codec.NewDecoderBytes(data, ch).Decode(&v) if err != nil { return nil, err diff --git a/transport/serialize/jsonserializer.go b/transport/serialize/jsonserializer.go index 7ac68d6a..19f5eca7 100644 --- a/transport/serialize/jsonserializer.go +++ b/transport/serialize/jsonserializer.go @@ -5,6 +5,7 @@ import ( "errors" "reflect" + "github.com/gammazero/nexus/metrics" "github.com/gammazero/nexus/wamp" "github.com/ugorji/go/codec" ) @@ -18,17 +19,26 @@ func init() { // JSONSerializer is an implementation of Serializer that handles // serializing and deserializing json encoded payloads. -type JSONSerializer struct{} +type JSONSerializer struct { +} // Serialize encodes a Message into a json payload. func (s *JSONSerializer) Serialize(msg wamp.Message) ([]byte, error) { var b []byte - return b, codec.NewEncoderBytes(&b, jh).Encode(msgToList(msg)) + err := codec.NewEncoderBytes(&b, jh).Encode(msgToList(msg)) + metrics.SendMsgLenHandler(uint64(len(b))) + metrics.SendMsgCountHandler() + return b, err } // Deserialize decodes a json payload into a Message. func (s *JSONSerializer) Deserialize(data []byte) (wamp.Message, error) { var v []interface{} + + // report msg size back to metrics + metrics.RecvMsgLenHandler(uint64(len(data))) + metrics.RecvMsgCountHandler() + err := codec.NewDecoderBytes(data, jh).Decode(&v) if err != nil { return nil, err diff --git a/transport/serialize/msgpackserializer.go b/transport/serialize/msgpackserializer.go index be32c004..893d6638 100644 --- a/transport/serialize/msgpackserializer.go +++ b/transport/serialize/msgpackserializer.go @@ -4,6 +4,7 @@ import ( "errors" "reflect" + "github.com/gammazero/nexus/metrics" "github.com/gammazero/nexus/wamp" "github.com/ugorji/go/codec" ) @@ -25,18 +26,27 @@ func MsgpackRegisterExtension(t reflect.Type, ext byte, encode func(reflect.Valu // MessagePackSerializer is an implementation of Serializer that handles // serializing and deserializing msgpack encoded payloads. -type MessagePackSerializer struct{} +type MessagePackSerializer struct { +} // Serialize encodes a Message into a msgpack payload. func (s *MessagePackSerializer) Serialize(msg wamp.Message) ([]byte, error) { var b []byte - return b, codec.NewEncoderBytes(&b, mh).Encode( + err := codec.NewEncoderBytes(&b, mh).Encode( msgToList(msg)) + metrics.RecvMsgLenHandler(uint64(len(b))) + metrics.RecvMsgCountHandler() + return b, err } // Deserialize decodes a msgpack payload into a Message. func (s *MessagePackSerializer) Deserialize(data []byte) (wamp.Message, error) { var v []interface{} + + // report msg size back to metrics + metrics.RecvMsgLenHandler(uint64(len(data))) + metrics.RecvMsgCountHandler() + err := codec.NewDecoderBytes(data, mh).Decode(&v) if err != nil { return nil, err diff --git a/transport/websocketpeer.go b/transport/websocketpeer.go index e1d7a79b..017bae75 100644 --- a/transport/websocketpeer.go +++ b/transport/websocketpeer.go @@ -154,7 +154,10 @@ func NewWebsocketPeer(conn *websocket.Conn, serializer serialize.Serializer, pay return w } -func (w *websocketPeer) Recv() <-chan wamp.Message { return w.rd } +func (w *websocketPeer) Recv() <-chan wamp.Message { + + return w.rd +} func (w *websocketPeer) TrySend(msg wamp.Message) error { select { diff --git a/wamp/options.go b/wamp/options.go index b8a9ec43..65c0f893 100644 --- a/wamp/options.go +++ b/wamp/options.go @@ -11,6 +11,7 @@ const ( OptInvoke = "invoke" OptMatch = "match" OptMode = "mode" + OptProcedure = "procedure" OptProgress = "progress" OptReceiveProgress = "receive_progress" OptTimeout = "timeout"