diff --git a/CHANGELOG.md b/CHANGELOG.md index fbd48b16b..8e2418faf 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `eqx dump`/`Equinox.Tool`: Show payload statistics [#323](https://github.com/jet/equinox/pull/323) - `eqx dump`/`Equinox.Tool`: Add `-B` option to prevent assuming UTF-8 bodies [#323](https://github.com/jet/equinox/pull/323) - `Equinox`: `Decider.Transact(interpret : 'state -> Async<'event list>)` [#314](https://github.com/jet/equinox/pull/314) +- `EventStoreDb`: As per `EventStore` module, but using the modern `EventStore.Client.Grpc.Streams` client [#196](https://github.com/jet/equinox/pull/196) ### Changed @@ -26,6 +27,7 @@ The `Unreleased` section name is replaced by the expected version of next releas - `CosmosStore`: Switch to natively using `System.Text.Json` for serialization of all `Microsoft.Azure.Cosmos` round-trips [#305](https://github.com/jet/equinox/pull/305) :pray: [@ylibrach](https://github.com/ylibrach) - `CosmosStore`: Only log `bytes` when log level is `Debug` [#305](https://github.com/jet/equinox/pull/305) - `EventStore`: Target `EventStore.Client` v `22.0.0-preview`; rename `Connector` -> `EventStoreConnector` [#317](https://github.com/jet/equinox/pull/317) +- `Equinox.Tool`/`samples/`: switched to use `Equinox.EventStoreDb` [#196](https://github.com/jet/equinox/pull/196) - Update all non-Client dependencies except `FSharp.Core`, `FSharp.Control.AsyncSeq` [#310](https://github.com/jet/equinox/pull/310) - Update all Stores to use `FsCodec` v `3.0.0`, with [`EventBody` types switching from `byte[]` to `ReadOnlyMemory`, see FsCodec#75](https://github.com/jet/FsCodec/pull/75) [#323](https://github.com/jet/equinox/pull/323) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 8ac9cf6c9..7d847163a 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -2230,10 +2230,9 @@ raise Issues first though ;) ). EventStore, and it's Store adapter is the most proven and is pretty feature rich relative to the need of consumers to date. Some things remain though: -- __Get impl in `master` ported to the modern gRPC client, currently parked in [#196](https://github.com/jet/equinox/pull/196). See also [#232](https://github.com/jet/equinox/issues/232)__ - Provide a low level walking events in F# API akin to `Equinox.CosmosStore.Core.Events`; this would allow consumers to jump from direct - use of `EventStore.ClientAPI` -> `Equinox.EventStore.Core.Events` -> + use of `EventStore.Client` -> `Equinox.EventStore.Core.Events` -> `Equinox.Decider` (with the potential to swap stores once one gets to using `Equinox.Decider`) - Get conflict handling as efficient and predictable as for `Equinox.CosmosStore` diff --git a/Equinox.sln b/Equinox.sln index d88f176aa..2a91415bf 100644 --- a/Equinox.sln +++ b/Equinox.sln @@ -91,6 +91,10 @@ EndProjectSection EndProject Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.CosmosStore.Prometheus", "src\Equinox.CosmosStore.Prometheus\Equinox.CosmosStore.Prometheus.fsproj", "{3107BBC1-2BCB-4750-AED0-42B1F4CD1909}" EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.EventStoreDb", "src\Equinox.EventStoreDb\Equinox.EventStoreDb.fsproj", "{C828E360-6FE8-47F9-9415-0A21869D7A93}" +EndProject +Project("{F2A71F9B-5D33-465A-A702-920D77279786}") = "Equinox.EventStoreDb.Integration", "tests\Equinox.EventStoreDb.Integration\Equinox.EventStoreDb.Integration.fsproj", "{BA63048B-3CA3-448D-A4CD-0C772D57B6F8}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -197,6 +201,14 @@ Global {3107BBC1-2BCB-4750-AED0-42B1F4CD1909}.Debug|Any CPU.Build.0 = Debug|Any CPU {3107BBC1-2BCB-4750-AED0-42B1F4CD1909}.Release|Any CPU.ActiveCfg = Release|Any CPU {3107BBC1-2BCB-4750-AED0-42B1F4CD1909}.Release|Any CPU.Build.0 = Release|Any CPU + {C828E360-6FE8-47F9-9415-0A21869D7A93}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {C828E360-6FE8-47F9-9415-0A21869D7A93}.Debug|Any CPU.Build.0 = Debug|Any CPU + {C828E360-6FE8-47F9-9415-0A21869D7A93}.Release|Any CPU.ActiveCfg = Release|Any CPU + {C828E360-6FE8-47F9-9415-0A21869D7A93}.Release|Any CPU.Build.0 = Release|Any CPU + {BA63048B-3CA3-448D-A4CD-0C772D57B6F8}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {BA63048B-3CA3-448D-A4CD-0C772D57B6F8}.Debug|Any CPU.Build.0 = Debug|Any CPU + {BA63048B-3CA3-448D-A4CD-0C772D57B6F8}.Release|Any CPU.ActiveCfg = Release|Any CPU + {BA63048B-3CA3-448D-A4CD-0C772D57B6F8}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE diff --git a/README.md b/README.md index 44e49d6fb..221db657a 100644 --- a/README.md +++ b/README.md @@ -105,7 +105,7 @@ _If you're looking to learn more about and/or discuss Event Sourcing and it's my # Currently Supported Data Stores - [Azure Cosmos DB](https://docs.microsoft.com/en-us/azure/cosmos-db): contains some fragments of code dating back to 2016, however [the storage model](DOCUMENTATION.md#Cosmos-Storage-Model) was arrived at based on intensive benchmarking (squash-merged in [#42](https://github.com/jet/equinox/pull/42)). The V2 and V3 release lines are being used in production systems. (The V3 release provides support for significantly more efficient packing of events ([storing events in the 'Tip'](https://github.com/jet/equinox/pull/251))). -- [EventStoreDB](https://eventstore.org/): this codebase itself has been in production since 2017 (see commit history), with key elements dating back to approx 2016. +- [EventStoreDB](https://eventstore.org/): this codebase itself has been in production since 2017 (see commit history), with key elements dating back to approx 2016. Current versions require EventStoreDB Server editions `21.10` or later, and communicate over the modern gRPC interface. - [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore): bindings for the powerful and widely used SQL-backed Event Storage system, derived from the EventStoreDB adapter. [See SqlStreamStore docs](https://sqlstreamstore.readthedocs.io/en/latest/#introduction). :pray: [@rajivhost](https://github.com/rajivhost) - `MemoryStore`: In-memory store (volatile, for unit or integration test purposes). Fulfils the full contract Equinox imposes on a store, but without I/O costs [(it's ~100 LOC wrapping a `ConcurrentDictionary`)](https://github.com/jet/equinox/blob/master/src/Equinox.MemoryStore/MemoryStore.fs). Also enables [take serialization/deserialization out of the picture](https://github.com/jet/FsCodec#boxcodec) in tests. @@ -137,6 +137,7 @@ The components within this repository are delivered as multi-targeted Nuget pack - `Equinox.CosmosStore` [![CosmosStore NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.svg)](https://www.nuget.org/packages/Equinox.CosmosStore/): Azure CosmosDB Adapter with integrated 'unfolds' feature, facilitating optimal read performance in terms of latency and RU costs, instrumented to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore) on `Equinox.Core`, `Microsoft.Azure.Cosmos >= 3.25`, `FsCodec`, `System.Text.Json`, `FSharp.Control.AsyncSeq >= 2.0.23`) - `Equinox.CosmosStore.Prometheus` [![CosmosStore.Prometheus NuGet](https://img.shields.io/nuget/v/Equinox.CosmosStore.Prometheus.svg)](https://www.nuget.org/packages/Equinox.CosmosStore.Prometheus/): Integration package providing a `Serilog.Core.ILogEventSink` that extracts detailed metrics information attached to the `LogEvent`s and feeds them to the `prometheus-net`'s `Prometheus.Metrics` static instance. ([depends](https://www.fuget.org/packages/Equinox.CosmosStore.Prometheus) on `Equinox.CosmosStore`, `prometheus-net >= 3.6.0`) - `Equinox.EventStore` [![EventStore NuGet](https://img.shields.io/nuget/v/Equinox.EventStore.svg)](https://www.nuget.org/packages/Equinox.EventStore/): [EventStoreDB](https://eventstore.org/) Adapter designed to meet Jet's production monitoring requirements. ([depends](https://www.fuget.org/packages/Equinox.EventStore) on `Equinox.Core`, `EventStore.Client >= 22.0.0-preview`, `FSharp.Control.AsyncSeq >= 2.0.23`), EventStore Server version `21.10` or later) +- `Equinox.EventStoreDb` [![EventStoreDb NuGet](https://img.shields.io/nuget/v/Equinox.EventStoreDb.svg)](https://www.nuget.org/packages/Equinox.EventStoreDb/): Production-strength [EventStoreDB](https://eventstore.org/) Adapter. ([depends](https://www.fuget.org/packages/Equinox.EventStoreDb) on `Equinox.Core`, `EventStore.Client.Grpc.Streams` >= `22.0.0, `FSharp.Control.AsyncSeq` v `2.0.23`, EventStore Server version `21.10` or later) - `Equinox.SqlStreamStore` [![SqlStreamStore NuGet](https://img.shields.io/nuget/v/Equinox.SqlStreamStore.svg)](https://www.nuget.org/packages/Equinox.SqlStreamStore/): [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore) Adapter derived from `Equinox.EventStore` - provides core facilities (but does not connect to a specific database; see sibling `SqlStreamStore`.* packages). ([depends](https://www.fuget.org/packages/Equinox.SqlStreamStore) on `Equinox.Core`, `FsCodec`, `SqlStreamStore >= 1.2.0-beta.8`, `FSharp.Control.AsyncSeq`) - `Equinox.SqlStreamStore.MsSql` [![MsSql NuGet](https://img.shields.io/nuget/v/Equinox.SqlStreamStore.MsSql.svg)](https://www.nuget.org/packages/Equinox.SqlStreamStore.MsSql/): [SqlStreamStore.MsSql](https://sqlstreamstore.readthedocs.io/en/latest/sqlserver) Sql Server `Connector` implementation for `Equinox.SqlStreamStore` package). ([depends](https://www.fuget.org/packages/Equinox.SqlStreamStore.MsSql) on `Equinox.SqlStreamStore`, `SqlStreamStore.MsSql >= 1.2.0-beta.8`) - `Equinox.SqlStreamStore.MySql` [![MySql NuGet](https://img.shields.io/nuget/v/Equinox.SqlStreamStore.MySql.svg)](https://www.nuget.org/packages/Equinox.SqlStreamStore.MySql/): `SqlStreamStore.MySql` MySQL `Connector` implementation for `Equinox.SqlStreamStore` package). ([depends](https://www.fuget.org/packages/Equinox.SqlStreamStore.MySql) on `Equinox.SqlStreamStore`, `SqlStreamStore.MySql >= 1.2.0-beta.8`) @@ -151,6 +152,7 @@ Equinox does not focus on projection logic - each store brings its own strengths - `Propulsion.Cosmos` [![Propulsion.Cosmos NuGet](https://img.shields.io/nuget/v/Propulsion.Cosmos.svg)](https://www.nuget.org/packages/Propulsion.Cosmos/): Wraps the [Microsoft .NET `ChangeFeedProcessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet) providing a [processor loop](DOCUMENTATION.md#change-feed-processors) that maintains a continuous query loop per CosmosDB Physical Partition (Range) yielding new or updated documents (optionally unrolling events written by `Equinox.CosmosStore` for processing or forwarding). ([depends](https://www.fuget.org/packages/Propulsion.Cosmos) on `Equinox.Cosmos`, `Microsoft.Azure.DocumentDb.ChangeFeedProcessor >= 2.2.5`) - `Propulsion.CosmosStore` [![Propulsion.CosmosStore NuGet](https://img.shields.io/nuget/v/Propulsion.CosmosStore.svg)](https://www.nuget.org/packages/Propulsion.CosmosStore/): Wraps the CosmosDB V3 SDK's Change Feed API, providing a [processor loop](DOCUMENTATION.md#change-feed-processors) that maintains a continuous query loop per CosmosDB Physical Partition (Range) yielding new or updated documents (optionally unrolling events written by `Equinox.CosmosStore` for processing or forwarding). Used in the [`propulsion project stats cosmos`](dotnet-tool-provisioning--benchmarking-tool) tool command; see [`dotnet new proProjector` to generate a sample app](#quickstart) using it. ([depends](https://www.fuget.org/packages/Propulsion.CosmosStore) on `Equinox.CosmosStore`) - `Propulsion.EventStore` [![Propulsion.EventStore NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/) Used in the [`propulsion project es`](dotnet-tool-provisioning--benchmarking-tool) tool command; see [`dotnet new proSync` to generate a sample app](#quickstart) using it. ([depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore`) +- `Propulsion.EventStoreDb` [![Propulsion.EventStoreDb NuGet](https://img.shields.io/nuget/v/Propulsion.EventStoreDb.svg)](https://www.nuget.org/packages/Propulsion.EventStoreDb/) Consumes from `EventStoreDB` v `21.10` or later using the gRPC interface. ([depends](https://www.fuget.org/packages/Propulsion.EventStoreDb) on `Equinox.EventStoreDb`) - `Propulsion.Kafka` [![Propulsion.Kafka NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/): Provides a canonical `RenderedSpan` that can be used as a default format when projecting events via e.g. the Producer/Consumer pair in `dotnet new proProjector -k; dotnet new proConsumer`. ([depends](https://www.fuget.org/packages/Propulsion.Kafka) on `Newtonsoft.Json >= 11.0.2`, `Propulsion`, `FsKafka`) ## `dotnet tool` provisioning / benchmarking tool @@ -537,7 +539,7 @@ The CLI can drive the Store and TodoBackend samples in the `samples/Web` ASP.NET ### Provisioning EventStore (when not using -s or -se) -There's a `docker-compose.yml` file in the root, so installing `docker-compose` and then running `docker-compose up` rigs a local 3-node cluster, which is assumed to be configured for `Equinox.EventStore.Integration` +There's a `docker-compose.yml` file in the root, so installing `docker-compose` and then running `docker-compose up` rigs a local 3-node cluster, which is assumed to be configured for `Equinox.EventStore.Integration` and `Equinox.EventStoreDb.Integration` For more complete instructions, follow https://developers.eventstore.com/server/v21.10/installation.html#use-docker-compose @@ -597,7 +599,6 @@ All non-alpha releases derive from tagged commits on `master`. The tag defines t - :cry: the Azure Pipelines script does not run the integration tests, so these need to be run manually via the following steps: - [Provision](#provisioning): - - Start Local EventStore running in simulated cluster mode - Set environment variables x 4 for a CosmosDB database and container (you might need to `eqx init`) - Add a `EQUINOX_COSMOS_CONTAINER_ARCHIVE` environment variable referencing a separate (`eqx init` initialized) CosmosDB Container that will be used to house fallback events in the [Fallback mechanism's tests](https://github.com/jet/equinox/pull/247) - `docker-compose up` to start diff --git a/build.proj b/build.proj index 15f52d847..b08c27936 100644 --- a/build.proj +++ b/build.proj @@ -19,6 +19,7 @@ + diff --git a/diagrams/EventStore.puml b/diagrams/EventStore.puml index 8583064de..ae743c61e 100644 --- a/diagrams/EventStore.puml +++ b/diagrams/EventStore.puml @@ -21,9 +21,9 @@ rectangle "Application Consistent Processing" <> { interface IStream <> } -rectangle "Equinox.EventStore" <> { +rectangle "Equinox.EventStoreDb" <> { rectangle eqxes <> [ - Equinox.EventStore OR + Equinox.EventStoreDb OR Equinox.SqlStreamStore ] database esstore <> [ diff --git a/diagrams/container.puml b/diagrams/container.puml index b622b184e..761d3c4b6 100644 --- a/diagrams/container.puml +++ b/diagrams/container.puml @@ -28,9 +28,9 @@ frame "Consistent Event Stores" as stores <> { rectangle "Azure.Cosmos" <> as cc } frame "EventStore" <> { - rectangle "Equinox.EventStore" <> as es + rectangle "Equinox.EventStoreDb" <> as es rectangle "Propulsion.EventStore" <> as er - rectangle "EventStore.ClientAPI" <> as esc + rectangle "EventStore.Client.Grpc.Streams" <> as esc } frame "Integration Test Support" <> { rectangle "Equinox.MemoryStore" <> as ms diff --git a/samples/Infrastructure/Infrastructure.fsproj b/samples/Infrastructure/Infrastructure.fsproj index 7fa2cb6c6..85f8101d7 100644 --- a/samples/Infrastructure/Infrastructure.fsproj +++ b/samples/Infrastructure/Infrastructure.fsproj @@ -16,7 +16,7 @@ - + diff --git a/samples/Infrastructure/Services.fs b/samples/Infrastructure/Services.fs index 5f02c077f..b0b40ae28 100644 --- a/samples/Infrastructure/Services.fs +++ b/samples/Infrastructure/Services.fs @@ -18,8 +18,8 @@ type StreamResolver(storage) = let accessStrategy = if unfolds then Equinox.CosmosStore.AccessStrategy.Snapshot snapshot else Equinox.CosmosStore.AccessStrategy.Unoptimized Equinox.CosmosStore.CosmosStoreCategory<'event,'state,_>(store, codec.ToJsonElementCodec(), fold, initial, caching, accessStrategy).Resolve | Storage.StorageConfig.Es (context, caching, unfolds) -> - let accessStrategy = if unfolds then Equinox.EventStore.AccessStrategy.RollingSnapshots snapshot |> Some else None - Equinox.EventStore.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve + let accessStrategy = if unfolds then Equinox.EventStoreDb.AccessStrategy.RollingSnapshots snapshot |> Some else None + Equinox.EventStoreDb.EventStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve | Storage.StorageConfig.Sql (context, caching, unfolds) -> let accessStrategy = if unfolds then Equinox.SqlStreamStore.AccessStrategy.RollingSnapshots snapshot |> Some else None Equinox.SqlStreamStore.SqlStreamStoreCategory<'event,'state,_>(context, codec, fold, initial, ?caching = caching, ?access = accessStrategy).Resolve diff --git a/samples/Infrastructure/Storage.fs b/samples/Infrastructure/Storage.fs index 6e28c5bc7..7d88135c1 100644 --- a/samples/Infrastructure/Storage.fs +++ b/samples/Infrastructure/Storage.fs @@ -9,7 +9,7 @@ type StorageConfig = // For MemoryStore, we keep the events as UTF8 arrays - we could use FsCodec.Codec.Box to remove the JSON encoding, which would improve perf but can conceal problems | Memory of Equinox.MemoryStore.VolatileStore> | Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.CosmosStore.CachingStrategy * unfolds: bool - | Es of Equinox.EventStore.EventStoreContext * Equinox.EventStore.CachingStrategy option * unfolds: bool + | Es of Equinox.EventStoreDb.EventStoreContext * Equinox.EventStoreDb.CachingStrategy option * unfolds: bool | Sql of Equinox.SqlStreamStore.SqlStreamStoreContext * Equinox.SqlStreamStore.CachingStrategy option * unfolds: bool module MemoryStore = @@ -128,13 +128,15 @@ module Cosmos = /// To establish a local node to run the tests against, follow https://developers.eventstore.com/server/v21.10/installation.html#use-docker-compose /// and/or do `docker compose up` in github.com/jet/equinox module EventStore = + + open Equinox.EventStoreDb + type [] Arguments = | [] VerboseStore | [] Timeout of float | [] Retries of int - | [] Host of string - | [] Username of string - | [] Password of string + | [] ConnectionString of string + | [] Credentials of string | [] ConcurrentOperationsLimit of int | [] HeartbeatTimeout of float | [] MaxEvents of int @@ -143,31 +145,27 @@ module EventStore = | VerboseStore -> "include low level Store logging." | Timeout _ -> "specify operation timeout in seconds (default: 5)." | Retries _ -> "specify operation retries (default: 1)." - | Host _ -> "specify a DNS query, using Gossip-driven discovery against all A records returned (default: localhost)." - | Username _ -> "specify a username (default: admin)." - | Password _ -> "specify a Password (default: changeit)." + | ConnectionString _ -> "Portion of connection string that's safe to write to console or log. default: esdb://localhost:2111,localhost:2112,localhost:2113?tls=true&tlsVerifyCert=false" + | Credentials _ -> "specify a sensitive portion of the connection string that should not be logged. Default: none" | ConcurrentOperationsLimit _ -> "max concurrent operations in flight (default: 5000)." | HeartbeatTimeout _ -> "specify heartbeat timeout in seconds (default: 1.5)." | MaxEvents _ -> "Maximum number of Events to request per batch. Default 500." - open Equinox.EventStore - type Info(args : ParseResults) = - member _.Host = args.GetResult(Host,"localhost") - member _.Credentials = args.GetResult(Username,"admin"), args.GetResult(Password,"changeit") + member _.Host = args.GetResult(ConnectionString, "esdb://localhost:2111,localhost:2112,localhost:2113?tls=true&tlsVerifyCert=false") + member _.Credentials = args.GetResult(Credentials, null) + member _.Timeout = args.GetResult(Timeout,5.) |> TimeSpan.FromSeconds member _.Retries = args.GetResult(Retries, 1) member _.HeartbeatTimeout = args.GetResult(HeartbeatTimeout,1.5) |> float |> TimeSpan.FromSeconds member _.ConcurrentOperationsLimit = args.GetResult(ConcurrentOperationsLimit,5000) member _.MaxEvents = args.GetResult(MaxEvents, 500) - open Serilog - - let private connect (log: ILogger) (dnsQuery, heartbeatTimeout, col) (username, password) (operationTimeout, operationRetries) = - EventStoreConnector(username, password, reqTimeout=operationTimeout, reqRetries=operationRetries, - heartbeatTimeout=heartbeatTimeout, concurrentOperationsLimit = col, - log=(if log.IsEnabled(Serilog.Events.LogEventLevel.Debug) then Logger.SerilogVerbose log else Logger.SerilogNormal log), + let private connect (log: ILogger) (connectionString, heartbeatTimeout, col) credentialsString (operationTimeout, operationRetries) = + EventStoreConnector(reqTimeout=operationTimeout, reqRetries=operationRetries, + // TODO heartbeatTimeout=heartbeatTimeout, concurrentOperationsLimit = col, + // TODO log=(if log.IsEnabled(Events.LogEventLevel.Debug) then Logger.SerilogVerbose log else Logger.SerilogNormal log), tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string]) - .Establish(appName, Discovery.GossipDns dnsQuery, ConnectionStrategy.ClusterTwinPreferSlaveReads) + .Establish(appName, Discovery.ConnectionString (String.Join(";", connectionString, credentialsString)), ConnectionStrategy.ClusterTwinPreferSlaveReads) let private createContext connection batchSize = EventStoreContext(connection, BatchingPolicy(maxBatchSize = batchSize)) let config (log: ILogger, storeLog) (cache, unfolds) (args : ParseResults) = let a = Info(args) @@ -176,7 +174,7 @@ module EventStore = let concurrentOperationsLimit = a.ConcurrentOperationsLimit log.Information("EventStoreDB {host} heartbeat: {heartbeat}s timeout: {timeout}s concurrent reqs: {concurrency} retries {retries}", a.Host, heartbeatTimeout.TotalSeconds, timeout.TotalSeconds, concurrentOperationsLimit, retries) - let connection = connect storeLog (a.Host, heartbeatTimeout, concurrentOperationsLimit) a.Credentials operationThrottling |> Async.RunSynchronously + let connection = connect storeLog (a.Host, heartbeatTimeout, concurrentOperationsLimit) a.Credentials operationThrottling let cacheStrategy = cache |> Option.map (fun c -> CachingStrategy.SlidingWindow (c, TimeSpan.FromMinutes 20.)) StorageConfig.Es ((createContext connection a.MaxEvents), cacheStrategy, unfolds) diff --git a/samples/Store/Integration/CartIntegration.fs b/samples/Store/Integration/CartIntegration.fs index e81b9bb1f..53236d7db 100644 --- a/samples/Store/Integration/CartIntegration.fs +++ b/samples/Store/Integration/CartIntegration.fs @@ -17,9 +17,9 @@ let codec = Cart.Events.codec let codecJe = Cart.Events.codecJe let resolveGesStreamWithRollingSnapshots context = - EventStore.EventStoreCategory(context, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot).Resolve + EventStoreDb.EventStoreCategory(context, codec, fold, initial, access = EventStoreDb.AccessStrategy.RollingSnapshots snapshot).Resolve let resolveGesStreamWithoutCustomAccessStrategy context = - EventStore.EventStoreCategory(context, codec, fold, initial).Resolve + EventStoreDb.EventStoreCategory(context, codec, fold, initial).Resolve let resolveCosmosStreamWithSnapshotStrategy context = CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.Snapshot snapshot).Resolve @@ -52,7 +52,7 @@ type Tests(testOutputHelper) = } let arrangeEs connect choose resolveStream = async { - let! client = connect log + let client = connect log let context = choose client defaultBatchSize return Cart.create log (resolveStream context) } diff --git a/samples/Store/Integration/ContactPreferencesIntegration.fs b/samples/Store/Integration/ContactPreferencesIntegration.fs index 6a1ae27ea..31c1c5762 100644 --- a/samples/Store/Integration/ContactPreferencesIntegration.fs +++ b/samples/Store/Integration/ContactPreferencesIntegration.fs @@ -14,9 +14,9 @@ let createServiceMemory log store = let codec = ContactPreferences.Events.codec let codecJe = ContactPreferences.Events.codecJe let resolveStreamGesWithOptimizedStorageSemantics context = - EventStore.EventStoreCategory(context 1, codec, fold, initial, access = EventStore.AccessStrategy.LatestKnownEvent).Resolve + EventStoreDb.EventStoreCategory(context 1, codec, fold, initial, access = EventStoreDb.AccessStrategy.LatestKnownEvent).Resolve let resolveStreamGesWithoutAccessStrategy context = - EventStore.EventStoreCategory(context defaultBatchSize, codec, fold, initial).Resolve + EventStoreDb.EventStoreCategory(context defaultBatchSize, codec, fold, initial).Resolve let resolveStreamCosmosWithLatestKnownEventSemantics context = CosmosStore.CosmosStoreCategory(context, codecJe, fold, initial, CosmosStore.CachingStrategy.NoCaching, CosmosStore.AccessStrategy.LatestKnownEvent).Resolve @@ -44,7 +44,7 @@ type Tests(testOutputHelper) = } let arrangeEs connect choose resolveStream = async { - let! client = connect log + let client = connect log let context = choose client return ContactPreferences.create log (resolveStream context) } diff --git a/samples/Store/Integration/EventStoreIntegration.fs b/samples/Store/Integration/EventStoreIntegration.fs index 04554792c..394976da6 100644 --- a/samples/Store/Integration/EventStoreIntegration.fs +++ b/samples/Store/Integration/EventStoreIntegration.fs @@ -1,14 +1,13 @@ [] module Samples.Store.Integration.EventStoreIntegration -open Equinox.EventStore +open Equinox.EventStoreDb open System -let connectToLocalEventStoreNode log = - // NOTE: disable cert validation for this test suite. ABSOLUTELY DO NOT DO THIS FOR ANY CODE THAT WILL EVER HIT A STAGING OR PROD SERVER - EventStoreConnector("admin", "changeit", custom = (fun c -> c.DisableServerCertificateValidation()), - reqTimeout=TimeSpan.FromSeconds 3., reqRetries=3, log=Logger.SerilogVerbose log, tags=["I",Guid.NewGuid() |> string] - // Connect directly to the locally running EventStore Node without using Gossip-driven discovery - ).Establish("Equinox-sample", Discovery.Uri(Uri "tcp://localhost:1113"), ConnectionStrategy.ClusterSingle NodePreference.Master) +// NOTE: use `docker compose up` to establish the standard 3 node config at ports 1113/2113 +let connectToLocalEventStoreNode (_log : Serilog.ILogger) = + let c = EventStoreConnector(reqTimeout=TimeSpan.FromSeconds 3., reqRetries=3, tags=["I",Guid.NewGuid() |> string]) + // Connect to the locally running EventStore Node using Gossip-driven discovery + c.Establish("Equinox-sample", Discovery.ConnectionString "esdb://localhost:2111,localhost:2112,localhost:2113?tls=true&tlsVerifyCert=false", ConnectionStrategy.ClusterSingle EventStore.Client.NodePreference.Leader) let defaultBatchSize = 500 let createContext connection batchSize = EventStoreContext(connection, BatchingPolicy(maxBatchSize = batchSize)) diff --git a/samples/Store/Integration/FavoritesIntegration.fs b/samples/Store/Integration/FavoritesIntegration.fs index e95dbe395..4b1e4dd01 100644 --- a/samples/Store/Integration/FavoritesIntegration.fs +++ b/samples/Store/Integration/FavoritesIntegration.fs @@ -15,7 +15,7 @@ let createServiceMemory log store = let codec = Favorites.Events.codec let codecJe = Favorites.Events.codecJe let createServiceGes log context = - let cat = EventStore.EventStoreCategory(context, codec, fold, initial, access = EventStore.AccessStrategy.RollingSnapshots snapshot) + let cat = EventStoreDb.EventStoreCategory(context, codec, fold, initial, access = EventStoreDb.AccessStrategy.RollingSnapshots snapshot) Favorites.create log cat.Resolve let createServiceCosmosSnapshotsUncached log context = @@ -68,7 +68,7 @@ type Tests(testOutputHelper) = [] let ``Can roundtrip against EventStore, correctly folding the events`` args = Async.RunSynchronously <| async { - let! client = connectToLocalEventStoreNode log + let client = connectToLocalEventStoreNode log let context = createContext client defaultBatchSize let service = createServiceGes log context let! version, items = act service args diff --git a/samples/Store/Integration/Integration.fsproj b/samples/Store/Integration/Integration.fsproj index 6efc9526d..f9d46cb4c 100644 --- a/samples/Store/Integration/Integration.fsproj +++ b/samples/Store/Integration/Integration.fsproj @@ -20,7 +20,7 @@ - + diff --git a/samples/Store/Integration/LogIntegration.fs b/samples/Store/Integration/LogIntegration.fs index 393c097c8..e350caec0 100644 --- a/samples/Store/Integration/LogIntegration.fs +++ b/samples/Store/Integration/LogIntegration.fs @@ -9,7 +9,7 @@ open System open System.Collections.Concurrent module EquinoxEsInterop = - open Equinox.EventStore + open Equinox.EventStoreDb [] type FlatMetric = { action: string; stream : string; interval: StopwatchInterval; bytes: int; count: int; batches: int option } with override x.ToString() = $"%s{x.action}-Stream=%s{x.stream} %s{x.action}-Elapsed={x.interval.Elapsed}" @@ -18,8 +18,9 @@ module EquinoxEsInterop = match evt with | Log.WriteSuccess m -> "AppendToStreamAsync", m, None | Log.WriteConflict m -> "AppendToStreamAsync", m, None - | Log.Slice (Direction.Forward,m) -> "ReadStreamEventsForwardAsync", m, None - | Log.Slice (Direction.Backward,m) -> "ReadStreamEventsBackwardAsync", m, None +// For the gRPC edition, no slice information is available +// | Log.Slice (Direction.Forward,m) -> "ReadStreamEventsForwardAsync", m, None +// | Log.Slice (Direction.Backward,m) -> "ReadStreamEventsBackwardAsync", m, None | Log.Batch (Direction.Forward,c,m) -> "ReadStreamAsyncF", m, Some c | Log.Batch (Direction.Backward,c,m) -> "ReadStreamAsyncB", m, Some c { action = action; stream = metric.stream; interval = metric.interval; bytes = metric.bytes; count = metric.count; batches = batches } @@ -60,7 +61,7 @@ type SerilogMetricsExtractor(emit : string -> unit) = let (|EsMetric|CosmosMetric|GenericMessage|) (logEvent : Serilog.Events.LogEvent) = logEvent.Properties |> Seq.tryPick (function - | KeyValue (k, SerilogScalar (:? Equinox.EventStore.Log.Metric as m)) -> Some <| Choice1Of3 (k,m) + | KeyValue (k, SerilogScalar (:? Equinox.EventStoreDb.Log.Metric as m)) -> Some <| Choice1Of3 (k,m) | KeyValue (k, SerilogScalar (:? Equinox.CosmosStore.Core.Log.Metric as m)) -> Some <| Choice2Of3 (k,m) | _ -> None) |> Option.defaultValue (Choice3Of3 ()) @@ -106,7 +107,7 @@ type Tests(testOutputHelper) = let batchSize = defaultBatchSize let buffer = ConcurrentQueue() let log = createLoggerWithMetricsExtraction buffer.Enqueue - let! client = connectToLocalEventStoreNode log + let client = connectToLocalEventStoreNode log let context = createContext client batchSize let service = Cart.create log (CartIntegration.resolveGesStreamWithRollingSnapshots context) let itemCount = batchSize / 2 + 1 diff --git a/samples/Tutorial/AsAt.fsx b/samples/Tutorial/AsAt.fsx index d43e180ff..3aa9c33c2 100644 --- a/samples/Tutorial/AsAt.fsx +++ b/samples/Tutorial/AsAt.fsx @@ -11,7 +11,7 @@ // - the same general point applies to over-using querying of streams for read purposes as we do here; // applying CQRS principles can often lead to a better model regardless of raw necessity -#if LOCAL +#if !LOCAL // Compile Tutorial.fsproj by either a) right-clicking or b) typing // dotnet build samples/Tutorial before attempting to send this to FSI with Alt-Enter #if VISUALSTUDIO @@ -29,19 +29,18 @@ #r "Equinox.dll" #r "TypeShape.dll" #r "FsCodec.SystemTextJson.dll" -#r "FSharp.Control.AsyncSeq.dll" -#r "System.Net.Http" -#r "Serilog.Sinks.Seq.dll" -#r "EventStore.ClientAPI.dll" -#r "Equinox.EventStore.dll" -#r "Microsoft.Azure.Cosmos.Direct.dll" -#r "Microsoft.Azure.Cosmos.Client.dll" +//#r "FSharp.Control.AsyncSeq.dll" +//#r "System.Net.Http" +//#r "EventStore.Client.dll" +//#r "EventStore.Client.Streams.dll" +#r "Equinox.EventStoreDb.dll" +//#r "Microsoft.Azure.Cosmos.Client.dll" #r "Equinox.CosmosStore.dll" #else #r "nuget:Serilog.Sinks.Console" #r "nuget:Serilog.Sinks.Seq" #r "nuget:Equinox.CosmosStore, *-*" -#r "nuget:Equinox.EventStore, *-*" +#r "nuget:Equinox.EventStoreDb, *-*" #r "nuget:FsCodec.SystemTextJson, *-*" #endif open System @@ -134,28 +133,26 @@ module Log = let log = let c = LoggerConfiguration() let c = if verbose then c.MinimumLevel.Debug() else c - let c = c.WriteTo.Sink(Equinox.EventStore.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump + let c = c.WriteTo.Sink(Equinox.EventStoreDb.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump let c = c.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink()) // to power Log.InternalMetrics.dump let c = c.WriteTo.Seq("http://localhost:5341") // https://getseq.net let c = c.WriteTo.Console(if verbose then LogEventLevel.Debug else LogEventLevel.Information) c.CreateLogger() let dumpMetrics () = Equinox.CosmosStore.Core.Log.InternalMetrics.dump log - Equinox.EventStore.Log.InternalMetrics.dump log + Equinox.EventStoreDb.Log.InternalMetrics.dump log let [] AppName = "equinox-tutorial" let cache = Equinox.Cache(AppName, 20) module EventStore = - open Equinox.EventStore + open Equinox.EventStoreDb let snapshotWindow = 500 - // see QuickStart for how to run a local instance in a mode that emulates the behavior of a cluster - let host, username, password = "localhost", "admin", "changeit" - let connector = Connector(username,password,TimeSpan.FromSeconds 5., reqRetries=3, log=Logger.SerilogNormal Log.log) - let esc = connector.Connect(AppName, Discovery.GossipDns host) |> Async.RunSynchronously - let log = Logger.SerilogNormal Log.log + // NOTE: use `docker compose up` to establish the standard 3 node config at ports 1113/2113 + let connector = EventStoreConnector(reqTimeout = TimeSpan.FromSeconds 5., reqRetries = 3) + let esc = connector.Connect(AppName, Discovery.ConnectionString "esdb://localhost:2111,localhost:2112,localhost:2113?tls=true&tlsVerifyCert=false") let connection = EventStoreConnection(esc) let context = EventStoreContext(connection, BatchingPolicy(maxBatchSize=snapshotWindow)) // cache so normal read pattern is to read from whatever we've built in memory @@ -179,8 +176,8 @@ module Cosmos = let category = CosmosStoreCategory(context, Events.codecJe, Fold.fold, Fold.initial, cacheStrategy, accessStrategy) let resolve id = Equinox.Decider(Log.log, category.Resolve(streamName id), maxAttempts = 3) -//let serviceES = Service(EventStore.resolve) -let service= Service(Cosmos.resolve) +let service = Service(EventStore.resolve) +//let service= Service(Cosmos.resolve) let client = "ClientA" service.Add(client, 1) |> Async.RunSynchronously diff --git a/samples/Tutorial/FulfilmentCenter.fsx b/samples/Tutorial/FulfilmentCenter.fsx index ea9f6a566..556ec4c5a 100644 --- a/samples/Tutorial/FulfilmentCenter.fsx +++ b/samples/Tutorial/FulfilmentCenter.fsx @@ -1,4 +1,4 @@ -#if LOCAL +#if !LOCAL #I "bin/Debug/net6.0/" #r "Serilog.dll" #r "Serilog.Sinks.Console.dll" diff --git a/samples/Tutorial/Tutorial.fsproj b/samples/Tutorial/Tutorial.fsproj index 88d504219..081060c18 100644 --- a/samples/Tutorial/Tutorial.fsproj +++ b/samples/Tutorial/Tutorial.fsproj @@ -22,7 +22,7 @@ - + diff --git a/samples/Tutorial/Upload.fs b/samples/Tutorial/Upload.fs index caab35472..202b44e06 100644 --- a/samples/Tutorial/Upload.fs +++ b/samples/Tutorial/Upload.fs @@ -67,7 +67,7 @@ module Cosmos = create category.Resolve module EventStore = - open Equinox.EventStore + open Equinox.EventStoreDb let create context = let cat = EventStoreCategory(context, Events.codec, Fold.fold, Fold.initial, access=AccessStrategy.LatestKnownEvent) create cat.Resolve diff --git a/samples/Web/Program.fs b/samples/Web/Program.fs index fab9549b7..5a740ded8 100644 --- a/samples/Web/Program.fs +++ b/samples/Web/Program.fs @@ -30,7 +30,7 @@ module Program = .WriteTo.Console() // TOCONSIDER log and reset every minute or something ? .WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink()) - .WriteTo.Sink(Equinox.EventStore.Log.InternalMetrics.Stats.LogSink()) + .WriteTo.Sink(Equinox.EventStoreDb.Log.InternalMetrics.Stats.LogSink()) .WriteTo.Sink(Equinox.SqlStreamStore.Log.InternalMetrics.Stats.LogSink()) let c = let maybeSeq = if args.Contains LocalSeq then Some "http://localhost:5341" else None diff --git a/src/Equinox.Core/Equinox.Core.fsproj b/src/Equinox.Core/Equinox.Core.fsproj index ea822571d..c5aad3c35 100644 --- a/src/Equinox.Core/Equinox.Core.fsproj +++ b/src/Equinox.Core/Equinox.Core.fsproj @@ -14,7 +14,6 @@ - diff --git a/src/Equinox.EventStore/Equinox.EventStore.fsproj b/src/Equinox.EventStore/Equinox.EventStore.fsproj index 16ce1fbc7..e32cb83f9 100644 --- a/src/Equinox.EventStore/Equinox.EventStore.fsproj +++ b/src/Equinox.EventStore/Equinox.EventStore.fsproj @@ -8,6 +8,7 @@ + diff --git a/src/Equinox.Core/Caching.fs b/src/Equinox.EventStoreDb/Caching.fs similarity index 100% rename from src/Equinox.Core/Caching.fs rename to src/Equinox.EventStoreDb/Caching.fs diff --git a/src/Equinox.EventStoreDb/Equinox.EventStoreDb.fsproj b/src/Equinox.EventStoreDb/Equinox.EventStoreDb.fsproj new file mode 100644 index 000000000..48347e3ad --- /dev/null +++ b/src/Equinox.EventStoreDb/Equinox.EventStoreDb.fsproj @@ -0,0 +1,30 @@ + + + + net6.0 + true + true + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/Equinox.EventStoreDb/EventStoreDb.fs b/src/Equinox.EventStoreDb/EventStoreDb.fs new file mode 100755 index 000000000..00156fd7c --- /dev/null +++ b/src/Equinox.EventStoreDb/EventStoreDb.fs @@ -0,0 +1,643 @@ +namespace Equinox.EventStoreDb + +open Equinox.Core +open EventStore.Client +open Serilog +open System + +type EventBody = ReadOnlyMemory + +[] +type Direction = Forward | Backward with + override this.ToString() = match this with Forward -> "Forward" | Backward -> "Backward" + +module Log = + + /// Name of Property used for Metric in LogEvents. + let [] PropertyTag = "esdbEvt" + + [] + type Measurement = { stream: string; interval: StopwatchInterval; bytes: int; count: int } + + [] + type Metric = + | WriteSuccess of Measurement + | WriteConflict of Measurement + | Batch of Direction * slices: int * Measurement + + let prop name value (log : ILogger) = log.ForContext(name, value) + + let propEvents name (kvps : System.Collections.Generic.KeyValuePair seq) (log : ILogger) = + let items = seq { for kv in kvps do yield sprintf "{\"%s\": %s}" kv.Key kv.Value } + log.ForContext(name, sprintf "[%s]" (String.concat ",\n\r" items)) + + let propEventData name (events : EventData[]) (log : ILogger) = + log |> propEvents name (seq { + for x in events do + if x.ContentType = "application/json" then + yield let d = x.Data in System.Collections.Generic.KeyValuePair<_,_>(x.Type, System.Text.Encoding.UTF8.GetString d.Span) }) + + let propResolvedEvents name (events : ResolvedEvent[]) (log : ILogger) = + log |> propEvents name (seq { + for x in events do + let e = x.Event + if e.ContentType = "application/json" then + yield let d = e.Data in System.Collections.Generic.KeyValuePair<_,_>(e.EventType, System.Text.Encoding.UTF8.GetString d.Span) }) + + open Serilog.Events + + /// Attach a property to the log context to hold the metrics + // Sidestep Log.ForContext converting to a string; see https://github.com/serilog/serilog/issues/1124 + let event (value : Metric) (log : ILogger) = + let enrich (e : LogEvent) = e.AddPropertyIfAbsent(LogEventProperty(PropertyTag, ScalarValue(value))) + log.ForContext({ new Serilog.Core.ILogEventEnricher with member _.Enrich(evt, _) = enrich evt }) + + let withLoggedRetries<'t> retryPolicy (contextLabel : string) (f : ILogger -> Async<'t>) log : Async<'t> = + match retryPolicy with + | None -> f log + | Some retryPolicy -> + let withLoggingContextWrapping count = + let log = if count = 1 then log else log |> prop contextLabel count + f log + retryPolicy withLoggingContextWrapping + + let (|BlobLen|) (x : ReadOnlyMemory) = x.Length + + /// NB Caveat emptor; this is subject to unlimited change without the major version changing - while the `dotnet-templates` repo will be kept in step, and + /// the ChangeLog will mention changes, it's critical to not assume that the presence or nature of these helpers be considered stable + module InternalMetrics = + + module Stats = + let inline (|Stats|) ({ interval = i } : Measurement) = let e = i.Elapsed in int64 e.TotalMilliseconds + + let (|Read|Write|Resync|) = function + | WriteSuccess (Stats s) -> Write s + | WriteConflict (Stats s) -> Resync s + // slices are rolled up into batches in other stores, but we don't log slices in this impl + | Batch (_, _, Stats s) -> Read s + + let (|SerilogScalar|_|) : LogEventPropertyValue -> obj option = function + | :? ScalarValue as x -> Some x.Value + | _ -> None + + let (|EsMetric|_|) (logEvent : LogEvent) : Metric option = + match logEvent.Properties.TryGetValue("esEvt") with + | true, SerilogScalar (:? Metric as e) -> Some e + | _ -> None + + type Counter = + { mutable count : int64; mutable ms : int64 } + static member Create() = { count = 0L; ms = 0L } + member x.Ingest(ms) = + System.Threading.Interlocked.Increment(&x.count) |> ignore + System.Threading.Interlocked.Add(&x.ms, ms) |> ignore + + type LogSink() = + static let epoch = System.Diagnostics.Stopwatch.StartNew() + static member val Read = Counter.Create() with get, set + static member val Write = Counter.Create() with get, set + static member val Resync = Counter.Create() with get, set + static member Restart() = + LogSink.Read <- Counter.Create() + LogSink.Write <- Counter.Create() + LogSink.Resync <- Counter.Create() + let span = epoch.Elapsed + epoch.Restart() + span + interface Serilog.Core.ILogEventSink with + member _.Emit logEvent = logEvent |> function + | EsMetric (Read stats) -> LogSink.Read.Ingest stats + | EsMetric (Write stats) -> LogSink.Write.Ingest stats + | EsMetric (Resync stats) -> LogSink.Resync.Ingest stats + | _ -> () + + /// Relies on feeding of metrics from Log through to Stats.LogSink + /// Use Stats.LogSink.Restart() to reset the start point (and stats) where relevant + let dump (log : ILogger) = + let stats = + [ "Read", Stats.LogSink.Read + "Write", Stats.LogSink.Write + "Resync", Stats.LogSink.Resync ] + let logActivity name count lat = + log.Information("{name}: {count:n0} requests; Average latency: {lat:n0}ms", + name, count, (if count = 0L then Double.NaN else float lat/float count)) + let mutable rows, totalCount, totalMs = 0, 0L, 0L + for name, stat in stats do + if stat.count <> 0L then + totalCount <- totalCount + stat.count + totalMs <- totalMs + stat.ms + logActivity name stat.count stat.ms + rows <- rows + 1 + // Yes, there's a minor race here between the use of the values and the reset + let duration = Stats.LogSink.Restart() + if rows > 1 then logActivity "TOTAL" totalCount totalMs + let measures : (string * (TimeSpan -> float)) list = [ "s", fun x -> x.TotalSeconds(*; "m", fun x -> x.TotalMinutes; "h", fun x -> x.TotalHours*) ] + let logPeriodicRate name count = log.Information("rp{name} {count:n0}", name, count) + for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) + +[] +type EsSyncResult = Written of ConditionalWriteResult | Conflict of actualVersion: int64 + +module private Write = + + let private writeEventsAsync (log : ILogger) (conn : EventStoreClient) (streamName : string) version (events : EventData[]) + : Async = async { + let! ct = Async.CancellationToken + let! wr = conn.ConditionalAppendToStreamAsync(streamName, StreamRevision.FromInt64 version, events, cancellationToken = ct) |> Async.AwaitTaskCorrect + if wr.Status = ConditionalWriteStatus.VersionMismatch then + log.Information("Esdb TrySync VersionMismatch writing {EventTypes}, actual {ActualVersion}", + [| for x in events -> x.Type |], wr.NextExpectedVersion) + return EsSyncResult.Conflict wr.NextExpectedVersion + elif wr.Status = ConditionalWriteStatus.StreamDeleted then return failwithf "Unexpected write to deleted stream %s" streamName + elif wr.Status = ConditionalWriteStatus.Succeeded then return EsSyncResult.Written wr + else return failwithf "Unexpected write response code %O" wr.Status } + + let eventDataBytes events = + let eventDataLen (x : EventData) = match x.Data, x.Metadata with Log.BlobLen bytes, Log.BlobLen metaBytes -> bytes + metaBytes + events |> Array.sumBy eventDataLen + + let private writeEventsLogged (conn : EventStoreClient) (streamName : string) (version : int64) (events : EventData[]) (log : ILogger) + : Async = async { + let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propEventData "Json" events + let bytes, count = eventDataBytes events, events.Length + let log = log |> Log.prop "bytes" bytes |> Log.prop "expectedVersion" version + let writeLog = log |> Log.prop "stream" streamName |> Log.prop "count" count + let! t, result = writeEventsAsync writeLog conn streamName version events |> Stopwatch.Time + let reqMetric : Log.Measurement = { stream = streamName; interval = t; bytes = bytes; count = count} + let resultLog, evt = + match result, reqMetric with + | EsSyncResult.Conflict actualVersion, m -> + log |> Log.prop "actualVersion" actualVersion, Log.WriteConflict m + | EsSyncResult.Written x, m -> + log |> Log.prop "nextExpectedVersion" x.NextExpectedVersion |> Log.prop "logPosition" x.LogPosition, Log.WriteSuccess m + (resultLog |> Log.event evt).Information("Esdb{action:l} count={count} conflict={conflict}", + "Write", events.Length, match evt with Log.WriteConflict _ -> true | _ -> false) + return result } + + let writeEvents (log : ILogger) retryPolicy (conn : EventStoreClient) (streamName : string) (version : int64) (events : EventData[]) + : Async = + let call = writeEventsLogged conn streamName version events + Log.withLoggedRetries retryPolicy "writeAttempt" call log + +module private Read = + open FSharp.Control + let resolvedEventBytes (x : ResolvedEvent) = let Log.BlobLen bytes, Log.BlobLen metaBytes = x.Event.Data, x.Event.Metadata in bytes + metaBytes + let resolvedEventsBytes events = events |> Array.sumBy resolvedEventBytes + let logBatchRead direction streamName t events batchSize version (log : ILogger) = + let bytes, count = resolvedEventsBytes events, events.Length + let reqMetric : Log.Measurement = { stream = streamName; interval = t; bytes = bytes; count = count} + let batches = match batchSize with Some batchSize -> (events.Length - 1) / batchSize + 1 | None -> -1 + let action = if direction = Direction.Forward then "LoadF" else "LoadB" + let log = if (not << log.IsEnabled) Events.LogEventLevel.Debug then log else log |> Log.propResolvedEvents "Json" events + let evt = Log.Metric.Batch (direction, batches, reqMetric) + (log |> Log.prop "bytes" bytes |> Log.event evt).Information( + "Esdb{action:l} stream={stream} count={count}/{batches} version={version}", + action, streamName, count, batches, version) + let loadBackwardsUntilOrigin (log : ILogger) (conn : EventStoreClient) batchSize streamName (tryDecode, isOrigin) + : Async = async { + let! ct = Async.CancellationToken + let res = conn.ReadStreamAsync(Direction.Backwards, streamName, StreamPosition.End, int64 batchSize, resolveLinkTos = false, cancellationToken = ct) + try let! events = + AsyncSeq.ofAsyncEnum res + |> AsyncSeq.map (fun x -> x, tryDecode x) + |> AsyncSeq.takeWhileInclusive (function + | x, Some e when isOrigin e -> + log.Information("EsdbStop stream={stream} at={eventNumber}", streamName, let en = x.Event.EventNumber in en.ToInt64()) + false + | _ -> true) + |> AsyncSeq.toArrayAsync + let v = match Seq.tryHead events with Some (r, _) -> let en = r.Event.EventNumber in en.ToInt64() | None -> -1 + Array.Reverse events + return v, events + with :? AggregateException as e when (e.InnerExceptions.Count = 1 && e.InnerExceptions[0] :? StreamNotFoundException) -> + return -1L, [||] } + let loadBackwards (log : ILogger) (conn : EventStoreClient) batchSize streamName (tryDecode, isOrigin) + : Async = async { + let! t, (version, events) = loadBackwardsUntilOrigin log conn batchSize streamName (tryDecode, isOrigin) |> Stopwatch.Time + let log = log |> Log.prop "batchSize" batchSize |> Log.prop "stream" streamName + log |> logBatchRead Direction.Backward streamName t (Array.map fst events) (Some batchSize) version + return version, events } + + let loadForward (conn : EventStoreClient) streamName startPosition + : Async = async { + let! ct = Async.CancellationToken + let res = conn.ReadStreamAsync(Direction.Forwards, streamName, startPosition, Int64.MaxValue, resolveLinkTos = false, cancellationToken = ct) + try let! events = AsyncSeq.ofAsyncEnum res |> AsyncSeq.toArrayAsync + let v = match Seq.tryLast events with Some r -> let en = r.Event.EventNumber in en.ToInt64() | None -> startPosition.ToInt64() - 1L + return v, events + with :? AggregateException as e when (e.InnerExceptions.Count = 1 && e.InnerExceptions[0] :? StreamNotFoundException) -> + return -1L, [||] } + let loadForwards log conn streamName startPosition + : Async = async { + let direction = Direction.Forward + let! t, (version, events) = loadForward conn streamName startPosition |> Stopwatch.Time + let log = log |> Log.prop "startPos" startPosition |> Log.prop "direction" direction |> Log.prop "stream" streamName + log |> logBatchRead direction streamName t events None version + return version, events } + +module UnionEncoderAdapters = + let encodedEventOfResolvedEvent (x : ResolvedEvent) : FsCodec.ITimelineEvent = + let e = x.Event + // TOCONSIDER wire e.Metadata["$correlationId"] and ["$causationId"] into correlationId and causationId + // https://eventstore.org/docs/server/metadata-and-reserved-names/index.html#event-metadata + let n, eu, ts = e.EventNumber, e.EventId, DateTimeOffset e.Created + FsCodec.Core.TimelineEvent.Create(n.ToInt64(), e.EventType, e.Data, e.Metadata, eu.ToGuid(), correlationId = null, causationId = null, timestamp = ts) + + let eventDataOfEncodedEvent (x : FsCodec.IEventData) = + // TOCONSIDER wire x.CorrelationId, x.CausationId into x.Meta.["$correlationId"] and .["$causationId"] + // https://eventstore.org/docs/server/metadata-and-reserved-names/index.html#event-metadata + EventData(Uuid.FromGuid x.EventId, x.EventType, contentType = "application/json", data = x.Data, metadata = x.Meta) + +type Position = { streamVersion : int64; compactionEventNumber : int64 option; batchCapacityLimit : int option } +type Token = { pos : Position } + +module Token = + let private create compactionEventNumber batchCapacityLimit streamVersion : StreamToken = + { value = box { pos = { streamVersion = streamVersion; compactionEventNumber = compactionEventNumber; batchCapacityLimit = batchCapacityLimit } } + // In this impl, the StreamVersion matches the EventStore StreamVersion in being -1-based + // Version however is the representation that needs to align with ISyncContext.Version + version = streamVersion + 1L } + + /// No batching / compaction; we only need to retain the StreamVersion + let ofNonCompacting streamVersion : StreamToken = + create None None streamVersion + + // headroom before compaction is necessary given the stated knowledge of the last (if known) `compactionEventNumberOption` + let private batchCapacityLimit compactedEventNumberOption unstoredEventsPending (batchSize : int) (streamVersion : int64) : int = + match compactedEventNumberOption with + | Some (compactionEventNumber : int64) -> (batchSize - unstoredEventsPending) - int (streamVersion - compactionEventNumber + 1L) |> max 0 + | None -> (batchSize - unstoredEventsPending) - (int streamVersion + 1) - 1 |> max 0 + + let (*private*) ofCompactionEventNumber compactedEventNumberOption unstoredEventsPending batchSize streamVersion : StreamToken = + let batchCapacityLimit = batchCapacityLimit compactedEventNumberOption unstoredEventsPending batchSize streamVersion + create compactedEventNumberOption (Some batchCapacityLimit) streamVersion + + /// Assume we have not seen any compaction events; use the batchSize and version to infer headroom + let ofUncompactedVersion batchSize streamVersion : StreamToken = + ofCompactionEventNumber None 0 batchSize streamVersion + + let (|Unpack|) (x : StreamToken) : Position = let t = unbox x.value in t.pos + + /// Use previousToken plus the data we are adding and the position we are adding it to infer a headroom + let ofPreviousTokenAndEventsLength (Unpack previousToken) eventsLength batchSize streamVersion : StreamToken = + let compactedEventNumber = previousToken.compactionEventNumber + ofCompactionEventNumber compactedEventNumber eventsLength batchSize streamVersion + + /// Use an event just read from the stream to infer headroom + let ofCompactionResolvedEventAndVersion (compactionEvent : ResolvedEvent) batchSize streamVersion : StreamToken = + let e = compactionEvent.Event.EventNumber in ofCompactionEventNumber (Some (e.ToInt64())) 0 batchSize streamVersion + + /// Use an event we are about to write to the stream to infer headroom + let ofPreviousStreamVersionAndCompactionEventDataIndex (Unpack token) compactionEventDataIndex eventsLength batchSize streamVersion' : StreamToken = + ofCompactionEventNumber (Some (token.streamVersion + 1L + int64 compactionEventDataIndex)) eventsLength batchSize streamVersion' + + let supersedes (Unpack current) (Unpack x) = + let currentVersion, newVersion = current.streamVersion, x.streamVersion + newVersion > currentVersion + +type EventStoreConnection(readConnection, [] ?writeConnection, [] ?readRetryPolicy, [] ?writeRetryPolicy) = + member _.ReadConnection = readConnection + member _.ReadRetryPolicy = readRetryPolicy + member _.WriteConnection = defaultArg writeConnection readConnection + member _.WriteRetryPolicy = writeRetryPolicy + +type BatchingPolicy(getMaxBatchSize : unit -> int) = + new (maxBatchSize) = BatchingPolicy(fun () -> maxBatchSize) + // TOCONSIDER remove if Client does not start to expose it + member _.BatchSize = getMaxBatchSize() + +[] +type GatewaySyncResult = Written of StreamToken | ConflictUnknown of StreamToken + +type EventStoreContext(conn : EventStoreConnection, batching : BatchingPolicy) = + let isResolvedEventEventType (tryDecode, predicate) (x : ResolvedEvent) = predicate (tryDecode x.Event.Data) + let tryIsResolvedEventEventType predicateOption = predicateOption |> Option.map isResolvedEventEventType + + member _.TokenEmpty = Token.ofUncompactedVersion batching.BatchSize -1L + member _.LoadBatched(streamName, log, tryDecode, isCompactionEventType) : Async = async { + let! version, events = Read.loadForwards log conn.ReadConnection streamName StreamPosition.Start + match tryIsResolvedEventEventType isCompactionEventType with + | None -> return Token.ofNonCompacting version, Array.choose tryDecode events + | Some isCompactionEvent -> + match events |> Array.tryFindBack isCompactionEvent with + | None -> return Token.ofUncompactedVersion batching.BatchSize version, Array.choose tryDecode events + | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize version, Array.choose tryDecode events } + + member _.LoadBackwardsStoppingAtCompactionEvent(streamName, log, limit, (tryDecode, isOrigin)) : Async = async { + let! version, events = Read.loadBackwards log conn.ReadConnection (defaultArg limit Int32.MaxValue) streamName (tryDecode, isOrigin) + match Array.tryHead events |> Option.filter (function _, Some e -> isOrigin e | _ -> false) with + | None -> return Token.ofUncompactedVersion batching.BatchSize version, Array.choose snd events + | Some (resolvedEvent, _) -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize version, Array.choose snd events } + + member _.LoadFromToken(useWriteConn, streamName, log, (Token.Unpack token as streamToken), tryDecode, isCompactionEventType) + : Async = async { + let streamPosition = StreamPosition.FromInt64(token.streamVersion + 1L) + let connToUse = if useWriteConn then conn.WriteConnection else conn.ReadConnection + let! version, events = Read.loadForwards log connToUse streamName streamPosition + match isCompactionEventType with + | None -> return Token.ofNonCompacting version, Array.choose tryDecode events + | Some isCompactionEvent -> + match events |> Array.tryFindBack (fun re -> match tryDecode re with Some e -> isCompactionEvent e | _ -> false) with + | None -> return Token.ofPreviousTokenAndEventsLength streamToken events.Length batching.BatchSize version, Array.choose tryDecode events + | Some resolvedEvent -> return Token.ofCompactionResolvedEventAndVersion resolvedEvent batching.BatchSize version, Array.choose tryDecode events } + + member _.TrySync(log, streamName, (Token.Unpack token as streamToken), events, encodedEvents : EventData array, isCompactionEventType): Async = async { + let streamVersion = token.streamVersion + let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents + match wr with + | EsSyncResult.Conflict actualVersion -> + return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting actualVersion) + | EsSyncResult.Written wr -> + let version' = wr.NextExpectedVersion + let token = + match isCompactionEventType with + | None -> Token.ofNonCompacting version' + | Some isCompactionEvent -> + match events |> Array.ofList |> Array.tryFindIndexBack isCompactionEvent with + | None -> Token.ofPreviousTokenAndEventsLength streamToken encodedEvents.Length batching.BatchSize version' + | Some compactionEventIndex -> + Token.ofPreviousStreamVersionAndCompactionEventDataIndex streamToken compactionEventIndex encodedEvents.Length batching.BatchSize version' + return GatewaySyncResult.Written token } + + member _.Sync(log, streamName, streamVersion, events : FsCodec.IEventData[]) : Async = async { + let encodedEvents : EventData[] = events |> Array.map UnionEncoderAdapters.eventDataOfEncodedEvent + let! wr = Write.writeEvents log conn.WriteRetryPolicy conn.WriteConnection streamName streamVersion encodedEvents + match wr with + | EsSyncResult.Conflict actualVersion -> + return GatewaySyncResult.ConflictUnknown (Token.ofNonCompacting actualVersion) + | EsSyncResult.Written wr -> + let version' = wr.NextExpectedVersion + let token = Token.ofNonCompacting version' + return GatewaySyncResult.Written token } + +[] +type AccessStrategy<'event, 'state> = + /// Load only the single most recent event defined in 'event and trust that doing a fold from any such event + /// will yield a correct and complete state + /// In other words, the fold function should not need to consider either the preceding 'state or 'events. + | LatestKnownEvent + /// Ensures a snapshot/compaction event from which the state can be reconstituted upon decoding is always present + /// (embedded in the stream as an event), generated every batchSize events using the supplied toSnapshot function + /// Scanning for events concludes when any event passes the isOrigin test. + /// See https://eventstore.org/docs/event-sourcing-basics/rolling-snapshots/index.html + | RollingSnapshots of isOrigin : ('event -> bool) * toSnapshot : ('state -> 'event) + +type private CompactionContext(eventsLen : int, capacityBeforeCompaction : int) = + /// Determines whether writing a Compaction event is warranted (based on the existing state and the current accumulated changes) + member _.IsCompactionDue = eventsLen > capacityBeforeCompaction + +type private Category<'event, 'state, 'context>(context : EventStoreContext, codec : FsCodec.IEventCodec<_, _, 'context>, ?access : AccessStrategy<'event, 'state>) = + let tryDecode (e : ResolvedEvent) = e |> UnionEncoderAdapters.encodedEventOfResolvedEvent |> codec.TryDecode + + let compactionPredicate = + match access with + | None -> None + | Some AccessStrategy.LatestKnownEvent -> Some (fun _ -> true) + | Some (AccessStrategy.RollingSnapshots (isValid, _)) -> Some isValid + + let isOrigin = + match access with + | None | Some AccessStrategy.LatestKnownEvent -> fun _ -> true + | Some (AccessStrategy.RollingSnapshots (isValid, _)) -> isValid + + let loadAlgorithm load streamName initial log = + let batched = load initial (context.LoadBatched(streamName, log, tryDecode, None)) + let compacted limit = load initial (context.LoadBackwardsStoppingAtCompactionEvent(streamName, log, limit, (tryDecode, isOrigin))) + match access with + | None -> batched + | Some AccessStrategy.LatestKnownEvent -> compacted (Some 1) + | Some (AccessStrategy.RollingSnapshots _) -> compacted None + + let load (fold : 'state -> 'event seq -> 'state) initial f = async { + let! token, events = f + return token, fold initial events } + + member _.Load(fold : 'state -> 'event seq -> 'state, initial : 'state, streamName : string, log : ILogger) : Async = + loadAlgorithm (load fold) streamName initial log + + member _.LoadFromToken(fold : 'state -> 'event seq -> 'state, state : 'state, streamName : string, token, log : ILogger) : Async = + (load fold) state (context.LoadFromToken(false, streamName, log, token, tryDecode, compactionPredicate)) + + member _.TrySync<'context> + ( log : ILogger, fold : 'state -> 'event seq -> 'state, + streamName, (Token.Unpack token as streamToken), state : 'state, events : 'event list, ctx : 'context option) : Async> = async { + let encode e = codec.Encode(ctx, e) + let events = + match access with + | None | Some AccessStrategy.LatestKnownEvent -> events + | Some (AccessStrategy.RollingSnapshots (_, compact)) -> + let cc = CompactionContext(List.length events, token.batchCapacityLimit.Value) + if cc.IsCompactionDue then events @ [fold state events |> compact] else events + + let encodedEvents : EventData[] = events |> Seq.map (encode >> UnionEncoderAdapters.eventDataOfEncodedEvent) |> Array.ofSeq + let! syncRes = context.TrySync(log, streamName, streamToken, events, encodedEvents, compactionPredicate) + match syncRes with + | GatewaySyncResult.ConflictUnknown _ -> + return SyncResult.Conflict (load fold state (context.LoadFromToken(true, streamName, log, streamToken, tryDecode, compactionPredicate))) + | GatewaySyncResult.Written token' -> + return SyncResult.Written (token', fold state (Seq.ofList events)) } + +type private Folder<'event, 'state, 'context>(category : Category<'event, 'state, 'context>, fold : 'state -> 'event seq -> 'state, initial : 'state, ?readCache) = + let batched log streamName = category.Load(fold, initial, streamName, log) + interface ICategory<'event, 'state, string, 'context> with + member _.Load(log, streamName, allowStale) : Async = + match readCache with + | None -> batched log streamName + | Some (cache : ICache, prefix : string) -> async { + match! cache.TryGet(prefix + streamName) with + | None -> return! batched log streamName + | Some tokenAndState when allowStale -> return tokenAndState + | Some (token, state) -> return! category.LoadFromToken(fold, state, streamName, token, log) } + + member _.TrySync(log : ILogger, streamName, token, initialState, events : 'event list, context) : Async> = async { + let! syncRes = category.TrySync(log, fold, streamName, token, initialState, events, context) + match syncRes with + | SyncResult.Conflict resync -> return SyncResult.Conflict resync + | SyncResult.Written (token', state') -> return SyncResult.Written (token', state') } + +/// For EventStoreDB, caching is less critical than it is for e.g. CosmosDB +/// As such, it can often be omitted, particularly if streams are short or there are snapshots being maintained +[] +type CachingStrategy = + /// Retain a single 'state per streamName. + /// Each cache hit for a stream renews the retention period for the defined window. + /// Upon expiration of the defined window from the point at which the cache was entry was last used, a full reload is triggered. + /// Unless LoadOption.AllowStale is used, each cache hit still incurs a roundtrip to load any subsequently-added events. + | SlidingWindow of ICache * window : TimeSpan + /// Retain a single 'state per streamName. + /// Upon expiration of the defined period, a full reload is triggered. + /// Unless LoadOption.AllowStale is used, each cache hit still incurs a roundtrip to load any subsequently-added events. + | FixedTimeSpan of ICache * period : TimeSpan + /// Prefix is used to segregate multiple folds per stream when they are stored in the cache. + /// Semantics are identical to SlidingWindow. + | SlidingWindowPrefixed of ICache * window : TimeSpan * prefix : string + +type EventStoreCategory<'event, 'state, 'context> + ( context : EventStoreContext, codec : FsCodec.IEventCodec<_, _, 'context>, fold, initial, + // Caching can be overkill for EventStore esp considering the degree to which its intrinsic caching is a first class feature + // e.g., A key benefit is that reads of streams more than a few pages long get completed in constant time after the initial load + [] ?caching, + [] ?access) = + + do match access with + | Some AccessStrategy.LatestKnownEvent when Option.isSome caching -> + "Equinox.EventStoreDb does not support (and it would make things _less_ efficient even if it did)" + + "mixing AccessStrategy.LatestKnownEvent with Caching at present." + |> invalidOp + | _ -> () + let inner = Category<'event, 'state, 'context>(context, codec, ?access = access) + let readCacheOption = + match caching with + | None -> None + | Some (CachingStrategy.SlidingWindow (cache, _)) + | Some (CachingStrategy.FixedTimeSpan (cache, _)) -> Some (cache, null) + | Some (CachingStrategy.SlidingWindowPrefixed (cache, _, prefix)) -> Some (cache, prefix) + let folder = Folder<'event, 'state, 'context>(inner, fold, initial, ?readCache = readCacheOption) + let category : ICategory<_, _, _, 'context> = + match caching with + | None -> folder :> _ + | Some (CachingStrategy.SlidingWindow (cache, window)) -> + Caching.applyCacheUpdatesWithSlidingExpiration cache null window folder Token.supersedes + | Some (CachingStrategy.FixedTimeSpan (cache, period)) -> + Caching.applyCacheUpdatesWithFixedTimeSpan cache null period folder Token.supersedes + | Some (CachingStrategy.SlidingWindowPrefixed (cache, window, prefix)) -> + Caching.applyCacheUpdatesWithSlidingExpiration cache prefix window folder Token.supersedes + let resolve streamName = category, FsCodec.StreamName.toString streamName, None + let empty = context.TokenEmpty, initial + let storeCategory = StoreCategory(resolve, empty) + member _.Resolve(streamName : FsCodec.StreamName, [] ?context) = storeCategory.Resolve(streamName, ?context = context) + +(* TODO +type private SerilogAdapter(log : ILogger) = + interface EventStore.ClientAPI.ILogger with + member _.Debug(format : string, args : obj []) = log.Debug(format, args) + member _.Debug(ex : exn, format : string, args : obj []) = log.Debug(ex, format, args) + member _.Info(format : string, args : obj []) = log.Information(format, args) + member _.Info(ex : exn, format : string, args : obj []) = log.Information(ex, format, args) + member _.Error(format : string, args : obj []) = log.Error(format, args) + member _.Error(ex : exn, format : string, args : obj []) = log.Error(ex, format, args) + +[] +type Logger = + | SerilogVerbose of ILogger + | SerilogNormal of ILogger + | CustomVerbose of EventStore.ClientAPI.ILogger + | CustomNormal of EventStore.ClientAPI.ILogger + member log.Configure(b : ConnectionSettingsBuilder) = + match log with + | SerilogVerbose logger -> b.EnableVerboseLogging().UseCustomLogger(SerilogAdapter(logger)) + | SerilogNormal logger -> b.UseCustomLogger(SerilogAdapter(logger)) + | CustomVerbose logger -> b.EnableVerboseLogging().UseCustomLogger(logger) + | CustomNormal logger -> b.UseCustomLogger(logger) +*) + +[] +type Discovery = + // Allow Uri-based connection definition (esdb://, etc) + | ConnectionString of string +(* TODO + /// Supply a set of pre-resolved EndPoints instead of letting Gossip resolution derive from the DNS outcome + | GossipSeeded of seedManagerEndpoints : System.Net.IPEndPoint [] + // Standard Gossip-based discovery based on Dns query and standard manager port + | GossipDns of clusterDns : string + // Standard Gossip-based discovery based on Dns query (with manager port overriding default 2113) + | GossipDnsCustomPort of clusterDns : string * managerPortOverride : int + +module private Discovery = + let buildDns np (f : DnsClusterSettingsBuilder -> DnsClusterSettingsBuilder) = + ClusterSettings.Create().DiscoverClusterViaDns().KeepDiscovering() + |> fun s -> match np with NodePreference.Random -> s.PreferRandomNode() | NodePreference.PreferSlave -> s.PreferSlaveNode() | _ -> s + |> f |> fun s -> s.Build() + + let buildSeeded np (f : GossipSeedClusterSettingsBuilder -> GossipSeedClusterSettingsBuilder) = + ClusterSettings.Create().DiscoverClusterViaGossipSeeds().KeepDiscovering() + |> fun s -> match np with NodePreference.Random -> s.PreferRandomNode() | NodePreference.PreferSlave -> s.PreferSlaveNode() | _ -> s + |> f |> fun s -> s.Build() + + let configureDns clusterDns maybeManagerPort (x : DnsClusterSettingsBuilder) = + x.SetClusterDns(clusterDns) + |> fun s -> match maybeManagerPort with Some port -> s.SetClusterGossipPort(port) | None -> s + + let inline configureSeeded (seedEndpoints : System.Net.IPEndPoint []) (x : GossipSeedClusterSettingsBuilder) = + x.SetGossipSeedEndPoints(seedEndpoints) + + // converts a Discovery mode to a ClusterSettings or a Uri as appropriate + let (|DiscoverViaUri|DiscoverViaGossip|) : Discovery * NodePreference -> Choice = function + | (Discovery.Uri uri), _ -> DiscoverViaUri uri + | (Discovery.GossipSeeded seedEndpoints), np -> DiscoverViaGossip (buildSeeded np (configureSeeded seedEndpoints)) + | (Discovery.GossipDns clusterDns), np -> DiscoverViaGossip (buildDns np (configureDns clusterDns None)) + | (Discovery.GossipDnsCustomPort (dns, port)), np ->DiscoverViaGossip (buildDns np (configureDns dns (Some port))) +*) + +// see https://github.com/EventStore/EventStore/issues/1652 +[] +type ConnectionStrategy = + /// Pair of master and slave connections, writes direct, often can't read writes, resync without backoff (kind to master, writes+resyncs optimal) + | ClusterTwinPreferSlaveReads + /// Single connection, with resync backoffs appropriate to the NodePreference + | ClusterSingle of NodePreference + +type EventStoreConnector + ( reqTimeout : TimeSpan, reqRetries : int, + ?readRetryPolicy, ?writeRetryPolicy, ?tags, + ?customize : EventStoreClientSettings -> unit) = +(* TODO port + let connSettings node = + ConnectionSettings.Create().SetDefaultUserCredentials(SystemData.UserCredentials(username, password)) + .KeepReconnecting() // ES default: .LimitReconnectionsTo(10) + .SetQueueTimeoutTo(reqTimeout) // ES default: Zero/unlimited + .FailOnNoServerResponse() // ES default: DoNotFailOnNoServerResponse() => wait forever; retry and/or log + .SetOperationTimeoutTo(reqTimeout) // ES default: 7s + .LimitRetriesForOperationTo(reqRetries) // ES default: 10 + |> fun s -> + match node with + | NodePreference.Master -> s.PerformOnMasterOnly() // explicitly use ES default of requiring master, use default Node preference of Master + | NodePreference.PreferMaster -> s.PerformOnAnyNode() // override default [implied] PerformOnMasterOnly(), use default Node preference of Master + // NB .PreferSlaveNode/.PreferRandomNode setting is ignored if using EventStoreConneciton.Create(ConnectionSettings, ClusterSettings) overload but + // this code is necessary for cases where people are using the discover :// and related URI schemes + | NodePreference.PreferSlave -> s.PerformOnAnyNode().PreferSlaveNode() // override default PerformOnMasterOnly(), override Master Node preference + | NodePreference.Random -> s.PerformOnAnyNode().PreferRandomNode() // override default PerformOnMasterOnly(), override Master Node preference + |> fun s -> match concurrentOperationsLimit with Some col -> s.LimitConcurrentOperationsTo(col) | None -> s // ES default: 5000 + |> fun s -> match heartbeatTimeout with Some v -> s.SetHeartbeatTimeout v | None -> s // default: 1500 ms + |> fun s -> match gossipTimeout with Some v -> s.SetGossipTimeout v | None -> s // default: 1000 ms + |> fun s -> match clientConnectionTimeout with Some v -> s.WithConnectionTimeoutOf v | None -> s // default: 1000 ms + |> fun s -> match log with Some log -> log.Configure s | None -> s + |> fun s -> s.Build() +*) + member _.Connect + ( // Name should be sufficient to uniquely identify this connection within a single app instance's logs + name, discovery : Discovery, ?clusterNodePreference) : EventStoreClient = + let settings = + match discovery with + | Discovery.ConnectionString s -> EventStoreClientSettings.Create(s) +(* TODO + | Discovery.DiscoverViaGossip clusterSettings -> + // NB This overload's implementation ignores the calls to ConnectionSettingsBuilder.PreferSlaveNode/.PreferRandomNode and + // requires equivalent ones on the GossipSeedClusterSettingsBuilder or ClusterSettingsBuilder + EventStoreConnection.Create(connSettings clusterNodePreference, clusterSettings, sanitizedName) *) + if name = null then nullArg "name" + let name = String.concat ";" <| seq { + name + string clusterNodePreference + match tags with None -> () | Some tags -> for key, value in tags do sprintf "%s=%s" key value } + let sanitizedName = name.Replace('\'','_').Replace(':','_') // ES internally uses `:` and `'` as separators in log messages and ... people regex logs + settings.ConnectionName <- sanitizedName + match clusterNodePreference with None -> () | Some np -> settings.ConnectivitySettings.NodePreference <- np + match customize with None -> () | Some f -> f settings + settings.DefaultDeadline <- reqTimeout + // TODO implement reqRetries + new EventStoreClient(settings) + + /// Yields a Connection (which may internally be twin connections) configured per the specified strategy + member x.Establish + ( // Name should be sufficient to uniquely identify this (aggregate) connection within a single app instance's logs + name, + discovery : Discovery, strategy : ConnectionStrategy) : EventStoreConnection = + match strategy with + | ConnectionStrategy.ClusterSingle nodePreference -> + let client = x.Connect(name, discovery, nodePreference) + EventStoreConnection(client, ?readRetryPolicy = readRetryPolicy, ?writeRetryPolicy = writeRetryPolicy) + | ConnectionStrategy.ClusterTwinPreferSlaveReads -> + let leader = x.Connect(name + "-TwinW", discovery, NodePreference.Leader) + let follower = x.Connect(name + "-TwinR", discovery, NodePreference.Follower) + EventStoreConnection(readConnection = follower, writeConnection = leader, ?readRetryPolicy = readRetryPolicy, ?writeRetryPolicy = writeRetryPolicy) diff --git a/src/Equinox.SqlStreamStore/Equinox.SqlStreamStore.fsproj b/src/Equinox.SqlStreamStore/Equinox.SqlStreamStore.fsproj index a85dadf82..48631f220 100644 --- a/src/Equinox.SqlStreamStore/Equinox.SqlStreamStore.fsproj +++ b/src/Equinox.SqlStreamStore/Equinox.SqlStreamStore.fsproj @@ -8,6 +8,7 @@ + diff --git a/tests/Equinox.EventStore.Integration/Equinox.EventStore.Integration.fsproj b/tests/Equinox.EventStore.Integration/Equinox.EventStore.Integration.fsproj index 07b4ee52d..be82aaf12 100644 --- a/tests/Equinox.EventStore.Integration/Equinox.EventStore.Integration.fsproj +++ b/tests/Equinox.EventStore.Integration/Equinox.EventStore.Integration.fsproj @@ -2,8 +2,7 @@ net6.0 - false - 5 + $(DefineConstants);STORE_EVENTSTORE_LEGACY diff --git a/tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs b/tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs index b75a4f47d..e77b4637c 100644 --- a/tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs +++ b/tests/Equinox.EventStore.Integration/EventStoreTokenTests.fs @@ -1,7 +1,11 @@ module Equinox.EventStore.Tests.EventStoreTokenTests open Equinox.Core +#if !STORE_EVENTSTORE_LEGACY +open Equinox.EventStoreDb +#else open Equinox.EventStore +#endif open FsCheck.Xunit open Swensen.Unquote.Assertions open Xunit diff --git a/tests/Equinox.EventStore.Integration/Infrastructure.fs b/tests/Equinox.EventStore.Integration/Infrastructure.fs index 75e33fd20..06da36049 100644 --- a/tests/Equinox.EventStore.Integration/Infrastructure.fs +++ b/tests/Equinox.EventStore.Integration/Infrastructure.fs @@ -15,8 +15,12 @@ type FsCheckGenerators = #if STORE_POSTGRES || STORE_MSSQL || STORE_MYSQL open Equinox.SqlStreamStore #else +#if !STORE_EVENTSTORE_LEGACY +open Equinox.EventStoreDb +#else open Equinox.EventStore #endif +#endif [] module SerilogHelpers = diff --git a/tests/Equinox.EventStore.Integration/StoreIntegration.fs b/tests/Equinox.EventStore.Integration/StoreIntegration.fs index b1abad481..6b814d6a9 100644 --- a/tests/Equinox.EventStore.Integration/StoreIntegration.fs +++ b/tests/Equinox.EventStore.Integration/StoreIntegration.fs @@ -46,7 +46,16 @@ let connectToLocalStore (_ : ILogger) = type Context = SqlStreamStoreContext type Category<'event, 'state, 'context> = SqlStreamStoreCategory<'event, 'state, 'context> -#else // STORE_EVENTSTORE +#else +#if !STORE_EVENTSTORE_LEGACY +open Equinox.EventStoreDb + +/// Connect directly to a locally running EventStoreDB Node using gRPC, without using Gossip-driven discovery +let connectToLocalStore (_log : ILogger) = async { + let c = EventStoreConnector(reqTimeout=TimeSpan.FromSeconds 3., reqRetries=3, (*, log=Logger.SerilogVerbose log,*) tags=["I",Guid.NewGuid() |> string]) + let conn = c.Establish("Equinox-integration", Discovery.ConnectionString "esdb://localhost:2111,localhost:2112,localhost:2113?tls=true&tlsVerifyCert=false", ConnectionStrategy.ClusterSingle EventStore.Client.NodePreference.Leader) + return conn } +#else // STORE_EVENTSTORE_LEGACY open Equinox.EventStore // NOTE: use `docker compose up` to establish the standard 3 node config at ports 1113/2113 @@ -61,7 +70,7 @@ let connectToLocalStore log = // Connect directly to the locally running EventStore Node using Gossip-driven discovery ).Establish("Equinox-integration", Discovery.GossipDns "localhost", ConnectionStrategy.ClusterTwinPreferSlaveReads) #endif - +#endif type Context = EventStoreContext type Category<'event, 'state, 'context> = EventStoreCategory<'event, 'state, 'context> #endif @@ -114,7 +123,11 @@ type Tests(testOutputHelper) = let addAndThenRemoveItemsOptimisticManyTimesExceptTheLastOne context cartId skuId service count = addAndThenRemoveItems true true context cartId skuId service count +#if STORE_EVENTSTOREDB // gRPC does not expose slice metrics + let sliceForward = [] +#else let sliceForward = [EsAct.SliceForward] +#endif let singleBatchForward = sliceForward @ [EsAct.BatchForward] let batchForwardAndAppend = singleBatchForward @ [EsAct.Append] @@ -223,7 +236,11 @@ type Tests(testOutputHelper) = test <@ [1; 1] = [for c in [capture1; capture2] -> c.ChooseCalls hadConflict |> List.length] @> } +#if STORE_EVENTSTOREDB // gRPC does not expose slice metrics + let sliceBackward = [] +#else let sliceBackward = [EsAct.SliceBackward] +#endif let singleBatchBackwards = sliceBackward @ [EsAct.BatchBackward] let batchBackwardsAndAppend = singleBatchBackwards @ [EsAct.Append] diff --git a/tests/Equinox.EventStoreDb.Integration/Equinox.EventStoreDb.Integration.fsproj b/tests/Equinox.EventStoreDb.Integration/Equinox.EventStoreDb.Integration.fsproj new file mode 100644 index 000000000..d64eca76c --- /dev/null +++ b/tests/Equinox.EventStoreDb.Integration/Equinox.EventStoreDb.Integration.fsproj @@ -0,0 +1,34 @@ + + + + net6.0 + $(DefineConstants);STORE_EVENTSTOREDB + + + + + + + + + + + + + + + + + + + + + + + + all + runtime; build; native; contentfiles; analyzers + + + + diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index 21824bc10..ec97c082c 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -226,7 +226,7 @@ let createStoreLog verbose verboseConsole maybeSeqEndpoint = let c = LoggerConfiguration().Destructure.FSharpTypes() let c = if verbose then c.MinimumLevel.Debug() else c let c = c.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink()) - let c = c.WriteTo.Sink(Equinox.EventStore.Log.InternalMetrics.Stats.LogSink()) + let c = c.WriteTo.Sink(Equinox.EventStoreDb.Log.InternalMetrics.Stats.LogSink()) let c = c.WriteTo.Sink(Equinox.SqlStreamStore.Log.InternalMetrics.Stats.LogSink()) let level = match verbose, verboseConsole with @@ -243,7 +243,7 @@ let dumpStats storeConfig log = | Some (Storage.StorageConfig.Cosmos _) -> Equinox.CosmosStore.Core.Log.InternalMetrics.dump log | Some (Storage.StorageConfig.Es _) -> - Equinox.EventStore.Log.InternalMetrics.dump log + Equinox.EventStoreDb.Log.InternalMetrics.dump log | Some (Storage.StorageConfig.Sql _) -> Equinox.SqlStreamStore.Log.InternalMetrics.dump log | _ -> () @@ -301,7 +301,7 @@ module LoadTest = test, a.Duration, a.TestsPerSecond, clients.Length, a.ErrorCutoff, a.ReportingIntervals, reportFilename) // Reset the start time based on which the shared global metrics will be computed let _ = Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink.Restart() - let _ = Equinox.EventStore.Log.InternalMetrics.Stats.LogSink.Restart() + let _ = Equinox.EventStoreDb.Log.InternalMetrics.Stats.LogSink.Restart() let _ = Equinox.SqlStreamStore.Log.InternalMetrics.Stats.LogSink.Restart() let results = runLoadTest log a.TestsPerSecond (duration.Add(TimeSpan.FromSeconds 5.)) a.ErrorCutoff a.ReportingIntervals clients runSingleTest |> Async.RunSynchronously @@ -315,7 +315,7 @@ let createDomainLog verbose verboseConsole maybeSeqEndpoint = let c = LoggerConfiguration().Destructure.FSharpTypes().Enrich.FromLogContext() let c = if verbose then c.MinimumLevel.Debug() else c let c = c.WriteTo.Sink(Equinox.CosmosStore.Core.Log.InternalMetrics.Stats.LogSink()) - let c = c.WriteTo.Sink(Equinox.EventStore.Log.InternalMetrics.Stats.LogSink()) + let c = c.WriteTo.Sink(Equinox.EventStoreDb.Log.InternalMetrics.Stats.LogSink()) let c = c.WriteTo.Sink(Equinox.SqlStreamStore.Log.InternalMetrics.Stats.LogSink()) let outputTemplate = "{Timestamp:T} {Level:u1} {Message:l} {Properties}{NewLine}{Exception}" let c = c.WriteTo.Console((if verboseConsole then LogEventLevel.Debug else LogEventLevel.Information), outputTemplate, theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code)