From 45ba6088ebd12c4c64ed4be3f8f859ea928d2e3a Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Sun, 2 Jun 2024 21:42:32 +0100 Subject: [PATCH 1/9] fix: player online state processing --- pkg/game/game.go | 30 +++++++++++++++++++++++++----- pkg/protocol/messages.go | 2 +- 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/pkg/game/game.go b/pkg/game/game.go index 863de9b..899407c 100644 --- a/pkg/game/game.go +++ b/pkg/game/game.go @@ -17,6 +17,8 @@ import ( var ( ErrNoRoom = errors.New("no room") + + playerOnlineTimeout = 20 * time.Second ) type StateSubscription chan *protocol.State @@ -191,12 +193,13 @@ func (g *Game) processMessage(payload []byte) { playerChanged := !g.state.Players[index].Online || g.state.Players[index].Name != playerOnline.Player.Name + g.state.Players[index].OnlineTimestamp = time.Now() + if !playerChanged { return } g.state.Players[index].Online = true - g.state.Players[index].OnlineTimestamp = time.Now() g.state.Players[index].Name = playerOnline.Player.Name g.notifyChangedState(true) @@ -352,7 +355,7 @@ func (g *Game) publishStateLoop() { } } -func (g *Game) checkPlayersStateLoop() { +func (g *Game) watchPlayersStateLoop() { g.logger.Debug("check users state loop") ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() @@ -368,8 +371,16 @@ func (g *Game) checkPlayersStateLoop() { } now := time.Now() for i, player := range g.state.Players { + if !player.Online { + continue + } diff := now.Sub(player.OnlineTimestamp) - if diff > 20*time.Second { + if diff > playerOnlineTimeout { + g.logger.Info("marking user as offline", + zap.Any("name", player.Name), + zap.Any("lastSeenAt", player.OnlineTimestamp), + zap.Any("now", now), + ) g.state.Players[i].Online = false } } @@ -573,7 +584,16 @@ func (g *Game) JoinRoom(roomID protocol.RoomID, state *protocol.State) error { if state == nil && g.HasStorage() { state, err = g.storage.LoadRoomState(roomID) - g.logger.Info("room not found in storage", zap.Error(err)) + if err != nil { + g.logger.Info("room not found in storage", zap.Error(err)) + } else { + g.logger.Info("loaded room from storage", zap.Any("roomID", roomID)) + now := time.Now() + for i := range state.Players { + online := now.Sub(state.Players[i].OnlineTimestamp) < playerOnlineTimeout + state.Players[i].Online = online + } + } } g.exitRoom = make(chan struct{}) @@ -597,7 +617,7 @@ func (g *Game) JoinRoom(roomID protocol.RoomID, state *protocol.State) error { go g.publishOnlineState() if g.isDealer { go g.publishStateLoop() - go g.checkPlayersStateLoop() + go g.watchPlayersStateLoop() } g.notifyChangedState(g.isDealer) diff --git a/pkg/protocol/messages.go b/pkg/protocol/messages.go index 98bcbab..537053b 100644 --- a/pkg/protocol/messages.go +++ b/pkg/protocol/messages.go @@ -15,7 +15,7 @@ type Player struct { Name string `json:"name"` Online bool `json:"online"` - OnlineTimestamp time.Time `json:"-"` + OnlineTimestamp time.Time `json:"onlineTimestamp"` } type Issue struct { From a82677c756df199a177acf3332160a8ff11abfe3 Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Sun, 2 Jun 2024 21:44:23 +0100 Subject: [PATCH 2/9] chore: implement game options instead of using config directly --- cmd/2sp/main.go | 3 ++ pkg/game/config.go | 8 +++++ pkg/game/game.go | 13 ++++----- pkg/game/game_test.go | 63 ++++++++++++++++++++++------------------ pkg/game/options.go | 19 ++++++++++++ pkg/game/options_test.go | 39 +++++++++++++++++++++++-- 6 files changed, 106 insertions(+), 39 deletions(-) diff --git a/cmd/2sp/main.go b/cmd/2sp/main.go index be0a338..6e5684b 100644 --- a/cmd/2sp/main.go +++ b/cmd/2sp/main.go @@ -25,6 +25,9 @@ func main() { game.WithTransport(waku), game.WithStorage(createStorage()), game.WithLogger(config.Logger.Named("game")), + game.WithPlayerName(config.PlayerName()), + game.WithOnlineMessagePeriod(config.OnlineMessagePeriod), + game.WithStateMessagePeriod(config.StateMessagePeriod), game.WithEnableSymmetricEncryption(config.EnableSymmetricEncryption), } diff --git a/pkg/game/config.go b/pkg/game/config.go index ef8f15b..b46a100 100644 --- a/pkg/game/config.go +++ b/pkg/game/config.go @@ -1,9 +1,17 @@ package game +import "time" + type gameConfig struct { + PlayerName string EnableSymmetricEncryption bool + OnlineMessagePeriod time.Duration + StateMessagePeriod time.Duration } var defaultConfig = gameConfig{ + PlayerName: "", EnableSymmetricEncryption: true, + OnlineMessagePeriod: 5 * time.Second, + StateMessagePeriod: 30 * time.Second, } diff --git a/pkg/game/game.go b/pkg/game/game.go index 899407c..a70208f 100644 --- a/pkg/game/game.go +++ b/pkg/game/game.go @@ -1,7 +1,6 @@ package game import ( - "2sp/internal/config" "2sp/internal/transport" "2sp/pkg/protocol" "2sp/pkg/storage" @@ -86,7 +85,7 @@ func (g *Game) Initialize() error { } } - player, err := loadPlayer(g.storage) + player, err := g.loadPlayer(g.storage) if err != nil { return err } @@ -327,7 +326,7 @@ func (g *Game) publishOnlineState() { g.publishUserOnline(true) for { select { - case <-time.After(config.OnlineMessagePeriod): + case <-time.After(g.config.OnlineMessagePeriod): g.publishUserOnline(true) case <-g.exitRoom: return @@ -342,7 +341,7 @@ func (g *Game) publishStateLoop() { logger.Debug("started") for { select { - case <-time.After(config.StateMessagePeriod): + case <-time.After(g.config.StateMessagePeriod): logger.Debug("tick") g.notifyChangedState(true) case <-g.exitRoom: @@ -821,7 +820,7 @@ func (g *Game) playerIndex(playerID protocol.PlayerID) int { }) } -func loadPlayer(s storage.Service) (*protocol.Player, error) { +func (g *Game) loadPlayer(s storage.Service) (*protocol.Player, error) { var err error var player protocol.Player @@ -845,8 +844,8 @@ func loadPlayer(s storage.Service) (*protocol.Player, error) { } // Load Name - if config.PlayerName() != "" { - player.Name = config.PlayerName() + if g.config.PlayerName != "" { + player.Name = g.config.PlayerName } else if !nilStorage(s) { player.Name = s.PlayerName() } diff --git a/pkg/game/game_test.go b/pkg/game/game_test.go index 8478287..6bb7d4e 100644 --- a/pkg/game/game_test.go +++ b/pkg/game/game_test.go @@ -25,9 +25,23 @@ type Suite struct { ctx context.Context cancel context.CancelFunc transport *mocktransport.MockService + game *Game +} + +func (s *Suite) newGame(extraOptions []Option) *Game { + options := []Option{ + WithContext(s.ctx), + WithTransport(s.transport), + } + options = append(options, extraOptions...) + + g := NewGame(options) + s.Require().NotNil(g) - game *Game - stateSub StateSubscription + err := g.Initialize() + s.Require().NoError(err) + + return g } func (s *Suite) SetupTest() { @@ -40,9 +54,6 @@ func (s *Suite) SetupTest() { WithEnableSymmetricEncryption(true), }) - s.stateSub = s.game.SubscribeToStateChanges() - s.Require().NotNil(s.stateSub) - err := s.game.Initialize() s.Require().NoError(err) } @@ -51,6 +62,23 @@ func (s *Suite) TearDownTest() { s.cancel() } +func (s *Suite) expectSubscribeToMessages(room *protocol.Room) func(room *protocol.Room, payload []byte) { + roomMatcher := matchers.NewRoomMatcher(room) + + subscription := &transport.MessagesSubscription{ + Ch: make(chan []byte), + Unsubscribe: func() {}, + } + + s.transport.EXPECT().SubscribeToMessages(roomMatcher). + Return(subscription, nil). + Times(1) + + return func(room *protocol.Room, payload []byte) { + subscription.Ch <- payload + } +} + func (s *Suite) TestStateSize() { const playersCount = 20 const issuesCount = 30 @@ -104,7 +132,6 @@ func (s *Suite) TestSimpleGame() { s.Require().NotNil(room) roomID := room.ToRoomID() - roomMatcher := matchers.NewRoomMatcher(room) onlineMatcher := matchers.NewOnlineMatcher() @@ -113,16 +140,7 @@ func (s *Suite) TestSimpleGame() { // We need to loop the subscription to mock waku behaviour // We should probably check the published messages instead of received ones, but it's fine for now. - subscription := &transport.MessagesSubscription{ - Ch: make(chan []byte), - Unsubscribe: func() {}, - } - loop := func(room *protocol.Room, payload []byte) { - subscription.Ch <- payload - } - s.transport.EXPECT().SubscribeToMessages(roomMatcher). - Return(subscription, nil). - Times(1) + loop := s.expectSubscribeToMessages(room) // Join room stateMatcher := matchers.NewStateMatcher(nil) @@ -342,16 +360,3 @@ func (s *Suite) TestPublishMessage() { }) } } - -func (s *Suite) newGame(extraOptions []Option) *Game { - options := []Option{ - WithContext(s.ctx), - WithTransport(s.transport), - } - options = append(options, extraOptions...) - - g := NewGame(options) - s.Require().NotNil(g) - - return g -} diff --git a/pkg/game/options.go b/pkg/game/options.go index b7ac601..6cf9d45 100644 --- a/pkg/game/options.go +++ b/pkg/game/options.go @@ -5,6 +5,7 @@ import ( "2sp/pkg/storage" "context" "go.uber.org/zap" + "time" ) type Option func(*Game) @@ -38,3 +39,21 @@ func WithEnableSymmetricEncryption(b bool) Option { g.config.EnableSymmetricEncryption = b } } + +func WithPlayerName(name string) Option { + return func(g *Game) { + g.config.PlayerName = name + } +} + +func WithOnlineMessagePeriod(d time.Duration) Option { + return func(g *Game) { + g.config.OnlineMessagePeriod = d + } +} + +func WithStateMessagePeriod(d time.Duration) Option { + return func(g *Game) { + g.config.StateMessagePeriod = d + } +} diff --git a/pkg/game/options_test.go b/pkg/game/options_test.go index e630dc3..98477a4 100644 --- a/pkg/game/options_test.go +++ b/pkg/game/options_test.go @@ -4,24 +4,33 @@ import ( mocktransport "2sp/internal/transport/mock" mockstorage "2sp/pkg/storage/mock" "context" + "github.com/brianvoe/gofakeit/v6" "github.com/stretchr/testify/require" "go.uber.org/zap" + "go.uber.org/zap/zapcore" "testing" + "time" ) func TestOptions(t *testing.T) { - ctx := context.Background() + ctx := context.WithValue(context.Background(), gofakeit.LetterN(3), gofakeit.LetterN(3)) transport := &mocktransport.MockService{} storage := &mockstorage.MockService{} logger := zap.NewNop() - const enableSymmetricEncryption = false + enableSymmetricEncryption := gofakeit.Bool() + playerName := gofakeit.Username() + onlineMessagePeriod := time.Duration(gofakeit.Int64()) + stateMessagePeriod := time.Duration(gofakeit.Int64()) options := []Option{ WithContext(ctx), WithTransport(transport), WithStorage(storage), WithLogger(logger), - WithEnableSymmetricEncryption(false), + WithEnableSymmetricEncryption(enableSymmetricEncryption), + WithPlayerName(playerName), + WithOnlineMessagePeriod(onlineMessagePeriod), + WithStateMessagePeriod(stateMessagePeriod), } game := NewGame(options) @@ -31,6 +40,9 @@ func TestOptions(t *testing.T) { require.Equal(t, storage, game.storage) require.Equal(t, logger, game.logger) require.Equal(t, enableSymmetricEncryption, game.config.EnableSymmetricEncryption) + require.Equal(t, playerName, game.config.PlayerName) + require.Equal(t, onlineMessagePeriod, game.config.OnlineMessagePeriod) + require.Equal(t, stateMessagePeriod, game.config.StateMessagePeriod) } func TestNoTransport(t *testing.T) { @@ -40,3 +52,24 @@ func TestNoTransport(t *testing.T) { game := NewGame(options) require.Nil(t, game) } + +func TestNoContext(t *testing.T) { + options := []Option{ + WithContext(nil), + WithTransport(&mocktransport.MockService{}), + } + game := NewGame(options) + require.NotNil(t, game) + require.Equal(t, context.Background(), game.ctx) +} + +func TestNotLogger(t *testing.T) { + options := []Option{ + WithLogger(nil), + WithTransport(&mocktransport.MockService{}), + } + game := NewGame(options) + require.NotNil(t, game) + require.NotNil(t, game.logger) + require.Equal(t, zapcore.InvalidLevel, game.logger.Level()) +} From 4482ac45663b73cc4b439625bf98f3460de2a525 Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Sun, 2 Jun 2024 22:09:37 +0100 Subject: [PATCH 3/9] chore: simplify JoinRoom method --- pkg/game/game.go | 37 +++++++++++++++++++++++-------------- 1 file changed, 23 insertions(+), 14 deletions(-) diff --git a/pkg/game/game.go b/pkg/game/game.go index a70208f..6626355 100644 --- a/pkg/game/game.go +++ b/pkg/game/game.go @@ -582,17 +582,7 @@ func (g *Game) JoinRoom(roomID protocol.RoomID, state *protocol.State) error { } if state == nil && g.HasStorage() { - state, err = g.storage.LoadRoomState(roomID) - if err != nil { - g.logger.Info("room not found in storage", zap.Error(err)) - } else { - g.logger.Info("loaded room from storage", zap.Any("roomID", roomID)) - now := time.Now() - for i := range state.Players { - online := now.Sub(state.Players[i].OnlineTimestamp) < playerOnlineTimeout - state.Players[i].Online = online - } - } + state = g.loadStateFromStorage(roomID) } g.exitRoom = make(chan struct{}) @@ -625,9 +615,7 @@ func (g *Game) JoinRoom(roomID protocol.RoomID, state *protocol.State) error { } else { g.stateTimestamp = g.timestamp() - g.logger.Info("loaded room", - zap.Any("roomID", roomID), - zap.Bool("isDealer", g.isDealer)) + g.logger.Info("loaded room", zap.Any("roomID", roomID), zap.Bool("isDealer", g.isDealer)) } return nil @@ -860,3 +848,24 @@ func nilStorage(s storage.Service) bool { func (g *Game) HasStorage() bool { return !nilStorage(g.storage) } + +func (g *Game) loadStateFromStorage(roomID protocol.RoomID) *protocol.State { + if !g.HasStorage() { + return nil + } + state, err := g.storage.LoadRoomState(roomID) + if err != nil { + g.logger.Info("room not found in storage", zap.Error(err)) + return nil + } + g.logger.Info("loaded room from storage", zap.Any("roomID", roomID)) + + // Mark players as offline if they haven't been seen for a while + now := time.Now() + for i := range state.Players { + online := now.Sub(state.Players[i].OnlineTimestamp) < playerOnlineTimeout + state.Players[i].Online = online + } + + return state +} From dae7e6ec8f48de6e5a856ee5316a9bc2caaa95ec Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Thu, 6 Jun 2024 22:48:42 +0100 Subject: [PATCH 4/9] fix_: proper player online message timestamp type --- pkg/protocol/messages.go | 9 --------- pkg/protocol/player.go | 21 +++++++++++++++++++++ pkg/protocol/protocol_test.go | 28 ++++++++++++++++++++++++++++ 3 files changed, 49 insertions(+), 9 deletions(-) create mode 100644 pkg/protocol/player.go diff --git a/pkg/protocol/messages.go b/pkg/protocol/messages.go index 537053b..d8d6b04 100644 --- a/pkg/protocol/messages.go +++ b/pkg/protocol/messages.go @@ -2,7 +2,6 @@ package protocol import ( "2sp/internal/config" - "time" ) const Version byte = 1 @@ -10,14 +9,6 @@ const Version byte = 1 type PlayerID string type IssueID string -type Player struct { - ID PlayerID `json:"id"` - Name string `json:"name"` - Online bool `json:"online"` - - OnlineTimestamp time.Time `json:"onlineTimestamp"` -} - type Issue struct { ID IssueID `json:"id"` TitleOrURL string `json:"titleOrUrl"` diff --git a/pkg/protocol/player.go b/pkg/protocol/player.go new file mode 100644 index 0000000..0d0e4e8 --- /dev/null +++ b/pkg/protocol/player.go @@ -0,0 +1,21 @@ +package protocol + +import ( + "time" +) + +type Player struct { + ID PlayerID `json:"id"` + Name string `json:"name"` + Online bool `json:"online"` + + // Deprecated: use OnlineTimestamp instead + OnlineTimestamp time.Time `json:"onlineTimestamp"` + OnlineTimestampMilliseconds int64 `json:"onlineTimestampMilliseconds"` +} + +func (p *Player) ApplyDeprecatedPatch() { + if p.OnlineTimestampMilliseconds == 0 { + p.OnlineTimestampMilliseconds = p.OnlineTimestamp.UnixMilli() + } +} diff --git a/pkg/protocol/protocol_test.go b/pkg/protocol/protocol_test.go index fb99edb..0d03474 100644 --- a/pkg/protocol/protocol_test.go +++ b/pkg/protocol/protocol_test.go @@ -1,9 +1,12 @@ package protocol import ( + "encoding/json" + "github.com/brianvoe/gofakeit/v6" "github.com/stretchr/testify/require" "reflect" "testing" + "time" ) func TestRoomID(t *testing.T) { @@ -22,3 +25,28 @@ func TestRoomID(t *testing.T) { require.Equal(t, sent.Version, received.Version) require.Equal(t, sent.SymmetricKey, received.SymmetricKey) } + +func TestOnlineTimestampMigration(t *testing.T) { + now := time.Now() + + player := Player{ + ID: PlayerID(gofakeit.LetterN(5)), + Name: gofakeit.Username(), + Online: true, + OnlineTimestamp: now, + } + + payload, err := json.Marshal(player) + require.NoError(t, err) + + var playerReceived Player + err = json.Unmarshal(payload, &playerReceived) + require.NoError(t, err) + + playerReceived.ApplyDeprecatedPatch() + + require.Equal(t, player.ID, playerReceived.ID) + require.Equal(t, player.Name, playerReceived.Name) + require.Equal(t, player.OnlineTimestamp.UnixMilli(), playerReceived.OnlineTimestamp.UnixMilli()) + require.Equal(t, player.OnlineTimestamp.UnixMilli(), playerReceived.OnlineTimestampMilliseconds) +} From 41b0ccf407fdf533308f8ca7138d3b6d82b1f33a Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Thu, 6 Jun 2024 23:55:27 +0100 Subject: [PATCH 5/9] chore: player online tests --- cmd/2sp/main.go | 2 + go.mod | 1 + go.sum | 2 + internal/testcommon/matchers/matcher.go | 29 +++ .../testcommon/matchers/message_matcher.go | 8 + .../testcommon/matchers/online_matcher.go | 34 +++- internal/testcommon/matchers/room_matcher.go | 8 +- internal/testcommon/matchers/state_matcher.go | 35 ++-- internal/testcommon/matchers/vote_matcher.go | 8 +- pkg/game/code_control_flags_test.go | 7 + pkg/game/config.go | 2 + pkg/game/features.go | 10 ++ pkg/game/game.go | 155 ++++++++-------- pkg/game/game_handle.go | 49 +++++ pkg/game/game_test.go | 167 ++++++++++++++---- pkg/game/options.go | 13 ++ pkg/game/options_test.go | 9 + pkg/protocol/messages.go | 12 +- pkg/protocol/player.go | 21 ++- pkg/protocol/players_list.go | 12 ++ pkg/protocol/protocol_test.go | 32 +++- pkg/storage/local_storage_test.go | 4 +- 22 files changed, 466 insertions(+), 154 deletions(-) create mode 100644 internal/testcommon/matchers/matcher.go create mode 100644 pkg/game/code_control_flags_test.go create mode 100644 pkg/game/game_handle.go create mode 100644 pkg/protocol/players_list.go diff --git a/cmd/2sp/main.go b/cmd/2sp/main.go index 6e5684b..e74dcec 100644 --- a/cmd/2sp/main.go +++ b/cmd/2sp/main.go @@ -7,6 +7,7 @@ import ( "2sp/pkg/game" "2sp/pkg/storage" "context" + "github.com/jonboulle/clockwork" "os" ) @@ -29,6 +30,7 @@ func main() { game.WithOnlineMessagePeriod(config.OnlineMessagePeriod), game.WithStateMessagePeriod(config.StateMessagePeriod), game.WithEnableSymmetricEncryption(config.EnableSymmetricEncryption), + game.WithClock(clockwork.NewRealClock()), } game := game.NewGame(options) diff --git a/go.mod b/go.mod index e0a13ee..30c070b 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/ethereum/go-ethereum v1.10.26 github.com/google/go-github/v61 v61.0.0 github.com/google/uuid v1.3.0 + github.com/jonboulle/clockwork v0.4.0 github.com/mr-tron/base58 v1.2.0 github.com/muesli/termenv v0.15.2 github.com/multiformats/go-multiaddr v0.10.1 diff --git a/go.sum b/go.sum index 743f2b1..120d5d8 100644 --- a/go.sum +++ b/go.sum @@ -345,6 +345,8 @@ github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0 github.com/jessevdk/go-flags v0.0.0-20141203071132-1679536dcc89/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI= github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo= github.com/jmespath/go-jmespath/internal/testify v1.5.1/go.mod h1:L3OGu8Wl2/fWfCI6z80xFu9LTZmf1ZRjMHUOPmWr69U= +github.com/jonboulle/clockwork v0.4.0 h1:p4Cf1aMWXnXAUh8lVfewRBx1zaTSYKrKMF2g3ST4RZ4= +github.com/jonboulle/clockwork v0.4.0/go.mod h1:xgRqUGwRcjKCO1vbZUEtSLrqKoPSsUpK7fnezOII0kc= github.com/jrick/logrotate v1.0.0/go.mod h1:LNinyqDIJnpAur+b8yyulnQw/wDuN1+BYKlTRt3OuAQ= github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU= diff --git a/internal/testcommon/matchers/matcher.go b/internal/testcommon/matchers/matcher.go new file mode 100644 index 0000000..93e4be1 --- /dev/null +++ b/internal/testcommon/matchers/matcher.go @@ -0,0 +1,29 @@ +package matchers + +import ( + "github.com/stretchr/testify/require" + "testing" + "time" +) + +type Matcher struct { + t *testing.T + triggered chan interface{} +} + +func NewMatcher(t *testing.T) *Matcher { + return &Matcher{ + t: t, + triggered: make(chan interface{}, 42), + } +} + +func (m *Matcher) Wait() interface{} { + select { + case <-time.After(1 * time.Second): + require.Fail(m.t, "timeout waiting for matched call") + case result := <-m.triggered: + return result + } + return nil +} diff --git a/internal/testcommon/matchers/message_matcher.go b/internal/testcommon/matchers/message_matcher.go index ed30c89..e85db06 100644 --- a/internal/testcommon/matchers/message_matcher.go +++ b/internal/testcommon/matchers/message_matcher.go @@ -3,13 +3,21 @@ package matchers import ( "2sp/pkg/protocol" "fmt" + "testing" ) type MessageMatcher struct { + Matcher payload []byte message *protocol.Message } +func NewMessageMatcher(t *testing.T) *MessageMatcher { + return &MessageMatcher{ + Matcher: *NewMatcher(t), + } +} + func (m *MessageMatcher) Matches(x interface{}) bool { m.message = nil m.payload = x.([]byte) diff --git a/internal/testcommon/matchers/online_matcher.go b/internal/testcommon/matchers/online_matcher.go index 6fb77b1..e1a53b9 100644 --- a/internal/testcommon/matchers/online_matcher.go +++ b/internal/testcommon/matchers/online_matcher.go @@ -1,19 +1,27 @@ package matchers import ( + "2sp/internal/config" "2sp/pkg/protocol" + "encoding/json" "fmt" + "go.uber.org/zap" + "testing" ) type OnlineMatcher struct { MessageMatcher + playerID protocol.PlayerID } -func NewOnlineMatcher() OnlineMatcher { - return OnlineMatcher{} +func NewOnlineMatcher(t *testing.T, playerID protocol.PlayerID) *OnlineMatcher { + return &OnlineMatcher{ + MessageMatcher: *NewMessageMatcher(t), + playerID: playerID, + } } -func (m OnlineMatcher) Matches(x interface{}) bool { +func (m *OnlineMatcher) Matches(x interface{}) bool { if !m.MessageMatcher.Matches(x) { return false } @@ -22,9 +30,27 @@ func (m OnlineMatcher) Matches(x interface{}) bool { return false } + var onlineMessage protocol.PlayerOnlineMessage + err := json.Unmarshal(m.payload, &onlineMessage) + if err != nil { + return false + } + + if onlineMessage.Player.ID != m.playerID { + return false + } + + config.Logger.Debug("<<< OnlineMatcher.Matches", + zap.Any("onlineMessage", onlineMessage), + ) + m.triggered <- onlineMessage return true } -func (m OnlineMatcher) String() string { +func (m *OnlineMatcher) String() string { return fmt.Sprintf("is user online protocol message") } + +func (m *OnlineMatcher) Wait() protocol.PlayerOnlineMessage { + return m.MessageMatcher.Wait().(protocol.PlayerOnlineMessage) +} diff --git a/internal/testcommon/matchers/room_matcher.go b/internal/testcommon/matchers/room_matcher.go index cad6907..bbcbdb7 100644 --- a/internal/testcommon/matchers/room_matcher.go +++ b/internal/testcommon/matchers/room_matcher.go @@ -9,17 +9,17 @@ type RoomMatcher struct { room *protocol.Room } -func NewRoomMatcher(room *protocol.Room) RoomMatcher { - return RoomMatcher{ +func NewRoomMatcher(room *protocol.Room) *RoomMatcher { + return &RoomMatcher{ room: room, } } -func (m RoomMatcher) Matches(x interface{}) bool { +func (m *RoomMatcher) Matches(x interface{}) bool { room, ok := x.(*protocol.Room) return ok && m.room.ToRoomID() == room.ToRoomID() } -func (m RoomMatcher) String() string { +func (m *RoomMatcher) String() string { return fmt.Sprintf("is equal to room %s", m.room.ToRoomID()) } diff --git a/internal/testcommon/matchers/state_matcher.go b/internal/testcommon/matchers/state_matcher.go index 7ca75f0..9955c3c 100644 --- a/internal/testcommon/matchers/state_matcher.go +++ b/internal/testcommon/matchers/state_matcher.go @@ -5,31 +5,25 @@ import ( "encoding/json" "fmt" "testing" - "time" ) -type Matcher func(state *protocol.State) +type Callback func(state *protocol.State) bool type StateMatcher struct { + Matcher MessageMatcher - matcher Matcher - state protocol.State - - triggered chan protocol.State + cb Callback + state protocol.State } -func NewStateMatcher(matcher Matcher) *StateMatcher { +func NewStateMatcher(t *testing.T, cb Callback) *StateMatcher { return &StateMatcher{ - matcher: matcher, - triggered: make(chan protocol.State, 1), + Matcher: *NewMatcher(t), + cb: cb, } } func (m *StateMatcher) Matches(x interface{}) bool { - defer func() { - m.triggered <- m.state - }() - if !m.MessageMatcher.Matches(x) { return false } @@ -45,13 +39,13 @@ func (m *StateMatcher) Matches(x interface{}) bool { } m.state = stateMessage.State + m.triggered <- stateMessage.State - if m.matcher == nil { + if m.cb == nil { return true } - m.matcher(&stateMessage.State) - return true + return m.cb(&stateMessage.State) } func (m *StateMatcher) String() string { @@ -62,11 +56,6 @@ func (m *StateMatcher) State() protocol.State { return m.state } -func (m *StateMatcher) Wait(t *testing.T) protocol.State { - select { - case <-time.After(1 * time.Second): - t.Fatal("timeout waiting for state message") - case <-m.triggered: - } - return m.state +func (m *StateMatcher) Wait() protocol.State { + return m.Matcher.Wait().(protocol.State) } diff --git a/internal/testcommon/matchers/vote_matcher.go b/internal/testcommon/matchers/vote_matcher.go index 0bd3f82..1a18313 100644 --- a/internal/testcommon/matchers/vote_matcher.go +++ b/internal/testcommon/matchers/vote_matcher.go @@ -12,15 +12,15 @@ type VoteMatcher struct { voteValue protocol.VoteValue } -func NewVoteMatcher(playerID protocol.PlayerID, issue protocol.IssueID, value protocol.VoteValue) VoteMatcher { - return VoteMatcher{ +func NewVoteMatcher(playerID protocol.PlayerID, issue protocol.IssueID, value protocol.VoteValue) *VoteMatcher { + return &VoteMatcher{ playerID: playerID, issueID: issue, voteValue: value, } } -func (m VoteMatcher) Matches(x interface{}) bool { +func (m *VoteMatcher) Matches(x interface{}) bool { if !m.MessageMatcher.Matches(x) { return false } @@ -40,6 +40,6 @@ func (m VoteMatcher) Matches(x interface{}) bool { vote.Timestamp > 0 } -func (m VoteMatcher) String() string { +func (m *VoteMatcher) String() string { return fmt.Sprintf("is any state message") } diff --git a/pkg/game/code_control_flags_test.go b/pkg/game/code_control_flags_test.go new file mode 100644 index 0000000..a40ee87 --- /dev/null +++ b/pkg/game/code_control_flags_test.go @@ -0,0 +1,7 @@ +package game + +func WithEnablePublishOnlineState(enable bool) Option { + return func(game *Game) { + game.codeControls.EnablePublishOnlineState = enable + } +} diff --git a/pkg/game/config.go b/pkg/game/config.go index b46a100..b028eae 100644 --- a/pkg/game/config.go +++ b/pkg/game/config.go @@ -7,6 +7,7 @@ type gameConfig struct { EnableSymmetricEncryption bool OnlineMessagePeriod time.Duration StateMessagePeriod time.Duration + PublishStateLoopEnabled bool } var defaultConfig = gameConfig{ @@ -14,4 +15,5 @@ var defaultConfig = gameConfig{ EnableSymmetricEncryption: true, OnlineMessagePeriod: 5 * time.Second, StateMessagePeriod: 30 * time.Second, + PublishStateLoopEnabled: true, } diff --git a/pkg/game/features.go b/pkg/game/features.go index 8dc5231..82148dd 100644 --- a/pkg/game/features.go +++ b/pkg/game/features.go @@ -9,3 +9,13 @@ func defaultFeatureFlags() FeatureFlags { EnableDeckSelection: false, } } + +type codeControlFlags struct { + EnablePublishOnlineState bool +} + +func defaultCodeControlFlags() codeControlFlags { + return codeControlFlags{ + EnablePublishOnlineState: true, + } +} diff --git a/pkg/game/game.go b/pkg/game/game.go index 6626355..ce975a6 100644 --- a/pkg/game/game.go +++ b/pkg/game/game.go @@ -7,6 +7,7 @@ import ( "context" "encoding/json" "fmt" + "github.com/jonboulle/clockwork" "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/exp/slices" @@ -23,12 +24,15 @@ var ( type StateSubscription chan *protocol.State type Game struct { - logger *zap.Logger - ctx context.Context - transport transport.Service - storage storage.Service - exitRoom chan struct{} - features FeatureFlags + logger *zap.Logger + ctx context.Context + transport transport.Service + storage storage.Service + clock clockwork.Clock + exitRoom chan struct{} + messages chan []byte + features FeatureFlags + codeControls codeControlFlags isDealer bool player *protocol.Player @@ -44,10 +48,12 @@ type Game struct { func NewGame(opts []Option) *Game { game := &Game{ - exitRoom: nil, - features: defaultFeatureFlags(), - isDealer: false, - player: nil, + exitRoom: nil, + messages: make(chan []byte, 42), + features: defaultFeatureFlags(), + codeControls: defaultCodeControlFlags(), + isDealer: false, + player: nil, myVote: protocol.VoteResult{ Value: "", Timestamp: 0, @@ -74,6 +80,11 @@ func NewGame(opts []Option) *Game { return nil } + if game.clock == nil { + game.logger.Error("clock is required") + return nil + } + return game } @@ -128,8 +139,8 @@ func (g *Game) Stop() { // WARNING: wait for all routines to finish } -func (g *Game) processMessage(payload []byte) { - g.logger.Debug("processing message", zap.String("payload", string(payload))) +func (g *Game) handleMessage(payload []byte) { + g.logger.Debug("handling message", zap.String("payload", string(payload))) message := protocol.Message{} err := json.Unmarshal(payload, &message) @@ -176,31 +187,7 @@ func (g *Game) processMessage(payload []byte) { logger.Error("failed to unmarshal message", zap.Error(err)) return } - g.logger.Info("player online message received", zap.Any("player", playerOnline.Player)) - - // TODO: Store player pointers in a map - - index := g.playerIndex(playerOnline.Player.ID) - if index < 0 { - playerOnline.Player.Online = true - playerOnline.Player.OnlineTimestamp = time.Now() - g.state.Players = append(g.state.Players, playerOnline.Player) - g.notifyChangedState(true) - return - } - - playerChanged := !g.state.Players[index].Online || - g.state.Players[index].Name != playerOnline.Player.Name - - g.state.Players[index].OnlineTimestamp = time.Now() - - if !playerChanged { - return - } - - g.state.Players[index].Online = true - g.state.Players[index].Name = playerOnline.Player.Name - g.notifyChangedState(true) + g.handlePlayerOnlineMessage(&playerOnline) case protocol.MessageTypePlayerOffline: if !g.isDealer { @@ -212,15 +199,7 @@ func (g *Game) processMessage(payload []byte) { logger.Error("failed to unmarshal message", zap.Error(err)) return } - g.logger.Info("player is offline", zap.Any("player", playerOffline.Player)) - - index := g.playerIndex(playerOffline.Player.ID) - if index < 0 { - return - } - - g.state.Players[index].Online = false - g.notifyChangedState(true) + g.handlePlayerOfflineMessage(&playerOffline) case protocol.MessageTypePlayerVote: if !g.isDealer { @@ -326,7 +305,7 @@ func (g *Game) publishOnlineState() { g.publishUserOnline(true) for { select { - case <-time.After(g.config.OnlineMessagePeriod): + case <-g.clock.After(g.config.OnlineMessagePeriod): g.publishUserOnline(true) case <-g.exitRoom: return @@ -341,7 +320,7 @@ func (g *Game) publishStateLoop() { logger.Debug("started") for { select { - case <-time.After(g.config.StateMessagePeriod): + case <-g.clock.After(g.config.StateMessagePeriod): logger.Debug("tick") g.notifyChangedState(true) case <-g.exitRoom: @@ -356,7 +335,7 @@ func (g *Game) publishStateLoop() { func (g *Game) watchPlayersStateLoop() { g.logger.Debug("check users state loop") - ticker := time.NewTicker(1 * time.Second) + ticker := g.clock.NewTicker(1 * time.Second) defer ticker.Stop() for { select { @@ -364,24 +343,29 @@ func (g *Game) watchPlayersStateLoop() { return case <-g.ctx.Done(): return - case <-ticker.C: + case <-ticker.Chan(): if g.state == nil { continue } - now := time.Now() + stateChanged := false + now := g.clock.Now() for i, player := range g.state.Players { if !player.Online { continue } - diff := now.Sub(player.OnlineTimestamp) - if diff > playerOnlineTimeout { - g.logger.Info("marking user as offline", - zap.Any("name", player.Name), - zap.Any("lastSeenAt", player.OnlineTimestamp), - zap.Any("now", now), - ) - g.state.Players[i].Online = false + if now.Sub(player.OnlineTime()) <= playerOnlineTimeout { + continue } + g.logger.Info("marking user as offline", + zap.Any("name", player.Name), + zap.Any("lastSeenAt", player.OnlineTimestampMilliseconds), + zap.Any("now", now), + ) + g.state.Players[i].Online = false + stateChanged = true + } + if stateChanged { + g.notifyChangedState(true) } } } @@ -397,11 +381,24 @@ func (g *Game) processIncomingMessages(sub *transport.MessagesSubscription) { if !more { return } - g.processMessage(payload) + g.handleMessage(payload) + case <-g.exitRoom: + return + case <-g.ctx.Done(): + return + } + } +} + +func (g *Game) loopPublishedMessages() { + for { + select { case <-g.exitRoom: return case <-g.ctx.Done(): return + case payload := <-g.messages: + g.handleMessage(payload) } } } @@ -422,28 +419,41 @@ func (g *Game) publishMessage(message any) error { err = g.transport.PublishUnencryptedMessage(g.room, payload) } + // Loop message to ourselves + if g.isDealer { + g.messages <- payload + } + return err } func (g *Game) publishUserOnline(online bool) { - g.logger.Debug("publishing online state", zap.Bool("online", online)) + timestamp := g.timestamp() + + g.logger.Debug("publishing online state", + zap.Bool("online", online), + zap.Int64("timestamp", timestamp), + ) var message interface{} + player := *g.player + player.ApplyDeprecatedPatchOnSend() + if online { message = protocol.PlayerOnlineMessage{ - Player: *g.player, + Player: player, Message: protocol.Message{ Type: protocol.MessageTypePlayerOnline, - Timestamp: g.timestamp(), + Timestamp: timestamp, }, } } else { message = protocol.PlayerOfflineMessage{ - Player: *g.player, + Player: player, Message: protocol.Message{ Type: protocol.MessageTypePlayerOffline, - Timestamp: g.timestamp(), + Timestamp: timestamp, }, } } @@ -515,7 +525,7 @@ func (g *Game) publishState(state *protocol.State) { } func (g *Game) timestamp() int64 { - return time.Now().UnixMilli() + return g.clock.Now().UnixMilli() } func (g *Game) Deal(input string) (protocol.IssueID, error) { @@ -602,10 +612,15 @@ func (g *Game) JoinRoom(roomID protocol.RoomID, state *protocol.State) error { } g.resetMyVote() + go g.loopPublishedMessages() go g.processIncomingMessages(sub) - go g.publishOnlineState() + if g.codeControls.EnablePublishOnlineState { + go g.publishOnlineState() + } if g.isDealer { - go g.publishStateLoop() + if g.config.PublishStateLoopEnabled { + go g.publishStateLoop() + } go g.watchPlayersStateLoop() } g.notifyChangedState(g.isDealer) @@ -861,9 +876,9 @@ func (g *Game) loadStateFromStorage(roomID protocol.RoomID) *protocol.State { g.logger.Info("loaded room from storage", zap.Any("roomID", roomID)) // Mark players as offline if they haven't been seen for a while - now := time.Now() + now := g.clock.Now() for i := range state.Players { - online := now.Sub(state.Players[i].OnlineTimestamp) < playerOnlineTimeout + online := now.Sub(state.Players[i].OnlineTime()) < playerOnlineTimeout state.Players[i].Online = online } diff --git a/pkg/game/game_handle.go b/pkg/game/game_handle.go new file mode 100644 index 0000000..967927c --- /dev/null +++ b/pkg/game/game_handle.go @@ -0,0 +1,49 @@ +package game + +import ( + "2sp/pkg/protocol" + "go.uber.org/zap" +) + +func (g *Game) handlePlayerOnlineMessage(message *protocol.PlayerOnlineMessage) { + g.logger.Info("player online message received", zap.Any("player", message.Player)) + + message.Player.ApplyDeprecatedPatchOnReceive() + + // TODO: Store player pointers in a map + + index := g.playerIndex(message.Player.ID) + if index < 0 { + message.Player.Online = true + message.Player.OnlineTimestampMilliseconds = g.timestamp() + g.state.Players = append(g.state.Players, message.Player) + g.notifyChangedState(true) + g.logger.Info("player joined", zap.Any("player", message.Player)) + return + } + + playerChanged := !g.state.Players[index].Online || + g.state.Players[index].Name != message.Player.Name + + g.state.Players[index].OnlineTimestampMilliseconds = g.timestamp() + + if !playerChanged { + return + } + + g.state.Players[index].Online = true + g.state.Players[index].Name = message.Player.Name + g.notifyChangedState(true) +} + +func (g *Game) handlePlayerOfflineMessage(message *protocol.PlayerOfflineMessage) { + g.logger.Info("player is offline", zap.Any("player", message.Player)) + + index := g.playerIndex(message.Player.ID) + if index < 0 { + return + } + + g.state.Players[index].Online = false + g.notifyChangedState(true) +} diff --git a/pkg/game/game_test.go b/pkg/game/game_test.go index 6bb7d4e..8eb1abe 100644 --- a/pkg/game/game_test.go +++ b/pkg/game/game_test.go @@ -10,8 +10,10 @@ import ( "encoding/json" "fmt" "github.com/brianvoe/gofakeit/v6" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/suite" "go.uber.org/mock/gomock" + "go.uber.org/zap" "testing" ) @@ -25,13 +27,18 @@ type Suite struct { ctx context.Context cancel context.CancelFunc transport *mocktransport.MockService - game *Game + clock clockwork.FakeClock + dealer *Game } func (s *Suite) newGame(extraOptions []Option) *Game { options := []Option{ WithContext(s.ctx), WithTransport(s.transport), + WithClock(s.clock), + WithLogger(s.Logger), + WithPlayerName(gofakeit.Username()), + WithPublishStateLoop(false), } options = append(options, extraOptions...) @@ -49,12 +56,13 @@ func (s *Suite) SetupTest() { ctrl := gomock.NewController(s.T()) s.transport = mocktransport.NewMockService(ctrl) + s.clock = clockwork.NewFakeClock() - s.game = s.newGame([]Option{ + s.dealer = s.newGame([]Option{ WithEnableSymmetricEncryption(true), }) - err := s.game.Initialize() + err := s.dealer.Initialize() s.Require().NoError(err) } @@ -62,6 +70,10 @@ func (s *Suite) TearDownTest() { s.cancel() } +func (s *Suite) newStateMatcher() *matchers.StateMatcher { + return matchers.NewStateMatcher(s.T(), nil) +} + func (s *Suite) expectSubscribeToMessages(room *protocol.Room) func(room *protocol.Room, payload []byte) { roomMatcher := matchers.NewRoomMatcher(room) @@ -127,30 +139,28 @@ func (s *Suite) TestStateSize() { } func (s *Suite) TestSimpleGame() { - room, initialState, err := s.game.CreateNewRoom() + room, initialState, err := s.dealer.CreateNewRoom() s.Require().NoError(err) s.Require().NotNil(room) roomID := room.ToRoomID() roomMatcher := matchers.NewRoomMatcher(room) - onlineMatcher := matchers.NewOnlineMatcher() + onlineMatcher := matchers.NewOnlineMatcher(s.T(), s.dealer.Player().ID) // Online state is sent periodically s.transport.EXPECT().PublishPublicMessage(roomMatcher, onlineMatcher).AnyTimes() - // We need to loop the subscription to mock waku behaviour - // We should probably check the published messages instead of received ones, but it's fine for now. - loop := s.expectSubscribeToMessages(room) + s.expectSubscribeToMessages(room) // Join room - stateMatcher := matchers.NewStateMatcher(nil) + stateMatcher := s.newStateMatcher() s.transport.EXPECT().PublishPublicMessage(roomMatcher, stateMatcher). Times(1) - err = s.game.JoinRoom(roomID, initialState) + err = s.dealer.JoinRoom(roomID, initialState) s.Require().NoError(err) - state := stateMatcher.Wait(s.T()) + state := stateMatcher.Wait() s.Require().False(state.VotesRevealed) s.Require().Empty(state.ActiveIssue) s.Require().Len(state.Players, 1) @@ -172,68 +182,67 @@ func (s *Suite) TestSimpleGame() { } { // Deal first vote item - stateMatcher = matchers.NewStateMatcher(nil) + stateMatcher = s.newStateMatcher() s.transport.EXPECT(). PublishPublicMessage(roomMatcher, stateMatcher). Times(1) - firstIssueID, err = s.game.Deal(firstItemText) + firstIssueID, err = s.dealer.Deal(firstItemText) s.Require().NoError(err) - state = stateMatcher.Wait(s.T()) + state = stateMatcher.Wait() item := checkIssues(state.Issues) s.Require().Nil(item.Result) s.Require().Len(item.Votes, 0) s.Logger.Info("match on deal first item") } - currentIssue := s.game.CurrentState().Issues.Get(s.game.CurrentState().ActiveIssue) + currentIssue := s.dealer.CurrentState().Issues.Get(s.dealer.CurrentState().ActiveIssue) s.Require().NotNil(currentIssue) s.Require().Equal(firstItemText, currentIssue.TitleOrURL) { // Publish dealer vote - voteMatcher := matchers.NewVoteMatcher(s.game.Player().ID, currentIssue.ID, dealerVote) + voteMatcher := matchers.NewVoteMatcher(s.dealer.Player().ID, currentIssue.ID, dealerVote) s.transport.EXPECT(). PublishPublicMessage(roomMatcher, voteMatcher). - Do(loop). // FIXME: Game should not depend on transport loop/no-loop behaviour Times(1) - stateMatcher = matchers.NewStateMatcher(nil) + stateMatcher = s.newStateMatcher() s.transport.EXPECT(). PublishPublicMessage(roomMatcher, stateMatcher). Times(1) - err = s.game.PublishVote(dealerVote) + err = s.dealer.PublishVote(dealerVote) s.Require().NoError(err) - state = stateMatcher.Wait(s.T()) + state = stateMatcher.Wait() item := checkIssues(state.Issues) s.Require().NotNil(item) s.Require().Nil(item.Result) s.Require().Len(item.Votes, 1) - vote, ok := item.Votes[s.game.Player().ID] + vote, ok := item.Votes[s.dealer.Player().ID] s.Require().True(ok) s.Require().Empty(vote.Value) s.Require().Greater(vote.Timestamp, int64(0)) } { // Reveal votes - stateMatcher = matchers.NewStateMatcher(nil) + stateMatcher = s.newStateMatcher() s.transport.EXPECT(). PublishPublicMessage(roomMatcher, stateMatcher). Times(1) - err = s.game.Reveal() + err = s.dealer.Reveal() s.Require().NoError(err) - state = stateMatcher.Wait(s.T()) + state = stateMatcher.Wait() item := checkIssues(state.Issues) s.Require().Nil(item.Result) s.Require().Len(item.Votes, 1) - vote, ok := item.Votes[s.game.Player().ID] + vote, ok := item.Votes[s.dealer.Player().ID] s.Require().True(ok) s.Require().NotNil(vote) s.Require().Equal(dealerVote, vote.Value) @@ -243,21 +252,21 @@ func (s *Suite) TestSimpleGame() { const votingResult = protocol.VoteValue("1") { // Finish voting - stateMatcher = matchers.NewStateMatcher(nil) + stateMatcher = s.newStateMatcher() s.transport.EXPECT(). PublishPublicMessage(roomMatcher, stateMatcher). Times(1) - err = s.game.Finish(votingResult) + err = s.dealer.Finish(votingResult) s.Require().NoError(err) - state = stateMatcher.Wait(s.T()) + state = stateMatcher.Wait() item := checkIssues(state.Issues) s.Require().NotNil(item.Result) s.Require().Equal(*item.Result, votingResult) s.Require().Len(item.Votes, 1) - vote, ok := item.Votes[s.game.Player().ID] + vote, ok := item.Votes[s.dealer.Player().ID] s.Require().True(ok) s.Require().Equal(dealerVote, vote.Value) s.Require().Greater(vote.Timestamp, int64(0)) @@ -281,15 +290,15 @@ func (s *Suite) TestSimpleGame() { } { // Deal another issue - stateMatcher = matchers.NewStateMatcher(nil) + stateMatcher = s.newStateMatcher() s.transport.EXPECT(). PublishPublicMessage(roomMatcher, stateMatcher). Times(1) - secondIssueID, err = s.game.Deal(secondItemText) + secondIssueID, err = s.dealer.Deal(secondItemText) s.Require().NoError(err) - state = stateMatcher.Wait(s.T()) + state = stateMatcher.Wait() item := checkIssues(state.Issues) s.Require().Nil(item.Result) s.Require().Len(item.Votes, 0) @@ -360,3 +369,97 @@ func (s *Suite) TestPublishMessage() { }) } } + +func (s *Suite) TestOnlineState() { + /* + 2. Create and join room + 3. mock time = 0 + 4. Alice sends online message + 5. Dealer updates online timestamp + 6. mock time = 20 + 7. Dealer checks online initialState, mark as offline + */ + + playerID, err := GeneratePlayerID() + s.Require().NoError(err) + + player := protocol.Player{ + ID: playerID, + Name: gofakeit.Username(), + } + + s.dealer = s.newGame([]Option{ + WithPlayerName("dealer"), + WithEnablePublishOnlineState(false), // FIXME: Add a separate test for self publishing + }) + + s.Logger.Debug("<<< test info", + zap.Any("player", player), + zap.Any("dealer", s.dealer.Player()), + ) + + room, initialState, err := s.dealer.CreateNewRoom() + s.Require().NoError(err) + s.Require().NotNil(room) + + roomID := room.ToRoomID() + roomMatcher := matchers.NewRoomMatcher(room) + + s.expectSubscribeToMessages(room) + + //s.transport.EXPECT(). + // PublishPublicMessage(roomMatcher, + // matchers.NewOnlineMatcher(s.T(), s.dealer.Player().ID)). + // AnyTimes() + + stateMatcher := s.newStateMatcher() + s.transport.EXPECT(). + PublishPublicMessage(roomMatcher, stateMatcher). + Times(1) + + err = s.dealer.JoinRoom(roomID, initialState) + s.Require().NoError(err) + + _ = stateMatcher.Wait() + + // Player joins the room + playerOnlineMessage := &protocol.PlayerOnlineMessage{ + Message: protocol.Message{ + Type: protocol.MessageTypePlayerOnline, + Timestamp: s.clock.Now().UnixMilli(), + }, + Player: player, + } + + stateMatcher = s.newStateMatcher() + s.transport.EXPECT(). + PublishPublicMessage(roomMatcher, stateMatcher). + Times(1) + + s.dealer.handlePlayerOnlineMessage(playerOnlineMessage) + + // Ensure new player joined + state := stateMatcher.Wait() + s.Require().Len(state.Players, 2) + + p, ok := state.Players.Get(player.ID) + s.Require().True(ok) + s.Require().True(p.Online) + s.Require().Equal(s.clock.Now().UnixMilli(), p.OnlineTimestampMilliseconds) + + s.transport.EXPECT(). + PublishPublicMessage(roomMatcher, stateMatcher). + Times(1) + + // Advance time, make sure player is marked as offline + lastSeenAt := p.OnlineTimestampMilliseconds + s.clock.Advance(playerOnlineTimeout) + + state = stateMatcher.Wait() + s.Require().Len(state.Players, 2) + + p, ok = state.Players.Get(player.ID) + s.Require().True(ok) + s.Require().False(p.Online) + s.Require().Equal(lastSeenAt, p.OnlineTimestampMilliseconds) +} diff --git a/pkg/game/options.go b/pkg/game/options.go index 6cf9d45..78a8198 100644 --- a/pkg/game/options.go +++ b/pkg/game/options.go @@ -4,6 +4,7 @@ import ( "2sp/internal/transport" "2sp/pkg/storage" "context" + "github.com/jonboulle/clockwork" "go.uber.org/zap" "time" ) @@ -34,6 +35,12 @@ func WithStorage(s storage.Service) Option { } } +func WithClock(c clockwork.Clock) Option { + return func(g *Game) { + g.clock = c + } +} + func WithEnableSymmetricEncryption(b bool) Option { return func(g *Game) { g.config.EnableSymmetricEncryption = b @@ -57,3 +64,9 @@ func WithStateMessagePeriod(d time.Duration) Option { g.config.StateMessagePeriod = d } } + +func WithPublishStateLoop(enabled bool) Option { + return func(g *Game) { + g.config.PublishStateLoopEnabled = enabled + } +} diff --git a/pkg/game/options_test.go b/pkg/game/options_test.go index 98477a4..770da73 100644 --- a/pkg/game/options_test.go +++ b/pkg/game/options_test.go @@ -5,6 +5,7 @@ import ( mockstorage "2sp/pkg/storage/mock" "context" "github.com/brianvoe/gofakeit/v6" + "github.com/jonboulle/clockwork" "github.com/stretchr/testify/require" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -17,20 +18,24 @@ func TestOptions(t *testing.T) { transport := &mocktransport.MockService{} storage := &mockstorage.MockService{} logger := zap.NewNop() + clock := clockwork.NewFakeClock() enableSymmetricEncryption := gofakeit.Bool() playerName := gofakeit.Username() onlineMessagePeriod := time.Duration(gofakeit.Int64()) stateMessagePeriod := time.Duration(gofakeit.Int64()) + publishStateLoop := gofakeit.Bool() options := []Option{ WithContext(ctx), WithTransport(transport), WithStorage(storage), WithLogger(logger), + WithClock(clock), WithEnableSymmetricEncryption(enableSymmetricEncryption), WithPlayerName(playerName), WithOnlineMessagePeriod(onlineMessagePeriod), WithStateMessagePeriod(stateMessagePeriod), + WithPublishStateLoop(publishStateLoop), } game := NewGame(options) @@ -39,10 +44,12 @@ func TestOptions(t *testing.T) { require.Equal(t, transport, game.transport) require.Equal(t, storage, game.storage) require.Equal(t, logger, game.logger) + require.Equal(t, clock, game.clock) require.Equal(t, enableSymmetricEncryption, game.config.EnableSymmetricEncryption) require.Equal(t, playerName, game.config.PlayerName) require.Equal(t, onlineMessagePeriod, game.config.OnlineMessagePeriod) require.Equal(t, stateMessagePeriod, game.config.StateMessagePeriod) + require.Equal(t, publishStateLoop, game.config.PublishStateLoopEnabled) } func TestNoTransport(t *testing.T) { @@ -57,6 +64,7 @@ func TestNoContext(t *testing.T) { options := []Option{ WithContext(nil), WithTransport(&mocktransport.MockService{}), + WithClock(clockwork.NewFakeClock()), } game := NewGame(options) require.NotNil(t, game) @@ -67,6 +75,7 @@ func TestNotLogger(t *testing.T) { options := []Option{ WithLogger(nil), WithTransport(&mocktransport.MockService{}), + WithClock(clockwork.NewFakeClock()), } game := NewGame(options) require.NotNil(t, game) diff --git a/pkg/protocol/messages.go b/pkg/protocol/messages.go index d8d6b04..c8ac7f7 100644 --- a/pkg/protocol/messages.go +++ b/pkg/protocol/messages.go @@ -26,12 +26,12 @@ const ( ) type State struct { - Players []Player `json:"players"` - Issues IssuesList `json:"issues"` - ActiveIssue IssueID `json:"activeIssue"` - VotesRevealed bool `json:"votesRevealed"` - Timestamp int64 `json:"-"` // TODO: Fix conflict with Message.Timestamp. Change type to time.Time. - Deck Deck `json:"-"` + Players PlayersList `json:"players"` + Issues IssuesList `json:"issues"` + ActiveIssue IssueID `json:"activeIssue"` + VotesRevealed bool `json:"votesRevealed"` + Timestamp int64 `json:"-"` // TODO: Fix conflict with Message.Timestamp. Change type to time.Time. + Deck Deck `json:"-"` } func (s *State) VoteState() VoteState { diff --git a/pkg/protocol/player.go b/pkg/protocol/player.go index 0d0e4e8..8199786 100644 --- a/pkg/protocol/player.go +++ b/pkg/protocol/player.go @@ -10,12 +10,25 @@ type Player struct { Online bool `json:"online"` // Deprecated: use OnlineTimestamp instead + // TODO: Those fields should be removed from json. They shouldn't be part of the protocol. + // It should only be used by the dealer to keep the state of the player. + // This becomes quickly inconsistent in the protocol, because when the dealer receives a new + // player online message, he doesn't publish a new state with updated OnlineTimestamp. Because + // that would increase the network usage unnecessarily. + // Simple keeping this field in the struct, but removing it from json, works, but is not enough + // for the dealer, because dealer wants to save users OnlineTimestamp in the storage. OnlineTimestamp time.Time `json:"onlineTimestamp"` OnlineTimestampMilliseconds int64 `json:"onlineTimestampMilliseconds"` } -func (p *Player) ApplyDeprecatedPatch() { - if p.OnlineTimestampMilliseconds == 0 { - p.OnlineTimestampMilliseconds = p.OnlineTimestamp.UnixMilli() - } +func (p *Player) ApplyDeprecatedPatchOnReceive() { + p.OnlineTimestampMilliseconds = p.OnlineTimestamp.UnixMilli() +} + +func (p *Player) ApplyDeprecatedPatchOnSend() { + p.OnlineTimestamp = time.UnixMilli(p.OnlineTimestampMilliseconds) +} + +func (p *Player) OnlineTime() time.Time { + return time.UnixMilli(p.OnlineTimestampMilliseconds) } diff --git a/pkg/protocol/players_list.go b/pkg/protocol/players_list.go new file mode 100644 index 0000000..f9876ca --- /dev/null +++ b/pkg/protocol/players_list.go @@ -0,0 +1,12 @@ +package protocol + +type PlayersList []Player + +func (l PlayersList) Get(id PlayerID) (Player, bool) { + for _, player := range l { + if player.ID == id { + return player, true + } + } + return Player{}, false +} diff --git a/pkg/protocol/protocol_test.go b/pkg/protocol/protocol_test.go index 0d03474..150fae1 100644 --- a/pkg/protocol/protocol_test.go +++ b/pkg/protocol/protocol_test.go @@ -26,7 +26,7 @@ func TestRoomID(t *testing.T) { require.Equal(t, sent.SymmetricKey, received.SymmetricKey) } -func TestOnlineTimestampMigration(t *testing.T) { +func TestOnlineTimestampMigrationBackward(t *testing.T) { now := time.Now() player := Player{ @@ -43,10 +43,34 @@ func TestOnlineTimestampMigration(t *testing.T) { err = json.Unmarshal(payload, &playerReceived) require.NoError(t, err) - playerReceived.ApplyDeprecatedPatch() + playerReceived.ApplyDeprecatedPatchOnReceive() require.Equal(t, player.ID, playerReceived.ID) require.Equal(t, player.Name, playerReceived.Name) - require.Equal(t, player.OnlineTimestamp.UnixMilli(), playerReceived.OnlineTimestamp.UnixMilli()) - require.Equal(t, player.OnlineTimestamp.UnixMilli(), playerReceived.OnlineTimestampMilliseconds) + require.Equal(t, now.UnixMilli(), playerReceived.OnlineTimestamp.UnixMilli()) + require.Equal(t, now.UnixMilli(), playerReceived.OnlineTimestampMilliseconds) +} + +func TestOnlineTimestampMigrationForward(t *testing.T) { + now := time.Now() + + player := Player{ + ID: PlayerID(gofakeit.LetterN(5)), + Name: gofakeit.Username(), + Online: true, + OnlineTimestampMilliseconds: now.UnixMilli(), + } + + player.ApplyDeprecatedPatchOnSend() + + payload, err := json.Marshal(player) + require.NoError(t, err) + + var playerReceived Player + err = json.Unmarshal(payload, &playerReceived) + require.NoError(t, err) + + require.Equal(t, player.ID, playerReceived.ID) + require.Equal(t, player.Name, playerReceived.Name) + require.Equal(t, now.UnixMilli(), playerReceived.OnlineTimestamp.UnixMilli()) } diff --git a/pkg/storage/local_storage_test.go b/pkg/storage/local_storage_test.go index 4c35616..e4eefe1 100644 --- a/pkg/storage/local_storage_test.go +++ b/pkg/storage/local_storage_test.go @@ -8,7 +8,6 @@ import ( "github.com/shibukawa/configdir" "github.com/stretchr/testify/suite" "testing" - "time" ) func TestLocalStorage(t *testing.T) { @@ -91,9 +90,8 @@ func (s *Suite) TestRoomStorage() { s.Require().NoError(err) resetPlayersTimestamps := func(state *protocol.State) { - t := time.UnixMilli(0) for i := range state.Players { - state.Players[i].OnlineTimestamp = t + state.Players[i].OnlineTimestampMilliseconds = 0 } } From 5c2803612b985b25fd7d7bbb1adda5a1f8dcd58f Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Fri, 7 Jun 2024 00:21:02 +0100 Subject: [PATCH 6/9] chore: added TestNoClock --- pkg/game/options_test.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/pkg/game/options_test.go b/pkg/game/options_test.go index 770da73..c623e78 100644 --- a/pkg/game/options_test.go +++ b/pkg/game/options_test.go @@ -55,6 +55,16 @@ func TestOptions(t *testing.T) { func TestNoTransport(t *testing.T) { options := []Option{ WithTransport(nil), + WithClock(clockwork.NewFakeClock()), + } + game := NewGame(options) + require.Nil(t, game) +} + +func TestNoClock(t *testing.T) { + options := []Option{ + WithClock(nil), + WithTransport(&mocktransport.MockService{}), } game := NewGame(options) require.Nil(t, game) From c75ae2444c8c1af469d56382638cdd5af406cbdd Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Fri, 7 Jun 2024 00:21:21 +0100 Subject: [PATCH 7/9] chore: extracted handlers from Game.handleMessage --- pkg/game/game.go | 113 +++------------------------------------- pkg/game/game_handle.go | 109 ++++++++++++++++++++++++++++++++++++-- pkg/game/game_test.go | 5 +- 3 files changed, 116 insertions(+), 111 deletions(-) diff --git a/pkg/game/game.go b/pkg/game/game.go index ce975a6..38d897d 100644 --- a/pkg/game/game.go +++ b/pkg/game/game.go @@ -152,122 +152,25 @@ func (g *Game) handleMessage(payload []byte) { switch message.Type { case protocol.MessageTypeState: - if g.isDealer { - return - } - g.logger.Debug("state message received", - zap.Int64("timestamp", message.Timestamp), - zap.Int64("localTimestamp", g.stateTimestamp), - ) - if message.Timestamp < g.stateTimestamp { - logger.Warn("ignoring outdated state message") - return - } - var stateMessage protocol.GameStateMessage - err := json.Unmarshal(payload, &stateMessage) - if err != nil { - logger.Error("failed to unmarshal message", zap.Error(err)) - return - } - if g.state != nil && stateMessage.State.ActiveIssue != g.state.ActiveIssue { - // Voting finished or new issue dealt. Reset our vote. - g.resetMyVote() + if !g.isDealer { + g.handleStateMessage(payload) } - g.state = &stateMessage.State - g.state.Deck, _ = GetDeck(Fibonacci) // FIXME: remove hardcoded deck - g.notifyChangedState(false) case protocol.MessageTypePlayerOnline: - if !g.isDealer { - return - } - var playerOnline protocol.PlayerOnlineMessage - err := json.Unmarshal(payload, &playerOnline) - if err != nil { - logger.Error("failed to unmarshal message", zap.Error(err)) - return + if g.isDealer { + g.handlePlayerOnlineMessage(payload) } - g.handlePlayerOnlineMessage(&playerOnline) case protocol.MessageTypePlayerOffline: - if !g.isDealer { - return - } - var playerOffline protocol.PlayerOfflineMessage - err := json.Unmarshal(payload, &playerOffline) - if err != nil { - logger.Error("failed to unmarshal message", zap.Error(err)) - return + if g.isDealer { + g.handlePlayerOfflineMessage(payload) } - g.handlePlayerOfflineMessage(&playerOffline) case protocol.MessageTypePlayerVote: - if !g.isDealer { - return - } - var playerVote protocol.PlayerVoteMessage - err := json.Unmarshal(payload, &playerVote) - - if err != nil { - logger.Error("failed to unmarshal message", zap.Error(err)) - return - } - - if g.state.VoteState() != protocol.VotingState { - g.logger.Warn("player vote ignored as not in voting state", - zap.Any("playerID", playerVote.PlayerID), - ) - return - } - - if playerVote.VoteResult.Value != "" && !slices.Contains(g.state.Deck, playerVote.VoteResult.Value) { - logger.Warn("player vote ignored as not found in deck", - zap.Any("playerID", playerVote.PlayerID), - zap.Any("vote", playerVote.VoteResult), - zap.Any("deck", g.state.Deck)) - return - } - - if g.state.ActiveIssue != playerVote.Issue { - g.logger.Warn("player vote ignored as not for the current vote item", - zap.Any("playerID", playerVote.PlayerID), - zap.Any("voteFor", playerVote.Issue), - zap.Any("currentVoteItemID", g.state.ActiveIssue), - ) - return - } - - item := g.state.Issues.Get(playerVote.Issue) - if item == nil { - logger.Error("vote item not found", zap.Any("voteFor", playerVote.Issue)) - return - } - - currentVote, voteExist := item.Votes[playerVote.PlayerID] - if voteExist && currentVote.Timestamp >= playerVote.Timestamp { - logger.Warn("player vote ignored as outdated", - zap.Any("playerID", playerVote.PlayerID), - zap.Any("currentVote", currentVote), - zap.Any("receivedVote", playerVote.VoteResult), - ) - return - } - - g.logger.Info("player vote accepted", - zap.String("name", string(playerVote.PlayerID)), - zap.String("voteFor", string(playerVote.Issue)), - zap.String("voteResult", string(playerVote.VoteResult.Value)), - zap.Any("timestamp", playerVote.Timestamp), - ) - - if playerVote.VoteResult.Value == "" { - delete(item.Votes, playerVote.PlayerID) - } else { - item.Votes[playerVote.PlayerID] = playerVote.VoteResult + if g.isDealer { + g.handlePlayerVoteMessage(payload) } - g.notifyChangedState(true) - default: logger.Warn("unsupported message type") } diff --git a/pkg/game/game_handle.go b/pkg/game/game_handle.go index 967927c..32b1bd7 100644 --- a/pkg/game/game_handle.go +++ b/pkg/game/game_handle.go @@ -2,12 +2,40 @@ package game import ( "2sp/pkg/protocol" + "encoding/json" "go.uber.org/zap" + "golang.org/x/exp/slices" ) -func (g *Game) handlePlayerOnlineMessage(message *protocol.PlayerOnlineMessage) { - g.logger.Info("player online message received", zap.Any("player", message.Player)) +func (g *Game) handleStateMessage(payload []byte) { + var message protocol.GameStateMessage + err := json.Unmarshal(payload, &message) + if err != nil { + g.logger.Error("failed to unmarshal message", zap.Error(err)) + return + } + + g.logger.Info("state message received", zap.Any("state", message.State)) + + if g.state != nil && message.State.ActiveIssue != g.state.ActiveIssue { + // Voting finished or new issue dealt. Reset our vote. + g.resetMyVote() + } + + g.state = &message.State + g.state.Deck, _ = GetDeck(Fibonacci) // FIXME: remove hardcoded deck + g.notifyChangedState(false) +} + +func (g *Game) handlePlayerOnlineMessage(payload []byte) { + var message protocol.PlayerOnlineMessage + err := json.Unmarshal(payload, &message) + if err != nil { + g.logger.Error("failed to unmarshal message", zap.Error(err)) + return + } + g.logger.Info("player online message received", zap.Any("player", message.Player)) message.Player.ApplyDeprecatedPatchOnReceive() // TODO: Store player pointers in a map @@ -36,9 +64,15 @@ func (g *Game) handlePlayerOnlineMessage(message *protocol.PlayerOnlineMessage) g.notifyChangedState(true) } -func (g *Game) handlePlayerOfflineMessage(message *protocol.PlayerOfflineMessage) { - g.logger.Info("player is offline", zap.Any("player", message.Player)) +func (g *Game) handlePlayerOfflineMessage(payload []byte) { + var message protocol.PlayerOfflineMessage + err := json.Unmarshal(payload, &message) + if err != nil { + g.logger.Error("failed to unmarshal message", zap.Error(err)) + return + } + g.logger.Info("player is offline", zap.Any("player", message.Player)) index := g.playerIndex(message.Player.ID) if index < 0 { return @@ -47,3 +81,70 @@ func (g *Game) handlePlayerOfflineMessage(message *protocol.PlayerOfflineMessage g.state.Players[index].Online = false g.notifyChangedState(true) } + +func (g *Game) handlePlayerVoteMessage(payload []byte) { + var message protocol.PlayerVoteMessage + err := json.Unmarshal(payload, &message) + + if err != nil { + g.logger.Error("failed to unmarshal message", zap.Error(err)) + return + } + + g.logger.Info("player vote message received", zap.Any("player", message.PlayerID)) + + if g.state.VoteState() != protocol.VotingState { + g.logger.Warn("player vote ignored as not in voting state", + zap.Any("playerID", message.PlayerID), + ) + return + } + + if message.VoteResult.Value != "" && !slices.Contains(g.state.Deck, message.VoteResult.Value) { + g.logger.Warn("player vote ignored as not found in deck", + zap.Any("playerID", message.PlayerID), + zap.Any("vote", message.VoteResult), + zap.Any("deck", g.state.Deck)) + return + } + + if g.state.ActiveIssue != message.Issue { + g.logger.Warn("player vote ignored as not for the current vote item", + zap.Any("playerID", message.PlayerID), + zap.Any("voteFor", message.Issue), + zap.Any("currentVoteItemID", g.state.ActiveIssue), + ) + return + } + + item := g.state.Issues.Get(message.Issue) + if item == nil { + g.logger.Error("vote item not found", zap.Any("voteFor", message.Issue)) + return + } + + currentVote, voteExist := item.Votes[message.PlayerID] + if voteExist && currentVote.Timestamp >= message.Timestamp { + g.logger.Warn("player vote ignored as outdated", + zap.Any("playerID", message.PlayerID), + zap.Any("currentVote", currentVote), + zap.Any("receivedVote", message.VoteResult), + ) + return + } + + g.logger.Info("player vote accepted", + zap.String("name", string(message.PlayerID)), + zap.String("voteFor", string(message.Issue)), + zap.String("voteResult", string(message.VoteResult.Value)), + zap.Any("timestamp", message.Timestamp), + ) + + if message.VoteResult.Value == "" { + delete(item.Votes, message.PlayerID) + } else { + item.Votes[message.PlayerID] = message.VoteResult + } + + g.notifyChangedState(true) +} diff --git a/pkg/game/game_test.go b/pkg/game/game_test.go index 8eb1abe..fd2fe47 100644 --- a/pkg/game/game_test.go +++ b/pkg/game/game_test.go @@ -423,13 +423,14 @@ func (s *Suite) TestOnlineState() { _ = stateMatcher.Wait() // Player joins the room - playerOnlineMessage := &protocol.PlayerOnlineMessage{ + playerOnlineMessage, err := json.Marshal(&protocol.PlayerOnlineMessage{ Message: protocol.Message{ Type: protocol.MessageTypePlayerOnline, Timestamp: s.clock.Now().UnixMilli(), }, Player: player, - } + }) + s.Require().NoError(err) stateMatcher = s.newStateMatcher() s.transport.EXPECT(). From 24e7a461e9be9e66d375d8b560b7c1a44a3834e4 Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Fri, 7 Jun 2024 00:27:28 +0100 Subject: [PATCH 8/9] chore: simplify handlePlayerVoteMessage --- pkg/game/game.go | 3 --- pkg/game/game_handle.go | 23 +++++++++-------------- 2 files changed, 9 insertions(+), 17 deletions(-) diff --git a/pkg/game/game.go b/pkg/game/game.go index 38d897d..ccb0496 100644 --- a/pkg/game/game.go +++ b/pkg/game/game.go @@ -477,11 +477,9 @@ func (g *Game) JoinRoom(roomID protocol.RoomID, state *protocol.State) error { if g.RoomID() == roomID { return errors.New("already in this room") } - if g.room != nil { return errors.New("exit current room to join another one") } - if roomID.Empty() { return errors.New("empty room ID") } @@ -530,7 +528,6 @@ func (g *Game) JoinRoom(roomID protocol.RoomID, state *protocol.State) error { if state == nil { g.logger.Info("joined room", zap.Any("roomID", roomID)) - } else { g.stateTimestamp = g.timestamp() g.logger.Info("loaded room", zap.Any("roomID", roomID), zap.Bool("isDealer", g.isDealer)) diff --git a/pkg/game/game_handle.go b/pkg/game/game_handle.go index 32b1bd7..5302653 100644 --- a/pkg/game/game_handle.go +++ b/pkg/game/game_handle.go @@ -91,26 +91,23 @@ func (g *Game) handlePlayerVoteMessage(payload []byte) { return } - g.logger.Info("player vote message received", zap.Any("player", message.PlayerID)) + logger := g.logger.With(zap.Any("playerID", message.PlayerID)) + logger.Info("player vote message received") if g.state.VoteState() != protocol.VotingState { - g.logger.Warn("player vote ignored as not in voting state", - zap.Any("playerID", message.PlayerID), - ) + g.logger.Warn("player vote ignored as not in voting state") return } if message.VoteResult.Value != "" && !slices.Contains(g.state.Deck, message.VoteResult.Value) { - g.logger.Warn("player vote ignored as not found in deck", - zap.Any("playerID", message.PlayerID), + logger.Warn("player vote ignored as not found in deck", zap.Any("vote", message.VoteResult), zap.Any("deck", g.state.Deck)) return } if g.state.ActiveIssue != message.Issue { - g.logger.Warn("player vote ignored as not for the current vote item", - zap.Any("playerID", message.PlayerID), + logger.Warn("player vote ignored as not for the current vote item", zap.Any("voteFor", message.Issue), zap.Any("currentVoteItemID", g.state.ActiveIssue), ) @@ -125,18 +122,16 @@ func (g *Game) handlePlayerVoteMessage(payload []byte) { currentVote, voteExist := item.Votes[message.PlayerID] if voteExist && currentVote.Timestamp >= message.Timestamp { - g.logger.Warn("player vote ignored as outdated", - zap.Any("playerID", message.PlayerID), + logger.Warn("player vote ignored as outdated", zap.Any("currentVote", currentVote), zap.Any("receivedVote", message.VoteResult), ) return } - g.logger.Info("player vote accepted", - zap.String("name", string(message.PlayerID)), - zap.String("voteFor", string(message.Issue)), - zap.String("voteResult", string(message.VoteResult.Value)), + logger.Info("player vote accepted", + zap.Any("voteFor", message.Issue), + zap.Any("voteResult", message.VoteResult.Value), zap.Any("timestamp", message.Timestamp), ) From 8ac9232296a89f74fb47cbeef9f1d8bf54a32031 Mon Sep 17 00:00:00 2001 From: Igor Sirotin Date: Fri, 7 Jun 2024 00:32:25 +0100 Subject: [PATCH 9/9] chore: extract startRoutines from JoinRoom --- pkg/game/game.go | 46 ++++++++++++++++++++++++++++++---------------- 1 file changed, 30 insertions(+), 16 deletions(-) diff --git a/pkg/game/game.go b/pkg/game/game.go index ccb0496..af94dec 100644 --- a/pkg/game/game.go +++ b/pkg/game/game.go @@ -497,12 +497,6 @@ func (g *Game) JoinRoom(roomID protocol.RoomID, state *protocol.State) error { } g.exitRoom = make(chan struct{}) - - sub, err := g.transport.SubscribeToMessages(room) - if err != nil { - return errors.Wrap(err, "failed to subscribe to messages") - } - g.isDealer = state != nil g.room = room g.roomID = roomID @@ -511,19 +505,14 @@ func (g *Game) JoinRoom(roomID protocol.RoomID, state *protocol.State) error { if g.isDealer { g.state.Deck, _ = GetDeck(Fibonacci) // FIXME: remove hardcoded deck } + g.resetMyVote() - go g.loopPublishedMessages() - go g.processIncomingMessages(sub) - if g.codeControls.EnablePublishOnlineState { - go g.publishOnlineState() - } - if g.isDealer { - if g.config.PublishStateLoopEnabled { - go g.publishStateLoop() - } - go g.watchPlayersStateLoop() + err = g.startRoutines() + if err != nil { + return errors.Wrap(err, "failed to start routines") } + g.notifyChangedState(g.isDealer) if state == nil { @@ -536,6 +525,31 @@ func (g *Game) JoinRoom(roomID protocol.RoomID, state *protocol.State) error { return nil } +func (g *Game) startRoutines() error { + sub, err := g.transport.SubscribeToMessages(g.room) + if err != nil { + return errors.Wrap(err, "failed to subscribe to messages") + } + + go g.loopPublishedMessages() + go g.processIncomingMessages(sub) + + if g.codeControls.EnablePublishOnlineState { + go g.publishOnlineState() + } + + if !g.isDealer { + return nil + } + + if g.config.PublishStateLoopEnabled { + go g.publishStateLoop() + } + go g.watchPlayersStateLoop() + + return nil +} + func (g *Game) IsDealer() bool { return g.isDealer }