diff --git a/go.mod b/go.mod index 6a03e486..1ea11cee 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,9 @@ module github.com/gammazero/nexus/v3 -go 1.20 +go 1.21 require ( + github.com/gammazero/deque v0.2.1 github.com/gorilla/websocket v1.5.0 github.com/stretchr/testify v1.8.4 github.com/ugorji/go/codec v1.2.11 diff --git a/go.sum b/go.sum index 470c4ea1..2fef8843 100644 --- a/go.sum +++ b/go.sum @@ -1,9 +1,12 @@ github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/gammazero/deque v0.2.1 h1:qSdsbG6pgp6nL7A0+K/B7s12mcCY/5l5SIUpMOl+dC0= +github.com/gammazero/deque v0.2.1/go.mod h1:LFroj8x4cMYCukHJDbxFCkT+r9AndaJnFMuZDV34tuU= github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -18,5 +21,6 @@ golang.org/x/crypto v0.17.0 h1:r8bRNjWL3GshPW3gkd+RpvzWrZAwPS49OmTGZ/uhM4k= golang.org/x/crypto v0.17.0/go.mod h1:gCAAfMLgwOJRpTjQ2zCCt2OcSfYMTeZVSRtQlPC7Nq4= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/router/broker.go b/router/broker.go index 6ada3038..2ca916e3 100644 --- a/router/broker.go +++ b/router/broker.go @@ -4,6 +4,7 @@ import ( "fmt" "time" + "github.com/gammazero/deque" "github.com/gammazero/nexus/v3/stdlog" "github.com/gammazero/nexus/v3/wamp" ) @@ -52,10 +53,13 @@ type historyEntry struct { } type historyStore struct { - entries []historyEntry - matchPolicy string - limit int - isLimitReached bool + entries deque.Deque[historyEntry] + matchPolicy string + limit int +} + +func (h *historyStore) atLimit() bool { + return h.entries.Len() >= h.limit } type broker struct { @@ -149,10 +153,8 @@ func (b *broker) PreInitEventHistoryTopics(evntCfgs []*TopicEventHistoryConfig) sub, _ := b.syncInitSubscription(topicCfg.Topic, topicCfg.MatchPolicy, nil) b.eventHistoryStore[sub] = &historyStore{ - entries: []historyEntry{}, - matchPolicy: topicCfg.MatchPolicy, - limit: topicCfg.Limit, - isLimitReached: false, + matchPolicy: topicCfg.MatchPolicy, + limit: topicCfg.Limit, } } @@ -383,12 +385,8 @@ func newSubscription(id wamp.ID, subscriber *wamp.Session, topic wamp.URI, match } func (b *broker) syncSaveEvent(eventStore *historyStore, pub *wamp.Publish, event *wamp.Event) { - - if eventStore.isLimitReached { - eventStore.entries = eventStore.entries[1:] - } else if len(eventStore.entries) >= eventStore.limit { - eventStore.isLimitReached = true - eventStore.entries = eventStore.entries[1:] + if eventStore.atLimit() { + eventStore.entries.PopFront() } item := historyEntry{ @@ -403,7 +401,7 @@ func (b *broker) syncSaveEvent(eventStore *historyStore, pub *wamp.Publish, even }, } - eventStore.entries = append(eventStore.entries, item) + eventStore.entries.PushBack(item) } func (b *broker) syncInitSubscription(topic wamp.URI, match string, subscriber *wamp.Session) (sub *subscription, existingSub bool) { @@ -1138,7 +1136,7 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message { fromPubOp, ok := msg.ArgumentsKw["from_publication"] if ok { - fromPub = fromPubOp.(wamp.ID) + fromPub, ok = fromPubOp.(wamp.ID) if !ok || fromPub < 1 { return &wamp.Error{ Type: msg.MessageType(), @@ -1152,7 +1150,7 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message { afterPubOp, ok := msg.ArgumentsKw["after_publication"] if ok { - afterPub = afterPubOp.(wamp.ID) + afterPub, ok = afterPubOp.(wamp.ID) if !ok || afterPub < 1 { return &wamp.Error{ Type: msg.MessageType(), @@ -1165,7 +1163,7 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message { beforePubOp, ok := msg.ArgumentsKw["before_publication"] if ok { - beforePub = beforePubOp.(wamp.ID) + beforePub, ok = beforePubOp.(wamp.ID) if !ok || beforePub < 1 { return &wamp.Error{ Type: msg.MessageType(), @@ -1178,7 +1176,7 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message { untilPubOp, ok := msg.ArgumentsKw["until_publication"] if ok { - untilPub = untilPubOp.(wamp.ID) + untilPub, ok = untilPubOp.(wamp.ID) if !ok || untilPub < 1 { return &wamp.Error{ Type: msg.MessageType(), @@ -1189,19 +1187,20 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message { } } - ch := make(chan struct{}) + done := make(chan struct{}) b.actionChan <- func() { + defer close(done) + var filteredEvents []storedEvent if subscription, ok := b.subscriptions[subId]; ok { if storeItem, ok := b.eventHistoryStore[subscription]; ok { - isLimitReached = storeItem.isLimitReached + isLimitReached = storeItem.atLimit() - fromPubReached := false - afterPubReached := false - untilPubReached := false + var untilPubReached bool - for _, entry := range storeItem.entries { + for i := 0; i < storeItem.entries.Len(); i++ { + entry := storeItem.entries.At(i) if !fromDate.IsZero() && entry.event.timestamp.Before(fromDate) { continue } @@ -1214,20 +1213,17 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message { if !untilDate.IsZero() && entry.event.timestamp.After(untilDate) { continue } - if fromPub > 0 && !fromPubReached { + if fromPub != 0 { if entry.event.Publication != fromPub { continue - } else { - fromPubReached = true } + fromPub = 0 } - if afterPub > 0 && !afterPubReached { - if entry.event.Publication != afterPub { - continue - } else { - afterPubReached = true - continue + if afterPub != 0 { + if entry.event.Publication == afterPub { + afterPub = 0 } + continue } if beforePub > 0 && entry.event.Publication == beforePub { break @@ -1270,9 +1266,8 @@ func (b *broker) subEventHistory(msg *wamp.Invocation) wamp.Message { } events, _ = wamp.AsList(filteredEvents) - close(ch) } - <-ch + <-done return &wamp.Yield{ Request: msg.Request, diff --git a/router/broker_test.go b/router/broker_test.go index 58c28d2f..7ddfa8d9 100644 --- a/router/broker_test.go +++ b/router/broker_test.go @@ -699,48 +699,48 @@ func TestEventHistory(t *testing.T) { topic := wamp.URI("nexus.test.exact.topic") subscription := broker.topicSubscription[topic] subEvents := broker.eventHistoryStore[subscription].entries - require.Equalf(t, 3, len(subEvents), "Store for topic %s should hold 3 records", topic) - require.Truef(t, broker.eventHistoryStore[subscription].isLimitReached, "Limit for the store for topic %s should be reached", topic) - require.Equalf(t, "nexus.test.exact.topic", subEvents[0].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, 25509, subEvents[0].event.Arguments[0], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, "nexus.test.exact.topic", subEvents[1].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, 25513, subEvents[1].event.Arguments[0], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, "nexus.test.exact.topic", subEvents[2].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, 25517, subEvents[2].event.Arguments[0], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 3, subEvents.Len(), "Store for topic %s should hold 3 records", topic) + require.Truef(t, broker.eventHistoryStore[subscription].atLimit(), "Limit for the store for topic %s should be reached", topic) + require.Equalf(t, "nexus.test.exact.topic", subEvents.At(0).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 25509, subEvents.At(0).event.Arguments[0], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, "nexus.test.exact.topic", subEvents.At(1).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 25513, subEvents.At(1).event.Arguments[0], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, "nexus.test.exact.topic", subEvents.At(2).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 25517, subEvents.At(2).event.Arguments[0], "Event store for topic %s holds invalid event", topic) topic = wamp.URI("nexus.test") subscription = broker.pfxTopicSubscription[topic] subEvents = broker.eventHistoryStore[subscription].entries - require.Equalf(t, 4, len(subEvents), "Store for topic %s should hold 3 records", topic) - require.Truef(t, broker.eventHistoryStore[subscription].isLimitReached, "Limit for the store for topic %s should be reached", topic) - require.Equalf(t, "nexus.test.exact.topic", subEvents[0].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, 25517, subEvents[0].event.Arguments[0], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, "nexus.test.prefix.catch", subEvents[1].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, 25518, subEvents[1].event.Arguments[0], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, "nexus.test.wildcard.topic", subEvents[2].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, 25519, subEvents[2].event.Arguments[0], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, "nexus.test.wildcard.miss", subEvents[3].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, 25520, subEvents[3].event.Arguments[0], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 4, subEvents.Len(), "Store for topic %s should hold 3 records", topic) + require.Truef(t, broker.eventHistoryStore[subscription].atLimit(), "Limit for the store for topic %s should be reached", topic) + require.Equalf(t, "nexus.test.exact.topic", subEvents.At(0).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 25517, subEvents.At(0).event.Arguments[0], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, "nexus.test.prefix.catch", subEvents.At(1).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 25518, subEvents.At(1).event.Arguments[0], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, "nexus.test.wildcard.topic", subEvents.At(2).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 25519, subEvents.At(2).event.Arguments[0], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, "nexus.test.wildcard.miss", subEvents.At(3).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 25520, subEvents.At(3).event.Arguments[0], "Event store for topic %s holds invalid event", topic) topic = wamp.URI("nexus.test..topic") subscription = broker.wcTopicSubscription[topic] subEvents = broker.eventHistoryStore[subscription].entries - require.Equalf(t, 4, len(subEvents), "Store for topic %s should hold 3 records", topic) - require.Truef(t, broker.eventHistoryStore[subscription].isLimitReached, "Limit for the store for topic %s should be reached", topic) - require.Equalf(t, "nexus.test.exact.topic", subEvents[0].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, 25513, subEvents[0].event.Arguments[0], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, "nexus.test.wildcard.topic", subEvents[1].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, 25515, subEvents[1].event.Arguments[0], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, "nexus.test.exact.topic", subEvents[2].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, 25517, subEvents[2].event.Arguments[0], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, "nexus.test.wildcard.topic", subEvents[3].event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) - require.Equalf(t, 25519, subEvents[3].event.Arguments[0], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 4, subEvents.Len(), "Store for topic %s should hold 3 records", topic) + require.Truef(t, broker.eventHistoryStore[subscription].atLimit(), "Limit for the store for topic %s should be reached", topic) + require.Equalf(t, "nexus.test.exact.topic", subEvents.At(0).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 25513, subEvents.At(0).event.Arguments[0], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, "nexus.test.wildcard.topic", subEvents.At(1).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 25515, subEvents.At(1).event.Arguments[0], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, "nexus.test.exact.topic", subEvents.At(2).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 25517, subEvents.At(2).event.Arguments[0], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, "nexus.test.wildcard.topic", subEvents.At(3).event.ArgumentsKw["topic"], "Event store for topic %s holds invalid event", topic) + require.Equalf(t, 25519, subEvents.At(3).event.Arguments[0], "Event store for topic %s holds invalid event", topic) topic = wamp.URI("nexus") subscription = broker.pfxTopicSubscription[topic] subEvents = broker.eventHistoryStore[subscription].entries - require.Equalf(t, 20, len(subEvents), "Store for topic %s should hold 20 records", topic) - require.Falsef(t, broker.eventHistoryStore[subscription].isLimitReached, "Limit for the store for topic %s should not be reached", topic) + require.Equalf(t, 20, subEvents.Len(), "Store for topic %s should hold 20 records", topic) + require.Falsef(t, broker.eventHistoryStore[subscription].atLimit(), "Limit for the store for topic %s should not be reached", topic) //Now let's test Event History MetaRPCs topic = wamp.URI("nexus.test.exact.topic") @@ -885,7 +885,7 @@ func TestEventHistory(t *testing.T) { // Let's test filtering based on publication ID topic = wamp.URI("nexus") subscription = broker.pfxTopicSubscription[topic] - pubId := broker.eventHistoryStore[subscription].entries[4].event.Publication + pubId := broker.eventHistoryStore[subscription].entries.At(4).event.Publication inv = wamp.Invocation{ Request: wamp.ID(reqId), Registration: 0, diff --git a/router/realm.go b/router/realm.go index 4520b770..62e83ee1 100644 --- a/router/realm.go +++ b/router/realm.go @@ -472,21 +472,20 @@ func (r *realm) handleInboundMessages(sess *wamp.Session) (bool, bool, error) { switch msg := msg.(type) { case *wamp.Publish: r.broker.publish(sess, msg) + case *wamp.Yield: + r.dealer.yield(sess, msg) + case *wamp.Call: + r.dealer.call(sess, msg) + case *wamp.Cancel: + r.dealer.cancel(sess, msg) case *wamp.Subscribe: r.broker.subscribe(sess, msg) - case *wamp.Unsubscribe: - r.broker.unsubscribe(sess, msg) - case *wamp.Register: r.dealer.register(sess, msg) + case *wamp.Unsubscribe: + r.broker.unsubscribe(sess, msg) case *wamp.Unregister: r.dealer.unregister(sess, msg) - case *wamp.Call: - r.dealer.call(sess, msg) - case *wamp.Yield: - r.dealer.yield(sess, msg) - case *wamp.Cancel: - r.dealer.cancel(sess, msg) case *wamp.Error: // An INVOCATION error is the only type of ERROR message the diff --git a/wamp/session.go b/wamp/session.go index ae271b1b..84e2c832 100644 --- a/wamp/session.go +++ b/wamp/session.go @@ -71,12 +71,12 @@ func (s *Session) HasFeature(role, feature string) bool { // by calling EndRecv. func (s *Session) RecvDone() <-chan struct{} { s.mu.Lock() + defer s.mu.Unlock() + if s.done == nil { s.done = make(chan struct{}) } - d := s.done - s.mu.Unlock() - return d + return s.done } // If RecvDone is not yet closed, Goodbye returns nil. @@ -84,9 +84,8 @@ func (s *Session) RecvDone() <-chan struct{} { // when RecvEnd was called. func (s *Session) Goodbye() *Goodbye { s.mu.Lock() - g := s.goodbye - s.mu.Unlock() - return g + defer s.mu.Unlock() + return s.goodbye } // EndRecv tells the session to signal messages handlers to stop receiving @@ -96,8 +95,9 @@ func (s *Session) Goodbye() *Goodbye { // with exiting the message handler for other reasons. func (s *Session) EndRecv(goodbye *Goodbye) bool { s.mu.Lock() + defer s.mu.Unlock() + if s.goodbye != nil { - s.mu.Unlock() return false // already ended } @@ -112,7 +112,6 @@ func (s *Session) EndRecv(goodbye *Goodbye) bool { } close(s.done) - s.mu.Unlock() return true }