diff --git a/CHANGELOG.md b/CHANGELOG.md index 22adc6ef3..f2f8954d5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,7 +14,7 @@ The `Unreleased` section name is replaced by the expected version of next releas ### Changed -- Target `Equinox` v `4.0.0-rc.14.5`, `Propulsion` v `3.0.0-rc.9.11`, `FsCodec` v `3.0.0-rc.14.1` [#131](https://github.com/jet/dotnet-templates/pull/131) +- Target `Equinox` v `4.0.0-rc.16`, `Propulsion` v `3.0.0-rc.10`, `FsCodec` v `3.0.0-rc.15` [#131](https://github.com/jet/dotnet-templates/pull/131) - Target `Argu` v `6.0.14` [#135](https://github.com/jet/dotnet-templates/pull/135) ### Removed diff --git a/README.md b/README.md index 46af67062..2bb3e0b60 100644 --- a/README.md +++ b/README.md @@ -243,24 +243,694 @@ One can also do it manually: $ dotnet new -u Equinox.Templates + # PATTERNS / GUIDANCE -## Use Strongly typed ids + +## TL;DR -Wherever possible, the samples strongly type identifiers, particularly ones that might naturally be represented as primitives, i.e. `string` etc. +1. ✅ DO define [strongly typed ids](#do-id-type) and a `type Store.Config` in `namespace Domain` +2. ❌ DONT have global `module Types`. AVOID per Aggregate `module Types` or top level `type` definitions +3. ✅ DO group stuff predictably per `module Aggregate`: `Stream, Events, Reactions, Fold, Decisions, Service, Factory`. Keep grouping within that. +4. ❌ DONT [`open `](#dont-open-aggregate), [`open .Events`](#dont-open-events) or [`open .Fold`](#dont-open-fold) +5. ✅ DO design for idempotency everywhere. ❌ DONT [return TMI](#dont-return-tmi) that the world should not be taking a dependency on. +6. ❌ DONT [use `Result`](#dont-result) or a per-Aggregate `type Error`. ✅ [DO use minimal result types per decision function](#do-simplest-result) +7. ❌ DONT [expose your `Fold.State`](#dont-expose-state) outside your Aggregate. +8. ❌ DONT be a slave to CQRS for all read paths. ✅ [DO `AllowStale`](#do-allowstale) 🤔 [CONSIDER `QueryCurrent`](#consider-querycurrent) +9. ❌ [DONT be a slave to the Command pattern](#dont-commands) or Mediatr +10. ✅ DO maintain common wiring in [an `App` project, as per `propulsion-indexer`](https://github.com/jet/dotnet-templates/tree/master/propulsion-indexer/App) -- [`FSharp.UMX`](https://github.com/fsprojects/FSharp.UMX) is useful to transparently pin types in a message contract cheaply - it works well for a number of contexts: +## High level - - Coding/decoding events using [FsCodec](https://github.com/jet/fscodec). (because Events are things that **have happened**, validating them is not a central concern as we load and fold these incontrovertible Facts) - - Model binding in ASP.NET (because the types de-sugar to the primitives, no special support is required). _Unlike events, there are more considerations in play in this context though; often you'll want to apply validation to the inputs (representing Commands) as you map them to [Value Objects](https://martinfowler.com/bliki/ValueObject.html), [Making Illegal States Unrepresentable](https://fsharpforfunandprofit.com/posts/designing-with-types-making-illegal-states-unrepresentable/). Often, Single Case Discriminated Unions can be a better tool inb that context_ +### ❌ DONT have shared types in `Types.fs` -## Managing Projections and Reactions with Equinox, Propulsion and FsKafka +F# excels at succinctly expressing a high level design for a system; see [_Designing with types_ by Scott Wlaschin](https://fsharpforfunandprofit.com/series/designing-with-types/) for many examples. + +For an event sourced system, it gets even better: it's not uncommon to be able to, using only a screen or two of types, convey a system's significant events in a manner that's legible for both technical and non-technical stakeholders. + +It's important not to take this too far though; ultimately, as a system grows, the need for Events to be grouped into Categories must become the organizing constraint. + +That means letting go of something that feels _almost_ perfect... + + +### ❌ DONT share types across Aggregates / Categories + +In some cases, Aggregates have overlapping concerns that can mean soe aspects of Event Contracts are common. It can be very tempting to keep this [DRY](https://en.wikipedia.org/wiki/Don%27t_repeat_yourself) as shared types in a central place. These benefits must unfortunately be relinquished. Instead: + +```fs +❌ BAD shared types +// +module Domain.Types + +type EntityContext = { name: string; area: string } + +.. + +// .fs +module Aggregate + +open Domain.Types + +module Events = + + type Event = + // ❌ BAD defines a contract that can be changed by someone adding or renaming a field in a shared type + | Created of {| creator: UserId; context: EntityContext |} + .. + +// .fs +module Aggregate2 + +module Events = + + type Event = + | Copied of {| by: UserId; context: Types.EntityContext |} + .. +``` + +Instead, let each `module ` maintain its own version of each type that will be used in an event _within its `module Events`_. + +The `decide` function can map from an input type if desired. The important thing is that the Aggregate will need to be able to roundtrip its types in perpetuity, and having to disentangle the overlaps between types shared across multiple Aggregates is simply never worth it. + + +### ✅ DO have global strongly typed ids + +While [sharing the actual types is a no-no](#global-dont-share-types), having common id types, and using those for references across streams is valid. + +It's extremely valuable for these to be strongly typed. + +```fsharp +module Domain.Types + +type UserId = .. +type TenantId = .. + +.. + +module Domain.User + +module Events = + + type Joined = { tenant: TenantId; authorizedBy: UserId } + +``` + + +### ✅ DO Have a helper `module` per id type + +Per [strongly-typed id `type`](#do-id-type), having an associated `module` with the same name alongside works well. +This enables one to quickly identify and/or navigate the various ways in which such ids are generated/parsed and/or validated. + +```fsharp +namespace Domain + +type UserId = Guid +and [] userId + +module UserId = + let private ofGuid (id: Guid): UserId = %id + let private toGuid (id: UserId): Guid = %id + + let parse (input: string): UserId = input |> Guid.Parse |> ofGuid + let toString (x: UserId): string = (toGuid x).ToString "N" +``` + +### CONSIDER UMX for ids not used in storage contracts + +Wherever possible, the templates use use strongly type identifiers, particularly ones that might naturally be represented as primitives, i.e. `string` etc. + +[`FSharp.UMX`](https://github.com/fsprojects/FSharp.UMX) is useful to transparently pin types in a message contract cheaply - it works well for a number of contexts: + +- Coding/decoding events using [FsCodec](https://github.com/jet/fscodec). (because Events are things that **have happened**, validating them is not a central concern as we load and fold these incontrovertible Facts) +- Model binding in ASP.NET; because the types de-sugar to the primitives, no special support is required. + + _Unlike events, there are more considerations in play in this context though; often you'll want to apply validation to the inputs (representing Commands) as you map them to [Value Objects](https://martinfowler.com/bliki/ValueObject.html), [Making Illegal States Unrepresentable](https://fsharpforfunandprofit.com/posts/designing-with-types-making-illegal-states-unrepresentable/). + +### CONSIDER UMX `strings` for serialized ids + +TODO write up the fact that while UMX is a good default, there are nuances wrt string ones + +- case sensitive +- works transparently for most serializers, also works for model binding +- how/do you validate nulls/lengths, xss protection, rejecting massive ones +- if you use UMX, when/how will you validate + +### CONSIDER UMX `Guid`s for serialized ids + +TODO write up the fact that while UMX is a good default, there are nuances wrt Guid ones + +- parsing needs to not be sensitive to case +- rendering with or without dashes/braces - can be messy with configuring json serializers +- not actually part of JSON +- provides some XSS/null protection but is that worth it + +### ❌ DONT use SCDUs for ids + +TODO write something in more depth + +- https://paul.blasuc.ci/posts/really-scu.html +- https://paul.blasuc.ci/posts/even-more-scu.html + + +#### ✅ DO define a `Store.Config` type, and wire it up in the aggregate's `module Factory` + +It's correct to say that few systems actually switch databases in real life. Defining a `type` that holds only a `*StoreContext` and a `Cache` can feel like pointless abstraction. + +In `populsion-hotel`, we have: + +```fsharp +[] +type Config = + | Memory of Equinox.MemoryStore.VolatileStore)> + | Dynamo of Equinox.DynamoStore.DynamoStoreContext * Equinox.Cache + | Mdb of Equinox.MessageDb.MessageDbContext * Equinox.Cache +``` + +Clearly, not many systems are deployed that arbitrarily target MessageDB or DynamoDB + +More common is the configuration in: `propulsion-cosmos-reactor`: + +```fsharp +[] +type Config = + | Cosmos of Equinox.CosmosStore.CosmosStoreContext * Equinox.Cache +``` + +The advantage of still having a `type Config` in place is to be able to step in and generalize things. + +For instance, [when such a system expands from having a single store to also having a separated views store](https://github.com/jet/dotnet-templates/pull/132), it can become: + +```fsharp +[] +type Config = + | Cosmos of contexts: CosmosContexts * cache: Equinox.Cache +and [] CosmosContexts = + { main: Equinox.CosmosStore.CosmosStoreContext + views: Equinox.CosmosStore.CosmosStoreContext + /// Variant of `main` that's configured such that `module Snapshotter` updates will never trigger a calve + snapshotUpdate: Equinox.CosmosStore.CosmosStoreContext } +``` + +:bulb: This does mean that the `Domain` project will need to reference the concrete store packages (i.e., `Equinox.CosmosStore`, `Equinox.MemoryStore` etc). +:bulb: the wiring that actually establishes the `Context`s should be external to the `Domain` project in [an `App` project, as `propulsion-indexer` does](https://github.com/jet/dotnet-templates/tree/master/propulsion-indexer/App), and should only be triggered within a Host application's Composition root + +## Code structure + +### 1. `module Aggregate` -## Aggregate module conventions +#### ✅ DO stick to the `module ` conventions There are established conventions documented in [Equinox's `module Aggregate` overview](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#aggregate-module) +#### ❌ DONT split the `module ` +Having the Event Contracts, State and Decision logic in a single module can feel wrong when you get over e.g. 1000 lines of code; instincts to split the file on some basis will kick in. Don't do it; splitting the file is hiding complexity under the carpet. + +#### ❌ DONT move the `module Events` out + +The Event Contracts are the most important contract that an Aggregate has - decision logic will churn endlessly. You might even implement logic against it in other languages. But the Event Contracts you define are permanent. As a developer fresh to a project, the event contracts are often the best starting point as you try to understand what a given aggregate is responsible for. + +#### ❌ DONT move the `module State`, or `evolve` logic out + +The State type and the associated `evolve` and `fold` functions are intimately tied to the Event Contracts. Over time, ugliness and upconversion can lead to noise, and temptation to move it out. Don't do it; being able to understand the full coupling is critical to understanding how things work, and equally critical to being able to change or add functions. + + +#### ❌ DONT move the decision logic out + +Decision logic bridges between the two worlds of State and Events. +The State being held exists only to serve the Decision logic. +The only reason for Event Contracts is to record Decisions. +Trying to pretend that some of the Decisions are less important and hence should live elsewhere is rarely a good idea. +How decisions are made, and how those decisions are encoded as Events should be encapsulated within the Aggregate. + +In some cases, it can make sense for a decision function to be a skeleton function that passes out to some helper functions that it's passed to assist in the decision making and/or composing some details that go into the event body. +Sometimes these functions are best passed as arguments to the Service Method that will call the decision function. +In other cases, the relevant helper functions can be passed to the `type Service` as arguments when it's being constructed in the `Factory`. + +The critical bit is that the bits that need to touch the State and/or generate Events should not leave the `module Aggregate`, as there is not better place in the system for that to live. + +This is akin to the maxim (from [the GOOS book](http://www.growing-object-oriented-software.com) of _Listen to your Tests_: If a given Aggregate has too many responsibilities, that's feedback you should be using to your advantage, not lamenting or ignoring: + +- if an aggregate consumes or produces an extraordinary number of event types, maybe there's an axis on which they can be split? +- if there are multiple splittable pieces of state in the overall State, maybe you need two aggregates over the same stream? Or two sibling categories that share an id? +- should some of the logic and/or events be part of an adjacent aggregate? (why should a Cart have Checkout flow elements in it?) +- if there are many decision functions, is that a sign that there's a missing workflow or process manager that should be delegating some (cohesive) responsibolities to this aggregate? +- if a decision function is 300 lines, but only 5 lines touch the state and only 4 lines produce an event, can you extract just that logic to a single boring module that can be unit tested independent of how the State and Events are ultimately maintained? + +### 2. `module Events` + +Having the Event Contracts be their own `module` is a critical forcing function for good aggregate design. Having all types and all cases live in one place and being able to quickly determine where each Event is produced is key to being able to understand the moving parts of a system. + + +#### ❌ AVOID including egregious identity information + +When modelling, it's common to include primary identifiers (e.g. a user id), or contextual identifiers (e.g. a tenant id) in an Event in order to convey the relationships between events in the systems as a whole; you want the correlations to stand out. In the implementation however, repeating the identity information in every event is a major liability: +1. the State needs to contain the values - that's more noise +2. Event versioning gets messier - imagine extending a system to make it multi-tenant, you'd need to be able to handle all the historic events that predated the concept + +The alternative is for a workflow to react to the events in the context of a stream - if some logic needs to know the userid let the `User` reactor handling the `User` event on a `User` Stream pass that context forward if relevant in that context. + +#### ❌ DONT `open Events` in an aggregate module + +Having to prefix types and/or Event Type names with `Events.` is a feature, not a bug. + +### 4. `module Reactions` + +✅ DO encapsulate inferences from events and `Stream` names in a `module Reactions` facade + +`module Stream` should be always be `private`. +Any classification of events, parsing of stream names, should be via helpers within the `module Reactions`, e.g.: + +```fsharp +// ❌ BAD Stream module is `public` +module Stream = + + let [] Category = "tenant" + +// ❌ BAD +module TenantNotifications + +let categories = [ Tenant.Stream.Category] + +let handle (stream, events) = async { + if StreamName.category stream = Tenant.Stream.Category then + let tenantId = FsCodec.StreamName.Split stream |> snd |> TenantId.parse + +// ❌ BAD +module Tenant.Tests + +let [] ``generated correct events` () = + let id = TenantId.generate() + // ❌ BAD boilerplate, referencing multipple modules + let streamName = FsCodec.StreamName.create Tenant.Stream.Category id +``` + +Instead, keep the `module Streams` private: + +```fsharp +module private Stream = + + let [] Category = "tenant" + let id (id: TenantId) = FsCodec.StreamId.gen TenantId.toString id + let decodeId = FsCodec.StreamId.dec TenantId.parse + let name = id >> FsCodec.StreamName.create Category + let tryDecode = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId +``` + +selectively expose a relevant interface via a `module Reactions` facade: + +```fsharp +// ✅ GOOD expose all reactions and test integration helpers via a Reactions facade +module Reactions = + + // ✅ GOOD - F12 can show us all reaction logic + let categoryName = Stream.Category + // ✅ GOOD - if a unit test needs to generate a stream name, it can supply the tenant id + let streamName = Stream.name + let [] (|For|_|) = Stream.tryDecode + // ✅ OK generic decoding function (but next ones are better...) + let dec = Streams.Codec.dec + let [] (|Decode|_|) = function + | struct (For id, _) & Streams.Decode dec events -> ValueSome struct (id, events) + | _ -> ValueNone + let deletionNamePrefix tenantIdStr = $"%s{Stream.Category}-%s{tenantIdStr}" +``` + +in some cases, the filtering and/or classification functions can be more than just simple forwarding functions: + +```fsharp + // ✅ GOOD - better than sprinkling `nameof(Aggregate..Events.Completed)` in an adjacent `module` + /// Used by the Watchdog to infer whether a given event signifies that the processing has reached a terminal state + let isTerminalEvent (encoded: FsCodec.ITimelineEvent<_>) = + encoded.EventType = nameof(Events.Completed) + let private impliesStateChange = function Events.Snapshotted _ -> false | _ -> true + + // ✅ BETTER specific pattern that extracts relevant items, keeping it close to the Event definitiosn + let (|ImpliesStateChange|NoStateChange|NotApplicable|) = function + | Parse (tenantId, events) -> + if events |> Array.exists impliesStateChange then ImpliesStateChange (tenantId, events.Length) + else NoStateChange events.Length + | _, events -> NotApplicable events.Length +``` + +Ultimately, the consumption logic becomes clearer, and is less intimately intertwined with the implementation: + +```fsharp +// ✅ GOOD +module TenantNotifications + +let categories = [ Tenant.Reactions.categoryName ] + +let handle (stream, events) = async { + match stream, events with + | Tenant.Reactions.Decode (tenantId, events) -> + // ... +``` + +or: + +```fsharp +// ✅ BETTER - intention revealing names, classification encapsulated close to the events +module TenantNotifications + +let categories = [ Tenant.Reactions.categoryName ] + +let handle (stream, events) = async { + match struct (stream, events) with + | Todo.Reactions.ImpliesStateChange (clientId, eventCount) -> + let! version', summary = service.QueryWithVersion(clientId, Contract.ofState) + let wrapped = generate stream version' (Contract.Summary summary) + let! _ = produceSummary wrapped + return Propulsion.Sinks.StreamResult.OverrideNextIndex version', Outcome.Ok (1, eventCount - 1) + | Todo.Reactions.NoStateChange eventCount -> + return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.Skipped eventCount + | Todo.Reactions.NotApplicable eventCount -> + return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.NotApplicable eventCount } +``` + +The helpers can make tests terser, and make it easier to : + +```fsharp +// ✅ BETTER - intention revealing names, classification encapslated close to the events +module Tenant.Tests + +let [] ``generated correct events` () = + let id = TenantId.generate() + let streamName = Tenant.Reactions.streamName id +``` + +### 5. `module Fold` + + +#### ❌ DONT log + +If your `Fold` logic is anything but incredibly boring, that's a design smell. +If you must, unit test it to satisfy yourself things can't go wrong, but logging is never the answer. +Fold logic should not be deciding anything - just summarizing facts. +If anything needs to be massaged prior to making a decision, do that explicitly; don't pollute the `Fold` logic. +In general, you want to [make illegal States unrepresentable](https://fsharpforfunandprofit.com/posts/designing-with-types-making-illegal-states-unrepresentable/). + +#### ❌ DONT maintain identifiers and other information not required for decisions + +See [Events: AVOID including egregious identity information](#events-no-ids). + +### 6. `module Decisions` + + +#### ✅ DO use the simplest result type possible + +[Railway Oriented programming](https://fsharpforfunandprofit.com/rop) is a fantastic thinking tool. [Designing with types](https://fsharpforfunandprofit.com/series/designing-with-types/) is an excellent implementation strategy. [_Domain Modelling Made Functional_](https://fsharpforfunandprofit.com/books/) is a must read book. But it's critical to also consider the other side of the coin to avoid a lot of mess: +- [_Against Railway Oriented Programming_ by Scott Wlaschin](https://fsharpforfunandprofit.com/posts/against-railway-oriented-programming/). Scott absolutely understands the tradeoffs, but it's easy to forget them when reading the series +- [_you're better off using Exceptions_ by Eirik Tsarpalis](https://eiriktsarpalis.wordpress.com/2017/02/19/youre-better-off-using-exceptions). + +Each Decision function should have as specific a result contract as possible. In order of preference: +- `unit`: A function that idempotently maps the intent or request to internal Events based solely on the State is the ideal. Telling the world about what you did is not better. Logging what it did is not better than being able to trust it to do it's job. Unit tests should assert based on the produced Events as much as possible rather than relying on a return value. +- `throw`: if something can go wrong, but it's not an anticipated first class part of the workflow, there's no point returning an `Error` result; [_you're better off using Exceptions_](https://eiriktsarpalis.wordpress.com/2017/02/19/youre-better-off-using-exceptions). +- `bool`: in some cases, an external system may need to know whether something is permitted or necessary. If that's all that's needed, don't return identifiers or messages give away extra information +- _simple discriminated union_: the next step after a `true`/`false` is to make a simple discriminated union - you get a chance to name it, and the cases involved. +- record, anonymous record, tuple: returning multiple items is normally best accomplished via a named record type. + - the caller gets to use a clear name per field + - how it's encoded in the State type can vary over time without consumption sites needing to be revisited + - extra fields can be added later, without each layer through which the response travels needing to be adjusted + - the caller gets to pin the exact result type via a type annotation (either in the `Service`'s `member` return type, or at the call site) - this is not possible if it's an anonymous record + :bulb: in some cases it a tuple can be a better encoding if it's important that each call site explicitly consume each part of the result +- `string`: A string can be anything in any language. It can be `null`. It should not be used to convey a decision outcome. +- `Result`: A result can be a success or a failure. both sides are generic. Its the very definition of a lowest common denominator. + - if it's required in a response transmission, map it out there; don't make the implementation logic messier and harder to test in order to facilitate that need. + - if it's because you want to convey some extra information that the event cannot convey, use a tuple, a record or a Discriminated Union + +#### ❌ DONT Log + +It's always sufficient to return a `bool` or `enum` to convey an outcome (but try to avoid even that). See also [Fold: DONT log](#fold-dont-log) + + +#### ❌ DONT use a `Result` type + +Combining success and failures into one type because something will need to know suggests that there is a workflow. It's better to model that explicitly. + +If your API has a common set of result codes that it can return, map to those later - the job here is to model the decisions. + +See [use the simplest result possible](#decide-results-simple). + + +#### ❌ DONT return more status than necessary + +A corollary of designing for idempotency is that we don't want to have the caller care about whether a request triggered a change. If we need to test that, we can call the decision function and simply assert against the events it produced. + +```fsharp +// ❌ DONT DO THIS! +module Decisions = + + let create name state = + if state <> Initial then AlreadyCreated, [||] + else Ok, [| Created { name = name } |] +``` +The world does not need to know that you correctly handled at least once delivery of a request that was retried when the wifi reconnected. + +Instead: +```fsharp +let create name = function + | Fold.Initial -> [| Events.Created { name = name } |] + | Fold.Running _ -> [||] + +... + +module ThingTests + +let [] ``create generates Created`` () = + let state = Fold.Initial + let events = Decisions.create "tim" state + events =! [| Events.Created { name = "tim" } |] + +let [] ``create is idempotent`` () = + let state = Fold.Running () + let events = Decisions.create "tim" state + events =! [||] +``` + + +#### ❌ DONT share a common result type across multiple decision functions + +If you have three outcomes for one decision, don't borrow that result type for a separate decision that only needs two. Just give it it's own type. See [use the simplest result possible](#decide-results-simple). + +#### ✅ DO partition decision logic + +Most systems will have a significant number of Aggregates with low numbers of Events and Decisions. Having the Decision functions at the top level of the Aggregate Module can work well for those. Many people like to group such logic within a `module Decisions`, as it gives a good outline (`module Stream`, `module Events`, `module Reactions`, `module Fold`, `type Service`, `module Factory`) that allows one to quickly locate relevant artifacts and orient oneself in a less familiar area of the code. A key part of managing the complexity is to start looking for ways to group them into clumps of 3-10 related decision functions in a `module` within the overall `module Decisions` (or at top level in the file) as early as possible. + + +#### ❌ DONT be a slave to the Command pattern + +The bulk of introductory material on the Decider pattern, and event sourcing in general uses the Command pattern as if it's a central part of the architecture. That's not unreasonable; it's a proven pattern that's useful in a variety of contexts. + +Some positives of the pattern are: +- one can route any number of commands through any number of layers without having to change anything to add a new command +- it can be enable applying cross-cutting logic uniformly +- when implemented as Discriminated Unions in F#, the code can be very terse, and you can lean on total matching etc. +- In some cases it can work well with property based testing; the entirety of an Aggregate's Command Handling can be covered via Property Based Testing etc + +However, it's also just a pattern. It has negatives; some: +- if you have a single command handler, the result type is forced to be a lowest common denominator +- the code can actually end up longer and harder to read, but still anaemic in terms of modelling the domain properly + + ```fsharp + module Decisions = + type Command = Increment | Decrement + let decide command state = + match command with + | Increment by -> if state = 10 then [||] else [| Events.Incremented |] + | Decrement -> if state = 0 then [|] else [| Events.Decremented |] + | Reset -> if state = 0 then [||] else [| Events.Reset |] + type Service(resolve: ...) = + member _.Execute(id, c) = + let decider = resolve id + decider.Transact(Decisions.decide c) + type App(service: Service, otherService: ...) = + member _.Execute(id, cmd) = + if otherService.Handle(id, cmd) then + service.Execute(id, cmd) + type Controller(app: App) = + member _.Reset(id) = + app.Execute(id, Aggregate.Command.Reset) + ``` + + If you instead use methods with argument lists to convey the same information, there's more opportunity to let the intention be conveyed in the code. + + ```fsharp + module Decisions = + let increment state = [| if state < 10 then Events.Incremented |] + let reset _state = [| if state <> 0 then Events.Reset |] + type Service(resolve: ...) = + member _.Reset id = + let decider = resolve id + decider.Transact Decisions.reset + member _.Increment(id, ?by) = + let decider = resolve id + decider.Transact Decisions.increment + type App(service: Service, otherService: ...) = + member _.HandleFrob(id) = + if otherService.AttemptFrob() then + service.Increment(id) + member _.Reset(id) = + service.Reset(id) + type Controller(app: App) = + member _.Frob() = + app.HandleFrob id + member _.Reset() = + app.Reset id + ``` + + +### 7. `module Queries` + +The primary purpose of an Aggregate is to gather State and produce Events to facilitate making and recording of Decisions. There is no Law Of Event Sourcing that says you must at all times use CQRS to split all reads out to some secondary derived read model. + +In fact, in the the context of Equinox, the `AccessStrategy.RollingState`, `LoadOption.AllowStale` and `LoadOption.AnyCachedState` features each encourage borrowing the Decision State to facilitate rendering that state to users of the system directly. + +However, making pragmatic choices can also become unfettered hacking very quickly. As such the following apply. + +#### ✅ DO use a `module Queries` + +Unless there is a single obvious boring rendition for a boring aggregate, you should have a type per Queyr + +#### ✅ DO use view DTOs + +As with the guidance on [not using Lowest Common Denominator representations for results](#decide-results-simple), you want to avoid directly exposing the State + + +##### ❌ DONT having a public generic `Read` function that exposes the `Fold.State` + +The purpose of the Fold State is to facilitate making decisions correctly. It often has other concerns such as: +- being able to store and reload from a snapshot +- being able to validate inferences being made based on events are being made correctly in the context of tests + +Having it also be a read model DTO is a bridge too far: + +```fs +// ❌ DONT DO THIS! +member service.Read(tenantId) = + let decider = resolve tenantId + decider.Query(fun state -> state) +``` + + +#### CONSIDER `ReadCached*` methods delegating to an internal generic `Query` with a `maxAge`: + +`LoadOption.AllowStale` is the preferred default strategy for all queries. This is for two reasons: +1. if a cached version of the state fresher than the `maxAge` tolerance is available, you produce a result immediately and your store does less work +2. even if a sufficiently fresh state is not available, all such reads are coalesced into a single store roundtrip. This means that the impact of read traffic on the workload hitting the store itself is limited to one read round trip per `maxAge` interval. + +```fsharp +module Queries = + + let infoCachingPeriod = TimeSpan.FromSeconds 10. + type NameInfo = { name: string; contact: ContactInfo } + let renderName (state: Fold.State) = { name = state.originalName; contact = state.contactDetails } + let renderPendingApprovals (state: Fold.State) = Fold.calculatePendingApprovals state + +type Service(resolve: ...) + + // NOTE: Query should remain private; expose each relevant projection as a `Read*` method + member private service.Query(maxAge: TimeSpan, tenantId, render: Fold.State -> 'r): Async<'r> = + let decider = resolve tenantId + decider.Query(render, load = Equinox.LoadOption.AllowStale maxAge) + + member service.ReadCachedName(tenantId): Async = + service.Query(Queries.infoCachingPeriod, Queries.renderName) + member service.ReadPending(tenantId): Async = + service.Query(Queries.infoCachingPeriod, Queries.renderPendingApprovals) +``` + + +#### CONSIDER `QueryCurrent*` methods delegating to a `QueryRaw` helper + +While the `ReadCached*` pattern above is preferred, as it protect the store from unconstrained read traffic, there are cases where it's deemed necessary to be able to [Read Your Writes](https://www.allthingsdistributed.com/2007/12/eventually_consistent.html) 'as much as possible' at all costs. + +_TL;DR quite often you should really be doing the [`ReadCached` pattern](#do-allowstale)_ + +The first thing to note is that you need to be sure you're actually meeting that requirement. For instance, if you are using EventStoreDB, DynamoDB or MessageDB, you will want to use `Equinox.LoadOption.RequireLeader` for it to be meaningful (otherwise a read, (yes, even one served from the same application instance) might be read from a replica that has yet to see the latest state). For [CosmosDB in `Session` consistency mode, similar concerns apply](https://github.com/jet/equinox/issues/192). + +It's also important to consider the fact that any read, no matter how consistent it is at the point of reading, is also instantly stale data the instant it's been performed. + +:warning: If each and every query that is processed results in a store roundtrip, and you don't have any natural limiting of the request traffic, you open yourself up to overloading the store with read traffic (which is a primary reason the CQRS pattern is considered a good default). [`AllowStale` mode](#do-allowstale) is less prone to this issue, as store read round trips are limited to one per `maxAge` interval. + +:warning: `QueryRaw` should stay `private` - you want to avoid having read logic spread across your application doing arbitrary reads that are not appropriately encapsulated within the Aggregate. + +```fs +// NOTE: the QueryRaw helper absolutely needs to stay private. Expose queries only as specific `QueryCurrent*` methods +member private service.QueryRaw(tenantId, render) = + let decider = resolve tenantId + decider.Query(render, Equinox.LoadOption.RequireLeader) + +member service.QueryCurrentState(tenantId) = + service.QueryRaw(Queries.renderState) +``` + +## Outside `module ` + + +### ❌ DONT `open ` + +Ideally use the full name. If you can't help it, [use `module` aliases as outlined below](#dont-open-events) instead. If you are opening it because you also need to touch the Fold State, [don't do that either](#dont-open-fold). + +Exception: for the unit tests associated with a single Aggregate, `open Aggregate` may make sense. As long as it's exactly that one `Aggregate` + + +### ❌ DONT `open .Events` + +If you have logic in another module that is coupled to an event contract, you want that to stick out. +1. If the module is concerned with exactly one Aggregate, you can alias it via: `module Events = Aggregate.Events` +2. If the module is concerned with more than one Aggregate and there are less than 10 usages, prefix the consumption with `Aggregate.Events.` +3. If the module is concerned with more than one Aggregate and there are many usages, or the name is long, alias it via `module AggEvents = AggregateWithLongName.Events.` + +Exception: In some cases, an `open Events`, _inside_ `module Fold` might be reasonable: + +```fsharp +module Events = + + ... + + module Fold = + open Events + let evolve state = function + | Increment -> state + 1 + | Decrement -> state - 1 +``` + +BUT, how much worse is it to have to read or type: + +```fsharp +module Events = + + ... + +module Fold = + + let evolve state = function + | Events.Increment -> state + 1 + | Events.Decrement -> state - 1 +``` + +Within `module Decisions`, it's normally best not to open it. i.e. whenever producing Events, simply prefix it: + +```fsharp +module Events = + + ... + + module Decisions = + + module Counting = + + let increment state = [| if state < 10 then Events.Incremented |] +``` + + +### ❌ DONT `open .Fold` + +If you have external logic that is coupled to the State of an Aggregate and/or the related types, be explicit about that coupling; refer to `Aggregate.Fold.State` to make it clear. Or use the `ReadCached*` or `QueryCurrent*` patterns, which by definition return a specific type that is not the full `State` (and is not in the `Fold` namespace/module). + +# Managing Projections and Reactions with Equinox, Propulsion and FsKafka + ## Microservice Program.fs conventions diff --git a/equinox-patterns/Domain/Domain.fsproj b/equinox-patterns/Domain/Domain.fsproj index ee67d35ba..79ca690fd 100644 --- a/equinox-patterns/Domain/Domain.fsproj +++ b/equinox-patterns/Domain/Domain.fsproj @@ -17,10 +17,10 @@ - - - - + + + + diff --git a/equinox-shipping/Domain.Tests/Domain.Tests.fsproj b/equinox-shipping/Domain.Tests/Domain.Tests.fsproj index 6ad470aa5..023876f11 100644 --- a/equinox-shipping/Domain.Tests/Domain.Tests.fsproj +++ b/equinox-shipping/Domain.Tests/Domain.Tests.fsproj @@ -20,7 +20,7 @@ - + diff --git a/equinox-shipping/Domain/Domain.fsproj b/equinox-shipping/Domain/Domain.fsproj index 5b679032c..f3003cce6 100644 --- a/equinox-shipping/Domain/Domain.fsproj +++ b/equinox-shipping/Domain/Domain.fsproj @@ -20,11 +20,11 @@ - - - - - + + + + + diff --git a/equinox-shipping/Watchdog.Lambda.Cdk/Watchdog.Lambda.Cdk.fsproj b/equinox-shipping/Watchdog.Lambda.Cdk/Watchdog.Lambda.Cdk.fsproj index 41100b04a..d7e67bdfc 100644 --- a/equinox-shipping/Watchdog.Lambda.Cdk/Watchdog.Lambda.Cdk.fsproj +++ b/equinox-shipping/Watchdog.Lambda.Cdk/Watchdog.Lambda.Cdk.fsproj @@ -20,7 +20,7 @@ - + diff --git a/equinox-shipping/Watchdog.Lambda/Watchdog.Lambda.fsproj b/equinox-shipping/Watchdog.Lambda/Watchdog.Lambda.fsproj index 91527b6ab..3bb6ae4e9 100644 --- a/equinox-shipping/Watchdog.Lambda/Watchdog.Lambda.fsproj +++ b/equinox-shipping/Watchdog.Lambda/Watchdog.Lambda.fsproj @@ -16,7 +16,7 @@ - + diff --git a/equinox-shipping/Watchdog/Infrastructure.fs b/equinox-shipping/Watchdog/Infrastructure.fs index 30a176ed7..6cbfdb57c 100644 --- a/equinox-shipping/Watchdog/Infrastructure.fs +++ b/equinox-shipping/Watchdog/Infrastructure.fs @@ -95,9 +95,10 @@ module EventStoreContext = module OutcomeKind = - let [] (|StoreExceptions|_|) exn = + let [] (|StoreExceptions|_|) (exn: exn) = match exn with | Equinox.DynamoStore.Exceptions.ProvisionedThroughputExceeded | Equinox.CosmosStore.Exceptions.RateLimited -> Propulsion.Streams.OutcomeKind.RateLimited |> ValueSome - | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Timeout |> ValueSome + | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Tagged "cosmosTimeout" |> ValueSome + | :? System.Threading.Tasks.TaskCanceledException -> Propulsion.Streams.OutcomeKind.Tagged "taskCancelled" |> ValueSome | _ -> ValueNone diff --git a/equinox-shipping/Watchdog/Watchdog.fsproj b/equinox-shipping/Watchdog/Watchdog.fsproj index 9183ddae6..5e29e58f1 100644 --- a/equinox-shipping/Watchdog/Watchdog.fsproj +++ b/equinox-shipping/Watchdog/Watchdog.fsproj @@ -18,10 +18,10 @@ - - - - + + + + diff --git a/equinox-testbed/Testbed.fsproj b/equinox-testbed/Testbed.fsproj index 5833c989f..618bb7bb3 100644 --- a/equinox-testbed/Testbed.fsproj +++ b/equinox-testbed/Testbed.fsproj @@ -17,11 +17,11 @@ - - - - - + + + + + diff --git a/equinox-web-csharp/Domain/Domain.csproj b/equinox-web-csharp/Domain/Domain.csproj index 47173e8f1..e629abc6e 100755 --- a/equinox-web-csharp/Domain/Domain.csproj +++ b/equinox-web-csharp/Domain/Domain.csproj @@ -5,8 +5,8 @@ - - + + diff --git a/equinox-web-csharp/Web/CosmosContext.cs b/equinox-web-csharp/Web/CosmosContext.cs index 79c9e134d..ead44ff29 100644 --- a/equinox-web-csharp/Web/CosmosContext.cs +++ b/equinox-web-csharp/Web/CosmosContext.cs @@ -56,7 +56,7 @@ public override Func> Resolve(_store, name, codec.ToJsonElementCodec(), fold, initial, accessStrategy, cacheStrategy, compressUnfolds:FSharpOption.None); + var cat = new CosmosStoreCategory(_store, name, codec.ToJsonElementCodec(), fold, initial, accessStrategy, cacheStrategy); return cat.Resolve(handlerLog); } } \ No newline at end of file diff --git a/equinox-web-csharp/Web/Web.csproj b/equinox-web-csharp/Web/Web.csproj index 9be8b7486..c888985d5 100755 --- a/equinox-web-csharp/Web/Web.csproj +++ b/equinox-web-csharp/Web/Web.csproj @@ -5,10 +5,10 @@ - - - - + + + + diff --git a/equinox-web/Domain/Domain.fsproj b/equinox-web/Domain/Domain.fsproj index bd6d1f9c3..7fbc291e4 100644 --- a/equinox-web/Domain/Domain.fsproj +++ b/equinox-web/Domain/Domain.fsproj @@ -17,11 +17,11 @@ - - - - - + + + + + diff --git a/equinox-web/Web/Startup.fs b/equinox-web/Web/Startup.fs index e2d61c312..7a2ce56f3 100644 --- a/equinox-web/Web/Startup.fs +++ b/equinox-web/Web/Startup.fs @@ -190,7 +190,7 @@ type Startup() = .UseSerilogRequestLogging() // see https://nblumhardt.com/2019/10/serilog-in-aspnetcore-3/ #if todos // NB Jet does now own, control or audit https://todobackend.com; it is a third party site; please satisfy yourself that this is a safe thing use in your environment before using it._ - .UseCors(fun x -> x.WithOrigins([|"https://www.todobackend.com"|]).AllowAnyHeader().AllowAnyMethod() |> ignore) + .UseCors(_.WithOrigins([|"https://www.todobackend.com"|]).AllowAnyHeader().AllowAnyMethod() |> ignore) #endif .UseEndpoints(fun endpoints -> endpoints.MapMetrics() |> ignore // Host /metrics for Prometheus diff --git a/equinox-web/Web/Web.fsproj b/equinox-web/Web/Web.fsproj index 3619ad14b..fa720b173 100644 --- a/equinox-web/Web/Web.fsproj +++ b/equinox-web/Web/Web.fsproj @@ -11,12 +11,12 @@ - - + + - + diff --git a/feed-consumer/FeedConsumer.fsproj b/feed-consumer/FeedConsumer.fsproj index 1d1a079ee..12dc47ef0 100644 --- a/feed-consumer/FeedConsumer.fsproj +++ b/feed-consumer/FeedConsumer.fsproj @@ -16,11 +16,11 @@ - - - - - + + + + + diff --git a/feed-source/Domain/Domain.fsproj b/feed-source/Domain/Domain.fsproj index df4735163..9825743fb 100644 --- a/feed-source/Domain/Domain.fsproj +++ b/feed-source/Domain/Domain.fsproj @@ -15,10 +15,10 @@ - - - - + + + + diff --git a/feed-source/FeedApi/FeedApi.fsproj b/feed-source/FeedApi/FeedApi.fsproj index 357b2f78f..94de20d0f 100644 --- a/feed-source/FeedApi/FeedApi.fsproj +++ b/feed-source/FeedApi/FeedApi.fsproj @@ -19,7 +19,7 @@ - + diff --git a/global.json b/global.json index 0f050ba53..d07970ac2 100644 --- a/global.json +++ b/global.json @@ -1,6 +1,6 @@ { "sdk": { - "version": "6.0.300", + "version": "8.0.100", "rollForward": "latestMajor" } } diff --git a/periodic-ingester/PeriodicIngester.fsproj b/periodic-ingester/PeriodicIngester.fsproj index 78e467fd7..514c32f13 100644 --- a/periodic-ingester/PeriodicIngester.fsproj +++ b/periodic-ingester/PeriodicIngester.fsproj @@ -17,11 +17,11 @@ - - + + - - + + diff --git a/propulsion-archiver/Archiver.fsproj b/propulsion-archiver/Archiver.fsproj index 4967e9a88..59727f2af 100644 --- a/propulsion-archiver/Archiver.fsproj +++ b/propulsion-archiver/Archiver.fsproj @@ -14,8 +14,8 @@ - - + + diff --git a/propulsion-consumer/Consumer.fsproj b/propulsion-consumer/Consumer.fsproj index 3a4f9fb59..dbe7d85ae 100644 --- a/propulsion-consumer/Consumer.fsproj +++ b/propulsion-consumer/Consumer.fsproj @@ -15,8 +15,8 @@ - - + + diff --git a/propulsion-dynamostore-cdk/DynamoStore.Cdk.fsproj b/propulsion-dynamostore-cdk/DynamoStore.Cdk.fsproj index 86315b83b..1af75e6c9 100644 --- a/propulsion-dynamostore-cdk/DynamoStore.Cdk.fsproj +++ b/propulsion-dynamostore-cdk/DynamoStore.Cdk.fsproj @@ -21,9 +21,9 @@ - - - + + + diff --git a/propulsion-hotel/Domain/Domain.fsproj b/propulsion-hotel/Domain/Domain.fsproj index 2dce5221b..2e102af51 100644 --- a/propulsion-hotel/Domain/Domain.fsproj +++ b/propulsion-hotel/Domain/Domain.fsproj @@ -9,10 +9,10 @@ - - - - + + + + diff --git a/propulsion-hotel/Reactor/Reactor.fsproj b/propulsion-hotel/Reactor/Reactor.fsproj index 2c0389c57..e4794595c 100644 --- a/propulsion-hotel/Reactor/Reactor.fsproj +++ b/propulsion-hotel/Reactor/Reactor.fsproj @@ -18,11 +18,11 @@ - + - - - + + + diff --git a/propulsion-indexer/App/App.fsproj b/propulsion-indexer/App/App.fsproj index d58eee7bd..40eff004a 100644 --- a/propulsion-indexer/App/App.fsproj +++ b/propulsion-indexer/App/App.fsproj @@ -8,13 +8,17 @@ - + + - + + - + + + diff --git a/propulsion-indexer/App/Args.fs b/propulsion-indexer/App/Args.fs new file mode 100644 index 000000000..cb2d9956d --- /dev/null +++ b/propulsion-indexer/App/Args.fs @@ -0,0 +1,116 @@ +module App.Args + +open Argu +open System + +let [] CONNECTION = "EQUINOX_COSMOS_CONNECTION" +let [] DATABASE = "EQUINOX_COSMOS_DATABASE" +let [] CONTAINER = "EQUINOX_COSMOS_CONTAINER" +let [] VIEWS = "EQUINOX_COSMOS_VIEWS" + +type Configuration(tryGet: string -> string option) = + + let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" + + member x.CosmosConnection = get CONNECTION + member x.CosmosDatabase = get DATABASE + member x.CosmosContainer = get CONTAINER + member x.CosmosViews = get VIEWS + +type [] CosmosParameters = + | [] Verbose + | [] Connection of string + | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode + | [] Database of string + | [] Container of string + | [] Views of string + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of float + interface IArgParserTemplate with + member p.Usage = p |> function + | Verbose -> "request Verbose Logging from Store. Default: off" + | ConnectionMode _ -> "override the connection mode. Default: Direct." + | Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable ${CONNECTION} specified)" + | Database _ -> $"specify a database name for store. (optional if environment variable ${DATABASE} specified)" + | Container _ -> $"specify a container name for store. (optional if environment variable ${CONTAINER} specified)" + | Views _ -> $"specify a views Container name for Cosmos views. (optional if environment variable ${VIEWS} specified)" + | Timeout _ -> "specify operation timeout in seconds. Default: 5." + | Retries _ -> "specify operation retries. Default: 1." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." +and CosmosArguments(c: Configuration, p: ParseResults) = + let connection = p.GetResult(Connection, fun () -> c.CosmosConnection) + let connector = + let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds + let retries = p.GetResult(Retries, 1) + let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds + let mode = p.TryGetResult ConnectionMode + Equinox.CosmosStore.CosmosStoreConnector(Equinox.CosmosStore.Discovery.ConnectionString connection, timeout, retries, maxRetryWaitTime, ?mode = mode) + member val Verbose = p.Contains Verbose + member val Connection = connection + member val Database = p.GetResult(Database, fun () -> c.CosmosDatabase) + member val Container = p.GetResult(Container, fun () -> c.CosmosContainer) + member val private Views = p.GetResult(Views, fun () -> c.CosmosViews) + member x.Connect() = connector.Connect(x.Database, x.Container, x.Views) + +type [] CosmosSourceParameters = + | [] Verbose + | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode + | [] Connection of string + | [] Database of string + | [] Container of string + | [] Views of string + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of float + + | [] LeaseContainerSuffix of string + | [] LeaseContainer of string + | [] FromTail + | [] MaxItems of int + | [] LagFreqM of float + interface IArgParserTemplate with + member p.Usage = p |> function + | Verbose -> "request Verbose Logging from ChangeFeedProcessor and Store. Default: off" + | ConnectionMode _ -> "override the connection mode. Default: Direct." + | Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable {CONNECTION} specified)" + | Database _ -> $"specify a database name for store. (optional if environment variable {DATABASE} specified)" + | Container _ -> $"specify a container name for store. (optional if environment variable {CONTAINER} specified)" + | Views _ -> $"specify a container name for views container. (optional if environment variable {VIEWS} specified)" + | Timeout _ -> "specify operation timeout in seconds. Default: 5." + | Retries _ -> "specify operation retries. Default: 1." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." + + | LeaseContainerSuffix _ -> "specify Container Name suffix for Leases container. Default: `-aux`." + | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `` + `-aux`." + | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." + | MaxItems _ -> "maximum item count to request from the feed. Default: unlimited." + | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" +and CosmosSourceArguments(c: Configuration, p: ParseResults) = + let connection = p.GetResult(Connection, fun () -> c.CosmosConnection) + let connector = + let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds + let retries = p.GetResult(Retries, 1) + let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds + let mode = p.TryGetResult ConnectionMode + Equinox.CosmosStore.CosmosStoreConnector(Equinox.CosmosStore.Discovery.ConnectionString connection, timeout, retries, maxRetryWaitTime, ?mode = mode) + let database = p.GetResult(Database, fun () -> c.CosmosDatabase) + let containerId = p.GetResult(Container, fun () -> c.CosmosContainer) + let viewsContainerId = p.GetResult(Views, fun () -> c.CosmosViews) + + let suffix = p.GetResult(LeaseContainerSuffix, "-aux") + let leaseContainerId = p.GetResult(LeaseContainer, containerId + suffix) + + let fromTail = p.Contains FromTail + let maxItems = p.TryGetResult MaxItems + let tailSleepInterval = TimeSpan.FromMilliseconds 500. + let lagFrequency = p.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes + + member val IsLagFreqSpecified = p.Contains LagFreqM + member val Verbose = p.Contains Verbose + member val Connection = connection + member val Database = database + member _.ConnectWithFeed(?lsc) = connector.ConnectWithFeed(database, containerId, viewsContainerId, leaseContainerId, ?logSnapshotConfig = lsc) + member _.ConnectWithFeedReadOnly(auxClient, auxDatabase, auxContainerId) = + connector.ConnectWithFeedReadOnly(database, containerId, viewsContainerId, auxClient, auxDatabase, auxContainerId) + member val MonitoringParams = fromTail, maxItems, tailSleepInterval, lagFrequency diff --git a/propulsion-indexer/App/Configuration.fs b/propulsion-indexer/App/Configuration.fs deleted file mode 100644 index 568a54671..000000000 --- a/propulsion-indexer/App/Configuration.fs +++ /dev/null @@ -1,16 +0,0 @@ -module App.Args - -let [] CONNECTION = "EQUINOX_COSMOS_CONNECTION" -let [] DATABASE = "EQUINOX_COSMOS_DATABASE" -let [] CONTAINER = "EQUINOX_COSMOS_CONTAINER" -let [] VIEWS = "EQUINOX_COSMOS_VIEWS" - -type Configuration(tryGet: string -> string option) = - - let get key = match tryGet key with Some value -> value | None -> failwith $"Missing Argument/Environment Variable %s{key}" - - member _.CosmosConnection = get CONNECTION - member _.CosmosDatabase = get DATABASE - member _.CosmosContainer = get CONTAINER - member _.CosmosViews = get VIEWS - diff --git a/propulsion-indexer/App/CosmosDumpSource.fs b/propulsion-indexer/App/CosmosDumpSource.fs new file mode 100644 index 000000000..5b8e00c57 --- /dev/null +++ b/propulsion-indexer/App/CosmosDumpSource.fs @@ -0,0 +1,31 @@ +namespace App + +open FSharp.Control +open Propulsion.Feed +open System + +/// Parses CR separated file with items dumped from a Cosmos Container containing Equinox Items +/// One way to generate one of those is via the cosmic tool at https://github.com/creyke/Cosmic +/// dotnet tool install -g cosmic +/// # then connect/select db per https://github.com/creyke/Cosmic#basic-usage +/// cosmic query 'select * from c order by c._ts' > file.out +type [] CosmosDumpSource private () = + + static member Start(log, statsInterval, filePath, skip, parseFeedDoc, sink, ?truncateTo) = + let isNonCommentLine line = System.Text.RegularExpressions.Regex.IsMatch(line, "^\s*#") |> not + let truncate = match truncateTo with Some count -> Seq.truncate count | None -> id + let lines = Seq.append (System.IO.File.ReadLines filePath |> truncate) (Seq.singleton null) // Add a trailing EOF sentinel so checkpoint positions can be line numbers even when finished reading + let crawl _ _ _ = taskSeq { + for i, line in lines |> Seq.indexed do + let isEof = line = null + if isEof || (i >= skip && isNonCommentLine line) then + let lineNo = int64 i + 1L + try let items = if isEof then Array.empty + else System.Text.Json.JsonDocument.Parse line |> parseFeedDoc |> Seq.toArray + struct (TimeSpan.Zero, ({ items = items; isTail = isEof; checkpoint = Position.parse lineNo }: Core.Batch<_>)) + with e -> raise <| exn($"File Parse error on L{lineNo}: '{line.Substring(0, 200)}'", e) } + let source = + let checkpointStore = Equinox.MemoryStore.VolatileStore() + let checkpoints = ReaderCheckpoint.MemoryStore.create log ("consumerGroup", TimeSpan.FromMinutes 1) checkpointStore + Propulsion.Feed.Core.SinglePassFeedSource(log, statsInterval, SourceId.parse filePath, crawl, checkpoints, sink, string) + source.Start(fun _ct -> task { return [| TrancheId.parse "0" |] }) diff --git a/propulsion-indexer/App/Infrastructure.fs b/propulsion-indexer/App/Infrastructure.fs index d27526389..7037b1b56 100644 --- a/propulsion-indexer/App/Infrastructure.fs +++ b/propulsion-indexer/App/Infrastructure.fs @@ -10,8 +10,9 @@ module EnvVar = module Log = + let [] PropertyTag = "isMetric" /// Allow logging to filter out emission of log messages whose information is also surfaced as metrics - let isStoreMetrics e = Filters.Matching.WithProperty("isMetric").Invoke e + let logEventIsMetric e = Serilog.Filters.Matching.WithProperty(PropertyTag).Invoke e /// Equinox and Propulsion provide metrics as properties in log emissions /// These helpers wire those to pass through virtual Log Sinks that expose them as Prometheus metrics. @@ -25,15 +26,22 @@ module Sinks = let equinoxAndPropulsionConsumerMetrics tags group (l: LoggerConfiguration) = l |> equinoxMetricsOnly tags - |> fun l -> l.WriteTo.Sink(Propulsion.Prometheus.LogSink(tags, group)) + |> _.WriteTo.Sink(Propulsion.Prometheus.LogSink(tags, group)) + |> _.WriteTo.Sink(Propulsion.CosmosStore.Prometheus.LogSink(tags)) - let equinoxAndPropulsionCosmosConsumerMetrics tags group (l: LoggerConfiguration) = - l |> equinoxAndPropulsionConsumerMetrics tags group - |> fun l -> l.WriteTo.Sink(Propulsion.CosmosStore.Prometheus.LogSink(tags)) + let private removeMetrics (e: Serilog.Events.LogEvent) = + e.RemovePropertyIfPresent Equinox.CosmosStore.Core.Log.PropertyTag + e.RemovePropertyIfPresent Propulsion.CosmosStore.Log.PropertyTag + e.RemovePropertyIfPresent Propulsion.Feed.Core.Log.PropertyTag + e.RemovePropertyIfPresent Propulsion.Streams.Log.PropertyTag + e.RemovePropertyIfPresent Log.PropertyTag let console (configuration: LoggerConfiguration) = - let t = "[{Timestamp:HH:mm:ss} {Level:u1}] {Message:lj} {Properties:j}{NewLine}{Exception}" - configuration.WriteTo.Console(theme=Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code, outputTemplate=t) + let t = "{Timestamp:HH:mm:ss} {Level:u1} {Message:lj} {Properties:j}{NewLine}{Exception}" + configuration + .WriteTo.Logger(fun l -> + l.Enrich.With({ new Serilog.Core.ILogEventEnricher with member _.Enrich(evt, _) = removeMetrics evt }) + .WriteTo.Console(outputTemplate = t, theme = Sinks.SystemConsole.Themes.AnsiConsoleTheme.Code) |> ignore) [] type Logging() = @@ -43,6 +51,8 @@ type Logging() = configuration .Enrich.FromLogContext() |> fun c -> if verbose = Some true then c.MinimumLevel.Debug() else c + |> fun c -> let generalLevel = if verbose = Some true then Events.LogEventLevel.Information else Events.LogEventLevel.Warning + c.MinimumLevel.Override(typeof.FullName, generalLevel) [] static member private Sinks(configuration: LoggerConfiguration, configureMetricsSinks, configureConsoleSink, ?isMetric) = @@ -50,13 +60,14 @@ type Logging() = a.Logger(configureMetricsSinks >> ignore) |> ignore // unconditionally feed all log events to the metrics sinks a.Logger(fun l -> // but filter what gets emitted to the console sink let l = match isMetric with None -> l | Some predicate -> l.Filter.ByExcluding(Func predicate) + let l = l.Filter.ByExcluding(fun e -> match e.Properties.TryGetValue "SourceContext" with true, (:? Serilog.Events.ScalarValue as v) -> string v.Value = "LeMans.Common.CosmosRepository" | _ -> false) configureConsoleSink l |> ignore) |> ignore - configuration.WriteTo.Async(bufferSize=65536, blockWhenFull=true, configure=System.Action<_> configure) + configuration.WriteTo.Async(bufferSize = 65536, blockWhenFull = true, configure = System.Action<_> configure) [] static member Sinks(configuration: LoggerConfiguration, configureMetricsSinks, verboseStore) = - configuration.Sinks(configureMetricsSinks, Sinks.console, ?isMetric = if verboseStore then None else Some Log.isStoreMetrics) + configuration.Sinks(configureMetricsSinks, Sinks.console, ?isMetric = if verboseStore then None else Some Log.logEventIsMetric) module CosmosStoreConnector = @@ -99,19 +110,27 @@ type Equinox.CosmosStore.CosmosStoreConnector with let client = Equinox.CosmosStore.CosmosStoreClient(cosmosClient) let contexts = client.CreateContext(role, databaseId, containerId, tipMaxEvents = 256, queryMaxItems = 500), - client.CreateContext(role, databaseId, viewsContainerId, tipMaxEvents = 256, queryMaxItems = 500), + // In general, the views container won't write events. We also know we generally won't attach a CFP, so we keep events in tip + client.CreateContext($"{role}(Views)", databaseId, viewsContainerId, tipMaxEvents = 128), // NOTE the tip limits for this connection are set to be effectively infinite in order to ensure that writes never trigger calving from the tip client.CreateContext("snapshotUpdater", databaseId, containerId, tipMaxEvents = 1024, tipMaxJsonLength = 1024 * 1024, skipLog = not (logSnapshotConfig = Some true)) return cosmosClient, contexts } + + /// Connect to the database (including verifying and warming up relevant containers), establish relevant CosmosStoreContexts required by Domain + member x.Connect(databaseId, containerId, viewsContainerId) = async { + let! _client, contexts = x.Connect("Main", databaseId, containerId, viewsContainerId) + return contexts } + + /// Indexer: Connects to a Store as both a CosmosStoreClient and a ChangeFeedProcessor Monitored Container member x.ConnectWithFeed(databaseId, containerId, viewsContainerId, auxContainerId, ?logSnapshotConfig) = async { - let! cosmosClient, contexts = x.Connect("Main", databaseId, containerId, viewsContainerId, auxContainerId, ?logSnapshotConfig = logSnapshotConfig) - let source, leases = CosmosStoreConnector.getSourceAndLeases cosmosClient databaseId containerId auxContainerId + let! client, contexts = x.Connect("Main", databaseId, containerId, viewsContainerId, auxContainerId, ?logSnapshotConfig = logSnapshotConfig) + let source, leases = CosmosStoreConnector.getSourceAndLeases client databaseId containerId auxContainerId return contexts, source, leases } /// Indexer Sync mode: When using a ReadOnly connection string, the leases need to be maintained alongside the target - member x.ConnectWithFeedReadOnly(databaseId, containerId: string, viewsContainerId, auxClient, auxDatabaseId, auxContainerId) = async { - let! client, contexts = x.Connect("Main", databaseId, containerId, viewsContainerId = viewsContainerId) + member x.ConnectWithFeedReadOnly(databaseId, containerId, viewsContainerId, auxClient, auxDatabaseId, auxContainerId) = async { + let! client, contexts = x.Connect("Main", databaseId, containerId, viewsContainerId) let source = CosmosStoreConnector.getSource client databaseId containerId let leases = CosmosStoreConnector.getLeases auxClient auxDatabaseId auxContainerId return contexts, source, leases } @@ -123,7 +142,7 @@ type Equinox.CosmosStore.CosmosStoreConnector with type Factory private () = - static member StartSink(log, stats, maxConcurrentStreams, handle, maxReadAhead) = + static member StartStreamsSink(log, stats, maxConcurrentStreams, handle, maxReadAhead) = Propulsion.Sinks.Factory.StartConcurrent(log, maxReadAhead, maxConcurrentStreams, handle, stats) module OutcomeKind = @@ -131,8 +150,9 @@ module OutcomeKind = let [] (|StoreExceptions|_|) (exn: exn) = match exn with | Equinox.CosmosStore.Exceptions.RateLimited -> Propulsion.Streams.OutcomeKind.RateLimited |> ValueSome - | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Timeout |> ValueSome - | :? System.Threading.Tasks.TaskCanceledException -> Propulsion.Streams.OutcomeKind.Timeout |> ValueSome + | Equinox.CosmosStore.Exceptions.CosmosStatus System.Net.HttpStatusCode.RequestEntityTooLarge -> Propulsion.Streams.OutcomeKind.Tagged "cosmosTooLarge" |> ValueSome + | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Tagged "cosmosTimeout" |> ValueSome + | :? System.Threading.Tasks.TaskCanceledException -> Propulsion.Streams.OutcomeKind.Tagged "taskCancelled" |> ValueSome | _ -> ValueNone // A typical app will likely have health checks etc, implying the wireup would be via `endpoints.MapMetrics()` and thus not use this ugly code directly diff --git a/propulsion-indexer/Domain/Domain.fsproj b/propulsion-indexer/Domain/Domain.fsproj index 77b64dfd3..3eeacb638 100644 --- a/propulsion-indexer/Domain/Domain.fsproj +++ b/propulsion-indexer/Domain/Domain.fsproj @@ -7,9 +7,9 @@ - - - + + + diff --git a/propulsion-indexer/Domain/Store.fs b/propulsion-indexer/Domain/Store.fs index 40369fe84..bbaca8711 100644 --- a/propulsion-indexer/Domain/Store.fs +++ b/propulsion-indexer/Domain/Store.fs @@ -25,28 +25,68 @@ module Codec = /// - The Sync Stored procedure then processes the ensuing request, replacing the current (missing or outdated) `'u`nfolds with the fresh snapshot module Snapshotter = + type Result = + | Valid // No-op case: no update required as the stream already has a correct snapshot + | Invalid // Update skipped due to running in dryRun mode; we avoided running the update + | Updated // Update required: yield a tentative event (which transmuteAllEventsToUnfolds will flip to being an unfold) + let decide generate dryRun (hasSnapshot, state) = + if hasSnapshot then Valid, Array.empty + elif dryRun then Invalid, Array.empty + // Note Updated is a synthetic/tentative event, which transmuteAllEventsToUnfolds will use as a signal to a) update the unfolds b) drop the event + else Updated, generate state type private StateWithSnapshottedFlag<'s> = bool * 's - type Service<'id, 'e, 's> internal (resolve: 'id -> Equinox.Decider<'e, StateWithSnapshottedFlag<'s>>, generate: 's -> 'e) = - - member _.TryUpdate(id): Async = + type Service<'id, 'e, 's> internal (resolve: 'id -> Equinox.Decider<'e, StateWithSnapshottedFlag<'s>>, generate: 's -> 'e[]) = + member _.TryUpdate(id, dryRun): Async = let decider = resolve id - let decide (hasSnapshot, state) = - if hasSnapshot then false, Array.empty // case 1: no update required as the stream already has a correct snapshot - else true, generate state |> Array.singleton // case 2: yield a tentative event (which transmuteAllEventsToUnfolds will flip to being an unfold) - decider.TransactWithPostVersion(decide) - + decider.TransactWithPostVersion(decide generate dryRun) + module Service = + let tryUpdate dryRun (x: Service<_, _, _>) id = x.TryUpdate(id, dryRun) let internal createService streamId generate cat = let resolve = streamId >> createDecider cat Service(resolve, generate) let internal initial'<'s> initial: StateWithSnapshottedFlag<'s> = false, initial - let internal fold' isCurrentSnapshot fold (_wasOrigin, s) xs: StateWithSnapshottedFlag<'s> = + let internal fold' isValidUnfolds fold (_wasOrigin, s) xs: StateWithSnapshottedFlag<'s> = // NOTE ITimelineEvent.IsUnfold and/or a generic isOrigin event would be insufficient for our needs // The tail event encountered by the fold could either be: // - an 'out of date' snapshot (which the normal load process would be able to upconvert from, but is not what we desire) // - another event (if there is no snapshot of any kind) - isCurrentSnapshot (Array.last xs), fold s xs - + isValidUnfolds xs, fold s xs + +module Ingester = + + open FsCodec + type internal Event<'e, 'f> = (struct (ITimelineEvent<'f> * 'e)) + let internal createCodec<'e, 'f, 'c> (target: IEventCodec<'e, 'f, 'c>): IEventCodec, 'f, 'c> = + let encode (c: 'c) ((input, upped): Event<'e, 'f>) : struct (string * 'f * 'f * System.Guid * string * string * System.DateTimeOffset) = + let e = target.Encode(c, upped) + e.EventType, e.Data, input.Meta, input.EventId, input.CorrelationId, input.CausationId, input.Timestamp + let decode (e: ITimelineEvent<'f>): Event<'e, 'f> voption = match target.Decode e with ValueNone -> ValueNone | ValueSome d -> ValueSome (e, d) + Codec.Create, 'f, 'c>(encode, decode) + + type private State = unit + let internal initial: State = () + let internal fold () = ignore + + let private decide (inputCodec: IEventCodec<'e, 'f, unit>) (inputs: ITimelineEvent<'f>[]) (c: Equinox.ISyncContext): Event<'e, 'f>[] = [| + for x in inputs do + if x.Index >= c.Version then // NOTE source and target need to have 1:1 matching event indexes, or things would be much more complex + match inputCodec.Decode x with + | ValueNone -> failwith $"Unknown EventType {x.EventType} at index {x.Index}" + | ValueSome d -> struct (x, d) |] // So we require all source events to exactly one event in the target + + type Service<'id, 'e, 's, 'f> internal (codec: IEventCodec<'e, 'f, unit>, resolve: 'id -> Equinox.Decider, State>) = + member _.Ingest(id, sourceEvents: ITimelineEvent<'f>[]): Async = + let decider = resolve id + decider.TransactEx(decide codec sourceEvents, fun (x: Equinox.ISyncContext) -> x.Version) + let internal createService<'id, 'e, 'f> streamId inputCodec cat = + let resolve = streamId >> createDecider cat + Service<'id, 'e, unit, 'f>(inputCodec, resolve) + module Service = + let ingest (svc: Service<'id, 'e, 's, System.Text.Json.JsonElement>) id (events: ITimelineEvent>[]) = + let events = events |> Array.map (FsCodec.Core.TimelineEvent.Map (System.Func<_, _> FsCodec.SystemTextJson.Interop.InteropHelpers.Utf8ToJsonElement)) + svc.Ingest(id, events) + let private defaultCacheDuration = System.TimeSpan.FromMinutes 20 let private cacheStrategy cache = Equinox.CachingStrategy.SlidingWindow (cache, defaultCacheDuration) @@ -63,16 +103,16 @@ module Cosmos = open Equinox.CosmosStore - let private createCached name codec initial fold accessStrategy (context, cache) = - CosmosStoreCategory(context, name, codec, fold, initial, accessStrategy, cacheStrategy cache) + let private createCached name codec initial fold accessStrategy shouldCompress (context, cache) = + CosmosStoreCategory(context, name, codec, fold, initial, accessStrategy, cacheStrategy cache, ?shouldCompress = shouldCompress) let createSnapshotted name codec initial fold (isOrigin, toSnapshot) (context, cache) = let accessStrategy = AccessStrategy.Snapshot (isOrigin, toSnapshot) - createCached name codec initial fold accessStrategy (context.main, cache) + createCached name codec initial fold accessStrategy None (context.main, cache) let createRollingState name codec initial fold toSnapshot (context, cache) = let accessStrategy = AccessStrategy.RollingState toSnapshot - createCached name codec initial fold accessStrategy (context.views, cache) + createCached name codec initial fold accessStrategy None (context.views, cache) let createConfig (main, views, snapshotUpdate) cache = Config.Cosmos ({ main = main; views = views; snapshotUpdate = snapshotUpdate }, cache) @@ -82,10 +122,48 @@ module Cosmos = let private accessStrategy isOrigin = let transmuteAllEventsToUnfolds events _state = [||], events AccessStrategy.Custom (isOrigin, transmuteAllEventsToUnfolds) - let private createCategory name codec initial fold isCurrent (contexts, cache) = - createCached name codec (Snapshotter.initial' initial) (Snapshotter.fold' isCurrent fold) (accessStrategy isCurrent) (contexts.snapshotUpdate, cache) - - let create codec initial fold (isCurrentSnapshot, generate) streamId categoryName config = + let private createCategory name codec initial fold isValidUnfolds isOrigin (contexts, cache) = + createCached name codec (Snapshotter.initial' initial) (Snapshotter.fold' isValidUnfolds fold) (accessStrategy isOrigin) (Some isOrigin) (contexts.snapshotUpdate, cache) + /// Equinox allows any number of unfold events to be stored: + /// - the `isOrigin` predicate identifies the "main snapshot" - if it returns `true`, we don't need to load and fold based on events + /// - `isValidUnfolds` inspects the full set of unfolds in order to determine whether they are complete + /// - where Index Unfolds are required for application functionality, we can trigger regeneration where they are missing + let withIndexing codec initial fold (isOrigin, isValidUnfolds, generateUnfolds) streamId categoryName config = let cat = config |> function - | Config.Cosmos (context, cache) -> createCategory categoryName codec initial fold isCurrentSnapshot (context, cache) - Snapshotter.createService streamId generate cat + | Config.Cosmos (context, cache) -> createCategory categoryName codec initial fold isValidUnfolds isOrigin (context, cache) + Snapshotter.createService streamId generateUnfolds cat + /// For the common case where we don't use any indexing - we only have a single relevant unfold to detect, and a function to generate it + let single codec initial fold (isCurrentSnapshot, generate) streamId categoryName config = + withIndexing codec initial fold (isCurrentSnapshot, Array.tryExactlyOne >> Option.exists isCurrentSnapshot, generate >> Array.singleton) streamId categoryName config + + module Ingester = + + let private slice eventSize struct (maxEvents, maxBytes) span = + let mutable countBudget, bytesBudget = maxEvents, maxBytes + let withinLimits y = + countBudget <- countBudget - 1 + bytesBudget <- bytesBudget - eventSize y + // always send at least one event in order to surface the problem and have the stream marked malformed + countBudget = maxEvents - 1 || (countBudget >= 0 && bytesBudget >= 0) + span |> Array.takeWhile withinLimits + // We gauge the likely output size from the input size + // (to be 100% correct, we should encode it as the Sync in Equinox would do for the real converted data) + // (or, to completely cover/gold plate it, we could have an opt-in on the Category to do slicing internally) + let eventSize ((x, _e): Ingester.Event<_, _>) = x.Size + let private accessStrategy = + let isOriginIgnoreEvents _ = true // we only need to know the Version to manage the ingestion process + let transmuteTrimsToStoredProcInputLimitAndDoesNotGenerateUnfolds events () = + let maxEvents, maxBytes = 16384, 256 * 1024 + let trimmed = slice eventSize (maxEvents, maxBytes) events + trimmed, Array.empty + AccessStrategy.Custom (isOriginIgnoreEvents, transmuteTrimsToStoredProcInputLimitAndDoesNotGenerateUnfolds) + let private createCategory name codec (context, cache) = + createCached name codec Ingester.initial Ingester.fold accessStrategy None (context, cache) + + type TargetCodec<'e> = FsCodec.IEventCodec<'e, Core.EventBody, unit> + open FsCodec.SystemTextJson.Interop + let create<'id, 'e> struct (inputStreamCodec: FsCodec.IEventCodec<'e, System.ReadOnlyMemory, unit>, targetCodec: TargetCodec<'e>) streamId categoryName struct (context, cache) = + let rewriteEventBodiesCodec = Ingester.createCodec<'e, System.Text.Json.JsonElement, unit> targetCodec + let cat = createCategory categoryName rewriteEventBodiesCodec (context, cache) + let inputStreamToJsonElement = inputStreamCodec.ToJsonElementCodec() + Ingester.createService<'id, 'e, System.Text.Json.JsonElement> streamId inputStreamToJsonElement cat diff --git a/propulsion-indexer/Domain/Todo.fs b/propulsion-indexer/Domain/Todo.fs index a5980db95..a0d87fddc 100644 --- a/propulsion-indexer/Domain/Todo.fs +++ b/propulsion-indexer/Domain/Todo.fs @@ -1,10 +1,9 @@ module IndexerTemplate.Domain.Todo -module private Stream = - let [] Category = "Todos" - let id = FsCodec.StreamId.gen ClientId.toString - let decodeId = FsCodec.StreamId.dec ClientId.parse - let tryDecode = FsCodec.StreamName.tryFind Category >> ValueOption.map decodeId +let [] CategoryName = "Todos" +let private streamId = FsCodec.StreamId.gen ClientId.toString +let private decodeId = FsCodec.StreamId.dec ClientId.parse +let private tryDecode = FsCodec.StreamName.tryFind CategoryName >> ValueOption.map decodeId // NB - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = @@ -24,13 +23,13 @@ module Events = module Reactions = - let categories = [| Stream.Category |] + let categories = [| CategoryName |] /// Allows us to skip producing summaries for events that we know won't result in an externally discernable change to the summary output let private impliesStateChange = function Events.Snapshotted _ -> false | _ -> true let dec = Streams.Codec.gen - let [] (|For|_|) = Stream.tryDecode + let [] (|For|_|) = tryDecode let [] (|ImpliesStateChange|_|) = function | struct (For clientId, _) & Streams.Decode dec events when Array.exists impliesStateChange events -> ValueSome clientId | _ -> ValueNone @@ -73,7 +72,8 @@ type Service internal (resolve: ClientId -> Equinox.Decider Store.Cosmos.createSnapshotted Stream.Category Events.codec Fold.initial Fold.fold Fold.Snapshot.config (context, cache) - let create (Category cat) = Service(Stream.id >> Store.createDecider cat) + | Store.Config.Cosmos (context, cache) -> Store.Cosmos.createSnapshotted CategoryName Events.codec Fold.initial Fold.fold Fold.Snapshot.config (context, cache) + let create (Category cat) = Service(streamId >> Store.createDecider cat) diff --git a/propulsion-indexer/Indexer/Indexer.fs b/propulsion-indexer/Indexer/Indexer.fs index d82e97819..6b2724213 100644 --- a/propulsion-indexer/Indexer/Indexer.fs +++ b/propulsion-indexer/Indexer/Indexer.fs @@ -3,8 +3,8 @@ module IndexerTemplate.Indexer.Indexer type Outcome = Metrics.Outcome /// Gathers stats based on the Outcome of each Span as it's processed, for periodic emission via DumpStats() -type Stats(log, statsInterval, stateInterval, verboseStore) = - inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval) +type Stats(log, statsInterval, stateInterval, verboseStore, abendThreshold) = + inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval, abendThreshold = abendThreshold) let mutable ok, skipped, na = 0, 0, 0 override _.HandleOk res = @@ -48,7 +48,7 @@ let handle (sourceService: Todo.Service) (summaryService: TodoIndex.Service) str module Factory = - let createHandler store = + let create store = let srcService = Todo.Factory.create store let dstService = TodoIndex.Factory.create store - handle srcService dstService + Some sourceCategories, handle srcService dstService diff --git a/propulsion-indexer/Indexer/Indexer.fsproj b/propulsion-indexer/Indexer/Indexer.fsproj index e25a0afb1..ae12f24b2 100644 --- a/propulsion-indexer/Indexer/Indexer.fsproj +++ b/propulsion-indexer/Indexer/Indexer.fsproj @@ -10,15 +10,13 @@ + + - - - - diff --git a/propulsion-indexer/Indexer/Ingester.fs b/propulsion-indexer/Indexer/Ingester.fs new file mode 100644 index 000000000..008731578 --- /dev/null +++ b/propulsion-indexer/Indexer/Ingester.fs @@ -0,0 +1,27 @@ +module IndexerTemplate.Indexer.Ingester + +open IndexerTemplate.Domain +open Visitor + +type Stats(log, statsInterval, stateInterval, verboseStore, abendThreshold) = + inherit StatsBase(log, statsInterval, stateInterval, verboseStore, abendThreshold = abendThreshold) + override _.HandleOk(()) = () + +let handle todo + stream (events: Propulsion.Sinks.Event[]): Async<_ * unit> = async { + let handle = + match stream with + | Todo.Reactions.For id -> todo id + | sn -> failwith $"Unexpected category %A{sn}" + let! pos' = handle events + return Propulsion.Sinks.StreamResult.OverrideNextIndex pos', () } + +module Factory = + + let createHandler store = + + let todo = Todo.Factory.createIngester store + + let h svc = Store.Ingester.Service.ingest svc + handle + (h todo) diff --git a/propulsion-indexer/Indexer/Program.fs b/propulsion-indexer/Indexer/Program.fs index f5b2b59a2..32dc828d4 100644 --- a/propulsion-indexer/Indexer/Program.fs +++ b/propulsion-indexer/Indexer/Program.fs @@ -14,43 +14,153 @@ module Args = | [] ProcessorName of string | [] MaxReadAhead of int | [] MaxWriters of int - | [] Index of ParseResults - | [] Snapshot of ParseResults - | [] Sync of ParseResults + | [] AbendTimeoutM of float + + | [] DryRun + | [] Follow + | [] IncIdx + | [] IncCat of regex: string + | [] ExcCat of regex: string + | [] IncStream of regex: string + | [] ExcStream of regex: string + | [] IncEvent of regex: string + | [] ExcEvent of regex: string + + | [] Stats of ParseResults + | [] StatsFile of ParseResults + | [] Index of ParseResults + | [] Snapshot of ParseResults + | [] Sync of ParseResults + | [] Export of ParseResults interface IArgParserTemplate with member p.Usage = p |> function | Verbose -> "request Verbose Logging. Default: off." | PrometheusPort _ -> "port from which to expose a Prometheus /metrics endpoint. Default: off." | ProcessorName _ -> "Projector consumer group name." - | MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: 2." - | MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8." - | Index _ -> "Process indexing into the Views Container for the specified Cosmos feed" + | MaxReadAhead _ -> "maximum number of batches to let processing get ahead of completion. Default: File: 32768 Cosmos: 2." + | MaxWriters _ -> "maximum number of concurrent streams on which to process at any time. Default: 8 (Sync, Index: 16)." + | AbendTimeoutM _ -> "maximum number of minutes to wait before existing where processing enters a (non-transient) perpetual exception state. Default: 2" + + | DryRun -> "For Snapshot subcommand, skip actually updating" + | Follow -> "Continue waiting for more input when complete (like unix `tail -f`). Default: the Snapshot and Stats operations exit when the Tail of the feed has been reached" + + | IncIdx -> "Include Index streams. Default: Exclude Index Streams, identified by a $ prefix." + | IncCat _ -> "Allow Stream Category. Multiple values are combined with OR. Default: include all, subject to Category Deny and Stream Deny rules." + | ExcCat _ -> "Deny Stream Category. Specified values/regexes are applied after the Category Allow rule(s)." + | IncStream _ -> "Allow Stream Name. Multiple values are combined with OR. Default: Allow all streams that pass the category Allow test, Fail the Category and Stream deny tests." + | ExcStream _ -> "Deny Stream Name. Specified values/regexes are applied after the IncCat, ExcCat and IncStream filters." + + | IncEvent _ -> "Allow Event Type Name. Multiple values are combined with OR. Applied only after Category and Stream filters. Default: include all." + | ExcEvent _ -> "Deny Event Type Name. Specified values/regexes are applied after the Event Type Name Allow rule(s)." + + | Stats _ -> "Gather stats from the input data only; No indexing or writes performed." + | StatsFile _ -> "Same as stats, but replacing normal input with a File source" + | Index _ -> "Process indexing into the Views Container for the specified feed" | Snapshot _ -> "Process updating of snapshots for all traversed streams in the specified Cosmos feed" - | Sync _ -> "Sync into a specified Store for the specified Cosmos feed" + | Sync _ -> "Sync into a specified Store from the specified Cosmos feed" + | Export _ -> "Sync into a specified Store from the application's store, rewriting the events" + and StreamFilterArguments(p: ParseResults) = + let allowCats, denyCats = p.GetResults IncCat, p.GetResults ExcCat + let allowSns, denySns = p.GetResults IncStream, p.GetResults ExcStream + let incIndexes = p.Contains IncIdx + let allowEts, denyEts = p.GetResults IncEvent, p.GetResults ExcEvent + let isPlain = Seq.forall (fun x -> Char.IsLetterOrDigit x || x = '_') + let asRe = Seq.map (fun x -> if isPlain x then $"^{x}$" else x) + let (|Filter|) exprs = + let values, pats = List.partition isPlain exprs + let valuesContains = let set = System.Collections.Generic.HashSet(values) in set.Contains + let aPatternMatches x = pats |> List.exists (fun p -> System.Text.RegularExpressions.Regex.IsMatch(x, p)) + fun cat -> valuesContains cat || aPatternMatches cat + let filter map (allow, deny) = + match allow, deny with + | [], [] -> fun _ -> true + | Filter includes, Filter excludes -> fun x -> let x = map x in (List.isEmpty allow || includes x) && not (excludes x) + let validStream = filter FsCodec.StreamName.toString (allowSns, denySns) + let isTransactionalStream (sn: FsCodec.StreamName) = let sn = FsCodec.StreamName.toString sn in not (sn.StartsWith('$')) + member _.CreateStreamFilter(maybeCategories) = + let handlerCats = match maybeCategories with Some xs -> List.ofArray xs | None -> List.empty + let allowCats = handlerCats @ allowCats + let validCat = filter FsCodec.StreamName.Category.ofStreamName (allowCats, denyCats) + let allowCats = match allowCats with [] -> [ ".*" ] | xs -> xs + let denyCats = denyCats @ [ if not incIndexes then "^\$" ] + let allowSns, denySns = match allowSns, denySns with [], [] -> [".*"], [] | x -> x + let allowEts, denyEts = match allowEts, denyEts with [], [] -> [".*"], [] | x -> x + Log.Information("Categories ☑️ {@allowCats} 🚫{@denyCats} Streams ☑️ {@allowStreams} 🚫{denyStreams} Events ☑️ {allowEts} 🚫{@denyEts}", + asRe allowCats, asRe denyCats, asRe allowSns, asRe denySns, asRe allowEts, asRe denyEts) + fun sn -> + validCat sn + && validStream sn + && (incIndexes || isTransactionalStream sn) + member val EventFilter = filter (fun (x: Propulsion.Sinks.Event) -> x.EventType) (allowEts, denyEts) + and [] Action = + | SummarizeFile of FileArguments + | Summarize of Args.CosmosSourceArguments + | Index of IndexArguments + | Snapshot of Args.CosmosSourceArguments + | Sync of SyncArguments + | Export of SyncArguments and Arguments(c: Args.Configuration, p: ParseResults) = - let maxReadAhead = p.GetResult(MaxReadAhead, 2) - let maxConcurrentProcessors = p.GetResult(MaxWriters, 8) - member val Verbose = p.Contains Parameters.Verbose + let action = match p.GetSubCommand() with + | Parameters.Stats p -> Summarize <| Args.CosmosSourceArguments(c, p) + | Parameters.StatsFile p -> SummarizeFile <| FileArguments(c, p) + | Parameters.Index p -> Index <| IndexArguments(c, p) + | Parameters.Snapshot p -> Snapshot <| Args.CosmosSourceArguments(c, p) + | Parameters.Sync p -> Sync <| SyncArguments(c, p) + | Parameters.Export p -> Export <| SyncArguments(c, p) + | _ -> p.Raise "Must specify a subcommand" + let source = match action with + | Summarize c | Snapshot c -> Choice1Of2 c + | Index a -> match a.Source with + | Choice1Of2 c -> Choice1Of2 c + | Choice2Of2 f -> Choice2Of2 f + | Sync s | Export s -> match s.Source with + | Choice1Of2 c -> Choice1Of2 c + | Choice2Of2 f -> Choice2Of2 f + | SummarizeFile f -> Choice2Of2 f + let dryRun = match action, p.Contains DryRun with + | Snapshot _, value -> value + | _, true -> p.Raise "dryRun is not applicable to any subcommand other than Snapshot" + | _, false -> false + let actionLabel = match action with + | Snapshot _ when dryRun -> "DryRun Snapshot inspect" + | Snapshot _ -> "Snapshot updat" + | Summarize _ | SummarizeFile _ -> "Summariz" + | Index _ -> "Index" + | Sync _ -> "Synchroniz" + | Export _ -> "Export" + let isFileSource = match source with Choice1Of2 _ -> false | Choice2Of2 _ -> true + let maxReadAhead = p.GetResult(MaxReadAhead, if isFileSource then 32768 else 2) + member val Action = action + member val DryRun = dryRun + member val Source = source + member val Verbose = p.Contains Verbose member val PrometheusPort = p.TryGetResult PrometheusPort member val ProcessorName = p.GetResult ProcessorName member val StatsInterval = TimeSpan.FromMinutes 1. member val StateInterval = TimeSpan.FromMinutes 5. - member x.Cosmos = match x.Action with Action.Index c | Action.Snapshot c -> c | Action.Sync s -> s.Source - member x.ConnectWithFeed(?lsc) = match x.Action with - | Action.Index c | Action.Snapshot c -> c.ConnectWithFeed(?lsc = lsc) - | Action.Sync s -> s.ConnectWithFeed() - member val Action = match p.GetSubCommand() with - | Parameters.Index p -> CosmosArguments(c, p) |> Index - | Parameters.Snapshot p -> CosmosArguments(c, p) |> Snapshot - | Parameters.Sync p -> SyncArguments(c, p) |> Sync - | _ -> p.Raise "Must specify a subcommand" - member x.ActionLabel = match x.Action with Action.Index _ -> "Indexing" | Action.Snapshot _ -> "Snapshotting" | Action.Sync _ -> "Exporting" - member x.IsSnapshotting = match x.Action with Action.Snapshot _ -> true | _ -> false - member x.ProcessorParams() = Log.Information("{action}... {processorName}, reading {maxReadAhead} ahead, {dop} writers", - x.ActionLabel, x.ProcessorName, maxReadAhead, maxConcurrentProcessors) - (x.ProcessorName, maxReadAhead, maxConcurrentProcessors) - and [] Action = Index of CosmosArguments | Snapshot of CosmosArguments | Sync of SyncArguments - and [] SyncParameters = + member val AbendTimeout = p.GetResult(AbendTimeoutM, 2.) |> TimeSpan.FromMinutes + member val Filters = StreamFilterArguments(p) + member val MaxConcurrentProcessors =p.GetResult(MaxWriters, match action with Sync _ | Index _ -> 16 | _ -> 8) + member val CosmosVerbose = match source with Choice1Of2 c -> c.Verbose | Choice2Of2 f -> f.CosmosVerbose + member x.WaitForTail = if isFileSource || p.Contains Follow then None + else Some (x.StatsInterval * 2.) + member x.LagEstimationInterval = x.WaitForTail |> Option.map (fun _ -> TimeSpan.seconds 5) + member x.ProcessorParams() = Log.Information("{action}ing... {processorName}, reading {maxReadAhead} ahead, {dop} writers", + actionLabel, x.ProcessorName, maxReadAhead, x.MaxConcurrentProcessors) + (x.ProcessorName, maxReadAhead, x.MaxConcurrentProcessors) + member _.Connect appName = async { let store contexts = (contexts, Equinox.Cache(appName, sizeMb = 10)) ||> Store.Cosmos.createConfig + match source, action with + | Choice2Of2 f, (SummarizeFile _ | Sync _ | Export _) -> + return Choice1Of3 (f.Filepath, (f.Skip, f.Trunc)) + | Choice2Of2 f, Index _ -> + let! contexts = f.Connect() + return Choice2Of3 (f.Filepath, (f.Skip, f.Trunc), store contexts) + | Choice2Of2 _, (Summarize _ | Snapshot _ as x) -> return x |> failwithf "unexpected %A" + | Choice1Of2 c, action -> + let lsc = match action with Snapshot _ -> true | _ -> false + let! contexts, monitored, leases = c.ConnectWithFeed(lsc = lsc) + return Choice3Of3 (monitored, leases, c.MonitoringParams, store contexts) } + and [] SyncParameters = | [] Connection of string | [] Database of string | [] Container of string @@ -59,11 +169,12 @@ module Args = | [] Retries of int | [] RetriesWaitTime of float | [] MaxKiB of int - | [] Source of ParseResults + | [] Source of ParseResults + | [] SourceFile of ParseResults interface IArgParserTemplate with member p.Usage = p |> function - | Connection _ -> "specify a connection string for the destination Cosmos account. Default: Same as Source" - | Database _ -> "specify a database name for store. Default: Same as Source" + | Connection _ -> "specify a connection string for the destination Cosmos account. Default (if Cosmos): Same as Source" + | Database _ -> "specify a database name for store. Default (if Cosmos): Same as Source" | Container _ -> "specify a container name for store." | LeaseContainerId _ -> "store leases in Sync target DB (default: use `-aux` adjacent to the Source Container). Enables the Source to be read via a ReadOnly connection string." | Timeout _ -> "specify operation timeout in seconds. Default: 5." @@ -71,133 +182,177 @@ module Args = | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." | MaxKiB _ -> "specify maximum size in KiB to pass to the Sync stored proc (reduce if Malformed Streams due to 413 RequestTooLarge responses). Default: 128." | Source _ -> "Source store from which events are to be consumed via the feed" + | SourceFile _ -> "Source File from which events are to be consumed" and SyncArguments(c: Args.Configuration, p: ParseResults) = - let source = CosmosArguments(c, p.GetResult Source) - let discovery = p.TryGetResult SyncParameters.Connection - |> Option.map Equinox.CosmosStore.Discovery.ConnectionString - |> Option.defaultWith (fun () -> source.Discovery) - let timeout = p.GetResult(SyncParameters.Timeout, 5) |> TimeSpan.FromSeconds - let retries = p.GetResult(SyncParameters.Retries, 1) - let maxRetryWaitTime = p.GetResult(SyncParameters.RetriesWaitTime, 5) |> TimeSpan.FromSeconds - let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime) - let database = p.GetResult(SyncParameters.Database, fun () -> source.Database) - let container = p.GetResult SyncParameters.Container - member val MaxBytes = p.GetResult(MaxKiB, 128) * 1024 + let source = match p.GetSubCommand() with + | Source p -> Choice1Of2 (Args.CosmosSourceArguments(c, p)) + | SourceFile f -> Choice2Of2 (FileArguments(c, f)) + | x -> p.Raise $"Unexpected Subcommand %A{x}" + let connection = match source with + | Choice1Of2 c -> p.GetResult(Connection, fun () -> c.Connection) + | Choice2Of2 _ -> p.GetResult Connection + let connector = + let timeout = p.GetResult(Timeout, 5.) |> TimeSpan.FromSeconds + let retries = p.GetResult(Retries, 1) + let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5.) |> TimeSpan.FromSeconds + Equinox.CosmosStore.CosmosStoreConnector(Equinox.CosmosStore.Discovery.ConnectionString connection, timeout, retries, maxRetryWaitTime) + let database = match source with + | Choice1Of2 c -> p.GetResult(Database, fun () -> c.Database) + | Choice2Of2 _ -> p.GetResult Database + let container = p.GetResult Container member val Source = source - member _.ConnectWithFeed() = match p.TryGetResult LeaseContainerId with + member val MaxBytes = p.GetResult(MaxKiB, 128) * 1024 + member x.Connect() = connector.ConnectExternal("Destination", database, container) + member x.ConnectEvents() = async { let! context = x.Connect() + return Equinox.CosmosStore.Core.EventsContext(context, Store.Metrics.log) } + member x.ConnectWithFeed() = let source = match source with Choice1Of2 c -> c | Choice2Of2 _file -> p.Raise "unexpected" + match p.TryGetResult LeaseContainerId with | Some localAuxContainerId -> source.ConnectWithFeedReadOnly(connector.CreateUninitialized(), database, localAuxContainerId) | None -> source.ConnectWithFeed() - member _.Connect() = async { let! context = connector.ConnectExternal("Destination", database, container) - return Equinox.CosmosStore.Core.EventsContext(context, Store.Metrics.log) } - and [] CosmosParameters = - | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode - | [] Connection of string - | [] Database of string - | [] Container of string - | [] Views of string - | [] Timeout of float - | [] Retries of int - | [] RetriesWaitTime of float - - | [] Verbose - | [] LeaseContainer of string - | [] FromTail - | [] MaxItems of int - | [] LagFreqM of float + and [] IndexParameters = + | [] Cosmos of ParseResults + | [] File of ParseResults interface IArgParserTemplate with member p.Usage = p |> function - | ConnectionMode _ -> "override the connection mode. Default: Direct." - | Connection _ -> $"specify a connection string for a Cosmos account. (optional if environment variable {Args.CONNECTION} specified)" - | Database _ -> $"specify a database name for store. (optional if environment variable {Args.DATABASE} specified)" - | Container _ -> $"specify a container name for store. (optional if environment variable {Args.CONTAINER} specified)" - | Views _ -> $"specify a container name for views. (optional if environment variable {Args.VIEWS} specified)" - | Timeout _ -> "specify operation timeout in seconds. Default: 5." - | Retries _ -> "specify operation retries. Default: 1." - | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." - - | Verbose -> "request Verbose Logging from ChangeFeedProcessor and Store. Default: off" - | LeaseContainer _ -> "specify Container Name (in this [target] Database) for Leases container. Default: `` + `-aux`." - | FromTail -> "(iff the Consumer Name is fresh) - force skip to present Position. Default: Never skip an event." - | MaxItems _ -> "maximum item count to request from the feed. Default: unlimited." - | LagFreqM _ -> "specify frequency (minutes) to dump lag stats. Default: 1" - and CosmosArguments(c: Args.Configuration, p: ParseResults) = - let discovery = p.GetResult(CosmosParameters.Connection, fun () -> c.CosmosConnection) |> Equinox.CosmosStore.Discovery.ConnectionString - let mode = p.TryGetResult ConnectionMode - let timeout = p.GetResult(Timeout, 5) |> TimeSpan.FromSeconds - let retries = p.GetResult(Retries, 1) - let maxRetryWaitTime = p.GetResult(RetriesWaitTime, 5) |> TimeSpan.FromSeconds - let connector = Equinox.CosmosStore.CosmosStoreConnector(discovery, timeout, retries, maxRetryWaitTime, ?mode = mode) - let database = p.GetResult(Database, fun () -> c.CosmosDatabase) - let containerId = p.GetResult(Container, fun () -> c.CosmosContainer) - let viewsContainerId = p.GetResult(Views, fun () -> c.CosmosViews) - - let leaseContainerId = p.GetResult(LeaseContainer, containerId + "-aux") - let fromTail = p.Contains FromTail - let maxItems = p.TryGetResult MaxItems - let lagFrequency = p.GetResult(LagFreqM, 1.) |> TimeSpan.FromMinutes - member val Verbose = p.Contains Verbose - member val MonitoringParams = fromTail, maxItems, lagFrequency - member _.Discovery = discovery - member _.Database = database - member _.ConnectWithFeed(?lsc) = connector.ConnectWithFeed(database, containerId, viewsContainerId, leaseContainerId, ?logSnapshotConfig = lsc) - member _.ConnectWithFeedReadOnly(auxClient, auxDatabase, auxContainerId) = - connector.ConnectWithFeedReadOnly(database, containerId, viewsContainerId, auxClient, auxDatabase, auxContainerId) + | Cosmos _ -> "CosmosDb source parameters" + | File _ -> "Replacing normal input with a File source" + and IndexArguments(c: Args.Configuration, p: ParseResults) = + member val Source = match p.GetSubCommand() with + | IndexParameters.Cosmos p -> Choice1Of2 (Args.CosmosSourceArguments(c, p)) + | IndexParameters.File f -> Choice2Of2 (FileArguments(c, f)) + // | _ -> p.Raise $"Unexpected Subcommand %A{x}" + and [] FileParameters = + | [] Path of filename: string + | [] Skip of lines: int + | [] Truncate of lines: int + | [] LineNo of int + | [] Cosmos of ParseResults + interface IArgParserTemplate with + member p.Usage = p |> function + | Path _ -> "specify file path" + | Skip _ -> "specify number of lines to skip" + | Truncate _ -> "specify line number to pretend is End of File" + | LineNo _ -> "specify line number to start (1-based)" + | Cosmos _ -> "CosmosDb parameters (required for Index, not applicable for StatsFile or SourceFile)" + and FileArguments(c: Args.Configuration, p: ParseResults) = + let cosmos = match p.TryGetSubCommand() with Some (Cosmos p) -> Args.CosmosArguments(c, p) |> Some | _ -> None + let cosmosMissing () = p.Raise "cosmos details must be specified" + member val CosmosVerbose = match cosmos with Some c -> c.Verbose | None -> false + member val Filepath = p.GetResult Path + member val Skip = p.TryPostProcessResult(LineNo, fun l -> l - 1) |> Option.defaultWith (fun () -> p.GetResult(Skip, 0)) + member val Trunc = p.TryGetResult Truncate + member _.Connect() = match cosmos with Some c -> c.Connect() | None -> async { return cosmosMissing () } /// Parse the commandline; can throw exceptions in response to missing arguments and/or `-h`/`--help` args let parse tryGetConfigValue argv: Arguments = let programName = System.Reflection.Assembly.GetEntryAssembly().GetName().Name - let parser = ArgumentParser.Create(programName=programName) + let parser = ArgumentParser.Create(programName = programName) Arguments(Args.Configuration tryGetConfigValue, parser.ParseCommandLine argv) let [] AppName = "IndexerTemplate" let build (args: Args.Arguments) = async { let processorName, maxReadAhead, maxConcurrentStreams = args.ProcessorParams() - let! contexts, monitored, leases = args.ConnectWithFeed(args.IsSnapshotting) - let store = (contexts, Equinox.Cache(AppName, sizeMb = 10)) ||> Store.Cosmos.createConfig - let parseFeedDoc, sink = - let mkParseAll () = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereCategory (fun _ -> true) - let mkSink stats handle = Factory.StartSink(Log.Logger, stats, maxConcurrentStreams, handle, maxReadAhead) - match args.Action with - | Args.Action.Index _ -> - let mkParseCats = Propulsion.CosmosStore.EquinoxSystemTextJsonParser.ofCategories - let stats = Indexer.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.Cosmos.Verbose) - let handle = Indexer.Factory.createHandler store - mkParseCats Indexer.sourceCategories, mkSink stats handle - | Args.Action.Snapshot _ -> - let stats = Snapshotter.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.Cosmos.Verbose) - let handle = Snapshotter.Factory.createHandler store - mkParseAll (), mkSink stats handle - | Args.Action.Sync a -> - mkParseAll (), - let eventsContext = a.Connect() |> Async.RunSynchronously - let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, args.StatsInterval, args.StateInterval) - Propulsion.CosmosStore.CosmosStoreSink.Start( - Log.Logger, maxReadAhead, eventsContext, maxConcurrentStreams, stats, - purgeInterval = TimeSpan.FromHours 1, maxBytes = a.MaxBytes) - let source = - let startFromTail, maxItems, lagFrequency = args.Cosmos.MonitoringParams - Propulsion.CosmosStore.CosmosStoreSource(Log.Logger, args.StatsInterval, monitored, leases, processorName, parseFeedDoc, sink, - startFromTail = startFromTail, ?maxItems = maxItems, lagEstimationInterval = lagFrequency).Start() - return sink, source } + let parse = args.Filters.CreateStreamFilter >> Propulsion.CosmosStore.EquinoxSystemTextJsonParser.whereStream + let configureWithStreamsSink_ stats cats handle = + cats |> parse, Factory.StartStreamsSink(Log.Logger, stats, maxConcurrentStreams, handle, maxReadAhead) + let configureWithStreamsSink stats handle = configureWithStreamsSink_ stats None handle + let summarize () = + let stats = Visitor.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.CosmosVerbose, args.AbendTimeout) + let handle = Visitor.Factory.createHandler args.Filters.EventFilter + configureWithStreamsSink stats handle + let index store = + let stats = Indexer.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.CosmosVerbose, args.AbendTimeout) + let cats, handle = Indexer.Factory.create store // args.Filters.EventFilter + configureWithStreamsSink_ stats cats handle + let snapshot store = + let stats = Snapshotter.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.CosmosVerbose, args.AbendTimeout) + let handle = Snapshotter.Factory.createHandler args.DryRun store + configureWithStreamsSink stats handle + let sync (a: Args.SyncArguments) = + let eventsContext = a.ConnectEvents() |> Async.RunSynchronously + parse None, + let stats = Propulsion.CosmosStore.CosmosStoreSinkStats(Log.Logger, args.StatsInterval, args.StateInterval) + Propulsion.CosmosStore.CosmosStoreSink.Start(Log.Logger, maxReadAhead, eventsContext, maxConcurrentStreams, stats, + purgeInterval = TimeSpan.FromHours 1, maxBytes = a.MaxBytes) + let export (a: Args.SyncArguments) = + let context = a.Connect() |> Async.RunSynchronously + let cache = Equinox.Cache (AppName, sizeMb = 10) + let stats = Ingester.Stats(Log.Logger, args.StatsInterval, args.StateInterval, args.CosmosVerbose, args.AbendTimeout) + let handle = Ingester.Factory.createHandler (context, cache) + configureWithStreamsSink stats handle + let mkFileSource filePath (skip, truncate) parseFeedDoc sink = + sink, CosmosDumpSource.Start(Log.Logger, args.StatsInterval, filePath, skip, parseFeedDoc, sink, ?truncateTo = truncate) + match! args.Connect AppName with + | Choice1Of3 (filePath, skipTrunc) -> // Summarize or ingest from file (no application store or change feed processor involved) + return mkFileSource filePath skipTrunc <|| + match args.Action with + | Args.Action.SummarizeFile _ -> summarize () + | Args.Action.Sync a -> sync a + | Args.Action.Export a -> export a + | x -> x |> failwithf "unexpected %A" + | Choice2Of3 (filePath, skipTrunc, store) -> // Index from file to store (no change feed involved) + return mkFileSource filePath skipTrunc <|| + match args.Action with + | Args.Action.Index _ -> index store + | x -> x |> failwithf "unexpected %A" + | Choice3Of3 (monitored, leases, (startFromTail, maxItems, tailSleepInterval, _lagFrequency), store) -> // normal case - consume from change feed, write to store + let parseFeedDoc, sink = + match args.Action with + | Args.Action.Summarize _ -> summarize () + | Args.Action.Index _ -> index store + | Args.Action.Snapshot _ -> snapshot store + | Args.Action.Sync a -> sync a + | Args.Action.Export a -> export a + | Args.Action.SummarizeFile _ as x -> x |> failwithf "unexpected %A" + let source = + Propulsion.CosmosStore.CosmosStoreSource( + Log.Logger, args.StatsInterval, monitored, leases, processorName, parseFeedDoc, sink, + startFromTail = startFromTail, ?maxItems = maxItems, tailSleepInterval = tailSleepInterval, ?lagEstimationInterval = args.LagEstimationInterval + ).Start() + return sink, source } open Propulsion.Internal // AwaitKeyboardInterruptAsTaskCanceledException +(* +// A typical app will likely have health checks etc, implying the wireup would be via `endpoints.MapMetrics()` and thus not use this ugly code directly +let startMetricsServer port: IDisposable = + let metricsServer = new Prometheus.KestrelMetricServer(port = port) + let ms = metricsServer.Start() + Log.Information("Prometheus /metrics endpoint on port {port}", port) + { new IDisposable with member x.Dispose() = ms.Stop(); (metricsServer :> IDisposable).Dispose() } *) + +let eofSignalException = System.Threading.Tasks.TaskCanceledException "Stopping; FeedMonitor wait completed" +let isExpectedShutdownSignalException: exn -> bool = function + | :? Argu.ArguParseException // Via Arguments.Parse and/or Configuration.tryGet + | :? System.Threading.Tasks.TaskCanceledException -> true // via AwaitKeyboardInterruptAsTaskCanceledException + | _ -> false + let run args = async { let! sink, source = build args - use _metricsServer: IDisposable = args.PrometheusPort |> Option.map startMetricsServer |> Option.toObj - return! [| Async.AwaitKeyboardInterruptAsTaskCanceledException() - source.AwaitWithStopOnCancellation() - sink.AwaitWithStopOnCancellation() - |] |> Async.Parallel |> Async.Ignore } + // use _metricsServer: IDisposable = args.PrometheusPort |> Option.map startMetricsServer |> Option.toObj + try do! [| async { match args.WaitForTail with + | None -> () + | Some initialWait -> + do! source.Monitor.AwaitCompletion(initialWait, awaitFullyCaughtUp = true, logInterval = args.StatsInterval / 2.) |> Async.AwaitTask + source.Stop() + do! source.AwaitWithStopOnCancellation() // Wait until Source has emitted stats + return raise eofSignalException } // trigger tear down of sibling waits + sink.AwaitWithStopOnCancellation() + Async.AwaitKeyboardInterruptAsTaskCanceledException() |] |> Async.Parallel |> Async.Ignore + finally source.Flush() |> Async.Ignore |> Async.RunSynchronously } // flush checkpoints // TODO do! in F# 7 [] let main argv = try let args = Args.parse EnvVar.tryGet argv - try let metrics = Sinks.equinoxAndPropulsionCosmosConsumerMetrics (Sinks.tags AppName) args.ProcessorName - Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.Cosmos.Verbose).CreateLogger() + try let metrics = Sinks.equinoxAndPropulsionConsumerMetrics (Sinks.tags AppName) args.ProcessorName + Log.Logger <- LoggerConfiguration().Configure(args.Verbose).Sinks(metrics, args.CosmosVerbose).CreateLogger() try run args |> Async.RunSynchronously; 0 - with e when not (e :? System.Threading.Tasks.TaskCanceledException) -> Log.Fatal(e, "Exiting"); 2 + with + | :? Propulsion.Streams.HealthCheckException as e -> + Log.Fatal(e, "Exiting due to Healthcheck; Stuck streams {stuck} Failing streams {failing}", e.StuckStreams, e.FailingStreams); 3 + | e when not (isExpectedShutdownSignalException e) -> Log.Fatal(e, "Exiting"); 2 finally Log.CloseAndFlush() - with :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 - | e -> eprintf $"Exception %s{e.Message}"; 1 + with x when x = eofSignalException -> printfn "Processing COMPLETE"; 0 + | :? Argu.ArguParseException as e -> eprintfn $"%s{e.Message}"; 1 + | e -> eprintfn $"Exception %s{e.Message}"; 1 diff --git a/propulsion-indexer/Indexer/Snapshotter.fs b/propulsion-indexer/Indexer/Snapshotter.fs index feb1b6563..b858d6652 100644 --- a/propulsion-indexer/Indexer/Snapshotter.fs +++ b/propulsion-indexer/Indexer/Snapshotter.fs @@ -1,46 +1,58 @@ module IndexerTemplate.Indexer.Snapshotter -type Outcome = bool +open Visitor +open Propulsion.Internal + +type Outcome = (struct (string * System.TimeSpan * Store.Snapshotter.Result)) +module Outcome = let create sn ts res: Outcome = struct (FsCodec.StreamName.Category.ofStreamName sn, ts, res) + +/// Gathers counts of snapshots updated vs skipped +type Stats(log, statsInterval, stateInterval, verboseStore, abendThreshold) = + inherit StatsBase(log, statsInterval, stateInterval, verboseStore, abendThreshold = abendThreshold) + let lats, accLats = Stats.LatencyStatsSet(), Stats.LatencyStatsSet() + let counts, accCounts = CategoryCounters(), CategoryCounters() + override _.HandleOk((cat, ts, res)) = + lats.Record(cat, ts) + accLats.Record(cat, ts) + let count = [ FsCodec.Union.caseName res, 1 ] + counts.Ingest(cat, count) + accCounts.Ingest(cat, count) -type Stats(log, statsInterval, stateInterval, verboseStore) = - inherit Propulsion.Streams.Stats(log, statsInterval, stateInterval) - - let mutable handled, skipped = 0, 0 - override _.HandleOk(updated) = if updated then handled <- handled + 1 else skipped <- skipped + 1 override _.DumpStats() = + counts.DumpGrouped(log, "OUTCOMES") + counts.Clear() + lats.DumpGrouped(id, log, totalLabel = "CATEGORIES") + lats.Clear() base.DumpStats() - log.Information(" Snapshotted {handled}, skipped {skipped}", handled, skipped) - handled <- 0; skipped <- 0 - Equinox.CosmosStore.Core.Log.InternalMetrics.dump Serilog.Log.Logger - - override _.Classify(e) = - match e with - | OutcomeKind.StoreExceptions kind -> kind - | Equinox.CosmosStore.Exceptions.ServiceUnavailable when not verboseStore -> Propulsion.Streams.OutcomeKind.RateLimited - | x -> base.Classify x - override _.HandleExn(log, exn) = - log.Information(exn, "Unhandled") + override _.DumpState(prune) = + accCounts.DumpGrouped(log, "ΣOUTCOMES") + accLats.DumpGrouped(id, log, totalLabel = "ΣCATEGORIES") + if prune then accLats.Clear() + base.DumpState(prune) open IndexerTemplate.Domain -let handle - tryUpdateTodo +let handle todo stream _events: Async<_ * Outcome> = async { + let ts = Stopwatch.timestamp () let! res, pos' = match stream with - | Todo.Reactions.For id -> tryUpdateTodo id + | Todo.Reactions.For id -> todo id | sn -> failwith $"Unexpected category %A{sn}" // a) if the tryUpdate saw a version beyond what (Propulsion.Sinks.Events.nextIndex events) would suggest, then we pass that information out // in order to have the scheduler drop all events until we meet an event that signifies we may need to re-update // b) the fact that we use the same Microsoft.Azure.Cosmos.CosmosClient for the Change Feed and the Equinox-based Services means we are guaranteed // to always see all the _events we've been supplied. (Even if this were not the case, the scheduler would retain the excess events, and that // would result in an immediate re-triggering of the handler with those events) - return Propulsion.Sinks.StreamResult.OverrideNextIndex pos', res } + let elapsed = Stopwatch.elapsed ts + return Propulsion.Sinks.StreamResult.OverrideNextIndex pos', Outcome.create stream elapsed res } module Factory = - let createHandler context = + let createHandler dryRun store = + + let todo = Todo.Factory.createSnapshotter store - let todo = Todo.Factory.createSnapshotter context + let h svc = Store.Snapshotter.Service.tryUpdate dryRun svc handle - todo.TryUpdate + ( h todo) diff --git a/propulsion-indexer/Indexer/Visitor.fs b/propulsion-indexer/Indexer/Visitor.fs new file mode 100644 index 000000000..ea1d3cfe7 --- /dev/null +++ b/propulsion-indexer/Indexer/Visitor.fs @@ -0,0 +1,82 @@ +module IndexerTemplate.Indexer.Visitor + +type Outcome = (struct (string * (struct (string * System.TimeSpan))[] * (string * int)[])) +module Outcome = + let private eventType (x: Propulsion.Sinks.Event) = x.EventType + let private eventCounts = Array.countBy eventType + let private create sn ham spam: Outcome = struct (FsCodec.StreamName.Category.ofStreamName sn, ham, spam) + let render sn ham spam = create sn ham (eventCounts spam) + let render_ sn ham spam elapsedS = + let share = TimeSpan.seconds (match Array.length ham with 0 -> 0 | count -> elapsedS / float count) + create sn (ham |> Array.map (fun x -> struct (eventType x, share))) (eventCounts spam) + +[] +type StatsBase<'outcome>(log, statsInterval, stateInterval, verboseStore, ?abendThreshold) = + inherit Propulsion.Streams.Stats<'outcome>(log, statsInterval, stateInterval, failThreshold = TimeSpan.seconds 120, ?abendThreshold = abendThreshold) + + override _.DumpStats() = + base.DumpStats() + Equinox.CosmosStore.Core.Log.InternalMetrics.dump Serilog.Log.Logger + + override _.Classify(e) = + match e with + | OutcomeKind.StoreExceptions kind -> kind + // Cosmos Emulator overload manifests as 'Response status code does not indicate success: ServiceUnavailable (503); Substatus: 20002' + // (in verbose mode, we let the actual exception bubble up) + | Equinox.CosmosStore.Exceptions.ServiceUnavailable when not verboseStore -> Propulsion.Streams.OutcomeKind.Tagged "cosmos503" + | :? System.TimeoutException -> Propulsion.Streams.OutcomeKind.Tagged "timeoutEx" + // Emulator can emit this (normally only during laptop sleeps) + | Equinox.CosmosStore.Exceptions.CosmosStatus System.Net.HttpStatusCode.Forbidden as e + when e.Message.Contains "Authorization token is not valid at the current time. Please provide a valid token" -> + Propulsion.Streams.OutcomeKind.Timeout + | x -> + base.Classify x + override _.HandleExn(log, exn) = + log.Information(exn, "Unhandled") + +type CategoryCounters() = + let cats = System.Collections.Generic.Dictionary() + member _.Ingest(category, counts) = + let cat = + match cats.TryGetValue category with + | false, _ -> let acc = Propulsion.Internal.Stats.Counters() in cats.Add(category, acc); acc + | true, acc -> acc + for event, count : int in counts do cat.Ingest(event, count) + member _.Categories = cats.Keys + member _.StatsDescending cat = + match cats.TryGetValue cat with + | true, acc -> acc.StatsDescending + | false, _ -> Seq.empty + member _.DumpGrouped(log: Serilog.ILogger, totalLabel) = + if cats.Count <> 0 then + Propulsion.Internal.Stats.dumpCounterSet log totalLabel cats + member _.Clear() = cats.Clear() + +type Stats(log, statsInterval, stateInterval, verboseStore, abendThreshold) = + inherit StatsBase(log, statsInterval, stateInterval, verboseStore, abendThreshold = abendThreshold) + let mutable handled, ignored = 0, 0 + let accHam, accSpam = CategoryCounters(), CategoryCounters() + override _.HandleOk((category, ham, spam)) = + accHam.Ingest(category, ham |> Seq.countBy Propulsion.Internal.ValueTuple.fst) + accSpam.Ingest(category, spam) + handled <- handled + Array.length ham + ignored <- ignored + Array.sumBy snd spam + override _.DumpStats() = + if handled > 0 || ignored > 0 then + if ignored > 0 then log.Information(" Handled {count}, skipped {skipped}", handled, ignored) + handled <- 0; ignored <- 0 + override _.DumpState purge = + for cat in Seq.append accHam.Categories accSpam.Categories |> Seq.distinct |> Seq.sort do + let ham, spam = accHam.StatsDescending(cat) |> Array.ofSeq, accSpam.StatsDescending cat |> Array.ofSeq + if ham.Length > 00 then log.Information(" Category {cat} handled {@ham}", cat, ham) + if spam.Length <> 0 then log.Information(" Category {cat} ignored {@spam}", cat, spam) + if purge then + accHam.Clear(); accSpam.Clear() + +let private handle isValidEvent stream (events: Propulsion.Sinks.Event[]): Async<_ * Outcome> = async { + let ham, spam = events |> Array.partition isValidEvent + return Propulsion.Sinks.StreamResult.AllProcessed, Outcome.render_ stream ham spam 0 } + +module Factory = + + let createHandler = handle diff --git a/propulsion-projector/Projector.fsproj b/propulsion-projector/Projector.fsproj index d05749b13..d3e464e06 100644 --- a/propulsion-projector/Projector.fsproj +++ b/propulsion-projector/Projector.fsproj @@ -20,17 +20,17 @@ - - + + - - - - - - + + + + + + - + diff --git a/propulsion-pruner/Pruner.fsproj b/propulsion-pruner/Pruner.fsproj index 974405320..77ae3b1c7 100644 --- a/propulsion-pruner/Pruner.fsproj +++ b/propulsion-pruner/Pruner.fsproj @@ -14,9 +14,9 @@ - + - + diff --git a/propulsion-reactor/Infrastructure.fs b/propulsion-reactor/Infrastructure.fs index 9489039f0..a0ddc9cb6 100644 --- a/propulsion-reactor/Infrastructure.fs +++ b/propulsion-reactor/Infrastructure.fs @@ -146,9 +146,10 @@ type Logging() = module OutcomeKind = - let [] (|StoreExceptions|_|) exn = + let [] (|StoreExceptions|_|) (exn: exn) = match exn with | Equinox.DynamoStore.Exceptions.ProvisionedThroughputExceeded | Equinox.CosmosStore.Exceptions.RateLimited -> Propulsion.Streams.OutcomeKind.RateLimited |> ValueSome - | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Timeout |> ValueSome + | Equinox.CosmosStore.Exceptions.RequestTimeout -> Propulsion.Streams.OutcomeKind.Tagged "cosmosTimeout" |> ValueSome + | :? System.Threading.Tasks.TaskCanceledException -> Propulsion.Streams.OutcomeKind.Tagged "taskCancelled" |> ValueSome | _ -> ValueNone diff --git a/propulsion-reactor/Reactor.fsproj b/propulsion-reactor/Reactor.fsproj index 8af5aea06..83a416829 100644 --- a/propulsion-reactor/Reactor.fsproj +++ b/propulsion-reactor/Reactor.fsproj @@ -35,14 +35,14 @@ - - - - - - + + + + + + - + diff --git a/propulsion-summary-consumer/SummaryConsumer.fsproj b/propulsion-summary-consumer/SummaryConsumer.fsproj index fdf44e71e..c438ffa04 100644 --- a/propulsion-summary-consumer/SummaryConsumer.fsproj +++ b/propulsion-summary-consumer/SummaryConsumer.fsproj @@ -17,9 +17,9 @@ - - - + + + diff --git a/propulsion-sync/Sync.fsproj b/propulsion-sync/Sync.fsproj index 3c4fbfd9c..08659d0f6 100644 --- a/propulsion-sync/Sync.fsproj +++ b/propulsion-sync/Sync.fsproj @@ -14,10 +14,10 @@ - - + + - + diff --git a/propulsion-tracking-consumer/TrackingConsumer.fsproj b/propulsion-tracking-consumer/TrackingConsumer.fsproj index a81b68319..742a7d62b 100644 --- a/propulsion-tracking-consumer/TrackingConsumer.fsproj +++ b/propulsion-tracking-consumer/TrackingConsumer.fsproj @@ -17,9 +17,9 @@ - - - + + +