From 7efeab3cbd2dff27cf478abe474e6d3c7947a37a Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 20 Nov 2019 19:32:07 +0000 Subject: [PATCH 01/13] Location initial impl --- Equinox.sln | 17 +++++ samples/Fc/Domain.Tests/Domain.Tests.fsproj | 24 ++++++ samples/Fc/Domain.Tests/Program.fs | 5 ++ samples/Fc/Domain/Domain.fsproj | 21 +++++ samples/Fc/Domain/Infrastructure.fs | 18 +++++ samples/Fc/Domain/Location.fs | 24 ++++++ samples/Fc/Domain/LocationEpoch.fs | 85 +++++++++++++++++++++ samples/Fc/Domain/LocationSeries.fs | 49 ++++++++++++ 8 files changed, 243 insertions(+) create mode 100644 samples/Fc/Domain.Tests/Domain.Tests.fsproj create mode 100644 samples/Fc/Domain.Tests/Program.fs create mode 100644 samples/Fc/Domain/Domain.fsproj create mode 100644 samples/Fc/Domain/Infrastructure.fs create mode 100644 samples/Fc/Domain/Location.fs create mode 100644 samples/Fc/Domain/LocationEpoch.fs create mode 100644 samples/Fc/Domain/LocationSeries.fs diff --git a/Equinox.sln b/Equinox.sln index 2dc517ded..35ec02b97 100644 --- a/Equinox.sln +++ b/Equinox.sln @@ -75,6 +75,12 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.SqlStreamStore.Post EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Core", "src\Equinox.Core\Equinox.Core.fsproj", "{3021659A-5CA4-4E06-AF00-2457ED3F105B}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Fc", "Fc", "{63634A65-F668-4054-AAF5-AFD81C278F50}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "samples\Fc\Domain\Domain.fsproj", "{6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "samples\Fc\Domain.Tests\Domain.Tests.fsproj", "{C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -181,6 +187,14 @@ Global {3021659A-5CA4-4E06-AF00-2457ED3F105B}.Debug|Any CPU.Build.0 = Debug|Any CPU {3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.ActiveCfg = Release|Any CPU {3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.Build.0 = Release|Any CPU + {6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}.Release|Any CPU.Build.0 = Release|Any CPU + {C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -196,6 +210,9 @@ Global {EC2EC658-3D85-44F3-AD2F-52AFCAFF8871} = {8CDE1CC3-8619-44DE-8B4D-4102CE476C35} {8CDE1CC3-8619-44DE-8B4D-4102CE476C35} = {8F3EB30C-8BA3-4CC0-8361-0EA47C19ABB9} {D82AAB2E-7264-421A-A893-63A37E5F08B6} = {8F3EB30C-8BA3-4CC0-8361-0EA47C19ABB9} + {63634A65-F668-4054-AAF5-AFD81C278F50} = {8F3EB30C-8BA3-4CC0-8361-0EA47C19ABB9} + {6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E} = {63634A65-F668-4054-AAF5-AFD81C278F50} + {C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7} = {63634A65-F668-4054-AAF5-AFD81C278F50} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {177E1E7B-E275-4FC6-AE3C-2C651ECCF71E} diff --git a/samples/Fc/Domain.Tests/Domain.Tests.fsproj b/samples/Fc/Domain.Tests/Domain.Tests.fsproj new file mode 100644 index 000000000..517346b5e --- /dev/null +++ b/samples/Fc/Domain.Tests/Domain.Tests.fsproj @@ -0,0 +1,24 @@ + + + + Fc.Domain.Tests + netcoreapp2.1 + 5 + false + + + + + + + + + + + + + + + + + diff --git a/samples/Fc/Domain.Tests/Program.fs b/samples/Fc/Domain.Tests/Program.fs new file mode 100644 index 000000000..53bca0c52 --- /dev/null +++ b/samples/Fc/Domain.Tests/Program.fs @@ -0,0 +1,5 @@ +module Program + +let [] dummy () = () + +let [] main _ = 0 \ No newline at end of file diff --git a/samples/Fc/Domain/Domain.fsproj b/samples/Fc/Domain/Domain.fsproj new file mode 100644 index 000000000..bb4316193 --- /dev/null +++ b/samples/Fc/Domain/Domain.fsproj @@ -0,0 +1,21 @@ + + + + netstandard2.0 + 5 + Fc.Domain + + + + + + + + + + + + + + + diff --git a/samples/Fc/Domain/Infrastructure.fs b/samples/Fc/Domain/Infrastructure.fs new file mode 100644 index 000000000..e56c01a06 --- /dev/null +++ b/samples/Fc/Domain/Infrastructure.fs @@ -0,0 +1,18 @@ +namespace global + +open FSharp.UMX // see https://github.com/fsprojects/FSharp.UMX - % operator and ability to apply units of measure to Guids+strings + +type LocationId = string +and [] locationId +module LocationId = + let parse (value : string) : LocationId = %value + let toString (value : LocationId) : string = %value + +type LocationEpochId = int +and [] locationEpochId +module LocationEpochId = + let uninitialized : LocationEpochId = % -1 + let zero : LocationEpochId = %0 + let next (value : LocationEpochId) : LocationEpochId = % (%value + 1) + let parse (value : int) : LocationEpochId = %value + let toString (value : LocationEpochId) : string = string %value \ No newline at end of file diff --git a/samples/Fc/Domain/Location.fs b/samples/Fc/Domain/Location.fs new file mode 100644 index 000000000..affddad93 --- /dev/null +++ b/samples/Fc/Domain/Location.fs @@ -0,0 +1,24 @@ +namespace Location + +/// Manages a Series of Epochs, with a running total being carried forward to the next Epoch when it's Closed +type LocationService(zeroBalance, shouldClose, series : Location.Series.Service, epoch : Location.Epoch.Service) = + + let rec execute locationId originEpochId interpret = + let rec aux epochId balanceToCarryForward = async { + match! epoch.Sync(locationId,epochId,balanceToCarryForward,interpret,shouldClose) with + | { balance = bal; isComplete = true } -> + if originEpochId <> epochId then + do! series.ActivateEpoch(locationId, epochId) + return bal + | { balance = bal } -> + let successorEpochId = LocationEpochId.next epochId + return! aux successorEpochId (Some bal) } + aux + + member __.Execute(locationId, interpret) = async { + let! activeEpoch = series.Read(locationId) + let originEpochId,epochId,balanceCarriedForward = + match activeEpoch with + | None -> LocationEpochId.uninitialized,LocationEpochId.zero,Some zeroBalance + | Some activeEpochId -> activeEpochId,activeEpochId,None + return! execute locationId originEpochId interpret epochId balanceCarriedForward } \ No newline at end of file diff --git a/samples/Fc/Domain/LocationEpoch.fs b/samples/Fc/Domain/LocationEpoch.fs new file mode 100644 index 000000000..c9c2bd4dd --- /dev/null +++ b/samples/Fc/Domain/LocationEpoch.fs @@ -0,0 +1,85 @@ +module Location.Epoch + +// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +[] +module Events = + + type CarriedForward = { initial : int } + type Delta = { value : int } + type Event = + | CarriedForward of CarriedForward + | Closed + | Delta of Delta + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + let [] categoryId = "LocationEpoch" + +module Folds = + + type Balance = int + type State = Initial | Open of Balance | Closed of Balance + let initial = Initial + let evolve state event = + match event,state with + | Events.CarriedForward e,Initial -> Open e.initial + | Events.Delta e,Open bal -> Open (bal + e.value) + | Events.Closed,Open bal -> Closed bal + | Events.CarriedForward _,(Open _|Closed _ as x) -> failwithf "CarriedForward : Unexpected %A" x + | Events.Delta _,(Initial|Closed _ as x) -> failwithf "Delta : Unexpected %A" x + | Events.Closed,(Initial|Closed _ as x) -> failwithf "Closed : Unexpected %A" x + let fold = Seq.fold evolve + +type Result = { balance : Folds.Balance; isComplete : bool } + +let sync (balanceCarriedForward : Folds.Balance option) (interpret : (Folds.Balance -> Events.Event list) option) shouldClose state : Result*Events.Event list = + let acc = ResizeArray() + let stashEventsAndRollState state = function + | res,[] -> res,state + | res,[e] -> acc.Add e; res,Folds.evolve state e + | res,xs -> acc.AddRange xs; res,Folds.fold state (Seq.ofList xs) + // We always want to have a CarriedForward event at the start of any Epoch's event stream + let initialized,state = + match state with + | Folds.Initial -> true,[Events.CarriedForward { initial = Option.get balanceCarriedForward }] + | Folds.Open _ | Folds.Closed _ -> false,[] + |> stashEventsAndRollState state + // If an `interpret` is supplied, we run that (unless we determine we're in Closed state) + let worked,state = + match state, interpret with + | Folds.Initial,_ -> failwith "We've just guaranteed not Initial" + | Folds.Open _,None -> false,[] + | Folds.Open bal,Some interpret -> true,interpret bal + | Folds.Closed _,_ -> false,[] + |> stashEventsAndRollState state + /// Finally (iff we're `Open`, have `worked`, and `shouldClose`), we generate a Closed event + let (balance,isOpen),_ = + match state with + | Folds.Initial -> failwith "Can't be Initial" + | Folds.Open bal as state when worked && shouldClose state -> (bal,false),[Events.Closed] + | Folds.Open bal -> (bal,true),[] + | Folds.Closed bal -> (bal,false),[] + |> stashEventsAndRollState state + { balance = balance; isComplete = initialized || worked || isOpen },Seq.toList acc + +type Service internal (resolve, ?maxAttempts) = + + let log = Serilog.Log.ForContext() + let (|AggregateId|) (locationId,epochId) = + let id = sprintf "%s_%s" (LocationId.toString locationId) (LocationEpochId.toString epochId) + Equinox.AggregateId(Events.categoryId, id) + let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 2) + let decide (Stream stream) : (Folds.State -> 'r * Events.Event list) -> Async<'r> = stream.Transact + + member __.Sync(locationId,epochId,prevEpochBalanceCarriedForward,interpret,shouldClose) : Async = + decide (locationId,epochId) (sync prevEpochBalanceCarriedForward interpret shouldClose) + +let createService resolve = Service(resolve) + +module Cosmos = + + open Equinox.Cosmos + let resolve (context,cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve + let createService (context,cache) = + createService (resolve (context,cache)) \ No newline at end of file diff --git a/samples/Fc/Domain/LocationSeries.fs b/samples/Fc/Domain/LocationSeries.fs new file mode 100644 index 000000000..d98084989 --- /dev/null +++ b/samples/Fc/Domain/LocationSeries.fs @@ -0,0 +1,49 @@ +module Location.Series + +// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +[] +module Events = + + type Started = { epochId : LocationEpochId } + type Event = + | Started of Started + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + let [] categoryId = "LocationSeries" + +module Folds = + + type State = LocationEpochId + let initial = LocationEpochId.uninitialized + let evolve _state = function + | Events.Started e -> e.epochId + let fold = Seq.fold evolve + +let interpretActivateEpoch epochId (state : Folds.State) = + [if state < epochId then yield Events.Started { epochId = epochId }] + +let toActiveEpoch state = + if state = Folds.initial then None else Some state + +type Service internal (resolve, ?maxAttempts) = + + let log = Serilog.Log.ForContext() + let (|AggregateId|) id = Equinox.AggregateId(Events.categoryId, LocationId.toString id) + let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 2) + let query (Stream stream) = stream.Query + let execute (Stream stream) = stream.Transact + + member __.Read(locationId) : Async = query locationId toActiveEpoch + member __.ActivateEpoch(locationId,epochId) : Async = execute locationId (interpretActivateEpoch epochId) + +let createService resolve = Service(resolve) + +module Cosmos = + + open Equinox.Cosmos + let resolve (context,cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let opt = Equinox.ResolveOption.AllowStale + fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id,opt) + let createService (context,cache) = + createService (resolve (context,cache)) \ No newline at end of file From 623b2f2072b20149c93e1391b46cd20bf1e79035 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 20 Nov 2019 23:38:25 +0000 Subject: [PATCH 02/13] Location cleanup --- samples/Fc/Domain/Infrastructure.fs | 4 +--- samples/Fc/Domain/Location.fs | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/samples/Fc/Domain/Infrastructure.fs b/samples/Fc/Domain/Infrastructure.fs index e56c01a06..7a1bfa843 100644 --- a/samples/Fc/Domain/Infrastructure.fs +++ b/samples/Fc/Domain/Infrastructure.fs @@ -11,8 +11,6 @@ module LocationId = type LocationEpochId = int and [] locationEpochId module LocationEpochId = - let uninitialized : LocationEpochId = % -1 - let zero : LocationEpochId = %0 - let next (value : LocationEpochId) : LocationEpochId = % (%value + 1) let parse (value : int) : LocationEpochId = %value + let next (value : LocationEpochId) : LocationEpochId = % (%value + 1) let toString (value : LocationEpochId) : string = string %value \ No newline at end of file diff --git a/samples/Fc/Domain/Location.fs b/samples/Fc/Domain/Location.fs index affddad93..22250b2ad 100644 --- a/samples/Fc/Domain/Location.fs +++ b/samples/Fc/Domain/Location.fs @@ -8,7 +8,7 @@ type LocationService(zeroBalance, shouldClose, series : Location.Series.Service, match! epoch.Sync(locationId,epochId,balanceToCarryForward,interpret,shouldClose) with | { balance = bal; isComplete = true } -> if originEpochId <> epochId then - do! series.ActivateEpoch(locationId, epochId) + do! series.ActivateEpoch(locationId,epochId) return bal | { balance = bal } -> let successorEpochId = LocationEpochId.next epochId @@ -19,6 +19,6 @@ type LocationService(zeroBalance, shouldClose, series : Location.Series.Service, let! activeEpoch = series.Read(locationId) let originEpochId,epochId,balanceCarriedForward = match activeEpoch with - | None -> LocationEpochId.uninitialized,LocationEpochId.zero,Some zeroBalance + | None -> LocationEpochId.parse -1,LocationEpochId.parse 0,Some zeroBalance | Some activeEpochId -> activeEpochId,activeEpochId,None return! execute locationId originEpochId interpret epochId balanceCarriedForward } \ No newline at end of file From d65fc6ae3db792952c0283ad6952c23bd4a80d44 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 20 Nov 2019 23:38:25 +0000 Subject: [PATCH 03/13] Location cleanup --- samples/Fc/Domain/LocationEpoch.fs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/samples/Fc/Domain/LocationEpoch.fs b/samples/Fc/Domain/LocationEpoch.fs index c9c2bd4dd..ba6cc5c35 100644 --- a/samples/Fc/Domain/LocationEpoch.fs +++ b/samples/Fc/Domain/LocationEpoch.fs @@ -29,37 +29,39 @@ module Folds = | Events.Closed,(Initial|Closed _ as x) -> failwithf "Closed : Unexpected %A" x let fold = Seq.fold evolve +/// Holds events accumulated from a series of decisions while also evolving the presented `state` to reflect the pended events +type private Accumulator() = + let acc = ResizeArray() + member __.Ingest state : 'res * Events.Event list -> 'res * Folds.State = function + | res,[] -> res,state + | res,[e] -> acc.Add e; res,Folds.evolve state e + | res,xs -> acc.AddRange xs; res,Folds.fold state (Seq.ofList xs) + member __.Accumulated = List.ofSeq acc + type Result = { balance : Folds.Balance; isComplete : bool } let sync (balanceCarriedForward : Folds.Balance option) (interpret : (Folds.Balance -> Events.Event list) option) shouldClose state : Result*Events.Event list = - let acc = ResizeArray() - let stashEventsAndRollState state = function - | res,[] -> res,state - | res,[e] -> acc.Add e; res,Folds.evolve state e - | res,xs -> acc.AddRange xs; res,Folds.fold state (Seq.ofList xs) + let acc = Accumulator() // We always want to have a CarriedForward event at the start of any Epoch's event stream - let initialized,state = + let initialized,state = acc.Ingest state <| match state with | Folds.Initial -> true,[Events.CarriedForward { initial = Option.get balanceCarriedForward }] | Folds.Open _ | Folds.Closed _ -> false,[] - |> stashEventsAndRollState state // If an `interpret` is supplied, we run that (unless we determine we're in Closed state) - let worked,state = + let worked,state = acc.Ingest state <| match state, interpret with | Folds.Initial,_ -> failwith "We've just guaranteed not Initial" | Folds.Open _,None -> false,[] | Folds.Open bal,Some interpret -> true,interpret bal | Folds.Closed _,_ -> false,[] - |> stashEventsAndRollState state - /// Finally (iff we're `Open`, have `worked`, and `shouldClose`), we generate a Closed event - let (balance,isOpen),_ = + // Finally (iff we're `Open`, have `worked`, and `shouldClose`), we generate a Closed event + let (balance,isOpen),_ = acc.Ingest state <| match state with | Folds.Initial -> failwith "Can't be Initial" | Folds.Open bal as state when worked && shouldClose state -> (bal,false),[Events.Closed] | Folds.Open bal -> (bal,true),[] | Folds.Closed bal -> (bal,false),[] - |> stashEventsAndRollState state - { balance = balance; isComplete = initialized || worked || isOpen },Seq.toList acc + { balance = balance; isComplete = initialized || worked || isOpen },acc.Accumulated type Service internal (resolve, ?maxAttempts) = From b3a193afd05f9346c88ed9210c07e5d40a9784aa Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Mon, 25 Nov 2019 13:05:28 +0000 Subject: [PATCH 04/13] Fix test dummy --- samples/Fc/Domain.Tests/Domain.Tests.fsproj | 10 +++++----- samples/Fc/Domain.Tests/LocationTests.fs | 3 +++ samples/Fc/Domain.Tests/Program.fs | 5 ----- samples/Fc/Domain/LocationSeries.fs | 2 +- 4 files changed, 9 insertions(+), 11 deletions(-) create mode 100644 samples/Fc/Domain.Tests/LocationTests.fs delete mode 100644 samples/Fc/Domain.Tests/Program.fs diff --git a/samples/Fc/Domain.Tests/Domain.Tests.fsproj b/samples/Fc/Domain.Tests/Domain.Tests.fsproj index 517346b5e..1bbbf7d8c 100644 --- a/samples/Fc/Domain.Tests/Domain.Tests.fsproj +++ b/samples/Fc/Domain.Tests/Domain.Tests.fsproj @@ -1,22 +1,22 @@ - Fc.Domain.Tests netcoreapp2.1 5 + Fc.Domain.Tests false - - - - + + + + diff --git a/samples/Fc/Domain.Tests/LocationTests.fs b/samples/Fc/Domain.Tests/LocationTests.fs new file mode 100644 index 000000000..c984300ab --- /dev/null +++ b/samples/Fc/Domain.Tests/LocationTests.fs @@ -0,0 +1,3 @@ +module LocationTests + +let [] dummy () = () \ No newline at end of file diff --git a/samples/Fc/Domain.Tests/Program.fs b/samples/Fc/Domain.Tests/Program.fs deleted file mode 100644 index 53bca0c52..000000000 --- a/samples/Fc/Domain.Tests/Program.fs +++ /dev/null @@ -1,5 +0,0 @@ -module Program - -let [] dummy () = () - -let [] main _ = 0 \ No newline at end of file diff --git a/samples/Fc/Domain/LocationSeries.fs b/samples/Fc/Domain/LocationSeries.fs index d98084989..248f5d846 100644 --- a/samples/Fc/Domain/LocationSeries.fs +++ b/samples/Fc/Domain/LocationSeries.fs @@ -14,7 +14,7 @@ module Events = module Folds = type State = LocationEpochId - let initial = LocationEpochId.uninitialized + let initial = LocationEpochId.parse -1 let evolve _state = function | Events.Started e -> e.epochId let fold = Seq.fold evolve From d7edd88a7ad9a1f8e2472432037f52ceac1929fc Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Mon, 25 Nov 2019 13:35:55 +0000 Subject: [PATCH 05/13] Fix indentation --- samples/Fc/Domain/LocationEpoch.fs | 35 ++++++++++++++++-------------- 1 file changed, 19 insertions(+), 16 deletions(-) diff --git a/samples/Fc/Domain/LocationEpoch.fs b/samples/Fc/Domain/LocationEpoch.fs index ba6cc5c35..f8cf0cbe3 100644 --- a/samples/Fc/Domain/LocationEpoch.fs +++ b/samples/Fc/Domain/LocationEpoch.fs @@ -43,24 +43,27 @@ type Result = { balance : Folds.Balance; isComplete : bool } let sync (balanceCarriedForward : Folds.Balance option) (interpret : (Folds.Balance -> Events.Event list) option) shouldClose state : Result*Events.Event list = let acc = Accumulator() // We always want to have a CarriedForward event at the start of any Epoch's event stream - let initialized,state = acc.Ingest state <| - match state with - | Folds.Initial -> true,[Events.CarriedForward { initial = Option.get balanceCarriedForward }] - | Folds.Open _ | Folds.Closed _ -> false,[] + let initialized,state = + acc.Ingest state <| + match state with + | Folds.Initial -> true,[Events.CarriedForward { initial = Option.get balanceCarriedForward }] + | Folds.Open _ | Folds.Closed _ -> false,[] // If an `interpret` is supplied, we run that (unless we determine we're in Closed state) - let worked,state = acc.Ingest state <| - match state, interpret with - | Folds.Initial,_ -> failwith "We've just guaranteed not Initial" - | Folds.Open _,None -> false,[] - | Folds.Open bal,Some interpret -> true,interpret bal - | Folds.Closed _,_ -> false,[] + let worked,state = + acc.Ingest state <| + match state, interpret with + | Folds.Initial,_ -> failwith "We've just guaranteed not Initial" + | Folds.Open _,None -> false,[] + | Folds.Open bal,Some interpret -> true,interpret bal + | Folds.Closed _,_ -> false,[] // Finally (iff we're `Open`, have `worked`, and `shouldClose`), we generate a Closed event - let (balance,isOpen),_ = acc.Ingest state <| - match state with - | Folds.Initial -> failwith "Can't be Initial" - | Folds.Open bal as state when worked && shouldClose state -> (bal,false),[Events.Closed] - | Folds.Open bal -> (bal,true),[] - | Folds.Closed bal -> (bal,false),[] + let (balance,isOpen),_ = + acc.Ingest state <| + match state with + | Folds.Initial -> failwith "Can't be Initial" + | Folds.Open bal as state when worked && shouldClose state -> (bal,false),[Events.Closed] + | Folds.Open bal -> (bal,true),[] + | Folds.Closed bal -> (bal,false),[] { balance = balance; isComplete = initialized || worked || isOpen },acc.Accumulated type Service internal (resolve, ?maxAttempts) = From 6600e656193cacef8eb237517fc5f11f64b83b1e Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 27 Nov 2019 11:04:11 +0000 Subject: [PATCH 06/13] Fix LocationTests ordering --- samples/Fc/Domain.Tests/Domain.Tests.fsproj | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/samples/Fc/Domain.Tests/Domain.Tests.fsproj b/samples/Fc/Domain.Tests/Domain.Tests.fsproj index 1bbbf7d8c..bf282fc3d 100644 --- a/samples/Fc/Domain.Tests/Domain.Tests.fsproj +++ b/samples/Fc/Domain.Tests/Domain.Tests.fsproj @@ -8,15 +8,17 @@ + + + + + + - - - - From f659d4fd36e462a171bf704153a2fd3a4bfd3a9e Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Wed, 27 Nov 2019 14:39:50 +0000 Subject: [PATCH 07/13] Add LocationSeriesTests --- samples/Fc/Domain.Tests/Domain.Tests.fsproj | 1 + .../Fc/Domain.Tests/LocationSeriesTests.fs | 44 +++++++++++++++++++ samples/Fc/Domain/LocationSeries.fs | 11 +++-- 3 files changed, 52 insertions(+), 4 deletions(-) create mode 100644 samples/Fc/Domain.Tests/LocationSeriesTests.fs diff --git a/samples/Fc/Domain.Tests/Domain.Tests.fsproj b/samples/Fc/Domain.Tests/Domain.Tests.fsproj index bf282fc3d..87c125692 100644 --- a/samples/Fc/Domain.Tests/Domain.Tests.fsproj +++ b/samples/Fc/Domain.Tests/Domain.Tests.fsproj @@ -8,6 +8,7 @@ + diff --git a/samples/Fc/Domain.Tests/LocationSeriesTests.fs b/samples/Fc/Domain.Tests/LocationSeriesTests.fs new file mode 100644 index 000000000..1bd09a28d --- /dev/null +++ b/samples/Fc/Domain.Tests/LocationSeriesTests.fs @@ -0,0 +1,44 @@ +module LocationSeriesTests + +open FsCheck.Xunit +open FSharp.UMX +open Swensen.Unquote +open Location.Series + +let [] properties c1 c2 = + let events = interpretActivateEpoch c1 Folds.initial + let state1 = Folds.fold Folds.initial events + let epoch0 = %0 + match c1, events, toActiveEpoch state1 with + // Started events are not written for < 0 + | n, [], activeEpoch when n < epoch0 -> + test <@ None = activeEpoch @> + // Any >=0 value should trigger a Started event, initially + | n, [Events.Started { epochId = ee }], Some activatedEpoch -> + test <@ n >= epoch0 && n = ee && n = activatedEpoch @> + // Nothing else should yield events + | _, l, _ -> + test <@ List.isEmpty l @> + + let events = interpretActivateEpoch c2 state1 + let state2 = Folds.fold state1 events + match toActiveEpoch state1, c2, events, toActiveEpoch state2 with + // Started events are not written for < 0 + | None, n, [], activeEpoch when n < epoch0 -> + test <@ None = activeEpoch @> + // Any >= 0 epochId should trigger a Started event if first command didnt do anything + | None, n, [Events.Started { epochId = ee }], Some activatedEpoch -> + let eEpoch = %ee + test <@ n >= epoch0 && n = eEpoch && n = activatedEpoch @> + // Any higher epochId should trigger a Started event (gaps are fine - we are only tying to reduce walks) + | Some s1, n, [Events.Started { epochId = ee }], Some activatedEpoch -> + let eEpoch = %ee + test <@ n > s1 && n = eEpoch && n > epoch0 && n = activatedEpoch @> + // Nothing else should yield events + | _, _, l, _ -> + test <@ List.isEmpty l @> + +let [] ``codec can roundtrip`` event = + let ee = Events.codec.Encode(None,event) + let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + test <@ Some event = Events.codec.TryDecode ie @> \ No newline at end of file diff --git a/samples/Fc/Domain/LocationSeries.fs b/samples/Fc/Domain/LocationSeries.fs index 248f5d846..37f05dd0a 100644 --- a/samples/Fc/Domain/LocationSeries.fs +++ b/samples/Fc/Domain/LocationSeries.fs @@ -30,11 +30,14 @@ type Service internal (resolve, ?maxAttempts) = let log = Serilog.Log.ForContext() let (|AggregateId|) id = Equinox.AggregateId(Events.categoryId, LocationId.toString id) let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 2) - let query (Stream stream) = stream.Query - let execute (Stream stream) = stream.Transact - member __.Read(locationId) : Async = query locationId toActiveEpoch - member __.ActivateEpoch(locationId,epochId) : Async = execute locationId (interpretActivateEpoch epochId) + member __.Read(locationId) : Async = + let (Stream agg) = locationId + agg.Query toActiveEpoch + + member __.ActivateEpoch(locationId,epochId) : Async = + let (Stream agg) = locationId + agg.Transact(interpretActivateEpoch epochId) let createService resolve = Service(resolve) From 8919b0cbf2cea6859a7a14d4fb2dee8987508810 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 28 Nov 2019 10:03:29 +0000 Subject: [PATCH 08/13] Location tidy --- samples/Fc/Domain/LocationEpoch.fs | 58 ++++++++++++++--------------- samples/Fc/Domain/LocationSeries.fs | 12 +++--- 2 files changed, 35 insertions(+), 35 deletions(-) diff --git a/samples/Fc/Domain/LocationEpoch.fs b/samples/Fc/Domain/LocationEpoch.fs index f8cf0cbe3..3d608cbca 100644 --- a/samples/Fc/Domain/LocationEpoch.fs +++ b/samples/Fc/Domain/LocationEpoch.fs @@ -20,22 +20,22 @@ module Folds = type State = Initial | Open of Balance | Closed of Balance let initial = Initial let evolve state event = - match event,state with - | Events.CarriedForward e,Initial -> Open e.initial - | Events.Delta e,Open bal -> Open (bal + e.value) - | Events.Closed,Open bal -> Closed bal - | Events.CarriedForward _,(Open _|Closed _ as x) -> failwithf "CarriedForward : Unexpected %A" x - | Events.Delta _,(Initial|Closed _ as x) -> failwithf "Delta : Unexpected %A" x - | Events.Closed,(Initial|Closed _ as x) -> failwithf "Closed : Unexpected %A" x + match event, state with + | Events.CarriedForward e, Initial -> Open e.initial + | Events.Delta e, Open bal -> Open (bal + e.value) + | Events.Closed, Open bal -> Closed bal + | Events.CarriedForward _, (Open _|Closed _ as x) -> failwithf "CarriedForward : Unexpected %A" x + | Events.Delta _, (Initial|Closed _ as x) -> failwithf "Delta : Unexpected %A" x + | Events.Closed, (Initial|Closed _ as x) -> failwithf "Closed : Unexpected %A" x let fold = Seq.fold evolve /// Holds events accumulated from a series of decisions while also evolving the presented `state` to reflect the pended events type private Accumulator() = let acc = ResizeArray() member __.Ingest state : 'res * Events.Event list -> 'res * Folds.State = function - | res,[] -> res,state - | res,[e] -> acc.Add e; res,Folds.evolve state e - | res,xs -> acc.AddRange xs; res,Folds.fold state (Seq.ofList xs) + | res, [] -> res, state + | res, [e] -> acc.Add e; res, Folds.evolve state e + | res, xs -> acc.AddRange xs; res, Folds.fold state (Seq.ofList xs) member __.Accumulated = List.ofSeq acc type Result = { balance : Folds.Balance; isComplete : bool } @@ -43,42 +43,42 @@ type Result = { balance : Folds.Balance; isComplete : bool } let sync (balanceCarriedForward : Folds.Balance option) (interpret : (Folds.Balance -> Events.Event list) option) shouldClose state : Result*Events.Event list = let acc = Accumulator() // We always want to have a CarriedForward event at the start of any Epoch's event stream - let initialized,state = + let initialized, state = acc.Ingest state <| match state with - | Folds.Initial -> true,[Events.CarriedForward { initial = Option.get balanceCarriedForward }] - | Folds.Open _ | Folds.Closed _ -> false,[] + | Folds.Initial -> true, [Events.CarriedForward { initial = Option.get balanceCarriedForward }] + | Folds.Open _ | Folds.Closed _ -> false, [] // If an `interpret` is supplied, we run that (unless we determine we're in Closed state) - let worked,state = + let worked, state = acc.Ingest state <| match state, interpret with - | Folds.Initial,_ -> failwith "We've just guaranteed not Initial" - | Folds.Open _,None -> false,[] - | Folds.Open bal,Some interpret -> true,interpret bal - | Folds.Closed _,_ -> false,[] + | Folds.Initial, _ -> failwith "We've just guaranteed not Initial" + | Folds.Open _, None -> false, [] + | Folds.Open bal, Some interpret -> true, interpret bal + | Folds.Closed _, _ -> false, [] // Finally (iff we're `Open`, have `worked`, and `shouldClose`), we generate a Closed event - let (balance,isOpen),_ = + let (balance, isOpen), _ = acc.Ingest state <| match state with | Folds.Initial -> failwith "Can't be Initial" - | Folds.Open bal as state when worked && shouldClose state -> (bal,false),[Events.Closed] - | Folds.Open bal -> (bal,true),[] - | Folds.Closed bal -> (bal,false),[] - { balance = balance; isComplete = initialized || worked || isOpen },acc.Accumulated + | Folds.Open bal as state when worked && shouldClose state -> (bal, false), [Events.Closed] + | Folds.Open bal -> (bal, true), [] + | Folds.Closed bal -> (bal, false), [] + { balance = balance; isComplete = initialized || worked || isOpen }, acc.Accumulated type Service internal (resolve, ?maxAttempts) = let log = Serilog.Log.ForContext() - let (|AggregateId|) (locationId,epochId) = + let (|AggregateId|) (locationId, epochId) = let id = sprintf "%s_%s" (LocationId.toString locationId) (LocationEpochId.toString epochId) Equinox.AggregateId(Events.categoryId, id) let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 2) - let decide (Stream stream) : (Folds.State -> 'r * Events.Event list) -> Async<'r> = stream.Transact - member __.Sync(locationId,epochId,prevEpochBalanceCarriedForward,interpret,shouldClose) : Async = - decide (locationId,epochId) (sync prevEpochBalanceCarriedForward interpret shouldClose) + member __.Sync(locationId, epochId, prevEpochBalanceCarriedForward, interpret, shouldClose) : Async = + let (Stream stream) = (locationId, epochId) + stream.Transact(sync prevEpochBalanceCarriedForward interpret shouldClose) -let createService resolve = Service(resolve) +let create resolve = Service(resolve) module Cosmos = @@ -87,4 +87,4 @@ module Cosmos = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve let createService (context,cache) = - createService (resolve (context,cache)) \ No newline at end of file + create (resolve (context,cache)) \ No newline at end of file diff --git a/samples/Fc/Domain/LocationSeries.fs b/samples/Fc/Domain/LocationSeries.fs index 37f05dd0a..53ae54b4f 100644 --- a/samples/Fc/Domain/LocationSeries.fs +++ b/samples/Fc/Domain/LocationSeries.fs @@ -32,14 +32,14 @@ type Service internal (resolve, ?maxAttempts) = let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 2) member __.Read(locationId) : Async = - let (Stream agg) = locationId - agg.Query toActiveEpoch + let (Stream stream) = locationId + stream.Query toActiveEpoch member __.ActivateEpoch(locationId,epochId) : Async = - let (Stream agg) = locationId - agg.Transact(interpretActivateEpoch epochId) + let (Stream stream) = locationId + stream.Transact(interpretActivateEpoch epochId) -let createService resolve = Service(resolve) +let create resolve = Service(resolve) module Cosmos = @@ -49,4 +49,4 @@ module Cosmos = let opt = Equinox.ResolveOption.AllowStale fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id,opt) let createService (context,cache) = - createService (resolve (context,cache)) \ No newline at end of file + create (resolve (context,cache)) \ No newline at end of file From 35969092c2bc33461fd51a10590cc4f07500b3cd Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Thu, 28 Nov 2019 10:16:01 +0000 Subject: [PATCH 09/13] Add LocationEpoch tests --- samples/Fc/Domain.Tests/Domain.Tests.fsproj | 1 + samples/Fc/Domain.Tests/LocationEpochTests.fs | 54 +++++++++++++++++++ samples/Fc/Domain/Location.fs | 15 +++--- samples/Fc/Domain/LocationEpoch.fs | 6 +-- 4 files changed, 67 insertions(+), 9 deletions(-) create mode 100644 samples/Fc/Domain.Tests/LocationEpochTests.fs diff --git a/samples/Fc/Domain.Tests/Domain.Tests.fsproj b/samples/Fc/Domain.Tests/Domain.Tests.fsproj index 87c125692..5fccc91bd 100644 --- a/samples/Fc/Domain.Tests/Domain.Tests.fsproj +++ b/samples/Fc/Domain.Tests/Domain.Tests.fsproj @@ -9,6 +9,7 @@ + diff --git a/samples/Fc/Domain.Tests/LocationEpochTests.fs b/samples/Fc/Domain.Tests/LocationEpochTests.fs new file mode 100644 index 000000000..72392c1f4 --- /dev/null +++ b/samples/Fc/Domain.Tests/LocationEpochTests.fs @@ -0,0 +1,54 @@ +module LocationEpochTests + +open FsCheck.Xunit +open Location.Epoch +open Swensen.Unquote + +let interpret delta _balance = + match delta with + | 0 -> [] + | delta -> [Events.Delta { value = delta }] + +let validateAndInterpret expectedBalance delta balance = + test <@ expectedBalance = balance @> + interpret delta balance + +let verifyDeltaEvent delta events = + let dEvents = events |> List.filter (function Events.Delta _ -> true | _ -> false) + test <@ interpret delta (Unchecked.defaultof<_>) = dEvents @> + +let [] properties carriedForward delta1 closeImmediately delta2 close = + + (* Starting with an empty stream, we'll need to supply the balance carried forward, optionally we apply a delta and potentially close *) + + let initialShouldClose _state = closeImmediately + let res,events = sync (Some carriedForward) (Some (validateAndInterpret carriedForward delta1)) initialShouldClose Folds.initial + let cfEvents events = events |> List.filter (function Events.CarriedForward _ -> true | _ -> false) + let closeEvents events = events |> List.filter (function Events.Closed -> true | _ -> false) + let state1 = Folds.fold Folds.initial events + let expectedBalance = carriedForward + delta1 + // Only expect closing if it was requested + let expectImmediateClose = closeImmediately + test <@ res.worked + && expectedBalance = res.balance @> + test <@ [Events.CarriedForward { initial = carriedForward }] = cfEvents events + && (not expectImmediateClose || 1 = Seq.length (closeEvents events)) @> + verifyDeltaEvent delta1 events + + (* After initializing, validate we don't need to supply a carriedForward, and don't produce a CarriedForward event *) + + let shouldClose _state = close + let { isOpen = isOpen; worked = worked; balance = bal },events = sync None (Some (validateAndInterpret expectedBalance delta2)) shouldClose state1 + let expectedBalance = if expectImmediateClose then expectedBalance else expectedBalance + delta2 + test <@ [] = cfEvents events + && (expectImmediateClose || not close || 1 = Seq.length (closeEvents events)) @> + test <@ (expectImmediateClose || close || isOpen) + && expectedBalance = bal @> + if not expectImmediateClose then + test <@ worked @> + verifyDeltaEvent delta2 events + +let [] ``codec can roundtrip`` event = + let ee = Events.codec.Encode(None,event) + let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + test <@ Some event = Events.codec.TryDecode ie @> \ No newline at end of file diff --git a/samples/Fc/Domain/Location.fs b/samples/Fc/Domain/Location.fs index 22250b2ad..f9060b5f4 100644 --- a/samples/Fc/Domain/Location.fs +++ b/samples/Fc/Domain/Location.fs @@ -3,16 +3,19 @@ namespace Location /// Manages a Series of Epochs, with a running total being carried forward to the next Epoch when it's Closed type LocationService(zeroBalance, shouldClose, series : Location.Series.Service, epoch : Location.Epoch.Service) = - let rec execute locationId originEpochId interpret = - let rec aux epochId balanceToCarryForward = async { + let rec execute locationId originEpochId = + let rec aux epochId balanceToCarryForward interpret = async { match! epoch.Sync(locationId,epochId,balanceToCarryForward,interpret,shouldClose) with - | { balance = bal; isComplete = true } -> + | { balance = bal; isOpen = true } -> if originEpochId <> epochId then - do! series.ActivateEpoch(locationId,epochId) + do! series.ActivateEpoch(locationId, epochId) return bal + | { balance = bal; worked = true } -> + let successorEpochId = LocationEpochId.next epochId + return! aux successorEpochId (Some bal) None | { balance = bal } -> let successorEpochId = LocationEpochId.next epochId - return! aux successorEpochId (Some bal) } + return! aux successorEpochId (Some bal) interpret } aux member __.Execute(locationId, interpret) = async { @@ -21,4 +24,4 @@ type LocationService(zeroBalance, shouldClose, series : Location.Series.Service, match activeEpoch with | None -> LocationEpochId.parse -1,LocationEpochId.parse 0,Some zeroBalance | Some activeEpochId -> activeEpochId,activeEpochId,None - return! execute locationId originEpochId interpret epochId balanceCarriedForward } \ No newline at end of file + return! execute locationId originEpochId epochId interpret balanceCarriedForward } \ No newline at end of file diff --git a/samples/Fc/Domain/LocationEpoch.fs b/samples/Fc/Domain/LocationEpoch.fs index 3d608cbca..3153be1f8 100644 --- a/samples/Fc/Domain/LocationEpoch.fs +++ b/samples/Fc/Domain/LocationEpoch.fs @@ -38,7 +38,7 @@ type private Accumulator() = | res, xs -> acc.AddRange xs; res, Folds.fold state (Seq.ofList xs) member __.Accumulated = List.ofSeq acc -type Result = { balance : Folds.Balance; isComplete : bool } +type Result = { balance : Folds.Balance; worked : bool; isOpen : bool } let sync (balanceCarriedForward : Folds.Balance option) (interpret : (Folds.Balance -> Events.Event list) option) shouldClose state : Result*Events.Event list = let acc = Accumulator() @@ -53,7 +53,7 @@ let sync (balanceCarriedForward : Folds.Balance option) (interpret : (Folds.Bala acc.Ingest state <| match state, interpret with | Folds.Initial, _ -> failwith "We've just guaranteed not Initial" - | Folds.Open _, None -> false, [] + | Folds.Open _, None -> true, [] | Folds.Open bal, Some interpret -> true, interpret bal | Folds.Closed _, _ -> false, [] // Finally (iff we're `Open`, have `worked`, and `shouldClose`), we generate a Closed event @@ -64,7 +64,7 @@ let sync (balanceCarriedForward : Folds.Balance option) (interpret : (Folds.Bala | Folds.Open bal as state when worked && shouldClose state -> (bal, false), [Events.Closed] | Folds.Open bal -> (bal, true), [] | Folds.Closed bal -> (bal, false), [] - { balance = balance; isComplete = initialized || worked || isOpen }, acc.Accumulated + { balance = balance; worked = worked; isOpen = isOpen }, acc.Accumulated type Service internal (resolve, ?maxAttempts) = From 8b133f861f10374e793ccf3dfb8fcffb3edabc38 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 29 Nov 2019 12:42:17 +0000 Subject: [PATCH 10/13] Add Location integration test --- samples/Fc/Domain.Tests/Domain.Tests.fsproj | 1 + samples/Fc/Domain.Tests/LocationEpochTests.fs | 14 +++--- samples/Fc/Domain.Tests/LocationTests.fs | 50 ++++++++++++++++++- samples/Fc/Domain/Location.fs | 41 ++++++++++----- samples/Fc/Domain/LocationEpoch.fs | 48 +++++++++--------- samples/Fc/Domain/LocationSeries.fs | 4 +- 6 files changed, 111 insertions(+), 47 deletions(-) diff --git a/samples/Fc/Domain.Tests/Domain.Tests.fsproj b/samples/Fc/Domain.Tests/Domain.Tests.fsproj index 5fccc91bd..a1b3c3c0a 100644 --- a/samples/Fc/Domain.Tests/Domain.Tests.fsproj +++ b/samples/Fc/Domain.Tests/Domain.Tests.fsproj @@ -22,6 +22,7 @@ + diff --git a/samples/Fc/Domain.Tests/LocationEpochTests.fs b/samples/Fc/Domain.Tests/LocationEpochTests.fs index 72392c1f4..8a4e14a4a 100644 --- a/samples/Fc/Domain.Tests/LocationEpochTests.fs +++ b/samples/Fc/Domain.Tests/LocationEpochTests.fs @@ -6,8 +6,8 @@ open Swensen.Unquote let interpret delta _balance = match delta with - | 0 -> [] - | delta -> [Events.Delta { value = delta }] + | 0 -> (),[] + | delta -> (),[Events.Delta { value = delta }] let validateAndInterpret expectedBalance delta balance = test <@ expectedBalance = balance @> @@ -15,21 +15,21 @@ let validateAndInterpret expectedBalance delta balance = let verifyDeltaEvent delta events = let dEvents = events |> List.filter (function Events.Delta _ -> true | _ -> false) - test <@ interpret delta (Unchecked.defaultof<_>) = dEvents @> + test <@ interpret delta (Unchecked.defaultof<_>) = ((),dEvents) @> let [] properties carriedForward delta1 closeImmediately delta2 close = (* Starting with an empty stream, we'll need to supply the balance carried forward, optionally we apply a delta and potentially close *) let initialShouldClose _state = closeImmediately - let res,events = sync (Some carriedForward) (Some (validateAndInterpret carriedForward delta1)) initialShouldClose Folds.initial + let res,events = sync (Some carriedForward) (validateAndInterpret carriedForward delta1) initialShouldClose Folds.initial let cfEvents events = events |> List.filter (function Events.CarriedForward _ -> true | _ -> false) let closeEvents events = events |> List.filter (function Events.Closed -> true | _ -> false) let state1 = Folds.fold Folds.initial events let expectedBalance = carriedForward + delta1 // Only expect closing if it was requested let expectImmediateClose = closeImmediately - test <@ res.worked + test <@ Option.isSome res.result && expectedBalance = res.balance @> test <@ [Events.CarriedForward { initial = carriedForward }] = cfEvents events && (not expectImmediateClose || 1 = Seq.length (closeEvents events)) @> @@ -38,14 +38,14 @@ let [] properties carriedForward delta1 closeImmediately delta2 close (* After initializing, validate we don't need to supply a carriedForward, and don't produce a CarriedForward event *) let shouldClose _state = close - let { isOpen = isOpen; worked = worked; balance = bal },events = sync None (Some (validateAndInterpret expectedBalance delta2)) shouldClose state1 + let { isOpen = isOpen; result = worked; balance = bal },events = sync None (validateAndInterpret expectedBalance delta2) shouldClose state1 let expectedBalance = if expectImmediateClose then expectedBalance else expectedBalance + delta2 test <@ [] = cfEvents events && (expectImmediateClose || not close || 1 = Seq.length (closeEvents events)) @> test <@ (expectImmediateClose || close || isOpen) && expectedBalance = bal @> if not expectImmediateClose then - test <@ worked @> + test <@ Option.isSome worked @> verifyDeltaEvent delta2 events let [] ``codec can roundtrip`` event = diff --git a/samples/Fc/Domain.Tests/LocationTests.fs b/samples/Fc/Domain.Tests/LocationTests.fs index c984300ab..75f4e5e7e 100644 --- a/samples/Fc/Domain.Tests/LocationTests.fs +++ b/samples/Fc/Domain.Tests/LocationTests.fs @@ -1,3 +1,51 @@ module LocationTests -let [] dummy () = () \ No newline at end of file +open Location +open Swensen.Unquote +open System + +module Location = + + open Equinox.MemoryStore + + module Series = + + let resolve store = Resolver(store, Series.Events.codec, Series.Folds.fold, Series.Folds.initial).Resolve + + module Epoch = + + let resolve store = Resolver(store, Epoch.Events.codec, Epoch.Folds.fold, Epoch.Folds.initial).Resolve + + module MemoryStore = + + let createService (zeroBalance, shouldClose) store = + let maxAttempts = Int32.MaxValue + let series = Series.create (Series.resolve store) maxAttempts + let epochs = Epoch.create (Epoch.resolve store) maxAttempts + create (zeroBalance, shouldClose) (series, epochs) + +open FsCheck + +type FsCheckGenerators = + static member NonNullStrings = Arb.Default.String() |> Arb.filter (fun s -> s <> null) + +type NonNullStringsPropertyAttribute() = + inherit FsCheck.Xunit.PropertyAttribute(QuietOnSuccess = true, Arbitrary=[| typeof |]) + +let [] ``parallel properties`` loc1 (locations : _[]) (deltas : _[]) maxEvents = Async.RunSynchronously <| async { + let store = Equinox.MemoryStore.VolatileStore() + let zeroBalance = 0 + let maxEvents = max 1 maxEvents + let locations = Seq.append locations (Seq.singleton loc1) |> Seq.toArray + let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents + let service = Location.MemoryStore.createService (zeroBalance, shouldClose) store + let adjust delta (bal : Epoch.Folds.Balance) = + let value = max -bal delta + if value = 0 then 0, [] + else value, [Location.Epoch.Events.Delta { value = value }] + let updates = deltas |> Seq.mapi (fun i x -> locations.[i % locations.Length], x) |> Seq.cache + + let! applied = seq { for loc,x in updates -> async { let! _,eff = service.Execute(loc, adjust x) in return loc,eff } } |> Async.Parallel + let! balances = seq { for loc in locations -> async { let! bal,() = service.Execute(loc,(fun _ -> (),[])) in return loc,bal } } |> Async.Parallel + let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) applied |> Seq.groupBy fst |> Seq.map (fun (l,xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq + test <@ expectedBalances = Set.ofSeq balances @> } \ No newline at end of file diff --git a/samples/Fc/Domain/Location.fs b/samples/Fc/Domain/Location.fs index f9060b5f4..52380f134 100644 --- a/samples/Fc/Domain/Location.fs +++ b/samples/Fc/Domain/Location.fs @@ -1,27 +1,42 @@ namespace Location +type Wip<'R> = Pending of decide : (Epoch.Folds.Balance -> 'R*Epoch.Events.Event list) | Complete of 'R + /// Manages a Series of Epochs, with a running total being carried forward to the next Epoch when it's Closed -type LocationService(zeroBalance, shouldClose, series : Location.Series.Service, epoch : Location.Epoch.Service) = +type LocationService internal (zeroBalance, shouldClose, series : Series.Service, epochs : Epoch.Service) = let rec execute locationId originEpochId = - let rec aux epochId balanceToCarryForward interpret = async { - match! epoch.Sync(locationId,epochId,balanceToCarryForward,interpret,shouldClose) with - | { balance = bal; isOpen = true } -> + let rec aux epochId balanceToCarryForward wip = async { + let decide state = match wip with Complete r -> r,[] | Pending decide -> decide state + match! epochs.Sync(locationId, epochId, balanceToCarryForward, decide, shouldClose) with + | { balance = bal; result = Some res; isOpen = true } -> if originEpochId <> epochId then do! series.ActivateEpoch(locationId, epochId) - return bal - | { balance = bal; worked = true } -> + return bal, res + | { balance = bal; result = Some res } -> let successorEpochId = LocationEpochId.next epochId - return! aux successorEpochId (Some bal) None + return! aux successorEpochId (Some bal) (Wip.Complete res) | { balance = bal } -> let successorEpochId = LocationEpochId.next epochId - return! aux successorEpochId (Some bal) interpret } + return! aux successorEpochId (Some bal) wip } aux - member __.Execute(locationId, interpret) = async { + member __.Execute(locationId, decide) = async { let! activeEpoch = series.Read(locationId) - let originEpochId,epochId,balanceCarriedForward = + let originEpochId, epochId, balanceCarriedForward = match activeEpoch with - | None -> LocationEpochId.parse -1,LocationEpochId.parse 0,Some zeroBalance - | Some activeEpochId -> activeEpochId,activeEpochId,None - return! execute locationId originEpochId epochId interpret balanceCarriedForward } \ No newline at end of file + | None -> LocationEpochId.parse -1, LocationEpochId.parse 0, Some zeroBalance + | Some activeEpochId -> activeEpochId, activeEpochId, None + return! execute locationId originEpochId epochId balanceCarriedForward (Wip.Pending decide)} + +[] +module Helpers = + let create (zeroBalance, shouldClose) (series,epochs) = + LocationService(zeroBalance, shouldClose, series, epochs) + +module Cosmos = + + let createService (zeroBalance, shouldClose) (context,cache) = + let series = Series.Cosmos.createService (context, cache) + let epochs = Epoch.Cosmos.createService (context, cache) + create (zeroBalance, shouldClose) (series, epochs) \ No newline at end of file diff --git a/samples/Fc/Domain/LocationEpoch.fs b/samples/Fc/Domain/LocationEpoch.fs index 3153be1f8..c7c8f5475 100644 --- a/samples/Fc/Domain/LocationEpoch.fs +++ b/samples/Fc/Domain/LocationEpoch.fs @@ -17,13 +17,14 @@ module Events = module Folds = type Balance = int - type State = Initial | Open of Balance | Closed of Balance + type OpenState = { count : int; value : Balance } + type State = Initial | Open of OpenState | Closed of Balance let initial = Initial let evolve state event = match event, state with - | Events.CarriedForward e, Initial -> Open e.initial - | Events.Delta e, Open bal -> Open (bal + e.value) - | Events.Closed, Open bal -> Closed bal + | Events.CarriedForward e, Initial -> Open { count = 0; value = e.initial } + | Events.Delta e, Open bal -> Open { count = bal.count + 1; value = bal.value + e.value } + | Events.Closed, Open { value = bal } -> Closed bal | Events.CarriedForward _, (Open _|Closed _ as x) -> failwithf "CarriedForward : Unexpected %A" x | Events.Delta _, (Initial|Closed _ as x) -> failwithf "Delta : Unexpected %A" x | Events.Closed, (Initial|Closed _ as x) -> failwithf "Closed : Unexpected %A" x @@ -38,33 +39,32 @@ type private Accumulator() = | res, xs -> acc.AddRange xs; res, Folds.fold state (Seq.ofList xs) member __.Accumulated = List.ofSeq acc -type Result = { balance : Folds.Balance; worked : bool; isOpen : bool } +type Result<'t> = { balance : Folds.Balance; result : 't option; isOpen : bool } -let sync (balanceCarriedForward : Folds.Balance option) (interpret : (Folds.Balance -> Events.Event list) option) shouldClose state : Result*Events.Event list = +let sync (balanceCarriedForward : Folds.Balance option) (decide : (Folds.Balance -> 't*Events.Event list)) shouldClose state : Result<'t>*Events.Event list = let acc = Accumulator() // We always want to have a CarriedForward event at the start of any Epoch's event stream - let initialized, state = + let (), state = acc.Ingest state <| match state with - | Folds.Initial -> true, [Events.CarriedForward { initial = Option.get balanceCarriedForward }] - | Folds.Open _ | Folds.Closed _ -> false, [] - // If an `interpret` is supplied, we run that (unless we determine we're in Closed state) - let worked, state = + | Folds.Initial -> (), [Events.CarriedForward { initial = Option.get balanceCarriedForward }] + | Folds.Open _ | Folds.Closed _ -> (), [] + // Run, unless we determine we're in Closed state + let result, state = acc.Ingest state <| - match state, interpret with - | Folds.Initial, _ -> failwith "We've just guaranteed not Initial" - | Folds.Open _, None -> true, [] - | Folds.Open bal, Some interpret -> true, interpret bal - | Folds.Closed _, _ -> false, [] - // Finally (iff we're `Open`, have `worked`, and `shouldClose`), we generate a Closed event + match state with + | Folds.Initial -> failwith "We've just guaranteed not Initial" + | Folds.Open { value = bal } -> let r,es = decide bal in Some r,es + | Folds.Closed _ -> None, [] + // Finally (iff we're `Open`, have worked, and `shouldClose`), we generate a Closed event let (balance, isOpen), _ = acc.Ingest state <| match state with | Folds.Initial -> failwith "Can't be Initial" - | Folds.Open bal as state when worked && shouldClose state -> (bal, false), [Events.Closed] - | Folds.Open bal -> (bal, true), [] + | Folds.Open ({ value = bal } as openState) when Option.isSome result && shouldClose openState -> (bal, false), [Events.Closed] + | Folds.Open { value = bal } -> (bal, true), [] | Folds.Closed bal -> (bal, false), [] - { balance = balance; worked = worked; isOpen = isOpen }, acc.Accumulated + { balance = balance; result = result; isOpen = isOpen }, acc.Accumulated type Service internal (resolve, ?maxAttempts) = @@ -74,11 +74,11 @@ type Service internal (resolve, ?maxAttempts) = Equinox.AggregateId(Events.categoryId, id) let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 2) - member __.Sync(locationId, epochId, prevEpochBalanceCarriedForward, interpret, shouldClose) : Async = + member __.Sync<'R>(locationId, epochId, prevEpochBalanceCarriedForward, decide, shouldClose) : Async> = let (Stream stream) = (locationId, epochId) - stream.Transact(sync prevEpochBalanceCarriedForward interpret shouldClose) + stream.Transact(sync prevEpochBalanceCarriedForward decide shouldClose) -let create resolve = Service(resolve) +let create resolve maxAttempts = Service(resolve, maxAttempts) module Cosmos = @@ -87,4 +87,4 @@ module Cosmos = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve let createService (context,cache) = - create (resolve (context,cache)) \ No newline at end of file + create (resolve (context,cache)) 3 \ No newline at end of file diff --git a/samples/Fc/Domain/LocationSeries.fs b/samples/Fc/Domain/LocationSeries.fs index 53ae54b4f..8b4573bcb 100644 --- a/samples/Fc/Domain/LocationSeries.fs +++ b/samples/Fc/Domain/LocationSeries.fs @@ -39,7 +39,7 @@ type Service internal (resolve, ?maxAttempts) = let (Stream stream) = locationId stream.Transact(interpretActivateEpoch epochId) -let create resolve = Service(resolve) +let create resolve maxAttempts = Service(resolve, maxAttempts) module Cosmos = @@ -49,4 +49,4 @@ module Cosmos = let opt = Equinox.ResolveOption.AllowStale fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id,opt) let createService (context,cache) = - create (resolve (context,cache)) \ No newline at end of file + create (resolve (context,cache)) 3 \ No newline at end of file From c8c220abdcd4b81a5cb04f3607001aa1bd569ccb Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 29 Nov 2019 13:26:12 +0000 Subject: [PATCH 11/13] Cosmos tests wip --- samples/Fc/Domain.Tests/Domain.Tests.fsproj | 2 + samples/Fc/Domain.Tests/Infrastructure.fs | 43 +++++++++++++++++++++ samples/Fc/Domain.Tests/LocationTests.fs | 36 +++++++++++++++-- samples/Fc/Domain/Location.fs | 6 +-- samples/Fc/Domain/LocationEpoch.fs | 4 +- samples/Fc/Domain/LocationSeries.fs | 4 +- 6 files changed, 85 insertions(+), 10 deletions(-) create mode 100644 samples/Fc/Domain.Tests/Infrastructure.fs diff --git a/samples/Fc/Domain.Tests/Domain.Tests.fsproj b/samples/Fc/Domain.Tests/Domain.Tests.fsproj index a1b3c3c0a..afc036013 100644 --- a/samples/Fc/Domain.Tests/Domain.Tests.fsproj +++ b/samples/Fc/Domain.Tests/Domain.Tests.fsproj @@ -8,12 +8,14 @@ + + diff --git a/samples/Fc/Domain.Tests/Infrastructure.fs b/samples/Fc/Domain.Tests/Infrastructure.fs new file mode 100644 index 000000000..5094e3593 --- /dev/null +++ b/samples/Fc/Domain.Tests/Infrastructure.fs @@ -0,0 +1,43 @@ +[] +module Infrastructure + +open Serilog +open System + +module EnvVar = + let tryGet k = Environment.GetEnvironmentVariable k |> Option.ofObj + +module Cosmos = + + let connect () = + match EnvVar.tryGet "EQUINOX_COSMOS_CONNECTION", EnvVar.tryGet "EQUINOX_COSMOS_DATABASE", EnvVar.tryGet "EQUINOX_COSMOS_CONTAINER" with + | Some s,Some d,Some c -> + let appName = "Domain.Tests" + let discovery = Equinox.Cosmos.Discovery.FromConnectionString s + let connector = Equinox.Cosmos.Connector(TimeSpan.FromSeconds 5., 1, TimeSpan.FromSeconds 5., Serilog.Log.Logger) + let connection = connector.Connect(appName,discovery) |> Async.RunSynchronously + let context = Equinox.Cosmos.Context(connection,d,c) + let cache = Equinox.Cache (appName, 10) + context,cache + | s,d,c -> + failwithf "Connection, Database and Container EQUINOX_COSMOS_* Environment variables are required (%b,%b,%b)" + (Option.isSome s) (Option.isSome d) (Option.isSome c) + +/// Adapts the XUnit ITestOutputHelper to be a Serilog Sink +type TestOutputAdapter(testOutput : Xunit.Abstractions.ITestOutputHelper) = + let formatter = Serilog.Formatting.Display.MessageTemplateTextFormatter("{Timestamp:HH:mm:ss.fff zzz} [{Level:u3}] {Message}{Properties}{NewLine}{Exception}", null); + let writeSerilogEvent logEvent = + use writer = new System.IO.StringWriter() + formatter.Format(logEvent, writer) + let messageLine = string writer + testOutput.WriteLine messageLine + System.Diagnostics.Debug.Write messageLine + interface Serilog.Core.ILogEventSink with member __.Emit logEvent = writeSerilogEvent logEvent + +/// Creates a Serilog Log chain emitting to the cited Sink (only) +let createLogger sink = + Serilog.LoggerConfiguration() +// .MinimumLevel.Debug() + .Destructure.FSharpTypes() + .WriteTo.Sink(sink) + .CreateLogger() diff --git a/samples/Fc/Domain.Tests/LocationTests.fs b/samples/Fc/Domain.Tests/LocationTests.fs index 75f4e5e7e..536441101 100644 --- a/samples/Fc/Domain.Tests/LocationTests.fs +++ b/samples/Fc/Domain.Tests/LocationTests.fs @@ -45,7 +45,37 @@ let [] ``parallel properties`` loc1 (locations : _[]) (d else value, [Location.Epoch.Events.Delta { value = value }] let updates = deltas |> Seq.mapi (fun i x -> locations.[i % locations.Length], x) |> Seq.cache - let! applied = seq { for loc,x in updates -> async { let! _,eff = service.Execute(loc, adjust x) in return loc,eff } } |> Async.Parallel + let! appliedDeltas = seq { for loc,x in updates -> async { let! _,eff = service.Execute(loc, adjust x) in return loc,eff } } |> Async.Parallel let! balances = seq { for loc in locations -> async { let! bal,() = service.Execute(loc,(fun _ -> (),[])) in return loc,bal } } |> Async.Parallel - let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) applied |> Seq.groupBy fst |> Seq.map (fun (l,xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq - test <@ expectedBalances = Set.ofSeq balances @> } \ No newline at end of file + let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) appliedDeltas |> Seq.groupBy fst |> Seq.map (fun (l,xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq + test <@ expectedBalances = Set.ofSeq balances @> } + + +let (|Id|) (x : Guid) = x.ToString "N" |> FSharp.UMX.UMX.tag +let (|Ids|) (xs : Guid[]) = xs |> Array.map (|Id|) + +type Cosmos(testOutput) = + + let context,cache = Cosmos.connect () + + // NOTE this works very well for as long as we can guarantee we have a single instance of the Daemon in play + // i.e. we'll need to remove this e.g. if the Cosmos ones can run at the same time as this suite + let log = testOutput |> TestOutputAdapter |> createLogger + do Serilog.Log.Logger <- log + + let [] properties (Id loc1) (Ids locations) (deltas : _[]) maxEvents = Async.RunSynchronously <| async { + let zeroBalance = 0 + let maxEvents = max 1 maxEvents + let locations = Seq.append locations (Seq.singleton loc1) |> Seq.toArray + let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents + let service = Location.Cosmos.createService (zeroBalance, shouldClose) (context,cache,Int32.MaxValue) + let adjust delta (bal : Epoch.Folds.Balance) = + let value = max -bal delta + if value = 0 then 0, [] + else value, [Location.Epoch.Events.Delta { value = value }] + let updates = deltas |> Seq.mapi (fun i x -> locations.[i % locations.Length], x) |> Seq.cache + + let! appliedDeltas = seq { for loc,x in updates -> async { let! _,eff = service.Execute(loc, adjust x) in return loc,eff } } |> Async.Parallel + let! balances = seq { for loc in locations -> async { let! bal,() = service.Execute(loc,(fun _ -> (),[])) in return loc,bal } } |> Async.Parallel + let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) appliedDeltas |> Seq.groupBy fst |> Seq.map (fun (l,xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq + test <@ expectedBalances = Set.ofSeq balances @> } \ No newline at end of file diff --git a/samples/Fc/Domain/Location.fs b/samples/Fc/Domain/Location.fs index 52380f134..d09c87925 100644 --- a/samples/Fc/Domain/Location.fs +++ b/samples/Fc/Domain/Location.fs @@ -36,7 +36,7 @@ module Helpers = module Cosmos = - let createService (zeroBalance, shouldClose) (context,cache) = - let series = Series.Cosmos.createService (context, cache) - let epochs = Epoch.Cosmos.createService (context, cache) + let createService (zeroBalance, shouldClose) (context,cache,maxAttempts) = + let series = Series.Cosmos.createService (context, cache, maxAttempts) + let epochs = Epoch.Cosmos.createService (context, cache, maxAttempts) create (zeroBalance, shouldClose) (series, epochs) \ No newline at end of file diff --git a/samples/Fc/Domain/LocationEpoch.fs b/samples/Fc/Domain/LocationEpoch.fs index c7c8f5475..c26d81e9a 100644 --- a/samples/Fc/Domain/LocationEpoch.fs +++ b/samples/Fc/Domain/LocationEpoch.fs @@ -86,5 +86,5 @@ module Cosmos = let resolve (context,cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve - let createService (context,cache) = - create (resolve (context,cache)) 3 \ No newline at end of file + let createService (context,cache,maxAttempts) = + create (resolve (context,cache)) maxAttempts \ No newline at end of file diff --git a/samples/Fc/Domain/LocationSeries.fs b/samples/Fc/Domain/LocationSeries.fs index 8b4573bcb..e75ccb9b4 100644 --- a/samples/Fc/Domain/LocationSeries.fs +++ b/samples/Fc/Domain/LocationSeries.fs @@ -48,5 +48,5 @@ module Cosmos = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) let opt = Equinox.ResolveOption.AllowStale fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id,opt) - let createService (context,cache) = - create (resolve (context,cache)) 3 \ No newline at end of file + let createService (context, cache, maxAttempts) = + create (resolve (context,cache)) maxAttempts \ No newline at end of file From 191712a49cd73763b314ad013bf25336b4cf6044 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 29 Nov 2019 15:56:04 +0000 Subject: [PATCH 12/13] Clean Location test --- samples/Fc/Domain.Tests/Infrastructure.fs | 5 +++++ samples/Fc/Domain.Tests/LocationTests.fs | 19 +++---------------- samples/Fc/Domain/Location.fs | 2 +- samples/Fc/Domain/LocationEpoch.fs | 4 ++-- 4 files changed, 11 insertions(+), 19 deletions(-) diff --git a/samples/Fc/Domain.Tests/Infrastructure.fs b/samples/Fc/Domain.Tests/Infrastructure.fs index 5094e3593..3b7caffe6 100644 --- a/samples/Fc/Domain.Tests/Infrastructure.fs +++ b/samples/Fc/Domain.Tests/Infrastructure.fs @@ -4,7 +4,12 @@ module Infrastructure open Serilog open System +let (|Id|) (x : Guid) = x.ToString "N" |> FSharp.UMX.UMX.tag +let (|Ids|) (xs : Guid[]) = xs |> Array.map (|Id|) +let (|IdsAtLeastOne|) (Id x, Ids xs) = Seq.append xs (Seq.singleton x) |> Seq.toArray + module EnvVar = + let tryGet k = Environment.GetEnvironmentVariable k |> Option.ofObj module Cosmos = diff --git a/samples/Fc/Domain.Tests/LocationTests.fs b/samples/Fc/Domain.Tests/LocationTests.fs index 536441101..54a4f4c6e 100644 --- a/samples/Fc/Domain.Tests/LocationTests.fs +++ b/samples/Fc/Domain.Tests/LocationTests.fs @@ -1,5 +1,6 @@ module LocationTests +open FsCheck.Xunit open Location open Swensen.Unquote open System @@ -24,19 +25,10 @@ module Location = let epochs = Epoch.create (Epoch.resolve store) maxAttempts create (zeroBalance, shouldClose) (series, epochs) -open FsCheck - -type FsCheckGenerators = - static member NonNullStrings = Arb.Default.String() |> Arb.filter (fun s -> s <> null) - -type NonNullStringsPropertyAttribute() = - inherit FsCheck.Xunit.PropertyAttribute(QuietOnSuccess = true, Arbitrary=[| typeof |]) - -let [] ``parallel properties`` loc1 (locations : _[]) (deltas : _[]) maxEvents = Async.RunSynchronously <| async { +let [] ``parallel properties`` (IdsAtLeastOne locations) (deltas : _[]) maxEvents = Async.RunSynchronously <| async { let store = Equinox.MemoryStore.VolatileStore() let zeroBalance = 0 let maxEvents = max 1 maxEvents - let locations = Seq.append locations (Seq.singleton loc1) |> Seq.toArray let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents let service = Location.MemoryStore.createService (zeroBalance, shouldClose) store let adjust delta (bal : Epoch.Folds.Balance) = @@ -50,10 +42,6 @@ let [] ``parallel properties`` loc1 (locations : _[]) (d let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) appliedDeltas |> Seq.groupBy fst |> Seq.map (fun (l,xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq test <@ expectedBalances = Set.ofSeq balances @> } - -let (|Id|) (x : Guid) = x.ToString "N" |> FSharp.UMX.UMX.tag -let (|Ids|) (xs : Guid[]) = xs |> Array.map (|Id|) - type Cosmos(testOutput) = let context,cache = Cosmos.connect () @@ -63,10 +51,9 @@ type Cosmos(testOutput) = let log = testOutput |> TestOutputAdapter |> createLogger do Serilog.Log.Logger <- log - let [] properties (Id loc1) (Ids locations) (deltas : _[]) maxEvents = Async.RunSynchronously <| async { + let [] properties (IdsAtLeastOne locations) (deltas : _[]) maxEvents = Async.RunSynchronously <| async { let zeroBalance = 0 let maxEvents = max 1 maxEvents - let locations = Seq.append locations (Seq.singleton loc1) |> Seq.toArray let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents let service = Location.Cosmos.createService (zeroBalance, shouldClose) (context,cache,Int32.MaxValue) let adjust delta (bal : Epoch.Folds.Balance) = diff --git a/samples/Fc/Domain/Location.fs b/samples/Fc/Domain/Location.fs index d09c87925..404be4dfb 100644 --- a/samples/Fc/Domain/Location.fs +++ b/samples/Fc/Domain/Location.fs @@ -22,7 +22,7 @@ type LocationService internal (zeroBalance, shouldClose, series : Series.Service aux member __.Execute(locationId, decide) = async { - let! activeEpoch = series.Read(locationId) + let! activeEpoch = series.Read locationId let originEpochId, epochId, balanceCarriedForward = match activeEpoch with | None -> LocationEpochId.parse -1, LocationEpochId.parse 0, Some zeroBalance diff --git a/samples/Fc/Domain/LocationEpoch.fs b/samples/Fc/Domain/LocationEpoch.fs index c26d81e9a..a76b7807d 100644 --- a/samples/Fc/Domain/LocationEpoch.fs +++ b/samples/Fc/Domain/LocationEpoch.fs @@ -56,12 +56,12 @@ let sync (balanceCarriedForward : Folds.Balance option) (decide : (Folds.Balance | Folds.Initial -> failwith "We've just guaranteed not Initial" | Folds.Open { value = bal } -> let r,es = decide bal in Some r,es | Folds.Closed _ -> None, [] - // Finally (iff we're `Open`, have worked, and `shouldClose`), we generate a Closed event + // Finally (iff we're `Open`, have run a `decide` and `shouldClose`), we generate a Closed event let (balance, isOpen), _ = acc.Ingest state <| match state with | Folds.Initial -> failwith "Can't be Initial" - | Folds.Open ({ value = bal } as openState) when Option.isSome result && shouldClose openState -> (bal, false), [Events.Closed] + | Folds.Open ({ value = bal } as openState) when shouldClose openState -> (bal, false), [Events.Closed] | Folds.Open { value = bal } -> (bal, true), [] | Folds.Closed bal -> (bal, false), [] { balance = balance; result = result; isOpen = isOpen }, acc.Accumulated From b38335b2ac7214aa895f942bceac2f5bffa0a7c3 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 29 Nov 2019 16:27:50 +0000 Subject: [PATCH 13/13] DRY Location test --- samples/Fc/Domain.Tests/Infrastructure.fs | 1 + samples/Fc/Domain.Tests/LocationTests.fs | 45 ++++++++++++----------- samples/Fc/Domain/Location.fs | 4 +- 3 files changed, 26 insertions(+), 24 deletions(-) diff --git a/samples/Fc/Domain.Tests/Infrastructure.fs b/samples/Fc/Domain.Tests/Infrastructure.fs index 3b7caffe6..1c50bd69e 100644 --- a/samples/Fc/Domain.Tests/Infrastructure.fs +++ b/samples/Fc/Domain.Tests/Infrastructure.fs @@ -5,6 +5,7 @@ open Serilog open System let (|Id|) (x : Guid) = x.ToString "N" |> FSharp.UMX.UMX.tag +let inline mkId () = Guid.NewGuid() |> (|Id|) let (|Ids|) (xs : Guid[]) = xs |> Array.map (|Id|) let (|IdsAtLeastOne|) (Id x, Ids xs) = Seq.append xs (Seq.singleton x) |> Seq.toArray diff --git a/samples/Fc/Domain.Tests/LocationTests.fs b/samples/Fc/Domain.Tests/LocationTests.fs index 54a4f4c6e..736cdf08f 100644 --- a/samples/Fc/Domain.Tests/LocationTests.fs +++ b/samples/Fc/Domain.Tests/LocationTests.fs @@ -1,6 +1,7 @@ module LocationTests open FsCheck.Xunit +open FSharp.UMX open Location open Swensen.Unquote open System @@ -25,44 +26,44 @@ module Location = let epochs = Epoch.create (Epoch.resolve store) maxAttempts create (zeroBalance, shouldClose) (series, epochs) -let [] ``parallel properties`` (IdsAtLeastOne locations) (deltas : _[]) maxEvents = Async.RunSynchronously <| async { - let store = Equinox.MemoryStore.VolatileStore() - let zeroBalance = 0 - let maxEvents = max 1 maxEvents - let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents - let service = Location.MemoryStore.createService (zeroBalance, shouldClose) store +let run (service : LocationService) (IdsAtLeastOne locations, deltas : _[]) = Async.RunSynchronously <| async { + let runId = mkId () // Need to make making state in store unique when replaying or shrinking + let locations = locations |> Array.map (fun x -> % (sprintf "%O_%O" runId x)) + + let updates = deltas |> Seq.mapi (fun i x -> locations.[i % locations.Length], x) |> Seq.cache + + (* Apply random deltas *) + let adjust delta (bal : Epoch.Folds.Balance) = let value = max -bal delta if value = 0 then 0, [] else value, [Location.Epoch.Events.Delta { value = value }] - let updates = deltas |> Seq.mapi (fun i x -> locations.[i % locations.Length], x) |> Seq.cache - let! appliedDeltas = seq { for loc,x in updates -> async { let! _,eff = service.Execute(loc, adjust x) in return loc,eff } } |> Async.Parallel - let! balances = seq { for loc in locations -> async { let! bal,() = service.Execute(loc,(fun _ -> (),[])) in return loc,bal } } |> Async.Parallel let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) appliedDeltas |> Seq.groupBy fst |> Seq.map (fun (l,xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq + + (* Verify loading yields identical state *) + + let! balances = seq { for loc in locations -> async { let! bal,() = service.Execute(loc,(fun _ -> (),[])) in return loc,bal } } |> Async.Parallel test <@ expectedBalances = Set.ofSeq balances @> } +let [] ``MemoryStore properties`` maxEvents args = + let store = Equinox.MemoryStore.VolatileStore() + let zeroBalance = 0 + let maxEvents = max 1 maxEvents + let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents + let service = Location.MemoryStore.createService (zeroBalance, shouldClose) store + run service args + type Cosmos(testOutput) = let context,cache = Cosmos.connect () - // NOTE this works very well for as long as we can guarantee we have a single instance of the Daemon in play - // i.e. we'll need to remove this e.g. if the Cosmos ones can run at the same time as this suite let log = testOutput |> TestOutputAdapter |> createLogger do Serilog.Log.Logger <- log - let [] properties (IdsAtLeastOne locations) (deltas : _[]) maxEvents = Async.RunSynchronously <| async { + let [] properties maxEvents args = let zeroBalance = 0 let maxEvents = max 1 maxEvents let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents let service = Location.Cosmos.createService (zeroBalance, shouldClose) (context,cache,Int32.MaxValue) - let adjust delta (bal : Epoch.Folds.Balance) = - let value = max -bal delta - if value = 0 then 0, [] - else value, [Location.Epoch.Events.Delta { value = value }] - let updates = deltas |> Seq.mapi (fun i x -> locations.[i % locations.Length], x) |> Seq.cache - - let! appliedDeltas = seq { for loc,x in updates -> async { let! _,eff = service.Execute(loc, adjust x) in return loc,eff } } |> Async.Parallel - let! balances = seq { for loc in locations -> async { let! bal,() = service.Execute(loc,(fun _ -> (),[])) in return loc,bal } } |> Async.Parallel - let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) appliedDeltas |> Seq.groupBy fst |> Seq.map (fun (l,xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq - test <@ expectedBalances = Set.ofSeq balances @> } \ No newline at end of file + run service args \ No newline at end of file diff --git a/samples/Fc/Domain/Location.fs b/samples/Fc/Domain/Location.fs index 404be4dfb..30518cae5 100644 --- a/samples/Fc/Domain/Location.fs +++ b/samples/Fc/Domain/Location.fs @@ -31,12 +31,12 @@ type LocationService internal (zeroBalance, shouldClose, series : Series.Service [] module Helpers = - let create (zeroBalance, shouldClose) (series,epochs) = + let create (zeroBalance, shouldClose) (series, epochs) = LocationService(zeroBalance, shouldClose, series, epochs) module Cosmos = - let createService (zeroBalance, shouldClose) (context,cache,maxAttempts) = + let createService (zeroBalance, shouldClose) (context, cache, maxAttempts) = let series = Series.Cosmos.createService (context, cache, maxAttempts) let epochs = Epoch.Cosmos.createService (context, cache, maxAttempts) create (zeroBalance, shouldClose) (series, epochs) \ No newline at end of file