diff --git a/Equinox.sln b/Equinox.sln index 2dc517ded..35ec02b97 100644 --- a/Equinox.sln +++ b/Equinox.sln @@ -75,6 +75,12 @@ Project("{6EC3EE1D-3C4E-46DD-8F32-0CC8E7565705}") = "Equinox.SqlStreamStore.Post EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.Core", "src\Equinox.Core\Equinox.Core.fsproj", "{3021659A-5CA4-4E06-AF00-2457ED3F105B}" EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Fc", "Fc", "{63634A65-F668-4054-AAF5-AFD81C278F50}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain", "samples\Fc\Domain\Domain.fsproj", "{6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Domain.Tests", "samples\Fc\Domain.Tests\Domain.Tests.fsproj", "{C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -181,6 +187,14 @@ Global {3021659A-5CA4-4E06-AF00-2457ED3F105B}.Debug|Any CPU.Build.0 = Debug|Any CPU {3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.ActiveCfg = Release|Any CPU {3021659A-5CA4-4E06-AF00-2457ED3F105B}.Release|Any CPU.Build.0 = Release|Any CPU + {6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E}.Release|Any CPU.Build.0 = Release|Any CPU + {C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -196,6 +210,9 @@ Global {EC2EC658-3D85-44F3-AD2F-52AFCAFF8871} = {8CDE1CC3-8619-44DE-8B4D-4102CE476C35} {8CDE1CC3-8619-44DE-8B4D-4102CE476C35} = {8F3EB30C-8BA3-4CC0-8361-0EA47C19ABB9} {D82AAB2E-7264-421A-A893-63A37E5F08B6} = {8F3EB30C-8BA3-4CC0-8361-0EA47C19ABB9} + {63634A65-F668-4054-AAF5-AFD81C278F50} = {8F3EB30C-8BA3-4CC0-8361-0EA47C19ABB9} + {6BD9ACAC-A3E2-42B5-9502-1A4BCCEE422E} = {63634A65-F668-4054-AAF5-AFD81C278F50} + {C4BAAAA6-4EF5-4F2A-A2A8-1FE8BDAF95E7} = {63634A65-F668-4054-AAF5-AFD81C278F50} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {177E1E7B-E275-4FC6-AE3C-2C651ECCF71E} diff --git a/samples/Fc/Domain.Tests/Domain.Tests.fsproj b/samples/Fc/Domain.Tests/Domain.Tests.fsproj new file mode 100644 index 000000000..afc036013 --- /dev/null +++ b/samples/Fc/Domain.Tests/Domain.Tests.fsproj @@ -0,0 +1,31 @@ + + + + netcoreapp2.1 + 5 + Fc.Domain.Tests + false + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/samples/Fc/Domain.Tests/Infrastructure.fs b/samples/Fc/Domain.Tests/Infrastructure.fs new file mode 100644 index 000000000..1c50bd69e --- /dev/null +++ b/samples/Fc/Domain.Tests/Infrastructure.fs @@ -0,0 +1,49 @@ +[] +module Infrastructure + +open Serilog +open System + +let (|Id|) (x : Guid) = x.ToString "N" |> FSharp.UMX.UMX.tag +let inline mkId () = Guid.NewGuid() |> (|Id|) +let (|Ids|) (xs : Guid[]) = xs |> Array.map (|Id|) +let (|IdsAtLeastOne|) (Id x, Ids xs) = Seq.append xs (Seq.singleton x) |> Seq.toArray + +module EnvVar = + + let tryGet k = Environment.GetEnvironmentVariable k |> Option.ofObj + +module Cosmos = + + let connect () = + match EnvVar.tryGet "EQUINOX_COSMOS_CONNECTION", EnvVar.tryGet "EQUINOX_COSMOS_DATABASE", EnvVar.tryGet "EQUINOX_COSMOS_CONTAINER" with + | Some s,Some d,Some c -> + let appName = "Domain.Tests" + let discovery = Equinox.Cosmos.Discovery.FromConnectionString s + let connector = Equinox.Cosmos.Connector(TimeSpan.FromSeconds 5., 1, TimeSpan.FromSeconds 5., Serilog.Log.Logger) + let connection = connector.Connect(appName,discovery) |> Async.RunSynchronously + let context = Equinox.Cosmos.Context(connection,d,c) + let cache = Equinox.Cache (appName, 10) + context,cache + | s,d,c -> + failwithf "Connection, Database and Container EQUINOX_COSMOS_* Environment variables are required (%b,%b,%b)" + (Option.isSome s) (Option.isSome d) (Option.isSome c) + +/// Adapts the XUnit ITestOutputHelper to be a Serilog Sink +type TestOutputAdapter(testOutput : Xunit.Abstractions.ITestOutputHelper) = + let formatter = Serilog.Formatting.Display.MessageTemplateTextFormatter("{Timestamp:HH:mm:ss.fff zzz} [{Level:u3}] {Message}{Properties}{NewLine}{Exception}", null); + let writeSerilogEvent logEvent = + use writer = new System.IO.StringWriter() + formatter.Format(logEvent, writer) + let messageLine = string writer + testOutput.WriteLine messageLine + System.Diagnostics.Debug.Write messageLine + interface Serilog.Core.ILogEventSink with member __.Emit logEvent = writeSerilogEvent logEvent + +/// Creates a Serilog Log chain emitting to the cited Sink (only) +let createLogger sink = + Serilog.LoggerConfiguration() +// .MinimumLevel.Debug() + .Destructure.FSharpTypes() + .WriteTo.Sink(sink) + .CreateLogger() diff --git a/samples/Fc/Domain.Tests/LocationEpochTests.fs b/samples/Fc/Domain.Tests/LocationEpochTests.fs new file mode 100644 index 000000000..8a4e14a4a --- /dev/null +++ b/samples/Fc/Domain.Tests/LocationEpochTests.fs @@ -0,0 +1,54 @@ +module LocationEpochTests + +open FsCheck.Xunit +open Location.Epoch +open Swensen.Unquote + +let interpret delta _balance = + match delta with + | 0 -> (),[] + | delta -> (),[Events.Delta { value = delta }] + +let validateAndInterpret expectedBalance delta balance = + test <@ expectedBalance = balance @> + interpret delta balance + +let verifyDeltaEvent delta events = + let dEvents = events |> List.filter (function Events.Delta _ -> true | _ -> false) + test <@ interpret delta (Unchecked.defaultof<_>) = ((),dEvents) @> + +let [] properties carriedForward delta1 closeImmediately delta2 close = + + (* Starting with an empty stream, we'll need to supply the balance carried forward, optionally we apply a delta and potentially close *) + + let initialShouldClose _state = closeImmediately + let res,events = sync (Some carriedForward) (validateAndInterpret carriedForward delta1) initialShouldClose Folds.initial + let cfEvents events = events |> List.filter (function Events.CarriedForward _ -> true | _ -> false) + let closeEvents events = events |> List.filter (function Events.Closed -> true | _ -> false) + let state1 = Folds.fold Folds.initial events + let expectedBalance = carriedForward + delta1 + // Only expect closing if it was requested + let expectImmediateClose = closeImmediately + test <@ Option.isSome res.result + && expectedBalance = res.balance @> + test <@ [Events.CarriedForward { initial = carriedForward }] = cfEvents events + && (not expectImmediateClose || 1 = Seq.length (closeEvents events)) @> + verifyDeltaEvent delta1 events + + (* After initializing, validate we don't need to supply a carriedForward, and don't produce a CarriedForward event *) + + let shouldClose _state = close + let { isOpen = isOpen; result = worked; balance = bal },events = sync None (validateAndInterpret expectedBalance delta2) shouldClose state1 + let expectedBalance = if expectImmediateClose then expectedBalance else expectedBalance + delta2 + test <@ [] = cfEvents events + && (expectImmediateClose || not close || 1 = Seq.length (closeEvents events)) @> + test <@ (expectImmediateClose || close || isOpen) + && expectedBalance = bal @> + if not expectImmediateClose then + test <@ Option.isSome worked @> + verifyDeltaEvent delta2 events + +let [] ``codec can roundtrip`` event = + let ee = Events.codec.Encode(None,event) + let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + test <@ Some event = Events.codec.TryDecode ie @> \ No newline at end of file diff --git a/samples/Fc/Domain.Tests/LocationSeriesTests.fs b/samples/Fc/Domain.Tests/LocationSeriesTests.fs new file mode 100644 index 000000000..1bd09a28d --- /dev/null +++ b/samples/Fc/Domain.Tests/LocationSeriesTests.fs @@ -0,0 +1,44 @@ +module LocationSeriesTests + +open FsCheck.Xunit +open FSharp.UMX +open Swensen.Unquote +open Location.Series + +let [] properties c1 c2 = + let events = interpretActivateEpoch c1 Folds.initial + let state1 = Folds.fold Folds.initial events + let epoch0 = %0 + match c1, events, toActiveEpoch state1 with + // Started events are not written for < 0 + | n, [], activeEpoch when n < epoch0 -> + test <@ None = activeEpoch @> + // Any >=0 value should trigger a Started event, initially + | n, [Events.Started { epochId = ee }], Some activatedEpoch -> + test <@ n >= epoch0 && n = ee && n = activatedEpoch @> + // Nothing else should yield events + | _, l, _ -> + test <@ List.isEmpty l @> + + let events = interpretActivateEpoch c2 state1 + let state2 = Folds.fold state1 events + match toActiveEpoch state1, c2, events, toActiveEpoch state2 with + // Started events are not written for < 0 + | None, n, [], activeEpoch when n < epoch0 -> + test <@ None = activeEpoch @> + // Any >= 0 epochId should trigger a Started event if first command didnt do anything + | None, n, [Events.Started { epochId = ee }], Some activatedEpoch -> + let eEpoch = %ee + test <@ n >= epoch0 && n = eEpoch && n = activatedEpoch @> + // Any higher epochId should trigger a Started event (gaps are fine - we are only tying to reduce walks) + | Some s1, n, [Events.Started { epochId = ee }], Some activatedEpoch -> + let eEpoch = %ee + test <@ n > s1 && n = eEpoch && n > epoch0 && n = activatedEpoch @> + // Nothing else should yield events + | _, _, l, _ -> + test <@ List.isEmpty l @> + +let [] ``codec can roundtrip`` event = + let ee = Events.codec.Encode(None,event) + let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data) + test <@ Some event = Events.codec.TryDecode ie @> \ No newline at end of file diff --git a/samples/Fc/Domain.Tests/LocationTests.fs b/samples/Fc/Domain.Tests/LocationTests.fs new file mode 100644 index 000000000..736cdf08f --- /dev/null +++ b/samples/Fc/Domain.Tests/LocationTests.fs @@ -0,0 +1,69 @@ +module LocationTests + +open FsCheck.Xunit +open FSharp.UMX +open Location +open Swensen.Unquote +open System + +module Location = + + open Equinox.MemoryStore + + module Series = + + let resolve store = Resolver(store, Series.Events.codec, Series.Folds.fold, Series.Folds.initial).Resolve + + module Epoch = + + let resolve store = Resolver(store, Epoch.Events.codec, Epoch.Folds.fold, Epoch.Folds.initial).Resolve + + module MemoryStore = + + let createService (zeroBalance, shouldClose) store = + let maxAttempts = Int32.MaxValue + let series = Series.create (Series.resolve store) maxAttempts + let epochs = Epoch.create (Epoch.resolve store) maxAttempts + create (zeroBalance, shouldClose) (series, epochs) + +let run (service : LocationService) (IdsAtLeastOne locations, deltas : _[]) = Async.RunSynchronously <| async { + let runId = mkId () // Need to make making state in store unique when replaying or shrinking + let locations = locations |> Array.map (fun x -> % (sprintf "%O_%O" runId x)) + + let updates = deltas |> Seq.mapi (fun i x -> locations.[i % locations.Length], x) |> Seq.cache + + (* Apply random deltas *) + + let adjust delta (bal : Epoch.Folds.Balance) = + let value = max -bal delta + if value = 0 then 0, [] + else value, [Location.Epoch.Events.Delta { value = value }] + let! appliedDeltas = seq { for loc,x in updates -> async { let! _,eff = service.Execute(loc, adjust x) in return loc,eff } } |> Async.Parallel + let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) appliedDeltas |> Seq.groupBy fst |> Seq.map (fun (l,xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq + + (* Verify loading yields identical state *) + + let! balances = seq { for loc in locations -> async { let! bal,() = service.Execute(loc,(fun _ -> (),[])) in return loc,bal } } |> Async.Parallel + test <@ expectedBalances = Set.ofSeq balances @> } + +let [] ``MemoryStore properties`` maxEvents args = + let store = Equinox.MemoryStore.VolatileStore() + let zeroBalance = 0 + let maxEvents = max 1 maxEvents + let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents + let service = Location.MemoryStore.createService (zeroBalance, shouldClose) store + run service args + +type Cosmos(testOutput) = + + let context,cache = Cosmos.connect () + + let log = testOutput |> TestOutputAdapter |> createLogger + do Serilog.Log.Logger <- log + + let [] properties maxEvents args = + let zeroBalance = 0 + let maxEvents = max 1 maxEvents + let shouldClose (state : Epoch.Folds.OpenState) = state.count > maxEvents + let service = Location.Cosmos.createService (zeroBalance, shouldClose) (context,cache,Int32.MaxValue) + run service args \ No newline at end of file diff --git a/samples/Fc/Domain/Domain.fsproj b/samples/Fc/Domain/Domain.fsproj new file mode 100644 index 000000000..bb4316193 --- /dev/null +++ b/samples/Fc/Domain/Domain.fsproj @@ -0,0 +1,21 @@ + + + + netstandard2.0 + 5 + Fc.Domain + + + + + + + + + + + + + + + diff --git a/samples/Fc/Domain/Infrastructure.fs b/samples/Fc/Domain/Infrastructure.fs new file mode 100644 index 000000000..7a1bfa843 --- /dev/null +++ b/samples/Fc/Domain/Infrastructure.fs @@ -0,0 +1,16 @@ +namespace global + +open FSharp.UMX // see https://github.com/fsprojects/FSharp.UMX - % operator and ability to apply units of measure to Guids+strings + +type LocationId = string +and [] locationId +module LocationId = + let parse (value : string) : LocationId = %value + let toString (value : LocationId) : string = %value + +type LocationEpochId = int +and [] locationEpochId +module LocationEpochId = + let parse (value : int) : LocationEpochId = %value + let next (value : LocationEpochId) : LocationEpochId = % (%value + 1) + let toString (value : LocationEpochId) : string = string %value \ No newline at end of file diff --git a/samples/Fc/Domain/Location.fs b/samples/Fc/Domain/Location.fs new file mode 100644 index 000000000..30518cae5 --- /dev/null +++ b/samples/Fc/Domain/Location.fs @@ -0,0 +1,42 @@ +namespace Location + +type Wip<'R> = Pending of decide : (Epoch.Folds.Balance -> 'R*Epoch.Events.Event list) | Complete of 'R + +/// Manages a Series of Epochs, with a running total being carried forward to the next Epoch when it's Closed +type LocationService internal (zeroBalance, shouldClose, series : Series.Service, epochs : Epoch.Service) = + + let rec execute locationId originEpochId = + let rec aux epochId balanceToCarryForward wip = async { + let decide state = match wip with Complete r -> r,[] | Pending decide -> decide state + match! epochs.Sync(locationId, epochId, balanceToCarryForward, decide, shouldClose) with + | { balance = bal; result = Some res; isOpen = true } -> + if originEpochId <> epochId then + do! series.ActivateEpoch(locationId, epochId) + return bal, res + | { balance = bal; result = Some res } -> + let successorEpochId = LocationEpochId.next epochId + return! aux successorEpochId (Some bal) (Wip.Complete res) + | { balance = bal } -> + let successorEpochId = LocationEpochId.next epochId + return! aux successorEpochId (Some bal) wip } + aux + + member __.Execute(locationId, decide) = async { + let! activeEpoch = series.Read locationId + let originEpochId, epochId, balanceCarriedForward = + match activeEpoch with + | None -> LocationEpochId.parse -1, LocationEpochId.parse 0, Some zeroBalance + | Some activeEpochId -> activeEpochId, activeEpochId, None + return! execute locationId originEpochId epochId balanceCarriedForward (Wip.Pending decide)} + +[] +module Helpers = + let create (zeroBalance, shouldClose) (series, epochs) = + LocationService(zeroBalance, shouldClose, series, epochs) + +module Cosmos = + + let createService (zeroBalance, shouldClose) (context, cache, maxAttempts) = + let series = Series.Cosmos.createService (context, cache, maxAttempts) + let epochs = Epoch.Cosmos.createService (context, cache, maxAttempts) + create (zeroBalance, shouldClose) (series, epochs) \ No newline at end of file diff --git a/samples/Fc/Domain/LocationEpoch.fs b/samples/Fc/Domain/LocationEpoch.fs new file mode 100644 index 000000000..a76b7807d --- /dev/null +++ b/samples/Fc/Domain/LocationEpoch.fs @@ -0,0 +1,90 @@ +module Location.Epoch + +// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +[] +module Events = + + type CarriedForward = { initial : int } + type Delta = { value : int } + type Event = + | CarriedForward of CarriedForward + | Closed + | Delta of Delta + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + let [] categoryId = "LocationEpoch" + +module Folds = + + type Balance = int + type OpenState = { count : int; value : Balance } + type State = Initial | Open of OpenState | Closed of Balance + let initial = Initial + let evolve state event = + match event, state with + | Events.CarriedForward e, Initial -> Open { count = 0; value = e.initial } + | Events.Delta e, Open bal -> Open { count = bal.count + 1; value = bal.value + e.value } + | Events.Closed, Open { value = bal } -> Closed bal + | Events.CarriedForward _, (Open _|Closed _ as x) -> failwithf "CarriedForward : Unexpected %A" x + | Events.Delta _, (Initial|Closed _ as x) -> failwithf "Delta : Unexpected %A" x + | Events.Closed, (Initial|Closed _ as x) -> failwithf "Closed : Unexpected %A" x + let fold = Seq.fold evolve + +/// Holds events accumulated from a series of decisions while also evolving the presented `state` to reflect the pended events +type private Accumulator() = + let acc = ResizeArray() + member __.Ingest state : 'res * Events.Event list -> 'res * Folds.State = function + | res, [] -> res, state + | res, [e] -> acc.Add e; res, Folds.evolve state e + | res, xs -> acc.AddRange xs; res, Folds.fold state (Seq.ofList xs) + member __.Accumulated = List.ofSeq acc + +type Result<'t> = { balance : Folds.Balance; result : 't option; isOpen : bool } + +let sync (balanceCarriedForward : Folds.Balance option) (decide : (Folds.Balance -> 't*Events.Event list)) shouldClose state : Result<'t>*Events.Event list = + let acc = Accumulator() + // We always want to have a CarriedForward event at the start of any Epoch's event stream + let (), state = + acc.Ingest state <| + match state with + | Folds.Initial -> (), [Events.CarriedForward { initial = Option.get balanceCarriedForward }] + | Folds.Open _ | Folds.Closed _ -> (), [] + // Run, unless we determine we're in Closed state + let result, state = + acc.Ingest state <| + match state with + | Folds.Initial -> failwith "We've just guaranteed not Initial" + | Folds.Open { value = bal } -> let r,es = decide bal in Some r,es + | Folds.Closed _ -> None, [] + // Finally (iff we're `Open`, have run a `decide` and `shouldClose`), we generate a Closed event + let (balance, isOpen), _ = + acc.Ingest state <| + match state with + | Folds.Initial -> failwith "Can't be Initial" + | Folds.Open ({ value = bal } as openState) when shouldClose openState -> (bal, false), [Events.Closed] + | Folds.Open { value = bal } -> (bal, true), [] + | Folds.Closed bal -> (bal, false), [] + { balance = balance; result = result; isOpen = isOpen }, acc.Accumulated + +type Service internal (resolve, ?maxAttempts) = + + let log = Serilog.Log.ForContext() + let (|AggregateId|) (locationId, epochId) = + let id = sprintf "%s_%s" (LocationId.toString locationId) (LocationEpochId.toString epochId) + Equinox.AggregateId(Events.categoryId, id) + let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 2) + + member __.Sync<'R>(locationId, epochId, prevEpochBalanceCarriedForward, decide, shouldClose) : Async> = + let (Stream stream) = (locationId, epochId) + stream.Transact(sync prevEpochBalanceCarriedForward decide shouldClose) + +let create resolve maxAttempts = Service(resolve, maxAttempts) + +module Cosmos = + + open Equinox.Cosmos + let resolve (context,cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve + let createService (context,cache,maxAttempts) = + create (resolve (context,cache)) maxAttempts \ No newline at end of file diff --git a/samples/Fc/Domain/LocationSeries.fs b/samples/Fc/Domain/LocationSeries.fs new file mode 100644 index 000000000..e75ccb9b4 --- /dev/null +++ b/samples/Fc/Domain/LocationSeries.fs @@ -0,0 +1,52 @@ +module Location.Series + +// NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care +[] +module Events = + + type Started = { epochId : LocationEpochId } + type Event = + | Started of Started + interface TypeShape.UnionContract.IUnionContract + let codec = FsCodec.NewtonsoftJson.Codec.Create() + let [] categoryId = "LocationSeries" + +module Folds = + + type State = LocationEpochId + let initial = LocationEpochId.parse -1 + let evolve _state = function + | Events.Started e -> e.epochId + let fold = Seq.fold evolve + +let interpretActivateEpoch epochId (state : Folds.State) = + [if state < epochId then yield Events.Started { epochId = epochId }] + +let toActiveEpoch state = + if state = Folds.initial then None else Some state + +type Service internal (resolve, ?maxAttempts) = + + let log = Serilog.Log.ForContext() + let (|AggregateId|) id = Equinox.AggregateId(Events.categoryId, LocationId.toString id) + let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolve id, maxAttempts = defaultArg maxAttempts 2) + + member __.Read(locationId) : Async = + let (Stream stream) = locationId + stream.Query toActiveEpoch + + member __.ActivateEpoch(locationId,epochId) : Async = + let (Stream stream) = locationId + stream.Transact(interpretActivateEpoch epochId) + +let create resolve maxAttempts = Service(resolve, maxAttempts) + +module Cosmos = + + open Equinox.Cosmos + let resolve (context,cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + let opt = Equinox.ResolveOption.AllowStale + fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id,opt) + let createService (context, cache, maxAttempts) = + create (resolve (context,cache)) maxAttempts \ No newline at end of file