diff --git a/Makefile b/Makefile index 5daceee..c229e8d 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ build-all: generate @go build -v ./... run: generate - @go run -buildvcs=true ./cmd/2sp + @go run ./cmd/2sp generate: @go generate ./... @@ -20,3 +20,6 @@ lint: lint-fix: golangci-lint run --fix ./... + +demo: + @go run ./cmd/2sp --anonymous --demo --name=Alice diff --git a/cmd/2sp/demo/demo.go b/cmd/2sp/demo/demo.go index d4c6169..aaaebe8 100644 --- a/cmd/2sp/demo/demo.go +++ b/cmd/2sp/demo/demo.go @@ -35,19 +35,19 @@ var votes = [][]string{ type Demo struct { ctx context.Context dealer *game.Game - state game.StateSubscription + events *game.Subscription program *tea.Program logger *zap.Logger players []*game.Game - playerSubs []game.StateSubscription + playerSubs []*game.Subscription } func New(ctx context.Context, dealer *game.Game, program *tea.Program) *Demo { return &Demo{ ctx: ctx, dealer: dealer, - state: dealer.SubscribeToStateChanges(), + events: dealer.Subscribe(), program: program, logger: config.Logger.Named("demo"), } @@ -64,7 +64,7 @@ func (d *Demo) Stop() { func (d *Demo) initializePlayers() error { names := []string{"Alice", "Bob", "Charlie"} d.players = make([]*game.Game, 0, len(names)) - d.playerSubs = make([]game.StateSubscription, 0, len(names)) + d.playerSubs = make([]*game.Subscription, 0, len(names)) wg := sync.WaitGroup{} errChan := make(chan error, len(names)) @@ -79,7 +79,7 @@ func (d *Demo) initializePlayers() error { return } d.players = append(d.players, player) - d.playerSubs = append(d.playerSubs, player.SubscribeToStateChanges()) + d.playerSubs = append(d.playerSubs, player.Subscribe()) }(i, name) } @@ -176,7 +176,7 @@ func (d *Demo) issueSubRoutine(issueID protocol.IssueID, votes []string) error { }(i, vote) } - // Deal first issue (expect all players to be subscribed to state changes + // Deal first issue (expect all players to be subscribed to events changes d.sendKey(tea.KeyEnter, 1*time.Second) d.logger.Info("deal issue") @@ -204,7 +204,7 @@ func (d *Demo) issueSubRoutine(issueID protocol.IssueID, votes []string) error { // Reveal votes d.sendShortcut(commands.DefaultKeyMap.RevealVotes) d.logger.Info("votes revealed") - err = d.waitForStateCondition(d.state, func(state *protocol.State) bool { + err = d.waitForStateCondition(d.events, func(state *protocol.State) bool { return state.VotesRevealed }) if err != nil { @@ -232,7 +232,7 @@ func (d *Demo) playerVoteSubroutine(index int, issueID protocol.IssueID, vote pr delay := time.Duration(rand.Intn(4000)) * time.Millisecond time.Sleep(delay) - // Publish vote + // Send vote err = player.PublishVote(vote) if err != nil { return errors.Wrap(err, fmt.Sprintf("%s: failed to publish vote", playerName)) @@ -300,36 +300,40 @@ func (d *Demo) createPlayer(name string) (*game.Game, error) { return player, nil } -func (d *Demo) waitForStateCondition(sub game.StateSubscription, condition func(state *protocol.State) bool) error { +func (d *Demo) waitForStateCondition(sub *game.Subscription, condition func(state *protocol.State) bool) error { timeout := time.After(10 * time.Second) for { select { - case state := <-sub: + case event := <-sub.Events: + if event.Tag != game.EventStateChanged { + continue + } + state := event.Data.(*protocol.State) if condition(state) { time.Sleep(500 * time.Millisecond) return nil } case <-timeout: - return errors.New("timeout waiting for state condition") + return errors.New("timeout waiting for events condition") case <-d.ctx.Done(): } } } func (d *Demo) waitForPlayers(players []*game.Game) error { - return d.waitForStateCondition(d.state, func(state *protocol.State) bool { + return d.waitForStateCondition(d.events, func(state *protocol.State) bool { return len(state.Players) == len(players) }) } func (d *Demo) waitForIssues(count int) error { - return d.waitForStateCondition(d.state, func(state *protocol.State) bool { + return d.waitForStateCondition(d.events, func(state *protocol.State) bool { return len(state.Issues) == count }) } func (d *Demo) waitForVotes(votes []string) error { - return d.waitForStateCondition(d.state, func(state *protocol.State) bool { + return d.waitForStateCondition(d.events, func(state *protocol.State) bool { issue := state.GetActiveIssue() if issue == nil { return false @@ -338,7 +342,7 @@ func (d *Demo) waitForVotes(votes []string) error { }) } -func (d *Demo) waitForIssueDealt(sub game.StateSubscription, issueID protocol.IssueID) error { +func (d *Demo) waitForIssueDealt(sub *game.Subscription, issueID protocol.IssueID) error { return d.waitForStateCondition(sub, func(state *protocol.State) bool { return state.ActiveIssue == issueID }) diff --git a/internal/transport/waku.go b/internal/transport/waku.go index 6bc4502..a1392b7 100644 --- a/internal/transport/waku.go +++ b/internal/transport/waku.go @@ -481,9 +481,6 @@ func (n *Node) SubscribeToMessages(room *pp.Room) (*MessagesSubscription, error) case <-leaveRoom: return case value := <-in: - n.logger.Info("waku message received (relay)", - zap.String("payload", string(value.Message().Payload)), - ) payload, err := decryptMessage(room, value.Message()) if err != nil { n.logger.Warn("failed to decrypt message payload") diff --git a/internal/view/model.go b/internal/view/model.go index d952fd3..b4f09e1 100644 --- a/internal/view/model.go +++ b/internal/view/model.go @@ -56,7 +56,7 @@ type model struct { deckView deckview.Model issueView issueview.Model issuesListView issuesview.Model - gameEventHandler eventhandler.Model[*protocol.State, messages.GameStateMessage] + gameEventHandler eventhandler.Model[game.Event, interface{}] transportEventHandler eventhandler.Model[transport.ConnectionStatus, messages.ConnectionStatus] // Workaround: Used to allow pasting multiline text (list of issues) @@ -144,7 +144,7 @@ func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { case messages.AppStateFinishedMessage: switch msg.State { case states.Initializing: - // Notify playerID generated + // Send playerID generated cmds.AppendMessage(messages.PlayerIDMessage{ PlayerID: m.game.Player().ID, }) @@ -156,23 +156,7 @@ func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { } // Subscribe to states when app initialized - convert := func(status transport.ConnectionStatus) messages.ConnectionStatus { - return messages.ConnectionStatus{Status: status} - } - m.transportEventHandler = eventhandler.New[transport.ConnectionStatus, messages.ConnectionStatus](convert) - cmds.AppendCommand(m.transportEventHandler.Init( - m.transport.SubscribeToConnectionStatus(), - m.transport.ConnectionStatus(), - )) - - convert2 := func(status *protocol.State) messages.GameStateMessage { - return messages.GameStateMessage{State: status} - } - m.gameEventHandler = eventhandler.New[*protocol.State, messages.GameStateMessage](convert2) - cmds.AppendCommand(m.gameEventHandler.Init( - m.game.SubscribeToStateChanges(), - m.game.CurrentState(), - )) + cmds.AppendCommand(m.initializeEventHandlers()) case states.InputPlayerName: switchToState(states.WaitingForPeers) @@ -406,6 +390,42 @@ func (m *model) handlePastedText(text string) (tea.Msg, tea.Cmd) { return nil, tea.Batch(cmds...) } +func connectionStatusToMessage(status transport.ConnectionStatus) messages.ConnectionStatus { + return messages.ConnectionStatus{Status: status} +} + +func gameEventToMessage(event game.Event) interface{} { + switch event.Tag { + case game.EventStateChanged: + if state, ok := event.Data.(*protocol.State); ok { + return messages.GameStateMessage{State: state} + } + default: + return nil + } + config.Logger.Warn("unexpected event data type", zap.Any("event", event)) + return nil +} + +func (m *model) initializeEventHandlers() tea.Cmd { + m.transportEventHandler = eventhandler.New[transport.ConnectionStatus, messages.ConnectionStatus](connectionStatusToMessage) + cmd1 := m.transportEventHandler.Init( + m.transport.SubscribeToConnectionStatus(), + m.transport.ConnectionStatus(), + ) + + m.gameEventHandler = eventhandler.New[game.Event, interface{}](gameEventToMessage) + cmd2 := m.gameEventHandler.Init( + m.game.Subscribe().Events, + game.Event{ + Tag: game.EventStateChanged, + Data: m.game.CurrentState(), + }, + ) + + return tea.Batch(cmd1, cmd2) +} + // Ensure that model fulfils the tea.Model interface at compile time. // ref: https://www.inngest.com/blog/interactive-clis-with-bubbletea var _ tea.Model = (*model)(nil) diff --git a/pkg/game/config.go b/pkg/game/config.go index b028eae..d61b225 100644 --- a/pkg/game/config.go +++ b/pkg/game/config.go @@ -2,7 +2,7 @@ package game import "time" -type gameConfig struct { +type configuration struct { PlayerName string EnableSymmetricEncryption bool OnlineMessagePeriod time.Duration @@ -10,7 +10,7 @@ type gameConfig struct { PublishStateLoopEnabled bool } -var defaultConfig = gameConfig{ +var defaultConfig = configuration{ PlayerName: "", EnableSymmetricEncryption: true, OnlineMessagePeriod: 5 * time.Second, diff --git a/pkg/game/events.go b/pkg/game/events.go new file mode 100644 index 0000000..b43ac4c --- /dev/null +++ b/pkg/game/events.go @@ -0,0 +1,59 @@ +package game + +type EventTag int + +const ( + EventStateChanged EventTag = iota +) + +type Event struct { + Tag EventTag + Data interface{} +} + +type Subscription struct { + Events chan Event +} + +type EventPublisher interface { + Publish(tag EventTag, data interface{}) +} + +type EventSubscriber interface { + Subscribe(tag EventTag) *Subscription +} + +type EventManager struct { + subscriptions []*Subscription +} + +func NewEventManager() *EventManager { + return &EventManager{ + subscriptions: make([]*Subscription, 0, 1), + } +} + +func (m *EventManager) Send(event Event) { + for _, sub := range m.subscriptions { + sub.Events <- event + } +} + +func (m *EventManager) Subscribe() *Subscription { + subscription := &Subscription{ + Events: make(chan Event, 10), + } + m.subscriptions = append(m.subscriptions, subscription) + return subscription +} + +func (m *EventManager) Count() int { + return len(m.subscriptions) +} + +func (m *EventManager) Close() { + for _, sub := range m.subscriptions { + close(sub.Events) + } + m.subscriptions = nil +} diff --git a/pkg/game/events_test.go b/pkg/game/events_test.go new file mode 100644 index 0000000..6c255e9 --- /dev/null +++ b/pkg/game/events_test.go @@ -0,0 +1,40 @@ +package game + +import ( + "testing" + + "github.com/brianvoe/gofakeit/v6" + "github.com/stretchr/testify/require" +) + +func TestEventManager(t *testing.T) { + const subscribersCount = 3 + manager := NewEventManager() + + subs := make([]Subscription, subscribersCount) + for i := range subs { + subs[i] = *manager.Subscribe() + } + + require.Equal(t, subscribersCount, manager.Count()) + + event := Event{} + err := gofakeit.Struct(&event) + require.NoError(t, err) + + manager.Send(event) + + for i := range subs { + require.Equal(t, event, <-subs[i].Events) + } + + manager.Close() + require.Empty(t, manager.subscriptions) + + for i := range subs { + _, ok := <-subs[i].Events + require.False(t, ok) + } + + require.Zero(t, manager.Count()) +} diff --git a/pkg/game/game.go b/pkg/game/game.go index 4d7dd23..dea392c 100644 --- a/pkg/game/game.go +++ b/pkg/game/game.go @@ -24,8 +24,6 @@ var ( playerOnlineTimeout = 20 * time.Second ) -type StateSubscription chan *protocol.State - type Game struct { logger *zap.Logger ctx context.Context @@ -34,6 +32,7 @@ type Game struct { clock clockwork.Clock exitRoom chan struct{} messages chan []byte + config configuration features FeatureFlags codeControls codeControlFlags initialized bool @@ -42,12 +41,11 @@ type Game struct { player *protocol.Player myVote protocol.VoteResult // We save our vote to show it in UI - room *protocol.Room - roomID protocol.RoomID - state *protocol.State - stateTimestamp int64 - stateSubscribers []StateSubscription - config gameConfig + room *protocol.Room + roomID protocol.RoomID + state *protocol.State + stateTimestamp int64 + events EventManager } func NewGame(opts []Option) *Game { @@ -137,10 +135,7 @@ func (g *Game) LeaveRoom() { } func (g *Game) Stop() { - for _, subscriber := range g.stateSubscribers { - close(subscriber) - } - g.stateSubscribers = nil + g.events.Close() g.LeaveRoom() // WARNING: wait for all routines to finish } @@ -182,10 +177,8 @@ func (g *Game) handleMessage(payload []byte) { } } -func (g *Game) SubscribeToStateChanges() StateSubscription { - channel := make(StateSubscription, 10) - g.stateSubscribers = append(g.stateSubscribers, channel) - return channel +func (g *Game) Subscribe() *Subscription { + return g.events.Subscribe() } func (g *Game) CurrentState() *protocol.State { @@ -201,13 +194,14 @@ func (g *Game) notifyChangedState(publish bool) { g.logger.Debug("notifying state change", zap.Bool("publish", publish), - zap.Int("subscribers", len(g.stateSubscribers)), + zap.Int("subscribers", g.events.Count()), zap.Any("state", state), ) - for _, subscriber := range g.stateSubscribers { - subscriber <- state - } + g.events.Send(Event{ + Tag: EventStateChanged, + Data: state, + }) if publish { go g.publishState(state)