From e5ecd12d117a79b0f154f059f2c866330891fa39 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Sun, 23 Feb 2020 21:28:12 +0000 Subject: [PATCH] Add CosmosDB Support for proAllProjector (#47) --- CHANGELOG.md | 4 + README.md | 9 +- .../.template.config/template.json | 10 +- propulsion-all-projector/AllProjector.fsproj | 4 +- propulsion-all-projector/Handler.fs | 2 + propulsion-all-projector/Program.fs | 254 ++++++++++++++---- propulsion-all-projector/README.md | 63 ++++- propulsion-all-projector/Todo.fs | 14 +- propulsion-all-projector/TodoSummary.fs | 11 + 9 files changed, 306 insertions(+), 65 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 6af4aca82..7a0b9095e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,10 @@ The `Unreleased` section name is replaced by the expected version of next releas ## [Unreleased] ### Added + +- Add `Propulsion.Cosmos` support to `proAllProjector` [#47](https://github.com/jet/dotnet-templates/pulls/47) +- Add `-noEventStore` flag to `proAllProjector` [#47](https://github.com/jet/dotnet-templates/pulls/47) + ### Changed ### Removed ### Fixed diff --git a/README.md b/README.md index 783985d87..0dde4d31c 100644 --- a/README.md +++ b/README.md @@ -26,13 +26,14 @@ This repo hosts the source for Jet's [`dotnet new`](https://docs.microsoft.com/e - [`trackingConsumer`](propulsion-tracking-consumer/README.md) - Boilerplate for an Apache Kafka Consumer using [`Propulsion.Kafka`](https://github.com/jet/propulsion) to ingest accumulating changes in an `Equinox.Cosmos` store idempotently. -- [`proAllProjector`](propulsion-all-projector/README.md) - Boilerplate for an EventStore `$all` stream projector (projecting from an EventStore using `Propulsion.EventStore`.EventStore +- [`proAllProjector`](propulsion-all-projector/README.md) - Boilerplate for a dual mode CosmosDB ChangeFeed Processor and/or EventStore `$all` stream projector using `Propulsion.Cosmos`/`Propulsion.EventStore` - **NOTE At present, checkpoint storage is only implemented for Azure CosmosDB - help wanted ;)** + **NOTE At present, checkpoint storage when projecting from EventStore uses Azure CosmosDB - help wanted ;)** - Standard processing shows importing (in summary form) from `EventStore` to `Cosmos` (use `-b` to remove, yielding a minimal projector) + Standard processing shows importing (in summary form) from an aggregate in `EventStore` or `Cosmos` to a Summary form in `Cosmos` (use `-b`(`lank`) to remove, yielding a minimal projector) - `-k` adds Optional projection to Apache Kafka using [`Propulsion.Kafka`](https://github.com/jet/propulsion). + `-k` adds Optional projection to Apache Kafka using [`Propulsion.Kafka`](https://github.com/jet/propulsion) (instead of ingesting into a local `Cosmos` store). + `-noEventStore` removes support for projecting from EventStore from the emitted code - [`proSync`](propulsion-sync/README.md) - Boilerplate for a console app that that syncs events between [`Equinox.Cosmos` and `Equinox.EventStore` stores](https://github.com/jet/equinox) using the [relevant `Propulsion`.* libraries](https://github.com/jet/propulsion), filtering/enriching/mapping Events as necessary. diff --git a/propulsion-all-projector/.template.config/template.json b/propulsion-all-projector/.template.config/template.json index 94b292e30..6a43616b1 100644 --- a/propulsion-all-projector/.template.config/template.json +++ b/propulsion-all-projector/.template.config/template.json @@ -2,6 +2,7 @@ "$schema": "http://json.schemastore.org/template", "author": "@jet @bartelink", "classifications": [ + "Cosmos", "Event Sourcing", "Equinox", "Propulsion", @@ -12,11 +13,18 @@ "language": "F#" }, "identity": "Propulsion.Template.AllProjector", - "name": "Propulsion EventStore $all Projector", + "name": "Propulsion EventStore and/or CosmosDB Projector", "shortName": "proAllProjector", "sourceName": "AllTemplate", "preferNameDirectory": true, "symbols": { + "noEventStore": { + "type": "parameter", + "datatype": "bool", + "isRequired": false, + "defaultValue": "false", + "description": "Disable projecting from EventStore." + }, "kafka": { "type": "parameter", "datatype": "bool", diff --git a/propulsion-all-projector/AllProjector.fsproj b/propulsion-all-projector/AllProjector.fsproj index c8bd8715f..560fcb8e2 100644 --- a/propulsion-all-projector/AllProjector.fsproj +++ b/propulsion-all-projector/AllProjector.fsproj @@ -24,8 +24,10 @@ - + + + diff --git a/propulsion-all-projector/Handler.fs b/propulsion-all-projector/Handler.fs index cc2006d62..3869e03bb 100644 --- a/propulsion-all-projector/Handler.fs +++ b/propulsion-all-projector/Handler.fs @@ -1,5 +1,6 @@ module AllTemplate.Handler +//#if (!noEventStore) open Propulsion.EventStore /// Responsible for inspecting and then either dropping or tweaking events coming from EventStore @@ -8,6 +9,7 @@ let tryMapEvent filterByStreamName (x : EventStore.ClientAPI.ResolvedEvent) = match x.Event with | e when not e.IsJson || e.EventStreamId.StartsWith "$" || not (filterByStreamName e.EventStreamId) -> None | PropulsionStreamEvent e -> Some e +//#endif //#if kafka /// Responsible for wrapping a span of events for a specific stream into an envelope (we use the well-known Propulsion.Codec form) diff --git a/propulsion-all-projector/Program.fs b/propulsion-all-projector/Program.fs index ac00863d3..2018bbb45 100644 --- a/propulsion-all-projector/Program.fs +++ b/propulsion-all-projector/Program.fs @@ -1,6 +1,9 @@ module AllTemplate.Program +open Propulsion.Cosmos +//#if (!noEventStore) open Propulsion.EventStore +//#endif open Serilog open System @@ -32,7 +35,9 @@ module CmdParser = | Some x -> x open Argu open Equinox.Cosmos +//#if (!noEventStore) open Equinox.EventStore +//#endif [] type Parameters = | [] ConsumerGroupName of string @@ -44,7 +49,10 @@ module CmdParser = | [] CategoryBlacklist of string | [] CategoryWhitelist of string +//#if (!noEventStore) | [] Es of ParseResults +//#endif + | [] SrcCosmos of ParseResults interface IArgParserTemplate with member a.Usage = match a with @@ -55,7 +63,10 @@ module CmdParser = | VerboseConsole -> "request Verbose Console Logging. Default: off." | CategoryBlacklist _ -> "category whitelist" | CategoryWhitelist _ -> "category blacklist" +//#if (!noEventStore) | Es _ -> "specify EventStore input parameters." +//#endif + | SrcCosmos _ -> "specify CosmosDB input parameters." and Arguments(a : ParseResults) = member __.ConsumerGroupName = a.GetResult ConsumerGroupName member __.Verbose = a.Contains Parameters.Verbose @@ -89,21 +100,110 @@ module CmdParser = | [], good -> let white = Set.ofList good in Log.Warning("Only copying categories: {cats}", white); fun x -> white.Contains x | _, _ -> raise (MissingArg "BlackList and Whitelist are mutually exclusive; inclusions and exclusions cannot be mixed") - member val Source : EsSourceArguments = + +#if (!noEventStore) + member val Source : Choice = + match a.TryGetSubCommand() with + | Some (Es es) -> Choice1Of2 (EsSourceArguments es) + | Some (SrcCosmos cosmos) -> Choice2Of2 (CosmosSourceArguments cosmos) + | _ -> raise (MissingArg "Must specify one of cosmos or es for Src") + member x.SourceParams() : Choice = +#else + member val Source : CosmosSourceArguments = match a.TryGetSubCommand() with - | Some (Es es) -> EsSourceArguments es - | _ -> raise (MissingArg "Must specify es for Src") - member x.SourceParams() : EsSourceArguments*CosmosArguments*ReaderSpec = - let srcE = x.Source - let startPos, cosmos = srcE.StartPos, srcE.CheckpointStore - Log.Information("Processing Consumer Group {groupName} from {startPos} (force: {forceRestart}) in Database {db} Container {container}", - x.ConsumerGroupName, startPos, srcE.ForceRestart, cosmos.Database, cosmos.Container) - Log.Information("Ingesting in batches of [{minBatchSize}..{batchSize}], reading up to {maxReadAhead} uncommitted batches ahead", - srcE.MinBatchSize, srcE.StartingBatchSize, x.MaxReadAhead) - srcE, cosmos, - { groupName = x.ConsumerGroupName; start = startPos; checkpointInterval = srcE.CheckpointInterval; tailInterval = srcE.TailInterval - forceRestart = srcE.ForceRestart - batchSize = srcE.StartingBatchSize; minBatchSize = srcE.MinBatchSize; gorge = srcE.Gorge; streamReaders = 0 } + | Some (SrcCosmos cosmos) -> CosmosSourceArguments cosmos + | _ -> raise (MissingArg "Must specify cosmos for Src") + member x.SourceParams() = +#endif +#if noEventStore + let srcC = x.Source +#else + match x.Source with + | Choice1Of2 srcE -> + let startPos, cosmos = srcE.StartPos, srcE.Cosmos + Log.Information("Processing Consumer Group {groupName} from {startPos} (force: {forceRestart}) in Database {db} Container {container}", + x.ConsumerGroupName, startPos, srcE.ForceRestart, cosmos.Database, cosmos.Container) + Log.Information("Ingesting in batches of [{minBatchSize}..{batchSize}], reading up to {maxReadAhead} uncommitted batches ahead", + srcE.MinBatchSize, srcE.StartingBatchSize, x.MaxReadAhead) + Choice1Of2 (srcE, cosmos, + { groupName = x.ConsumerGroupName; start = startPos; checkpointInterval = srcE.CheckpointInterval; tailInterval = srcE.TailInterval + forceRestart = srcE.ForceRestart + batchSize = srcE.StartingBatchSize; minBatchSize = srcE.MinBatchSize; gorge = srcE.Gorge; streamReaders = 0 }) + | Choice2Of2 srcC -> +#endif + let disco, auxColl = + match srcC.LeaseContainer with + | None -> srcC.Discovery, { database = srcC.Database; container = srcC.Container + "-aux" } + | Some sc -> srcC.Discovery, { database = srcC.Database; container = sc } + Log.Information("Max read backlog: {maxReadAhead}", x.MaxReadAhead) + Log.Information("Processing Lease {leaseId} in Database {db} Container {container} with maximum document count limited to {maxDocuments}", + x.ConsumerGroupName, auxColl.database, auxColl.container, srcC.MaxDocuments) + if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.") + srcC.LagFrequency |> Option.iter (fun s -> Log.Information("Dumping lag stats at {lagS:n0}s intervals", s.TotalSeconds)) +#if (noEventStore) + (srcC, (disco, auxColl, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency)) +#else + Choice2Of2 (srcC, (disco, auxColl, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency)) +#endif + and [] CosmosSourceParameters = + | [] FromTail + | [] MaxDocuments of int + | [] LagFreqM of float + | [] LeaseContainer of string + + | [] ConnectionMode of Equinox.Cosmos.ConnectionMode + | [] Connection of string + | [] Database of string + | [] Container of string // Actually Mandatory, but stating that is not supported + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of float + + | [] Cosmos of ParseResults + interface IArgParserTemplate with + member a.Usage = a |> function + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | MaxDocuments _ -> "maximum item count to request from feed. Default: unlimited" + | LagFreqM _ -> "frequency (in minutes) to dump lag stats. Default: off" + | LeaseContainer _ -> "specify Container Name for Leases container. Default: `sourceContainer` + `-aux`." + + | ConnectionMode _ -> "override the connection mode. Default: Direct." + | Connection _ -> "specify a connection string for a Cosmos account. (optional if environment variable EQUINOX_COSMOS_CONNECTION specified)" + | Database _ -> "specify a database name for Cosmos account. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" + | Container _ -> "specify a container name within `Database`" + | Timeout _ -> "specify operation timeout in seconds. Default: 5." + | Retries _ -> "specify operation retries. Default: 1." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." + + | Cosmos _ -> "CosmosDb Sink parameters." + and CosmosSourceArguments(a : ParseResults) = + member __.FromTail = a.Contains CosmosSourceParameters.FromTail + member __.MaxDocuments = a.TryGetResult MaxDocuments + member __.LagFrequency = a.TryGetResult LagFreqM |> Option.map TimeSpan.FromMinutes + member __.LeaseContainer = a.TryGetResult CosmosSourceParameters.LeaseContainer + + member __.Mode = a.GetResult(CosmosSourceParameters.ConnectionMode, Equinox.Cosmos.ConnectionMode.Direct) + member __.Discovery = Discovery.FromConnectionString __.Connection + member __.Connection = a.TryGetResult CosmosSourceParameters.Connection |> defaultWithEnvVar "EQUINOX_COSMOS_CONNECTION" "Connection" + member __.Database = a.TryGetResult CosmosSourceParameters.Database |> defaultWithEnvVar "EQUINOX_COSMOS_DATABASE" "Database" + member __.Container = a.GetResult CosmosSourceParameters.Container + member __.Timeout = a.GetResult(CosmosSourceParameters.Timeout, 5.) |> TimeSpan.FromSeconds + member __.Retries = a.GetResult(CosmosSourceParameters.Retries, 1) + member __.MaxRetryWaitTime = a.GetResult(CosmosSourceParameters.RetriesWaitTime, 5.) |> TimeSpan.FromSeconds + member x.BuildConnectionDetails() = + let (Discovery.UriAndKey (endpointUri, _)) as discovery = x.Discovery + Log.Information("Source CosmosDb {mode} {endpointUri} Database {database} Container {container}", + x.Mode, endpointUri, x.Database, x.Container) + Log.Information("Source CosmosDb timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s", + (let t = x.Timeout in t.TotalSeconds), x.Retries, (let t = x.MaxRetryWaitTime in t.TotalSeconds)) + let c = Equinox.Cosmos.Connector(x.Timeout, x.Retries, x.MaxRetryWaitTime, Log.Logger, mode=x.Mode) + discovery, { database = x.Database; container = x.Container }, c + + member val Cosmos = + match a.TryGetSubCommand() with + | Some (CosmosSourceParameters.Cosmos cosmos) -> CosmosArguments cosmos + | _ -> raise (MissingArg "Must specify cosmos details") +//#if (!noEventStore) and [] EsSourceParameters = | [] FromTail | [] Gorge of int @@ -148,7 +248,7 @@ module CmdParser = | Retries _ -> "specify operation retries. Default: 3." | HeartbeatTimeout _ -> "specify heartbeat timeout in seconds. Default: 1.5." - | Cosmos _ -> "CosmosDb Checkpoint Store parameters." + | Cosmos _ -> "CosmosDB (Checkpoint) Store parameters." and EsSourceArguments(a : ParseResults) = member __.Gorge = a.TryGetResult Gorge member __.TailInterval = a.GetResult(Tail, 1.) |> TimeSpan.FromSeconds @@ -191,10 +291,11 @@ module CmdParser = .Establish(appName, discovery, connectionStrategy) |> Async.RunSynchronously member __.CheckpointInterval = TimeSpan.FromHours 1. - member val CheckpointStore : CosmosArguments = + member val Cosmos : CosmosArguments = match a.TryGetSubCommand() with | Some (EsSourceParameters.Cosmos cosmos) -> CosmosArguments cosmos - | _ -> raise (MissingArg "Must specify `cosmos` checkpoint store source is `es`") + | _ -> raise (MissingArg "Must specify `cosmos` checkpoint store when source is `es`") +//#endif and [] CosmosParameters = | [] Connection of string | [] ConnectionMode of ConnectionMode @@ -271,56 +372,103 @@ module Logging = c.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t) |> fun c -> c.CreateLogger() -let [] AppName = "ProjectorTemplate" +let [] AppName = "AllTemplate" +//#if (!noEventStore) module EventStoreContext = let cache = Equinox.Cache(AppName, sizeMb = 10) let create connection = Equinox.EventStore.Context(connection, Equinox.EventStore.BatchingPolicy(maxBatchSize=500)) +//#endif let build (args : CmdParser.Arguments) = - let (srcE, cosmos, spec) = args.SourceParams() - let connectEs () = srcE.Connect(Log.Logger, Log.Logger, AppName, Equinox.EventStore.ConnectionStrategy.ClusterSingle Equinox.EventStore.NodePreference.PreferSlave) - let filterByStreamName = args.FilterFunction() - let (discovery, database, container, connector) = cosmos.BuildConnectionDetails() +//#if (!noEventStore) + match args.SourceParams() with + | Choice1Of2 (srcE, cosmos, spec) -> + let connectEs () = srcE.Connect(Log.Logger, Log.Logger, AppName, Equinox.EventStore.ConnectionStrategy.ClusterSingle Equinox.EventStore.NodePreference.PreferSlave) + let (discovery, database, container, connector) = cosmos.BuildConnectionDetails() - let cache = Equinox.Cache(AppName, sizeMb = 10) - let connection = connector.Connect(AppName, discovery) |> Async.RunSynchronously - let context = Equinox.Cosmos.Context(connection, database, container) - - let resolveCheckpointStream = - let codec = FsCodec.NewtonsoftJson.Codec.Create() - let caching = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) - let access = Equinox.Cosmos.AccessStrategy.Custom (Checkpoint.Fold.isOrigin, Checkpoint.Fold.transmute) - fun target -> Equinox.Cosmos.Resolver(context, codec, Checkpoint.Fold.fold, Checkpoint.Fold.initial, caching, access).Resolve(target, Equinox.AllowStale) - let checkpoints = Checkpoint.CheckpointSeries(spec.groupName, Log.ForContext(), resolveCheckpointStream) + let connection = connector.Connect(AppName, discovery) |> Async.RunSynchronously + let cache = Equinox.Cache(AppName, sizeMb = 10) + let context = Equinox.Cosmos.Context(connection, database, container) + let resolveCheckpointStream = + let codec = FsCodec.NewtonsoftJson.Codec.Create() + let caching = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) + let access = Equinox.Cosmos.AccessStrategy.Custom (Checkpoint.Fold.isOrigin, Checkpoint.Fold.transmute) + fun target -> Equinox.Cosmos.Resolver(context, codec, Checkpoint.Fold.fold, Checkpoint.Fold.initial, caching, access).Resolve(target, Equinox.AllowStale) + let checkpoints = Checkpoint.CheckpointSeries(spec.groupName, Log.ForContext(), resolveCheckpointStream) #if kafka - let (broker, topic) = cosmos.Sink.BuildTargetParams() - let producer = Propulsion.Kafka.Producer(Log.Logger, AppName, broker, topic) - let sink = - Propulsion.Kafka.StreamsProducerSink.Start( - Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, Handler.render, producer, - statsInterval=TimeSpan.FromMinutes 1., stateInterval=TimeSpan.FromMinutes 2.) + let (broker, topic) = cosmos.Sink.BuildTargetParams() + let producer = Propulsion.Kafka.Producer(Log.Logger, AppName, broker, topic) + let sink = + Propulsion.Kafka.StreamsProducerSink.Start( + Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, Handler.render, producer, + statsInterval=TimeSpan.FromMinutes 1., stateInterval=TimeSpan.FromMinutes 2.) #else #if blank - // TODO: establish any relevant inputs, or re-run without `-blank` for example wiring code - let handle = Ingester.handleStreamEvents Ingester.tryHandle + // TODO: establish any relevant inputs, or re-run without `-blank` for example wiring code + let handle = Ingester.handleStreamEvents Ingester.tryHandle #else - let esConn = connectEs () - let dstCache = Equinox.Cache(AppName, sizeMb = 10) - let srcService = Todo.EventStore.create (EventStoreContext.create esConn,dstCache) - let dstService = TodoSummary.Cosmos.create (context, cache) - let handle = Ingester.handleStreamEvents (Ingester.tryHandle srcService dstService) + let esConn = connectEs () + let dstCache = Equinox.Cache(AppName, sizeMb = 10) + let srcService = Todo.EventStore.create (EventStoreContext.create esConn,dstCache) + let dstService = TodoSummary.Cosmos.create (context, cache) + let handle = Ingester.handleStreamEvents (Ingester.tryHandle srcService dstService) #endif - let sink = - Propulsion.Streams.StreamsProjector.Start( - Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats = Ingester.Stats(Log.Logger)) + let sink = + Propulsion.Streams.StreamsProjector.Start( + Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats = Ingester.Stats(Log.Logger)) #endif - - let connect () = let c = connectEs () in c.ReadConnection - sink, EventStoreSource.Run( - Log.Logger, sink, checkpoints, connect, spec, Handler.tryMapEvent filterByStreamName, - args.MaxReadAhead, args.StatsInterval) + let connect () = let c = connectEs () in c.ReadConnection + let filterByStreamName = args.FilterFunction() + let runPipeline = + EventStoreSource.Run( + Log.Logger, sink, checkpoints, connect, spec, Handler.tryMapEvent filterByStreamName, + args.MaxReadAhead, args.StatsInterval) + sink, runPipeline +//#endif +#if (!noEventStore) + | Choice2Of2 (srcCosmos, (auxDiscovery, aux, leaseId, startFromTail, maxDocuments, lagFrequency)) -> +#else + let (srcCosmos, (auxDiscovery, aux, leaseId, startFromTail, maxDocuments, lagFrequency)) = args.SourceParams() +#endif + let (discovery, database, container, connector) = srcCosmos.Cosmos.BuildConnectionDetails() +#if kafka + let (broker, topic) = srcCosmos.Cosmos.Sink.BuildTargetParams() + let producer = Propulsion.Kafka.Producer(Log.Logger, AppName, broker, topic) + let sink = + Propulsion.Kafka.StreamsProducerSink.Start( + Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, Handler.render, producer, + statsInterval=TimeSpan.FromMinutes 1., stateInterval=TimeSpan.FromMinutes 2.) +#else +#if blank + // TODO: establish any relevant inputs, or re-run without `-blank` for example wiring code + let handle = Ingester.handleStreamEvents Ingester.tryHandle +#else + let connection = connector.Connect(AppName, discovery) |> Async.RunSynchronously + let cosmos = Equinox.Cosmos.Context (connection, database, container) + let cache = Equinox.Cache(AppName, sizeMb = 10) + let srcService = Todo.Cosmos.create (cosmos, cache) + let dstService = TodoSummary.Cosmos.create (cosmos, cache) + let handle = Ingester.handleStreamEvents (Ingester.tryHandle srcService dstService) +#endif + let sink = + Propulsion.Streams.StreamsProjector.Start( + Log.Logger, args.MaxReadAhead, args.MaxConcurrentStreams, handle, stats = Ingester.Stats(Log.Logger)) +#endif + let filterByStreamName = args.FilterFunction() + let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq = + // TODO: customize parsing to events if source is not an Equinox Container + docs + |> Seq.collect EquinoxCosmosParser.enumStreamEvents + |> Seq.filter (fun e -> e.stream |> FsCodec.StreamName.toString |> filterByStreamName) + let source = { database = database; container = container } + let createObserver () = CosmosSource.CreateObserver(Log.Logger, sink.StartIngester, mapToStreamItems) + let runPipeline = + CosmosSource.Run(Log.Logger, connector.CreateClient(AppName, discovery), source, aux, + leaseId, startFromTail, createObserver, + ?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency, auxClient=connector.CreateClient(AppName, auxDiscovery)) + sink, runPipeline let run argv = try let args = CmdParser.parse argv diff --git a/propulsion-all-projector/README.md b/propulsion-all-projector/README.md index 1dd055bc0..5cdb02201 100644 --- a/propulsion-all-projector/README.md +++ b/propulsion-all-projector/README.md @@ -1,10 +1,30 @@ //#if kafka -# Propulsion EventStore $all -> Kafka Projector +//#if noEventStore +# Propulsion CosmosDb ChangeFeedProcessor -> Kafka Projector //#else -# Propulsion EventStore $all Projector (without Kafka emission) +# Propulsion EventStore $all/CosmosDb ChangeFeedProcessor -> Kafka Projector +//#endif +//#else +//#if noEventStore +# Propulsion CosmosDb ChangeFeedProcessor Projector (without Kafka emission) +//#else +# Propulsion EventStore $all/CosmosDb ChangeFeedProcessor Projector (without Kafka emission) +//#endif //#endif This project was generated using: +//#if noEventStore +//#if kafka + + dotnet new -i Equinox.Templates # just once, to install/update in the local templates store + dotnet new proAllProjector -noEventStore -k # -k => include Kafka projection logic +//#else + + dotnet new -i Equinox.Templates # just once, to install/update in the local templates store + # add -k to add Kafka Projection logic + dotnet new proAllProjector -noEventStore # use --help to see options +//#endif +//#else //#if kafka dotnet new -i Equinox.Templates # just once, to install/update in the local templates store @@ -15,21 +35,54 @@ This project was generated using: # add -k to add Kafka Projection logic dotnet new proAllProjector # use --help to see options //#endif +//#endif ## Usage instructions -0. establish connection strings etc. for the checkpoint store in CosmosDB per https://github.com/jet/equinox README +0. establish connection strings etc. per https://github.com/jet/equinox README $env:EQUINOX_COSMOS_CONNECTION="AccountEndpoint=https://....;AccountKey=....=;" # or use -s $env:EQUINOX_COSMOS_DATABASE="equinox-test" # or use -d $env:EQUINOX_COSMOS_CONTAINER="equinox-test" # or use -c +1. Use the `eqx` tool to initialize a CosmosDb container + dotnet tool install -g Equinox.Tool # only needed once + + # (either add environment variables as per step 0 or use -s/-d/-c to specify them) + # generate a cosmos container to store events in eqx init -ru 400 cosmos +2. We'll be operating a ChangeFeedProcessor, so use `propulsion init` to make a `-aux` container (unless there already is one) + + # (either add environment variables as per step 0 or use -s/-d/-c to specify them) + # default name is "($EQUINOX_COSMOS_CONTAINER)-aux" + propulsion init -ru 400 cosmos + +//#if (!noEventStore) + NOTE when projecting from EventStore, the current implementation stores the checkpoints within the CosmosDB store in order to remove feedback effects. + (Yes, someone should do a PR to store the checkpoints in EventStore itself; this is extracted from working code, which can assume there's always a CosmosDB around) +//#endif + +3. To run an instance of the Projector from a CosmosDb ChangeFeed + +//#if kafka + $env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b + + # `-g default` defines the Projector Group identity - each id has separated state in the checkpoints store (`Sync-default` in the cited `cosmos` store) + # `-c $env:EQUINOX_COSMOS_CONTAINER ` specifies the source (if you have specified 2x EQUINOX_COSMOS_* environment vars, no connection/database arguments are needed, but the monitored (source) container must be specified explicitly) + # the second `cosmos` specifies the target store for the reactions (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed) + # `-t topic0` identifies the Kafka topic to which the Projector should write + dotnet run -- -g default cosmos -c $env:EQUINOX_COSMOS_CONTAINER cosmos kafka -t topic0 +//#else + # `-g default` defines the Projector Group identity - each id has separated state in the checkpoints store (`Sync-default` in the cited `cosmos` store) + # `-c $env:EQUINOX_COSMOS_CONTAINER ` specifies the source (if you have specified EQUINOX_COSMOS_* environment vars, no connection/database arguments are needed, but the monitored (source) container must be specified explicitly) + # `cosmos` specifies the target store for the reactions (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed) + dotnet run -- -g default cosmos -c $env:EQUINOX_COSMOS_CONTAINER cosmos +//#endif -1. To run an instance of the Projector from EventStore +4. To run an instance of the Projector from EventStore # (either add environment variables as per step 0 or use -s/-d/-c to specify them after the `cosmos` argument token) @@ -54,4 +107,4 @@ This project was generated using: # NB running more than one projector will cause them to duel, and is hence not advised -2. To create a Consumer, use `dotnet new proConsumer` (see README therein for details) \ No newline at end of file +5. To create a Consumer, use `dotnet new proConsumer` (see README therein for details) \ No newline at end of file diff --git a/propulsion-all-projector/Todo.fs b/propulsion-all-projector/Todo.fs index 9f283ac35..c61db317a 100644 --- a/propulsion-all-projector/Todo.fs +++ b/propulsion-all-projector/Todo.fs @@ -63,6 +63,7 @@ type Service internal (log, resolve, maxAttempts) = let create resolve = Service(Serilog.Log.ForContext(), resolve, maxAttempts = 3) +//#if (!noEventStore) module EventStore = open Equinox.EventStore // Everything until now is independent of a concrete store @@ -70,4 +71,15 @@ module EventStore = let private resolve (context, cache) = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy).Resolve - let create (context, cache) = resolve (context, cache) |> create \ No newline at end of file + let create (context, cache) = resolve (context, cache) |> create + +//#endif +module Cosmos = + + open Equinox.Cosmos // Everything until now is independent of a concrete store + + let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot) + let private resolve (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve + let create (context, cache) = resolve (context, cache) |> create diff --git a/propulsion-all-projector/TodoSummary.fs b/propulsion-all-projector/TodoSummary.fs index 5bbe4e3b5..15201a36d 100644 --- a/propulsion-all-projector/TodoSummary.fs +++ b/propulsion-all-projector/TodoSummary.fs @@ -58,6 +58,17 @@ type Service internal (log, resolve, maxAttempts) = let create resolve = Service(Serilog.Log.ForContext(), resolve, maxAttempts = 3) +//#if (!noEventStore) +module EventStore = + + open Equinox.EventStore // Everything until now is independent of a concrete store + + let private resolve (context, cache) = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy).Resolve + let create (context, cache) = resolve (context, cache) |> create + +//#endif module Cosmos = open Equinox.Cosmos // Everything until now is independent of a concrete store