Skip to content

Commit

Permalink
Add CosmosDB Support for proAllProjector (#47)
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink authored Feb 23, 2020
1 parent 74261fe commit e5ecd12
Show file tree
Hide file tree
Showing 9 changed files with 306 additions and 65 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ The `Unreleased` section name is replaced by the expected version of next releas
## [Unreleased]

### Added

- Add `Propulsion.Cosmos` support to `proAllProjector` [#47](https://github.com/jet/dotnet-templates/pulls/47)
- Add `-noEventStore` flag to `proAllProjector` [#47](https://github.com/jet/dotnet-templates/pulls/47)

### Changed
### Removed
### Fixed
Expand Down
9 changes: 5 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,14 @@ This repo hosts the source for Jet's [`dotnet new`](https://docs.microsoft.com/e

- [`trackingConsumer`](propulsion-tracking-consumer/README.md) - Boilerplate for an Apache Kafka Consumer using [`Propulsion.Kafka`](https://github.com/jet/propulsion) to ingest accumulating changes in an `Equinox.Cosmos` store idempotently.

- [`proAllProjector`](propulsion-all-projector/README.md) - Boilerplate for an EventStore `$all` stream projector (projecting from an EventStore using `Propulsion.EventStore`.EventStore
- [`proAllProjector`](propulsion-all-projector/README.md) - Boilerplate for a dual mode CosmosDB ChangeFeed Processor and/or EventStore `$all` stream projector using `Propulsion.Cosmos`/`Propulsion.EventStore`

**NOTE At present, checkpoint storage is only implemented for Azure CosmosDB - help wanted ;)**
**NOTE At present, checkpoint storage when projecting from EventStore uses Azure CosmosDB - help wanted ;)**

Standard processing shows importing (in summary form) from `EventStore` to `Cosmos` (use `-b` to remove, yielding a minimal projector)
Standard processing shows importing (in summary form) from an aggregate in `EventStore` or `Cosmos` to a Summary form in `Cosmos` (use `-b`(`lank`) to remove, yielding a minimal projector)

`-k` adds Optional projection to Apache Kafka using [`Propulsion.Kafka`](https://github.com/jet/propulsion).
`-k` adds Optional projection to Apache Kafka using [`Propulsion.Kafka`](https://github.com/jet/propulsion) (instead of ingesting into a local `Cosmos` store).
`-noEventStore` removes support for projecting from EventStore from the emitted code

- [`proSync`](propulsion-sync/README.md) - Boilerplate for a console app that that syncs events between [`Equinox.Cosmos` and `Equinox.EventStore` stores](https://github.com/jet/equinox) using the [relevant `Propulsion`.* libraries](https://github.com/jet/propulsion), filtering/enriching/mapping Events as necessary.

Expand Down
10 changes: 9 additions & 1 deletion propulsion-all-projector/.template.config/template.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"$schema": "http://json.schemastore.org/template",
"author": "@jet @bartelink",
"classifications": [
"Cosmos",
"Event Sourcing",
"Equinox",
"Propulsion",
Expand All @@ -12,11 +13,18 @@
"language": "F#"
},
"identity": "Propulsion.Template.AllProjector",
"name": "Propulsion EventStore $all Projector",
"name": "Propulsion EventStore and/or CosmosDB Projector",
"shortName": "proAllProjector",
"sourceName": "AllTemplate",
"preferNameDirectory": true,
"symbols": {
"noEventStore": {
"type": "parameter",
"datatype": "bool",
"isRequired": false,
"defaultValue": "false",
"description": "Disable projecting from EventStore."
},
"kafka": {
"type": "parameter",
"datatype": "bool",
Expand Down
4 changes: 3 additions & 1 deletion propulsion-all-projector/AllProjector.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@
<ItemGroup>
<PackageReference Include="Argu" Version="5.4.0" />
<PackageReference Include="Destructurama.FSharp" Version="1.1.1-dev-00033" />
<PackageReference Include="Equinox.Cosmos" Version="2.0.0" />
<PackageReference Include="Propulsion.Cosmos" Version="2.0.0" />
<!--#if (!noEventStore)-->
<PackageReference Include="Propulsion.EventStore" Version="2.0.0" />
<!--#endif-->
<!--#if (kafka)-->
<PackageReference Include="Propulsion.Kafka" Version="2.0.0" />
<!--#endif-->
Expand Down
2 changes: 2 additions & 0 deletions propulsion-all-projector/Handler.fs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
module AllTemplate.Handler

//#if (!noEventStore)
open Propulsion.EventStore

/// Responsible for inspecting and then either dropping or tweaking events coming from EventStore
Expand All @@ -8,6 +9,7 @@ let tryMapEvent filterByStreamName (x : EventStore.ClientAPI.ResolvedEvent) =
match x.Event with
| e when not e.IsJson || e.EventStreamId.StartsWith "$" || not (filterByStreamName e.EventStreamId) -> None
| PropulsionStreamEvent e -> Some e
//#endif
//#if kafka

/// Responsible for wrapping a span of events for a specific stream into an envelope (we use the well-known Propulsion.Codec form)
Expand Down
254 changes: 201 additions & 53 deletions propulsion-all-projector/Program.fs

Large diffs are not rendered by default.

63 changes: 58 additions & 5 deletions propulsion-all-projector/README.md
Original file line number Diff line number Diff line change
@@ -1,10 +1,30 @@
//#if kafka
# Propulsion EventStore $all -> Kafka Projector
//#if noEventStore
# Propulsion CosmosDb ChangeFeedProcessor -> Kafka Projector
//#else
# Propulsion EventStore $all Projector (without Kafka emission)
# Propulsion EventStore $all/CosmosDb ChangeFeedProcessor -> Kafka Projector
//#endif
//#else
//#if noEventStore
# Propulsion CosmosDb ChangeFeedProcessor Projector (without Kafka emission)
//#else
# Propulsion EventStore $all/CosmosDb ChangeFeedProcessor Projector (without Kafka emission)
//#endif
//#endif

This project was generated using:
//#if noEventStore
//#if kafka

dotnet new -i Equinox.Templates # just once, to install/update in the local templates store
dotnet new proAllProjector -noEventStore -k # -k => include Kafka projection logic
//#else

dotnet new -i Equinox.Templates # just once, to install/update in the local templates store
# add -k to add Kafka Projection logic
dotnet new proAllProjector -noEventStore # use --help to see options
//#endif
//#else
//#if kafka

dotnet new -i Equinox.Templates # just once, to install/update in the local templates store
Expand All @@ -15,21 +35,54 @@ This project was generated using:
# add -k to add Kafka Projection logic
dotnet new proAllProjector # use --help to see options
//#endif
//#endif

## Usage instructions

0. establish connection strings etc. for the checkpoint store in CosmosDB per https://github.com/jet/equinox README
0. establish connection strings etc. per https://github.com/jet/equinox README

$env:EQUINOX_COSMOS_CONNECTION="AccountEndpoint=https://....;AccountKey=....=;" # or use -s
$env:EQUINOX_COSMOS_DATABASE="equinox-test" # or use -d
$env:EQUINOX_COSMOS_CONTAINER="equinox-test" # or use -c

1. Use the `eqx` tool to initialize a CosmosDb container

dotnet tool install -g Equinox.Tool # only needed once

# (either add environment variables as per step 0 or use -s/-d/-c to specify them)
# generate a cosmos container to store events in
eqx init -ru 400 cosmos

2. We'll be operating a ChangeFeedProcessor, so use `propulsion init` to make a `-aux` container (unless there already is one)

# (either add environment variables as per step 0 or use -s/-d/-c to specify them)
# default name is "($EQUINOX_COSMOS_CONTAINER)-aux"
propulsion init -ru 400 cosmos

//#if (!noEventStore)
NOTE when projecting from EventStore, the current implementation stores the checkpoints within the CosmosDB store in order to remove feedback effects.

(Yes, someone should do a PR to store the checkpoints in EventStore itself; this is extracted from working code, which can assume there's always a CosmosDB around)
//#endif

3. To run an instance of the Projector from a CosmosDb ChangeFeed

//#if kafka
$env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b

# `-g default` defines the Projector Group identity - each id has separated state in the checkpoints store (`Sync-default` in the cited `cosmos` store)
# `-c $env:EQUINOX_COSMOS_CONTAINER ` specifies the source (if you have specified 2x EQUINOX_COSMOS_* environment vars, no connection/database arguments are needed, but the monitored (source) container must be specified explicitly)
# the second `cosmos` specifies the target store for the reactions (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed)
# `-t topic0` identifies the Kafka topic to which the Projector should write
dotnet run -- -g default cosmos -c $env:EQUINOX_COSMOS_CONTAINER cosmos kafka -t topic0
//#else
# `-g default` defines the Projector Group identity - each id has separated state in the checkpoints store (`Sync-default` in the cited `cosmos` store)
# `-c $env:EQUINOX_COSMOS_CONTAINER ` specifies the source (if you have specified EQUINOX_COSMOS_* environment vars, no connection/database arguments are needed, but the monitored (source) container must be specified explicitly)
# `cosmos` specifies the target store for the reactions (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed)
dotnet run -- -g default cosmos -c $env:EQUINOX_COSMOS_CONTAINER cosmos
//#endif

1. To run an instance of the Projector from EventStore
4. To run an instance of the Projector from EventStore

# (either add environment variables as per step 0 or use -s/-d/-c to specify them after the `cosmos` argument token)

Expand All @@ -54,4 +107,4 @@ This project was generated using:

# NB running more than one projector will cause them to duel, and is hence not advised

2. To create a Consumer, use `dotnet new proConsumer` (see README therein for details)
5. To create a Consumer, use `dotnet new proConsumer` (see README therein for details)
14 changes: 13 additions & 1 deletion propulsion-all-projector/Todo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,23 @@ type Service internal (log, resolve, maxAttempts) =

let create resolve = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = 3)

//#if (!noEventStore)
module EventStore =

open Equinox.EventStore // Everything until now is independent of a concrete store

let private resolve (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy).Resolve
let create (context, cache) = resolve (context, cache) |> create
let create (context, cache) = resolve (context, cache) |> create

//#endif
module Cosmos =

open Equinox.Cosmos // Everything until now is independent of a concrete store

let accessStrategy = AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot)
let private resolve (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve
let create (context, cache) = resolve (context, cache) |> create
11 changes: 11 additions & 0 deletions propulsion-all-projector/TodoSummary.fs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,17 @@ type Service internal (log, resolve, maxAttempts) =

let create resolve = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = 3)

//#if (!noEventStore)
module EventStore =

open Equinox.EventStore // Everything until now is independent of a concrete store

let private resolve (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy).Resolve
let create (context, cache) = resolve (context, cache) |> create

//#endif
module Cosmos =

open Equinox.Cosmos // Everything until now is independent of a concrete store
Expand Down

0 comments on commit e5ecd12

Please sign in to comment.