Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Location Series example #181

Closed
wants to merge 13 commits into from
17 changes: 17 additions & 0 deletions Equinox.sln
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,12 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.SqlStreamStore.Post
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Core", "src\Equinox.Core\Equinox.Core.fsproj", "{3021659A-5CA4-4E06-AF00-2457ED3F105B}"
EndProject
Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Fc", "Fc", "{63634A65-F668-4054-AAF5-AFD81C278F50}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "samples\Fc\Domain\Domain.fsproj", "{6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}"
EndProject
Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "samples\Fc\Domain.Tests\Domain.Tests.fsproj", "{C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -181,6 +187,14 @@ Global
{3021659A-5CA4-4E06-AF00-2457ED3F105B}.Debug|Any CPU.Build.0 = Debug|Any CPU
{3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.ActiveCfg = Release|Any CPU
{3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.Build.0 = Release|Any CPU
{6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}.Debug|Any CPU.Build.0 = Debug|Any CPU
{6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}.Release|Any CPU.ActiveCfg = Release|Any CPU
{6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}.Release|Any CPU.Build.0 = Release|Any CPU
{C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -196,6 +210,9 @@ Global
{EC2EC658-3D85-44F3-AD2F-52AFCAFF8871} = {8CDE1CC3-8619-44DE-8B4D-4102CE476C35}
{8CDE1CC3-8619-44DE-8B4D-4102CE476C35} = {8F3EB30C-8BA3-4CC0-8361-0EA47C19ABB9}
{D82AAB2E-7264-421A-A893-63A37E5F08B6} = {8F3EB30C-8BA3-4CC0-8361-0EA47C19ABB9}
{63634A65-F668-4054-AAF5-AFD81C278F50} = {8F3EB30C-8BA3-4CC0-8361-0EA47C19ABB9}
{6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E} = {63634A65-F668-4054-AAF5-AFD81C278F50}
{C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7} = {63634A65-F668-4054-AAF5-AFD81C278F50}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {177E1E7B-E275-4FC6-AE3C-2C651ECCF71E}
Expand Down
31 changes: 31 additions & 0 deletions samples/Fc/Domain.Tests/Domain.Tests.fsproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.1</TargetFramework>
<WarningLevel>5</WarningLevel>
<RootNamespace>Fc.Domain.Tests</RootNamespace>
<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="LocationSeriesTests.fs" />
<Compile Include="LocationEpochTests.fs" />
<Compile Include="LocationTests.fs" />
</ItemGroup>

<ItemGroup>
<PackageReference Include="Destructurama.FSharp" Version="1.1.1-dev-00033" />
<PackageReference Include="FsCheck.xUnit" Version="2.14.0" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="16.3.0" />
<PackageReference Include="unquote" Version="4.0" />
<PackageReference Include="xunit" Version="2.4.1" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\..\..\src\Equinox.MemoryStore\Equinox.MemoryStore.fsproj" />
<ProjectReference Include="..\Domain\Domain.fsproj" />
</ItemGroup>

</Project>
49 changes: 49 additions & 0 deletions samples/Fc/Domain.Tests/Infrastructure.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
[<AutoOpen>]
module Infrastructure

open Serilog
open System

let (|Id|) (x : Guid) = x.ToString "N" |> FSharp.UMX.UMX.tag
let inline mkId () = Guid.NewGuid() |> (|Id|)
let (|Ids|) (xs : Guid[]) = xs |> Array.map (|Id|)
let (|IdsAtLeastOne|) (Id x, Ids xs) = Seq.append xs (Seq.singleton x) |> Seq.toArray

module EnvVar =

let tryGet k = Environment.GetEnvironmentVariable k |> Option.ofObj

module Cosmos =

let connect () =
match EnvVar.tryGet "EQUINOX_COSMOS_CONNECTION", EnvVar.tryGet "EQUINOX_COSMOS_DATABASE", EnvVar.tryGet "EQUINOX_COSMOS_CONTAINER" with
| Some s,Some d,Some c ->
let appName = "Domain.Tests"
let discovery = Equinox.Cosmos.Discovery.FromConnectionString s
let connector = Equinox.Cosmos.Connector(TimeSpan.FromSeconds 5., 1, TimeSpan.FromSeconds 5., Serilog.Log.Logger)
let connection = connector.Connect(appName,discovery) |> Async.RunSynchronously
let context = Equinox.Cosmos.Context(connection,d,c)
let cache = Equinox.Cache (appName, 10)
context,cache
| s,d,c ->
failwithf "Connection, Database and Container EQUINOX_COSMOS_* Environment variables are required (%b,%b,%b)"
(Option.isSome s) (Option.isSome d) (Option.isSome c)

/// Adapts the XUnit ITestOutputHelper to be a Serilog Sink
type TestOutputAdapter(testOutput : Xunit.Abstractions.ITestOutputHelper) =
let formatter = Serilog.Formatting.Display.MessageTemplateTextFormatter("{Timestamp:HH:mm:ss.fff zzz} [{Level:u3}] {Message}{Properties}{NewLine}{Exception}", null);
let writeSerilogEvent logEvent =
use writer = new System.IO.StringWriter()
formatter.Format(logEvent, writer)
let messageLine = string writer
testOutput.WriteLine messageLine
System.Diagnostics.Debug.Write messageLine
interface Serilog.Core.ILogEventSink with member __.Emit logEvent = writeSerilogEvent logEvent

/// Creates a Serilog Log chain emitting to the cited Sink (only)
let createLogger sink =
Serilog.LoggerConfiguration()
// .MinimumLevel.Debug()
.Destructure.FSharpTypes()
.WriteTo.Sink(sink)
.CreateLogger()
54 changes: 54 additions & 0 deletions samples/Fc/Domain.Tests/LocationEpochTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
module LocationEpochTests

open FsCheck.Xunit
open Location.Epoch
open Swensen.Unquote

let interpret delta _balance =
match delta with
| 0 -> (),[]
| delta -> (),[Events.Delta { value = delta }]

let validateAndInterpret expectedBalance delta balance =
test <@ expectedBalance = balance @>
interpret delta balance

let verifyDeltaEvent delta events =
let dEvents = events |> List.filter (function Events.Delta _ -> true | _ -> false)
test <@ interpret delta (Unchecked.defaultof<_>) = ((),dEvents) @>

let [<Property>] properties carriedForward delta1 closeImmediately delta2 close =

(* Starting with an empty stream, we'll need to supply the balance carried forward, optionally we apply a delta and potentially close *)

let initialShouldClose _state = closeImmediately
let res,events = sync (Some carriedForward) (validateAndInterpret carriedForward delta1) initialShouldClose Folds.initial
let cfEvents events = events |> List.filter (function Events.CarriedForward _ -> true | _ -> false)
let closeEvents events = events |> List.filter (function Events.Closed -> true | _ -> false)
let state1 = Folds.fold Folds.initial events
let expectedBalance = carriedForward + delta1
// Only expect closing if it was requested
let expectImmediateClose = closeImmediately
test <@ Option.isSome res.result
&& expectedBalance = res.balance @>
test <@ [Events.CarriedForward { initial = carriedForward }] = cfEvents events
&& (not expectImmediateClose || 1 = Seq.length (closeEvents events)) @>
verifyDeltaEvent delta1 events

(* After initializing, validate we don't need to supply a carriedForward, and don't produce a CarriedForward event *)

let shouldClose _state = close
let { isOpen = isOpen; result = worked; balance = bal },events = sync None (validateAndInterpret expectedBalance delta2) shouldClose state1
let expectedBalance = if expectImmediateClose then expectedBalance else expectedBalance + delta2
test <@ [] = cfEvents events
&& (expectImmediateClose || not close || 1 = Seq.length (closeEvents events)) @>
test <@ (expectImmediateClose || close || isOpen)
&& expectedBalance = bal @>
if not expectImmediateClose then
test <@ Option.isSome worked @>
verifyDeltaEvent delta2 events

let [<Property>] ``codec can roundtrip`` event =
let ee = Events.codec.Encode(None,event)
let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
test <@ Some event = Events.codec.TryDecode ie @>
44 changes: 44 additions & 0 deletions samples/Fc/Domain.Tests/LocationSeriesTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
module LocationSeriesTests

open FsCheck.Xunit
open FSharp.UMX
open Swensen.Unquote
open Location.Series

let [<Property>] properties c1 c2 =
let events = interpretActivateEpoch c1 Folds.initial
let state1 = Folds.fold Folds.initial events
let epoch0 = %0
match c1, events, toActiveEpoch state1 with
// Started events are not written for < 0
| n, [], activeEpoch when n < epoch0 ->
test <@ None = activeEpoch @>
// Any >=0 value should trigger a Started event, initially
| n, [Events.Started { epochId = ee }], Some activatedEpoch ->
test <@ n >= epoch0 && n = ee && n = activatedEpoch @>
// Nothing else should yield events
| _, l, _ ->
test <@ List.isEmpty l @>

let events = interpretActivateEpoch c2 state1
let state2 = Folds.fold state1 events
match toActiveEpoch state1, c2, events, toActiveEpoch state2 with
// Started events are not written for < 0
| None, n, [], activeEpoch when n < epoch0 ->
test <@ None = activeEpoch @>
// Any >= 0 epochId should trigger a Started event if first command didnt do anything
| None, n, [Events.Started { epochId = ee }], Some activatedEpoch ->
let eEpoch = %ee
test <@ n >= epoch0 && n = eEpoch && n = activatedEpoch @>
// Any higher epochId should trigger a Started event (gaps are fine - we are only tying to reduce walks)
| Some s1, n, [Events.Started { epochId = ee }], Some activatedEpoch ->
let eEpoch = %ee
test <@ n > s1 && n = eEpoch && n > epoch0 && n = activatedEpoch @>
// Nothing else should yield events
| _, _, l, _ ->
test <@ List.isEmpty l @>

let [<Property>] ``codec can roundtrip`` event =
let ee = Events.codec.Encode(None,event)
let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
test <@ Some event = Events.codec.TryDecode ie @>
69 changes: 69 additions & 0 deletions samples/Fc/Domain.Tests/LocationTests.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
module LocationTests

open FsCheck.Xunit
open FSharp.UMX
open Location
open Swensen.Unquote
open System

module Location =

open Equinox.MemoryStore

module Series =

let resolve store = Resolver(store, Series.Events.codec, Series.Folds.fold, Series.Folds.initial).Resolve

module Epoch =

let resolve store = Resolver(store, Epoch.Events.codec, Epoch.Folds.fold, Epoch.Folds.initial).Resolve

module MemoryStore =

let createService (zeroBalance, shouldClose) store =
let maxAttempts = Int32.MaxValue
let series = Series.create (Series.resolve store) maxAttempts
let epochs = Epoch.create (Epoch.resolve store) maxAttempts
create (zeroBalance, shouldClose) (series, epochs)

let run (service : LocationService) (IdsAtLeastOne locations, deltas : _[]) = Async.RunSynchronously <| async {
let runId = mkId () // Need to make making state in store unique when replaying or shrinking
let locations = locations |> Array.map (fun x -> % (sprintf "%O_%O" runId x))

let updates = deltas |> Seq.mapi (fun i x -> locations.[i % locations.Length], x) |> Seq.cache

(* Apply random deltas *)

let adjust delta (bal : Epoch.Folds.Balance) =
let value = max -bal delta
if value = 0 then 0, []
else value, [Location.Epoch.Events.Delta { value = value }]
let! appliedDeltas = seq { for loc,x in updates -> async { let! _,eff = service.Execute(loc, adjust x) in return loc,eff } } |> Async.Parallel
let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) appliedDeltas |> Seq.groupBy fst |> Seq.map (fun (l,xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq

(* Verify loading yields identical state *)

let! balances = seq { for loc in locations -> async { let! bal,() = service.Execute(loc,(fun _ -> (),[])) in return loc,bal } } |> Async.Parallel
test <@ expectedBalances = Set.ofSeq balances @> }

let [<Property>] ``MemoryStore properties`` maxEvents args =
let store = Equinox.MemoryStore.VolatileStore()
let zeroBalance = 0
let maxEvents = max 1 maxEvents
let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents
let service = Location.MemoryStore.createService (zeroBalance, shouldClose) store
run service args

type Cosmos(testOutput) =

let context,cache = Cosmos.connect ()

let log = testOutput |> TestOutputAdapter |> createLogger
do Serilog.Log.Logger <- log

let [<Property>] properties maxEvents args =
let zeroBalance = 0
let maxEvents = max 1 maxEvents
let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents
let service = Location.Cosmos.createService (zeroBalance, shouldClose) (context,cache,Int32.MaxValue)
run service args
21 changes: 21 additions & 0 deletions samples/Fc/Domain/Domain.fsproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFrameworks>netstandard2.0</TargetFrameworks>
<WarningLevel>5</WarningLevel>
<RootNamespace>Fc.Domain</RootNamespace>
</PropertyGroup>

<ItemGroup>
<Compile Include="Infrastructure.fs" />
<Compile Include="LocationSeries.fs" />
<Compile Include="LocationEpoch.fs" />
<Compile Include="Location.fs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="FSharp.UMX" Version="1.0.0" />
<ProjectReference Include="..\..\..\src\Equinox.Cosmos\Equinox.Cosmos.fsproj" />
<ProjectReference Include="..\..\..\src\Equinox.EventStore\Equinox.EventStore.fsproj" />
</ItemGroup>

</Project>
16 changes: 16 additions & 0 deletions samples/Fc/Domain/Infrastructure.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
namespace global

open FSharp.UMX // see https://github.com/fsprojects/FSharp.UMX - % operator and ability to apply units of measure to Guids+strings

type LocationId = string<locationId>
and [<Measure>] locationId
module LocationId =
let parse (value : string) : LocationId = %value
let toString (value : LocationId) : string = %value

type LocationEpochId = int<locationEpochId>
and [<Measure>] locationEpochId
module LocationEpochId =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any reason these parse/next/tostrings are not just member functions of the type?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 'type', being a type alias, only exists at compile time, so there's no type to augment as such

let parse (value : int) : LocationEpochId = %value
let next (value : LocationEpochId) : LocationEpochId = % (%value + 1)
let toString (value : LocationEpochId) : string = string %value
42 changes: 42 additions & 0 deletions samples/Fc/Domain/Location.fs
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
namespace Location

type Wip<'R> = Pending of decide : (Epoch.Folds.Balance -> 'R*Epoch.Events.Event list) | Complete of 'R

/// Manages a Series of Epochs, with a running total being carried forward to the next Epoch when it's Closed
type LocationService internal (zeroBalance, shouldClose, series : Series.Service, epochs : Epoch.Service) =

let rec execute locationId originEpochId =
let rec aux epochId balanceToCarryForward wip = async {
let decide state = match wip with Complete r -> r,[] | Pending decide -> decide state
match! epochs.Sync(locationId, epochId, balanceToCarryForward, decide, shouldClose) with
| { balance = bal; result = Some res; isOpen = true } ->
if originEpochId <> epochId then
do! series.ActivateEpoch(locationId, epochId)
return bal, res
| { balance = bal; result = Some res } ->
let successorEpochId = LocationEpochId.next epochId
return! aux successorEpochId (Some bal) (Wip.Complete res)
| { balance = bal } ->
let successorEpochId = LocationEpochId.next epochId
return! aux successorEpochId (Some bal) wip }
aux

member __.Execute(locationId, decide) = async {
let! activeEpoch = series.Read locationId
let originEpochId, epochId, balanceCarriedForward =
match activeEpoch with
| None -> LocationEpochId.parse -1, LocationEpochId.parse 0, Some zeroBalance
| Some activeEpochId -> activeEpochId, activeEpochId, None
return! execute locationId originEpochId epochId balanceCarriedForward (Wip.Pending decide)}

[<AutoOpen>]
module Helpers =
let create (zeroBalance, shouldClose) (series, epochs) =
LocationService(zeroBalance, shouldClose, series, epochs)

module Cosmos =

let createService (zeroBalance, shouldClose) (context, cache, maxAttempts) =
let series = Series.Cosmos.createService (context, cache, maxAttempts)
let epochs = Epoch.Cosmos.createService (context, cache, maxAttempts)
create (zeroBalance, shouldClose) (series, epochs)
Loading