Skip to content

Commit

Permalink
feat: game events
Browse files Browse the repository at this point in the history
  • Loading branch information
igor-sirotin committed Jul 14, 2024
1 parent ae7ea50 commit 06f5bfa
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 58 deletions.
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ build-all: generate
@go build -v ./...

run: generate
@go run -buildvcs=true ./cmd/2sp
@go run ./cmd/2sp

generate:
@go generate ./...
Expand All @@ -20,3 +20,6 @@ lint:

lint-fix:
golangci-lint run --fix ./...

demo:
@go run ./cmd/2sp --anonymous --demo --name=Alice
34 changes: 19 additions & 15 deletions cmd/2sp/demo/demo.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,19 +35,19 @@ var votes = [][]string{
type Demo struct {
ctx context.Context
dealer *game.Game
state game.StateSubscription
events *game.Subscription
program *tea.Program
logger *zap.Logger

players []*game.Game
playerSubs []game.StateSubscription
playerSubs []*game.Subscription
}

func New(ctx context.Context, dealer *game.Game, program *tea.Program) *Demo {
return &Demo{
ctx: ctx,
dealer: dealer,
state: dealer.SubscribeToStateChanges(),
events: dealer.Subscribe(),
program: program,
logger: config.Logger.Named("demo"),
}
Expand All @@ -64,7 +64,7 @@ func (d *Demo) Stop() {
func (d *Demo) initializePlayers() error {
names := []string{"Alice", "Bob", "Charlie"}
d.players = make([]*game.Game, 0, len(names))
d.playerSubs = make([]game.StateSubscription, 0, len(names))
d.playerSubs = make([]*game.Subscription, 0, len(names))

wg := sync.WaitGroup{}
errChan := make(chan error, len(names))
Expand All @@ -79,7 +79,7 @@ func (d *Demo) initializePlayers() error {
return
}
d.players = append(d.players, player)
d.playerSubs = append(d.playerSubs, player.SubscribeToStateChanges())
d.playerSubs = append(d.playerSubs, player.Subscribe())
}(i, name)
}

Expand Down Expand Up @@ -176,7 +176,7 @@ func (d *Demo) issueSubRoutine(issueID protocol.IssueID, votes []string) error {
}(i, vote)
}

// Deal first issue (expect all players to be subscribed to state changes
// Deal first issue (expect all players to be subscribed to events changes
d.sendKey(tea.KeyEnter, 1*time.Second)
d.logger.Info("deal issue")

Expand Down Expand Up @@ -204,7 +204,7 @@ func (d *Demo) issueSubRoutine(issueID protocol.IssueID, votes []string) error {
// Reveal votes
d.sendShortcut(commands.DefaultKeyMap.RevealVotes)
d.logger.Info("votes revealed")
err = d.waitForStateCondition(d.state, func(state *protocol.State) bool {
err = d.waitForStateCondition(d.events, func(state *protocol.State) bool {
return state.VotesRevealed
})
if err != nil {
Expand Down Expand Up @@ -232,7 +232,7 @@ func (d *Demo) playerVoteSubroutine(index int, issueID protocol.IssueID, vote pr
delay := time.Duration(rand.Intn(4000)) * time.Millisecond
time.Sleep(delay)

// Publish vote
// Send vote
err = player.PublishVote(vote)
if err != nil {
return errors.Wrap(err, fmt.Sprintf("%s: failed to publish vote", playerName))
Expand Down Expand Up @@ -300,36 +300,40 @@ func (d *Demo) createPlayer(name string) (*game.Game, error) {
return player, nil
}

func (d *Demo) waitForStateCondition(sub game.StateSubscription, condition func(state *protocol.State) bool) error {
func (d *Demo) waitForStateCondition(sub *game.Subscription, condition func(state *protocol.State) bool) error {
timeout := time.After(10 * time.Second)
for {
select {
case state := <-sub:
case event := <-sub.Events:
if event.Tag != game.EventStateChanged {
continue
}
state := event.Data.(*protocol.State)
if condition(state) {
time.Sleep(500 * time.Millisecond)
return nil
}
case <-timeout:
return errors.New("timeout waiting for state condition")
return errors.New("timeout waiting for events condition")
case <-d.ctx.Done():
}
}
}

func (d *Demo) waitForPlayers(players []*game.Game) error {
return d.waitForStateCondition(d.state, func(state *protocol.State) bool {
return d.waitForStateCondition(d.events, func(state *protocol.State) bool {
return len(state.Players) == len(players)
})
}

func (d *Demo) waitForIssues(count int) error {
return d.waitForStateCondition(d.state, func(state *protocol.State) bool {
return d.waitForStateCondition(d.events, func(state *protocol.State) bool {
return len(state.Issues) == count
})
}

func (d *Demo) waitForVotes(votes []string) error {
return d.waitForStateCondition(d.state, func(state *protocol.State) bool {
return d.waitForStateCondition(d.events, func(state *protocol.State) bool {
issue := state.GetActiveIssue()
if issue == nil {
return false
Expand All @@ -338,7 +342,7 @@ func (d *Demo) waitForVotes(votes []string) error {
})
}

func (d *Demo) waitForIssueDealt(sub game.StateSubscription, issueID protocol.IssueID) error {
func (d *Demo) waitForIssueDealt(sub *game.Subscription, issueID protocol.IssueID) error {
return d.waitForStateCondition(sub, func(state *protocol.State) bool {
return state.ActiveIssue == issueID
})
Expand Down
3 changes: 0 additions & 3 deletions internal/transport/waku.go
Original file line number Diff line number Diff line change
Expand Up @@ -481,9 +481,6 @@ func (n *Node) SubscribeToMessages(room *pp.Room) (*MessagesSubscription, error)
case <-leaveRoom:
return
case value := <-in:
n.logger.Info("waku message received (relay)",
zap.String("payload", string(value.Message().Payload)),
)
payload, err := decryptMessage(room, value.Message())
if err != nil {
n.logger.Warn("failed to decrypt message payload")
Expand Down
58 changes: 39 additions & 19 deletions internal/view/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type model struct {
deckView deckview.Model
issueView issueview.Model
issuesListView issuesview.Model
gameEventHandler eventhandler.Model[*protocol.State, messages.GameStateMessage]
gameEventHandler eventhandler.Model[game.Event, interface{}]
transportEventHandler eventhandler.Model[transport.ConnectionStatus, messages.ConnectionStatus]

// Workaround: Used to allow pasting multiline text (list of issues)
Expand Down Expand Up @@ -144,7 +144,7 @@ func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
case messages.AppStateFinishedMessage:
switch msg.State {
case states.Initializing:
// Notify playerID generated
// Send playerID generated
cmds.AppendMessage(messages.PlayerIDMessage{
PlayerID: m.game.Player().ID,
})
Expand All @@ -156,23 +156,7 @@ func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
}

// Subscribe to states when app initialized
convert := func(status transport.ConnectionStatus) messages.ConnectionStatus {
return messages.ConnectionStatus{Status: status}
}
m.transportEventHandler = eventhandler.New[transport.ConnectionStatus, messages.ConnectionStatus](convert)
cmds.AppendCommand(m.transportEventHandler.Init(
m.transport.SubscribeToConnectionStatus(),
m.transport.ConnectionStatus(),
))

convert2 := func(status *protocol.State) messages.GameStateMessage {
return messages.GameStateMessage{State: status}
}
m.gameEventHandler = eventhandler.New[*protocol.State, messages.GameStateMessage](convert2)
cmds.AppendCommand(m.gameEventHandler.Init(
m.game.SubscribeToStateChanges(),
m.game.CurrentState(),
))
cmds.AppendCommand(m.initializeEventHandlers())

case states.InputPlayerName:
switchToState(states.WaitingForPeers)
Expand Down Expand Up @@ -406,6 +390,42 @@ func (m *model) handlePastedText(text string) (tea.Msg, tea.Cmd) {
return nil, tea.Batch(cmds...)
}

func connectionStatusToMessage(status transport.ConnectionStatus) messages.ConnectionStatus {
return messages.ConnectionStatus{Status: status}
}

func gameEventToMessage(event game.Event) interface{} {
switch event.Tag {
case game.EventStateChanged:
if state, ok := event.Data.(*protocol.State); ok {
return messages.GameStateMessage{State: state}
}
default:
return nil
}
config.Logger.Warn("unexpected event data type", zap.Any("event", event))
return nil
}

func (m *model) initializeEventHandlers() tea.Cmd {
m.transportEventHandler = eventhandler.New[transport.ConnectionStatus, messages.ConnectionStatus](connectionStatusToMessage)
cmd1 := m.transportEventHandler.Init(
m.transport.SubscribeToConnectionStatus(),
m.transport.ConnectionStatus(),
)

m.gameEventHandler = eventhandler.New[game.Event, interface{}](gameEventToMessage)
cmd2 := m.gameEventHandler.Init(
m.game.Subscribe().Events,
game.Event{
Tag: game.EventStateChanged,
Data: m.game.CurrentState(),
},
)

return tea.Batch(cmd1, cmd2)
}

// Ensure that model fulfils the tea.Model interface at compile time.
// ref: https://www.inngest.com/blog/interactive-clis-with-bubbletea
var _ tea.Model = (*model)(nil)
59 changes: 59 additions & 0 deletions pkg/game/events.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package game

type EventTag int

const (
EventStateChanged EventTag = iota
)

type Event struct {
Tag EventTag
Data interface{}
}

type Subscription struct {
Events chan Event
}

type EventPublisher interface {
Publish(tag EventTag, data interface{})
}

type EventSubscriber interface {
Subscribe(tag EventTag) *Subscription
}

type EventManager struct {
subscriptions []*Subscription
}

func NewEventManager() *EventManager {
return &EventManager{
subscriptions: make([]*Subscription, 0, 1),
}
}

func (m *EventManager) Send(event Event) {
for _, sub := range m.subscriptions {
sub.Events <- event
}
}

func (m *EventManager) Subscribe() *Subscription {
subscription := &Subscription{
Events: make(chan Event, 10),
}
m.subscriptions = append(m.subscriptions, subscription)
return subscription
}

func (m *EventManager) Count() int {
return len(m.subscriptions)
}

func (m *EventManager) Close() {
for _, sub := range m.subscriptions {
close(sub.Events)
}
m.subscriptions = nil
}
40 changes: 40 additions & 0 deletions pkg/game/events_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package game

import (
"testing"

"github.com/brianvoe/gofakeit/v6"
"github.com/stretchr/testify/require"
)

func TestEventManager(t *testing.T) {
const subscribersCount = 3
manager := NewEventManager()

subs := make([]Subscription, subscribersCount)
for i := range subs {
subs[i] = *manager.Subscribe()
}

require.Equal(t, subscribersCount, manager.Count())

event := Event{}
err := gofakeit.Struct(&event)
require.NoError(t, err)

manager.Send(event)

for i := range subs {
require.Equal(t, event, <-subs[i].Events)
}

manager.Close()
require.Empty(t, manager.subscriptions)

for i := range subs {
_, ok := <-subs[i].Events
require.False(t, ok)
}

require.Zero(t, manager.Count())
}
Loading

0 comments on commit 06f5bfa

Please sign in to comment.