Skip to content

Commit

Permalink
Merge pull request #667 from sputn1ck/expand_fsm
Browse files Browse the repository at this point in the history
fsm: expand fsm
  • Loading branch information
sputn1ck authored Dec 1, 2023
2 parents 1308e91 + 5b6f847 commit 3743a49
Show file tree
Hide file tree
Showing 3 changed files with 97 additions and 22 deletions.
4 changes: 2 additions & 2 deletions fsm/example_fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func NewExampleFSMContext(service ExampleService,
service: service,
store: store,
}
exampleFSM.StateMachine = NewStateMachine(exampleFSM.GetStates())
exampleFSM.StateMachine = NewStateMachine(exampleFSM.GetStates(), 10)

return exampleFSM
}
Expand All @@ -55,7 +55,7 @@ var (
// GetStates returns the states for the example FSM.
func (e *ExampleFSM) GetStates() States {
return States{
Default: State{
EmptyState: State{
Transitions: Transitions{
OnRequestStuff: InitFSM,
},
Expand Down
62 changes: 43 additions & 19 deletions fsm/fsm.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ var (
)

const (
// Default represents the default state of the system.
Default StateType = ""
// EmptyState represents the default state of the system.
EmptyState StateType = ""

// NoOp represents a no-op event.
NoOp EventType = "NoOp"
Expand Down Expand Up @@ -88,19 +88,19 @@ type StateMachine struct {

// ActionEntryFunc is a function that is called before an action is
// executed.
ActionEntryFunc func()
ActionEntryFunc func(Notification)

// ActionExitFunc is a function that is called after an action is
// executed.
ActionExitFunc func()

// mutex ensures that only 1 event is processed by the state machine at
// any given time.
mutex sync.Mutex
// executed, it is called with the EventType returned by the action.
ActionExitFunc func(NextEvent EventType)

// LastActionError is an error set by the last action executed.
LastActionError error

// DefaultObserver is the default observer that is notified when the
// state machine transitions between states.
DefaultObserver *CachedObserver

// previous represents the previous state.
previous StateType

Expand All @@ -114,13 +114,35 @@ type StateMachine struct {
// observerMutex ensures that observers are only added or removed
// safely.
observerMutex sync.Mutex

// mutex ensures that only 1 event is processed by the state machine at
// any given time.
mutex sync.Mutex
}

// NewStateMachine creates a new state machine.
func NewStateMachine(states States) *StateMachine {
func NewStateMachine(states States, observerSize int) *StateMachine {
return NewStateMachineWithState(states, EmptyState, observerSize)
}

// NewStateMachineWithState creates a new state machine and sets the initial
// state.
func NewStateMachineWithState(states States, current StateType,
observerSize int) *StateMachine {

observers := []Observer{}
var defaultObserver *CachedObserver

if observerSize > 0 {
defaultObserver = NewCachedObserver(observerSize)
observers = append(observers, defaultObserver)
}

return &StateMachine{
States: states,
observers: make([]Observer, 0),
States: states,
current: current,
DefaultObserver: defaultObserver,
observers: observers,
}
}

Expand Down Expand Up @@ -189,18 +211,20 @@ func (s *StateMachine) SendEvent(event EventType, eventCtx EventContext) error {

// Notify the state machine's observers.
s.observerMutex.Lock()
notification := Notification{
PreviousState: s.previous,
NextState: s.current,
Event: event,
}

for _, observer := range s.observers {
observer.Notify(Notification{
PreviousState: s.previous,
NextState: s.current,
Event: event,
})
observer.Notify(notification)
}
s.observerMutex.Unlock()

// Execute the state machines ActionEntryFunc.
if s.ActionEntryFunc != nil {
s.ActionEntryFunc()
s.ActionEntryFunc(notification)
}

// Execute the current state's entry function
Expand All @@ -219,7 +243,7 @@ func (s *StateMachine) SendEvent(event EventType, eventCtx EventContext) error {

// Execute the state machines ActionExitFunc.
if s.ActionExitFunc != nil {
s.ActionExitFunc()
s.ActionExitFunc(nextEvent)
}

// If the next event is a no-op, we're done.
Expand Down
53 changes: 52 additions & 1 deletion fsm/observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,60 @@ func (c *CachedObserver) GetCachedNotifications() []Notification {
return c.cachedNotifications.Get()
}

// WaitForStateOption is an option that can be passed to the WaitForState
// function.
type WaitForStateOption interface {
apply(*fsmOptions)
}

// fsmOptions is a struct that holds all options that can be passed to the
// WaitForState function.
type fsmOptions struct {
initialWait time.Duration
}

// InitialWaitOption is an option that can be passed to the WaitForState
// function to wait for a given duration before checking the state.
type InitialWaitOption struct {
initialWait time.Duration
}

// WithWaitForStateOption creates a new InitialWaitOption.
func WithWaitForStateOption(initialWait time.Duration) WaitForStateOption {
return &InitialWaitOption{
initialWait,
}
}

// apply implements the WaitForStateOption interface.
func (w *InitialWaitOption) apply(o *fsmOptions) {
o.initialWait = w.initialWait
}

// WaitForState waits for the state machine to reach the given state.
// If the optional initialWait parameter is set, the function will wait for
// the given duration before checking the state. This is useful if the
// function is called immediately after sending an event to the state machine
// and the state machine needs some time to process the event.
func (s *CachedObserver) WaitForState(ctx context.Context,
timeout time.Duration, state StateType) error {
timeout time.Duration, state StateType,
opts ...InitialWaitOption) error {

var options fsmOptions

for _, opt := range opts {
opt.apply(&options)
}

// Wait for the initial wait duration if set.
if options.initialWait > 0 {
select {
case <-time.After(options.initialWait):

case <-ctx.Done():
return ctx.Err()
}
}

timeoutCtx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
Expand Down

0 comments on commit 3743a49

Please sign in to comment.