Skip to content

Commit

Permalink
Aggregate layout/naming consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jan 28, 2020
1 parent 83757a0 commit a7c44df
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 52 deletions.
4 changes: 2 additions & 2 deletions equinox-fc/Domain.Tests/LocationEpochTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@ let [<Property>] properties carriedForward delta1 closeImmediately delta2 close
(* Starting with an empty stream, we'll need to supply the balance carried forward, optionally we apply a delta and potentially close *)

let initialShouldClose _state = closeImmediately
let res,events = sync (Some carriedForward) (validateAndInterpret carriedForward delta1) initialShouldClose Folds.initial
let res,events = sync (Some carriedForward) (validateAndInterpret carriedForward delta1) initialShouldClose Fold.initial
let cfEvents events = events |> List.filter (function Events.CarriedForward _ -> true | _ -> false)
let closeEvents events = events |> List.filter (function Events.Closed -> true | _ -> false)
let state1 = Folds.fold Folds.initial events
let state1 = Fold.fold Fold.initial events
let expectedBalance = carriedForward + delta1
// Only expect closing if it was requested
let expectImmediateClose = closeImmediately
Expand Down
6 changes: 3 additions & 3 deletions equinox-fc/Domain.Tests/LocationSeriesTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ open Swensen.Unquote
open Location.Series

let [<Property>] properties c1 c2 =
let events = interpretActivateEpoch c1 Folds.initial
let state1 = Folds.fold Folds.initial events
let events = interpretActivateEpoch c1 Fold.initial
let state1 = Fold.fold Fold.initial events
let epoch0 = %0
match c1, events, toActiveEpoch state1 with
// Started events are not written for < 0
Expand All @@ -21,7 +21,7 @@ let [<Property>] properties c1 c2 =
test <@ List.isEmpty l @>

let events = interpretActivateEpoch c2 state1
let state2 = Folds.fold state1 events
let state2 = Fold.fold state1 events
match toActiveEpoch state1, c2, events, toActiveEpoch state2 with
// Started events are not written for < 0
| None, n, [], activeEpoch when n < epoch0 ->
Expand Down
18 changes: 9 additions & 9 deletions equinox-fc/Domain.Tests/LocationTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ open System
/// Helpers to match `module Cosmos` wrapping inside the impl
module Location =

open Equinox.MemoryStore
module MemoryStore =

module Series =
open Equinox.MemoryStore

let resolve store = Resolver(store, Series.Events.codec, Series.Folds.fold, Series.Folds.initial).Resolve
module Series =

module Epoch =
let resolve store = Resolver(store, Series.Events.codec, Series.Fold.fold, Series.Fold.initial).Resolve

let resolve store = Resolver(store, Epoch.Events.codec, Epoch.Folds.fold, Epoch.Folds.initial).Resolve
module Epoch =

module MemoryStore =
let resolve store = Resolver(store, Epoch.Events.codec, Epoch.Fold.fold, Epoch.Fold.initial).Resolve

let createService (zeroBalance, shouldClose) store =
let maxAttempts = Int32.MaxValue
Expand All @@ -35,7 +35,7 @@ let run (service : LocationService) (IdsAtLeastOne locations, deltas : _[]) = As

(* Apply random deltas *)

let adjust delta (bal : Epoch.Folds.Balance) =
let adjust delta (bal : Epoch.Fold.Balance) =
let value = max -bal delta
if value = 0 then 0, []
else value, [Location.Epoch.Events.Delta { value = value }]
Expand All @@ -51,7 +51,7 @@ let [<Property>] ``MemoryStore properties`` maxEvents args =
let store = Equinox.MemoryStore.VolatileStore()
let zeroBalance = 0
let maxEvents = max 1 maxEvents
let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents
let shouldClose (state : Epoch.Fold.OpenState) = state.count > maxEvents
let service = Location.MemoryStore.createService (zeroBalance, shouldClose) store
run service args

Expand All @@ -65,6 +65,6 @@ type Cosmos(testOutput) =
let [<Property(MaxTest=10)>] properties maxEvents args =
let zeroBalance = 0
let maxEvents = max 1 maxEvents
let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents
let shouldClose (state : Epoch.Fold.OpenState) = state.count > maxEvents
let service = Location.Cosmos.createService (zeroBalance, shouldClose) (context,cache,Int32.MaxValue)
run service args
2 changes: 1 addition & 1 deletion equinox-fc/Domain/Location.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ namespace Location

[<NoComparison; NoEquality>]
type Wip<'R> =
| Pending of decide : (Epoch.Folds.Balance -> 'R*Epoch.Events.Event list)
| Pending of decide : (Epoch.Fold.Balance -> 'R*Epoch.Events.Event list)
| Complete of 'R

/// Manages a Series of Epochs, with a running total being carried forward to the next Epoch when it's Closed
Expand Down
47 changes: 23 additions & 24 deletions equinox-fc/Domain/LocationEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ module Events =
| Delta of Delta
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
let [<Literal>] categoryId = "LocationEpoch"
let (|AggregateId|) (locationId, epochId) =
let [<Literal>] category = "LocationEpoch"
let (|For|) (locationId, epochId) =
let id = sprintf "%s_%s" (LocationId.toString locationId) (LocationEpochId.toString epochId)
Equinox.AggregateId(categoryId, id)
Equinox.AggregateId(category, id)

module Folds =
module Fold =

type Balance = int
type OpenState = { count : int; value : Balance }
Expand All @@ -36,55 +36,54 @@ module Folds =
/// Holds events accumulated from a series of decisions while also evolving the presented `state` to reflect the pended events
type private Accumulator() =
let acc = ResizeArray()
member __.Ingest state : 'res * Events.Event list -> 'res * Folds.State = function
member __.Ingest state : 'res * Events.Event list -> 'res * Fold.State = function
| res, [] -> res, state
| res, [e] -> acc.Add e; res, Folds.evolve state e
| res, xs -> acc.AddRange xs; res, Folds.fold state (Seq.ofList xs)
| res, [e] -> acc.Add e; res, Fold.evolve state e
| res, xs -> acc.AddRange xs; res, Fold.fold state (Seq.ofList xs)
member __.Accumulated = List.ofSeq acc

type Result<'t> = { balance : Folds.Balance; result : 't option; isOpen : bool }
type Result<'t> = { balance : Fold.Balance; result : 't option; isOpen : bool }

let sync (balanceCarriedForward : Folds.Balance option) (decide : (Folds.Balance -> 't*Events.Event list)) shouldClose state : Result<'t>*Events.Event list =
let sync (balanceCarriedForward : Fold.Balance option) (decide : (Fold.Balance -> 't*Events.Event list)) shouldClose state : Result<'t>*Events.Event list =
let acc = Accumulator()
// We always want to have a CarriedForward event at the start of any Epoch's event stream
let (), state =
acc.Ingest state <|
match state with
| Folds.Initial -> (), [Events.CarriedForward { initial = Option.get balanceCarriedForward }]
| Folds.Open _ | Folds.Closed _ -> (), []
| Fold.Initial -> (), [Events.CarriedForward { initial = Option.get balanceCarriedForward }]
| Fold.Open _ | Fold.Closed _ -> (), []
// Run, unless we determine we're in Closed state
let result, state =
acc.Ingest state <|
match state with
| Folds.Initial -> failwith "We've just guaranteed not Initial"
| Folds.Open { value = bal } -> let r,es = decide bal in Some r,es
| Folds.Closed _ -> None, []
| Fold.Initial -> failwith "We've just guaranteed not Initial"
| Fold.Open { value = bal } -> let r,es = decide bal in Some r,es
| Fold.Closed _ -> None, []
// Finally (iff we're `Open`, have run a `decide` and `shouldClose`), we generate a Closed event
let (balance, isOpen), _ =
acc.Ingest state <|
match state with
| Folds.Initial -> failwith "Can't be Initial"
| Folds.Open ({ value = bal } as openState) when shouldClose openState -> (bal, false), [Events.Closed]
| Folds.Open { value = bal } -> (bal, true), []
| Folds.Closed bal -> (bal, false), []
| Fold.Initial -> failwith "Can't be Initial"
| Fold.Open ({ value = bal } as openState) when shouldClose openState -> (bal, false), [Events.Closed]
| Fold.Open { value = bal } -> (bal, true), []
| Fold.Closed bal -> (bal, false), []
{ balance = balance; result = result; isOpen = isOpen }, acc.Accumulated

type Service internal (resolve, ?maxAttempts) =
type Service internal (log, resolve, maxAttempts) =

let log = Serilog.Log.ForContext<Service>()
let (|Stream|) (Events.AggregateId id) = Equinox.Stream<Events.Event,Folds.State>(log, resolve id, maxAttempts = defaultArg maxAttempts 2)
let resolve (Events.For id) = Equinox.Stream<Events.Event, Fold.State>(log, resolve id, maxAttempts)

member __.Sync<'R>(locationId, epochId, prevEpochBalanceCarriedForward, decide, shouldClose) : Async<Result<'R>> =
let (Stream stream) = (locationId, epochId)
let stream = resolve (locationId, epochId)
stream.Transact(sync prevEpochBalanceCarriedForward decide shouldClose)

let create resolve maxAttempts = Service(resolve, maxAttempts)
let create resolve maxAttempts = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = maxAttempts)

module Cosmos =

open Equinox.Cosmos
let resolve (context,cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.Unoptimized).Resolve
Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.Unoptimized).Resolve
let createService (context,cache,maxAttempts) =
create (resolve (context,cache)) maxAttempts
25 changes: 12 additions & 13 deletions equinox-fc/Domain/LocationSeries.fs
Original file line number Diff line number Diff line change
Expand Up @@ -9,44 +9,43 @@ module Events =
| Started of Started
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
let [<Literal>] categoryId = "LocationSeries"
let (|AggregateId|) id = Equinox.AggregateId(categoryId, LocationId.toString id)
let [<Literal>] category = "LocationSeries"
let (|For|) id = Equinox.AggregateId(category, LocationId.toString id)

module Folds =
module Fold =

type State = LocationEpochId
let initial = LocationEpochId.parse -1
let evolve _state = function
| Events.Started e -> e.epochId
let fold = Seq.fold evolve

let interpretActivateEpoch epochId (state : Folds.State) =
let interpretActivateEpoch epochId (state : Fold.State) =
[if state < epochId then yield Events.Started { epochId = epochId }]

let toActiveEpoch state =
if state = Folds.initial then None else Some state
if state = Fold.initial then None else Some state

type Service internal (resolve, ?maxAttempts) =
type Service internal (log, resolve, maxAttempts) =

let log = Serilog.Log.ForContext<Service>()
let (|Stream|) (Events.AggregateId id) = Equinox.Stream<Events.Event,Folds.State>(log, resolve id, maxAttempts = defaultArg maxAttempts 2)
let resolve (Events.For id) = Equinox.Stream<Events.Event, Fold.State>(log, resolve id, maxAttempts)

member __.Read(locationId) : Async<LocationEpochId option> =
let (Stream stream) = locationId
let stream = resolve locationId
stream.Query toActiveEpoch

member __.ActivateEpoch(locationId,epochId) : Async<unit> =
let (Stream stream) = locationId
member __.ActivateEpoch(locationId, epochId) : Async<unit> =
let stream = resolve locationId
stream.Transact(interpretActivateEpoch epochId)

let create resolve maxAttempts = Service(resolve, maxAttempts)
let create resolve maxAttempts = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts)

module Cosmos =

open Equinox.Cosmos
let resolve (context,cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let opt = Equinox.ResolveOption.AllowStale
fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id,opt)
fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id,opt)
let createService (context, cache, maxAttempts) =
create (resolve (context,cache)) maxAttempts

0 comments on commit a7c44df

Please sign in to comment.