Skip to content

Commit

Permalink
Complete Process Manager Apply
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 18, 2020
1 parent d383647 commit 65c0b3c
Showing 1 changed file with 36 additions and 44 deletions.
80 changes: 36 additions & 44 deletions equinox-fc/Domain/InventoryTransaction.fs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ module Events =
| Added of Added

(* Successful completion *)
| Completed // terminal
| Logged
| Completed
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

Expand All @@ -42,6 +43,7 @@ module Fold =
type State =
| Initial
| Running of RunningState
| Logging of TerminalState
| Completed of TerminalState
and RunningState =
| Adjust of Events.AdjustmentRequested
Expand All @@ -61,10 +63,10 @@ module Fold =
let evolve state event =
match state, event with
(* Adjustment Process *)
| Initial, Events.AdjustmentRequested e ->
Running (Adjust e)
| Running (Adjust s), Events.Adjusted ->
Completed (Adjusted s)
| Initial, Events.AdjustmentRequested r ->
Running (Adjust r)
| Running (Adjust r), Events.Adjusted ->
Logging (Adjusted r)

(* Transfer Process *)
| Initial, Events.TransferRequested e ->
Expand All @@ -73,59 +75,49 @@ module Fold =
| Running (Transfer (Requested s)), Events.Failed ->
Completed (TransferFailed s)

| Running (Transfer (Requested s)), Events.Removed e as ee ->
| Running (Transfer (Requested s)), Events.Removed e ->
Running (Transfer (Adding { request = s; removed = e }))
| Running (Transfer (Adding s)), Events.Added e as ee ->
| Running (Transfer (Adding s)), Events.Added e ->
Running (Transfer (Added { request = s.request; removed = s.removed; added = e }))
| Running (Transfer (Added s)), Events.Completed as ee ->
Completed (Transferred s)
| Running (Transfer (Added s)), Events.Completed ->
Logging (Transferred s)

(* Log result *)
| Logging s, Events.Logged ->
Completed s

(* Any disallowed state changes represent gaps in the model, so we fail fast *)
| state, event -> failwithf "Unexpected %A when %A" event state
let fold : State -> Events.Event seq -> State = Seq.fold evolve

type Command =
| Adjust of Events.AdjustmentRequested
| Transfer of Events.TransferRequested

type Result = { complete : bool; }

/// Holds events accumulated from a series of decisions while also evolving the presented `state` to reflect the pended events
type private Accumulator() =
let acc = ResizeArray()
member __.Ingest(state) : 'res * Events.Event list -> 'res * Fold.State = function
| res, [] -> res, state
| res, [e] -> acc.Add e; res, Fold.evolve state e
| res, xs -> acc.AddRange xs; res, Fold.fold state (Seq.ofList xs)
member __.Accumulated = List.ofSeq acc

type Update =
| AdjustmentCompleted

let decide command updates (state : Fold.State) : Result * Events.Event list =

let acc = Accumulator()
let started, state =
acc.Ingest state <|
match state, command with
| Fold.Initial, Adjust e -> true, [ Events.AdjustmentRequested e ]
| Fold.Initial, Transfer e -> true, [ Events.TransferRequested e ]

// TOCONSIDER validate conflicts

| _ -> false, []

{ complete = false }, acc.Accumulated
/// Validates an event actually represents an acceptable, non-redundant state transition
let filter event state =
match state, event with
| Initial, Events.AdjustmentRequested _
| Initial, Events.TransferRequested _
| Running (Adjust _), Events.Adjusted
| Running (Transfer (Requested _)), Events.Failed
| Running (Transfer (Requested _)), Events.Removed _
| Running (Transfer (Adding _)), Events.Added _
| Logging _, Events.Logged ->
[event]
| _ -> []

/// Given an event from the Process's timeline, yields the State, in order that it can be completed
let decide update (state : Fold.State) : Fold.State * Events.Event list =
let events = Fold.filter update state
let state' = Fold.fold state events
state', events

type Service internal (log, resolve, maxAttempts) =

let resolve (Events.For streamId) = Equinox.Stream<Events.Event, Fold.State>(log, resolve streamId, maxAttempts)

member __.IngestShipped(inventoryId, transactionId, command, updates) : Async<Result> =
member __.Apply(transactionId, update) : Async<Fold.State> =
let stream = resolve transactionId
stream.Transact(decide command updates)
stream.Transact(decide update)

let createService resolve = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = 2)
let createService resolve = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = 3)

module Cosmos =

Expand Down

0 comments on commit 65c0b3c

Please sign in to comment.