diff --git a/CHANGELOG.md b/CHANGELOG.md index 8c4ac90da..f38ae2c2c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas ## [Unreleased] ### Added + +- now targets `Microsoft.Azure.Cosmos` v `3.1.1` (instead of `Microsoft.Azure.DocumentDB`[`.Core`] v 2.x) [#144](https://github.com/jet/equinox/pull/144) + ### Changed diff --git a/samples/Infrastructure/Infrastructure.fsproj b/samples/Infrastructure/Infrastructure.fsproj index dca97c645..ec7bf948d 100644 --- a/samples/Infrastructure/Infrastructure.fsproj +++ b/samples/Infrastructure/Infrastructure.fsproj @@ -37,6 +37,7 @@ + diff --git a/samples/Infrastructure/Storage.fs b/samples/Infrastructure/Storage.fs index 0449989f4..35c7d21e1 100644 --- a/samples/Infrastructure/Storage.fs +++ b/samples/Infrastructure/Storage.fs @@ -35,7 +35,7 @@ module Cosmos = type [] Arguments = | [] VerboseStore - | [] ConnectionMode of Equinox.Cosmos.ConnectionMode + | [] ConnectionMode of Microsoft.Azure.Cosmos.ConnectionMode | [] Timeout of float | [] Retries of int | [] RetriesWaitTimeS of float @@ -54,7 +54,7 @@ module Cosmos = | Database _ -> "specify a database name for store. (optional if environment variable EQUINOX_COSMOS_DATABASE specified)" | Container _ -> "specify a container name for store. (optional if environment variable EQUINOX_COSMOS_CONTAINER specified)" type Info(args : ParseResults) = - member __.Mode = args.GetResult(ConnectionMode,Equinox.Cosmos.ConnectionMode.Direct) + member __.Mode = args.GetResult(ConnectionMode,Microsoft.Azure.Cosmos.ConnectionMode.Direct) member __.Connection = args.TryGetResult Connection |> defaultWithEnvVar "EQUINOX_COSMOS_CONNECTION" "Connection" member __.Database = args.TryGetResult Database |> defaultWithEnvVar "EQUINOX_COSMOS_DATABASE" "Database" member __.Container = args.TryGetResult Container |> defaultWithEnvVar "EQUINOX_COSMOS_CONTAINER" "Container" diff --git a/samples/Tutorial/Sequence.fs b/samples/Tutorial/Sequence.fs index cedd1a26c..36e6aa633 100644 --- a/samples/Tutorial/Sequence.fs +++ b/samples/Tutorial/Sequence.fs @@ -4,6 +4,17 @@ module Sequence open System +// shim for net461 +module Seq = + let tryLast (source : seq<_>) = + use e = source.GetEnumerator() + if e.MoveNext() then + let mutable res = e.Current + while (e.MoveNext()) do res <- e.Current + Some res + else + None + // NOTE - these types and the union case names reflect the actual storage formats and hence need to be versioned with care module Events = @@ -57,4 +68,4 @@ module Cosmos = module RollingUnfolds = let createService (context,cache) = - createService (context,cache,AccessStrategy.RollingState Fold.snapshot) \ No newline at end of file + createService (context,cache,AccessStrategy.RollingState Fold.snapshot) diff --git a/samples/Tutorial/Tutorial.fsproj b/samples/Tutorial/Tutorial.fsproj index a996d1247..f94548ffd 100644 --- a/samples/Tutorial/Tutorial.fsproj +++ b/samples/Tutorial/Tutorial.fsproj @@ -4,6 +4,8 @@ netstandard2.0;net461 5 true + true + true @@ -29,6 +31,9 @@ + + + diff --git a/samples/Tutorial/Upload.fs b/samples/Tutorial/Upload.fs index a37425ade..5de5e7a6b 100644 --- a/samples/Tutorial/Upload.fs +++ b/samples/Tutorial/Upload.fs @@ -4,6 +4,17 @@ module Upload open System open FSharp.UMX +// shim for net461 +module Seq = + let tryLast (source : seq<_>) = + use e = source.GetEnumerator() + if e.MoveNext() then + let mutable res = e.Current + while (e.MoveNext()) do res <- e.Current + Some res + else + None + type PurchaseOrderId = int and [] purchaseOrderId module PurchaseOrderId = @@ -38,7 +49,7 @@ module Fold = let private evolve _ignoreState = function | Events.IdAssigned e -> Some e.value let fold (state: State) (events: seq) : State = - Seq.tryLast events |> Option.fold evolve state + events |> Seq.tryLast |> Option.fold evolve state let decide (value : UploadId) (state : Fold.State) : Choice * Events.Event list = match state with @@ -67,4 +78,4 @@ module EventStore = open Equinox.EventStore let createService context = let resolve = Resolver(context, Events.codec, Fold.fold, Fold.initial, access=AccessStrategy.LatestKnownEvent).Resolve - create resolve \ No newline at end of file + create resolve diff --git a/src/Equinox.Cosmos/Cosmos.fs b/src/Equinox.Cosmos/Cosmos.fs index 1b2edf0f5..b5c854ca5 100644 --- a/src/Equinox.Cosmos/Cosmos.fs +++ b/src/Equinox.Cosmos/Cosmos.fs @@ -2,7 +2,7 @@ open Equinox.Core open FsCodec -open Microsoft.Azure.Documents +open Microsoft.Azure.Cosmos open Newtonsoft.Json open Serilog open System @@ -333,13 +333,8 @@ module Log = let logPeriodicRate name count ru = log.Information("rp{name} {count:n0} = ~{ru:n0} RU", name, count, ru) for uom, f in measures do let d = f duration in if d <> 0. then logPeriodicRate uom (float totalCount/d |> int64) (totalRc/d) -type Container(client : Client.DocumentClient, databaseId, containerId) = - let collectionUri = Microsoft.Azure.Documents.Client.UriFactory.CreateDocumentCollectionUri(databaseId, containerId) - member __.Client = client - member __.CollectionUri = collectionUri - [] -module private DocDb = +module private MicrosoftAzureCosmosWrappers = /// Extracts the innermost exception from a nested hierarchy of Aggregate Exceptions let (|AggregateException|) (exn : exn) = let rec aux (e : exn) = @@ -351,28 +346,25 @@ module private DocDb = /// CosmosDB Error HttpStatusCode extractor let (|CosmosException|_|) (e : exn) = match e with - | AggregateException (:? DocumentClientException as ce) -> Some ce + | AggregateException (:? CosmosException as ce) -> Some ce | _ -> None - /// Map Nullable to Option - let (|HasValue|Null|) (x:Nullable<_>) = - if x.HasValue then HasValue x.Value - else Null - /// CosmosDB Error HttpStatusCode extractor - let (|CosmosStatusCode|_|) (e : DocumentClientException) = - match e.StatusCode with - | HasValue x -> Some x - | Null -> None + // CosmosDB Error HttpStatusCode extractor + let (|CosmosStatusCode|) (e : CosmosException) = + e.StatusCode + type ReadResult<'T> = Found of 'T | NotFound | NotModified type Container with - member container.TryReadItem(documentId : string, ?options : Client.RequestOptions): Async> = async { + member container.TryReadItem(partitionKey : PartitionKey, documentId : string, ?options : ItemRequestOptions): Async> = async { let options = defaultArg options null - let docLink = sprintf "%O/docs/%s" container.CollectionUri documentId let! ct = Async.CancellationToken - try let! item = async { return! container.Client.ReadDocumentAsync<'T>(docLink, options = options, cancellationToken = ct) |> Async.AwaitTaskCorrect } - if item.StatusCode = System.Net.HttpStatusCode.NotModified then return item.RequestCharge, NotModified + // TODO use TryReadItemStreamAsync to avoid the exception https://github.com/Azure/azure-cosmos-dotnet-v3/issues/692#issuecomment-521936888 + try let! item = async { return! container.ReadItemAsync(documentId, partitionKey, requestOptions = options, cancellationToken = ct) |> Async.AwaitTaskCorrect } + // if item.StatusCode = System.Net.HttpStatusCode.NotModified then return item.RequestCharge, NotModified // NB `.Document` will NRE if a IfNoneModified precondition triggers a NotModified result - else return item.RequestCharge, Found item.Document + // else + return item.RequestCharge, Found item.Resource with CosmosException (CosmosStatusCode System.Net.HttpStatusCode.NotFound as e) -> return e.RequestCharge, NotFound + | CosmosException (CosmosStatusCode System.Net.HttpStatusCode.NotModified as e) -> return e.RequestCharge, NotModified // NB while the docs suggest you may see a 412, the NotModified in the body of the try/with is actually what happens | CosmosException (CosmosStatusCode System.Net.HttpStatusCode.PreconditionFailed as e) -> return e.RequestCharge, NotModified } @@ -455,14 +447,13 @@ function sync(req, expIndex, expEtag) { type [] Exp = Version of int64 | Etag of string | Any let private run (container : Container, stream : string) (exp, req: Tip) : Async = async { - let sprocLink = sprintf "%O/sprocs/%s" container.CollectionUri sprocName - let opts = Client.RequestOptions(PartitionKey=PartitionKey stream) let ep = match exp with Exp.Version ev -> Position.fromI ev | Exp.Etag et -> Position.fromEtag et | Exp.Any -> Position.fromAppendAtEnd let! ct = Async.CancellationToken - let! (res : Client.StoredProcedureResponse) = - container.Client.ExecuteStoredProcedureAsync(sprocLink, opts, ct, box req, box ep.index, box (Option.toObj ep.etag)) |> Async.AwaitTaskCorrect - let newPos = { index = res.Response.n; etag = Option.ofObj res.Response.etag } - return res.RequestCharge, res.Response.conflicts |> function + let args = [| box req; box ep.index; box (Option.toObj ep.etag)|] + let! (res : Scripts.StoredProcedureExecuteResponse) = + container.Scripts.ExecuteStoredProcedureAsync(sprocName, PartitionKey stream, args, cancellationToken = ct) |> Async.AwaitTaskCorrect + let newPos = { index = res.Resource.n; etag = Option.ofObj res.Resource.etag } + return res.RequestCharge, res.Resource.conflicts |> function | null -> Result.Written newPos | [||] when newPos.index = 0L -> Result.Conflict (newPos, Array.empty) | [||] -> Result.ConflictUnknown newPos @@ -509,44 +500,43 @@ function sync(req, expIndex, expEtag) { unfolds |> Seq.mapi (fun offset x -> { i = baseIndex + int64 offset; c = x.EventType; d = x.Data; m = x.Meta; t = DateTimeOffset.UtcNow } : Unfold) module Initialization = - open System.Linq type [] Provisioning = Container of rus: int | Database of rus: int - let adjustOffer (c:Client.DocumentClient) resourceLink rus = async { - let offer = c.CreateOfferQuery().Where(fun r -> r.ResourceLink = resourceLink).AsEnumerable().Single() - let! _ = c.ReplaceOfferAsync(OfferV2(offer,rus)) |> Async.AwaitTaskCorrect in () } - let private createDatabaseIfNotExists (client:Client.DocumentClient) dName maybeRus = - let opts = Client.RequestOptions(ConsistencyLevel = Nullable ConsistencyLevel.Session) - maybeRus |> Option.iter (fun rus -> opts.OfferThroughput <- Nullable rus) - client.CreateDatabaseIfNotExistsAsync(Database(Id=dName), options = opts) |> Async.AwaitTaskCorrect - let private createOrProvisionDatabase (client:Client.DocumentClient) dName mode = async { + let adjustOfferC (c:Container) rus = async { + let! ct = Async.CancellationToken + let! _ = c.ReplaceThroughputAsync(rus, cancellationToken = ct) |> Async.AwaitTaskCorrect in () } + let adjustOfferD (d:Database) rus = async { + let! ct = Async.CancellationToken + let! _ = d.ReplaceThroughputAsync(rus, cancellationToken = ct) |> Async.AwaitTaskCorrect in () } + let private createDatabaseIfNotExists (client:CosmosClient) dName maybeRus = async { + let! ct = Async.CancellationToken + let! dbr = client.CreateDatabaseIfNotExistsAsync(id=dName, throughput = Option.toNullable maybeRus, cancellationToken=ct) |> Async.AwaitTaskCorrect + return dbr.Database } + let private createOrProvisionDatabase (client:CosmosClient) dName mode = async { match mode with | Provisioning.Database rus -> let! db = createDatabaseIfNotExists client dName (Some rus) - return! adjustOffer client db.Resource.SelfLink rus + do! adjustOfferD db rus | Provisioning.Container _ -> let! _ = createDatabaseIfNotExists client dName None in () } - let private createContainerIfNotExists (client:Client.DocumentClient) dName (def: DocumentCollection) maybeRus = - let dbUri = Client.UriFactory.CreateDatabaseUri dName - let opts = match maybeRus with None -> Client.RequestOptions() | Some rus -> Client.RequestOptions(OfferThroughput=Nullable rus) - client.CreateDocumentCollectionIfNotExistsAsync(dbUri, def, opts) |> Async.AwaitTaskCorrect - let private createOrProvisionContainer (client: Client.DocumentClient) (dName, def: DocumentCollection) mode = async { + let private createContainerIfNotExists (d:Database) (cp:ContainerProperties) maybeRus = async { + let! ct = Async.CancellationToken + let! c = d.CreateContainerIfNotExistsAsync(cp, throughput=Option.toNullable maybeRus, cancellationToken=ct) |> Async.AwaitTaskCorrect + return c.Container } + let private createOrProvisionContainer (d:Database) (cp:ContainerProperties) mode = async { match mode with | Provisioning.Database _ -> - let! _ = createContainerIfNotExists client dName def None in () + return! createContainerIfNotExists d cp None | Provisioning.Container rus -> - let! container = createContainerIfNotExists client dName def (Some rus) in () - return! adjustOffer client container.Resource.SelfLink rus } + let! c = createContainerIfNotExists d cp (Some rus) + do! adjustOfferC c rus + return c } let private createStoredProcIfNotExists (c:Container) (name, body): Async = async { - try let! r = c.Client.CreateStoredProcedureAsync(c.CollectionUri, StoredProcedure(Id = name, Body = body)) |> Async.AwaitTaskCorrect + try let! r = c.Scripts.CreateStoredProcedureAsync(Scripts.StoredProcedureProperties(id=name, body=body)) |> Async.AwaitTaskCorrect return r.RequestCharge with CosmosException ((CosmosStatusCode sc) as e) when sc = System.Net.HttpStatusCode.Conflict -> return e.RequestCharge } - let private mkContainerProperties idFieldName partitionKeyFieldName = - // While the v2 SDK and earlier portal versions admitted 'fixed' collections where no Partition Key is defined, we follow the recent policy - // simplification of having a convention of always defining a partition key - let pkd = PartitionKeyDefinition() - pkd.Paths.Add(sprintf "/%s" partitionKeyFieldName) - DocumentCollection(Id = idFieldName, PartitionKey = pkd) - let private createBatchAndTipContainerIfNotExists (client: Client.DocumentClient) (dName,cName) mode : Async = + let private mkContainerProperties containerName partitionKeyFieldName = + ContainerProperties(id = containerName, partitionKeyPath = sprintf "/%s" partitionKeyFieldName) + let private createBatchAndTipContainerIfNotExists (client: CosmosClient) (dName,cName) mode : Async = let def = mkContainerProperties cName Batch.PartitionKeyField def.IndexingPolicy.IndexingMode <- IndexingMode.Consistent def.IndexingPolicy.Automatic <- true @@ -555,35 +545,33 @@ function sync(req, expIndex, expEtag) { def.IndexingPolicy.ExcludedPaths.Add(ExcludedPath(Path="/*")) // NB its critical to index the nominated PartitionKey field defined above or there will be runtime errors for k in Batch.IndexedFields do def.IndexingPolicy.IncludedPaths.Add(IncludedPath(Path = sprintf "/%s/?" k)) - createOrProvisionContainer client (dName, def) mode + createOrProvisionContainer (client.GetDatabase dName) def mode let createSyncStoredProcIfNotExists (log: ILogger option) container = async { let! t, ru = createStoredProcIfNotExists container (sprocName,sprocBody) |> Stopwatch.Time match log with | None -> () | Some log -> log.Information("Created stored procedure {sprocId} in {ms}ms rc={ru}", sprocName, (let e = t.Elapsed in e.TotalMilliseconds), ru) } - let private createAuxContainerIfNotExists client (dName,cName) mode : Async = + let private createAuxContainerIfNotExists (client: CosmosClient) (dName,cName) mode : Async = let def = mkContainerProperties cName "id" // as per Cosmos team, Partition Key must be "/id" // TL;DR no indexing of any kind; see https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet/issues/142 def.IndexingPolicy.Automatic <- false def.IndexingPolicy.IndexingMode <- IndexingMode.None - createOrProvisionContainer client (dName,def) mode - let init log (client : Client.DocumentClient) (dName,cName) mode skipStoredProc = async { + createOrProvisionContainer (client.GetDatabase dName) def mode + let init log (client: CosmosClient) (dName,cName) mode skipStoredProc = async { do! createOrProvisionDatabase client dName mode - do! createBatchAndTipContainerIfNotExists client (dName,cName) mode - let container = Container(client,dName,cName) + let! container = createBatchAndTipContainerIfNotExists client (dName,cName) mode if not skipStoredProc then do! createSyncStoredProcIfNotExists (Some log) container } - let initAux (client: Client.DocumentClient) (dName,cName) rus = async { - // Hardwired for now (not sure if CFP can store in a Database-allocated as it would need to be supplying partition keys) + let initAux (client: CosmosClient) (dName,cName) rus = async { + // Hardwired for now (not sure if CFP can store in a Database-allocated as it would need to be supplying partion keys) let mode = Provisioning.Container rus do! createOrProvisionDatabase client dName mode return! createAuxContainerIfNotExists client (dName,cName) mode } module internal Tip = let private get (container : Container, stream : string) (maybePos: Position option) = - let ac = match maybePos with Some { etag=Some etag } -> Client.AccessCondition(Type=Client.AccessConditionType.IfNoneMatch, Condition=etag) | _ -> null - let ro = Client.RequestOptions(PartitionKey=PartitionKey stream, AccessCondition = ac) - container.TryReadItem(Tip.WellKnownDocumentId, ro) + let ro = match maybePos with Some { etag=Some etag } -> ItemRequestOptions(IfNoneMatchEtag=etag) | _ -> null + container.TryReadItem(PartitionKey stream, Tip.WellKnownDocumentId, ro) let private loggedGet (get : Container * string -> Position option -> Async<_>) (container,stream) (maybePos: Position option) (log: ILogger) = async { let log = log |> Log.prop "stream" stream let! t, (ru, res : ReadResult) = get (container,stream) maybePos |> Stopwatch.Time @@ -612,25 +600,24 @@ module internal Tip = | ReadResult.Found tip -> return Result.Found (Position.fromTip tip, Enum.EventsAndUnfolds tip |> Array.ofSeq) } module internal Query = - open Microsoft.Azure.Documents.Linq open FSharp.Control - let private mkQuery (container : Container, stream : string) maxItems (direction: Direction) startPos = + let private mkQuery (container : Container, stream: string) maxItems (direction: Direction) startPos : FeedIterator= let query = let root = sprintf "SELECT c.id, c.i, c._etag, c.n, c.e FROM c WHERE c.id!=\"%s\"" Tip.WellKnownDocumentId let tail = sprintf "ORDER BY c.i %s" (if direction = Direction.Forward then "ASC" else "DESC") match startPos with - | None -> SqlQuerySpec(sprintf "%s %s" root tail) + | None -> QueryDefinition(sprintf "%s %s" root tail) | Some { index = positionSoExclusiveWhenBackward } -> let cond = if direction = Direction.Forward then "c.n > @startPos" else "c.i < @startPos" - SqlQuerySpec(sprintf "%s AND %s %s" root cond tail, SqlParameterCollection [SqlParameter("@startPos", positionSoExclusiveWhenBackward)]) - let qro = new Client.FeedOptions(PartitionKey = PartitionKey stream, MaxItemCount=Nullable maxItems) - container.Client.CreateDocumentQuery(container.CollectionUri, query, qro).AsDocumentQuery() + QueryDefinition(sprintf "%s AND %s %s" root cond tail).WithParameter("@startPos", positionSoExclusiveWhenBackward) + let qro = new QueryRequestOptions(PartitionKey = Nullable(PartitionKey stream), MaxItemCount=Nullable maxItems) + container.GetItemQueryIterator(query, requestOptions = qro) // Unrolls the Batches in a response - note when reading backwards, the events are emitted in reverse order of index - let private handleResponse direction (streamName: string) startPos (query: IDocumentQuery) (log: ILogger) + let private handleResponse direction (streamName: string) startPos (query: FeedIterator) (log: ILogger) : Async[] * Position option * float> = async { let! ct = Async.CancellationToken - let! t, (res : Client.FeedResponse) = query.ExecuteNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time + let! t, (res : FeedResponse) = query.ReadNextAsync(ct) |> Async.AwaitTaskCorrect |> Stopwatch.Time let batches, ru = Array.ofSeq res, res.RequestCharge let events = batches |> Seq.collect (fun b -> Enum.Events(b, startPos, direction)) |> Array.ofSeq let (Log.BatchLen bytes), count = events, events.Length @@ -644,9 +631,9 @@ module internal Tip = let maybePosition = batches |> Array.tryPick Position.tryFromBatch return events, maybePosition, ru } - let private run (log : ILogger) (readSlice: IDocumentQuery -> ILogger -> Async[] * Position option * float>) + let private run (log : ILogger) (readSlice: FeedIterator -> ILogger -> Async[] * Position option * float>) (maxPermittedBatchReads: int option) - (query: IDocumentQuery) + (query: FeedIterator) : AsyncSeq[] * Position option * float> = let rec loop batchCount : AsyncSeq[] * Position option * float> = asyncSeq { match maxPermittedBatchReads with @@ -787,13 +774,13 @@ open Equinox.Core open Equinox.Cosmos.Store open FsCodec open FSharp.Control -open Microsoft.Azure.Documents +open Microsoft.Azure.Cosmos open Serilog open System open System.Collections.Concurrent /// Defines policies for retrying with respect to transient failures calling CosmosDb (as opposed to application level concurrency conflicts) -type Connection(client: Client.DocumentClient, []?readRetryPolicy: IRetryPolicy, []?writeRetryPolicy) = +type Connection(client: CosmosClient, []?readRetryPolicy: IRetryPolicy, []?writeRetryPolicy) = member __.Client = client member __.TipRetryPolicy = readRetryPolicy member __.QueryRetryPolicy = readRetryPolicy @@ -968,11 +955,10 @@ type Containers(categoryAndIdToDatabaseContainerStream : string -> string -> str let genStreamName categoryName streamId = if categoryName = null then streamId else sprintf "%s-%s" categoryName streamId Containers(fun categoryName streamId -> databaseId, containerId, genStreamName categoryName streamId) - member internal __.Resolve(client, categoryName, id, init) : (Container*string) * (unit -> Async) option = + member internal __.Resolve(client : CosmosClient, categoryName, id, init) : (Container*string) * (unit -> Async) option = let databaseId, containerName, streamName = categoryAndIdToDatabaseContainerStream categoryName id let init = match disableInitialization with Some true -> None | _ -> Some init - let mkWrapped (db,containerName) = ContainerWrapper(Container(client,db,containerName), ?initContainer = init) - let wrapped = wrappers.GetOrAdd((databaseId,containerName), mkWrapped) + let wrapped = wrappers.GetOrAdd((databaseId,containerName), fun (d,c) -> ContainerWrapper(client.GetContainer(d, c), ?initContainer = init)) (wrapped.Container,streamName),wrapped.InitializationGate /// Pairs a Gateway, defining the retry policies for CosmosDb with a Containers map defining mappings from (category,id) to (databaseId,containerId,streamName) @@ -1091,15 +1077,6 @@ type Discovery = UriAndKey (Uri uri, key) | _ -> invalidArg "connectionString" "unrecognized connection string format; must be `AccountEndpoint=https://...;AccountKey=...=;`" -[] -type ConnectionMode = - /// Default mode, uses Https - inefficient as uses a double hop - | Gateway - /// Most efficient, but requires direct connectivity - | Direct - // More efficient than Gateway, but suboptimal - | DirectHttps - type Connector ( /// Timeout to apply to individual reads/write round-trips going to CosmosDb requestTimeout: TimeSpan, @@ -1135,34 +1112,34 @@ type Connector let sanitizedName = name.Replace('\'','_').Replace(':','_') // sic; Align with logging for ES Adapter log.ForContext("Uri", uri).Information("CosmosDb Connection Name {connectionName}", sanitizedName) - /// ClientOptions (ConnectionPolicy with v2 SDK) for this Connector as configured + /// ClientOptions for this Connector as configured member val ClientOptions = - let co = Client.ConnectionPolicy.Default + let maxAttempts, maxWait, timeout = Nullable maxRetryAttemptsOnRateLimitedRequests, Nullable maxRetryWaitTimeOnRateLimitedRequests, requestTimeout + let co = CosmosClientOptions(MaxRetryAttemptsOnRateLimitedRequests = maxAttempts, MaxRetryWaitTimeOnRateLimitedRequests = maxWait, RequestTimeout = timeout) match mode with - | None | Some ConnectionMode.Gateway -> co.ConnectionMode <- Client.ConnectionMode.Gateway // default; only supports Https - | Some ConnectionMode.DirectHttps -> co.ConnectionMode <- Client.ConnectionMode.Direct; co.ConnectionProtocol <- Client.Protocol.Https // Https is default when using Direct - | Some ConnectionMode.Direct -> co.ConnectionMode <- Client.ConnectionMode.Direct; co.ConnectionProtocol <- Client.Protocol.Tcp - co.RetryOptions <- - Client.RetryOptions( - MaxRetryAttemptsOnThrottledRequests = maxRetryAttemptsOnRateLimitedRequests, - MaxRetryWaitTimeInSeconds = (Math.Ceiling(maxRetryWaitTimeOnRateLimitedRequests.TotalSeconds) |> int)) - co.RequestTimeout <- requestTimeout - co.MaxConnectionLimit <- defaultArg gatewayModeMaxConnectionLimit 1000 + | Some ConnectionMode.Direct -> co.ConnectionMode <- ConnectionMode.Direct + | None | Some ConnectionMode.Gateway | Some _ (* enum total match :( *) -> co.ConnectionMode <- ConnectionMode.Gateway // default; only supports Https + match gatewayModeMaxConnectionLimit with + | Some _ when co.ConnectionMode = ConnectionMode.Direct -> invalidArg "gatewayModeMaxConnectionLimit" "Not admissible in Direct mode" + | x -> co.GatewayModeMaxConnectionLimit <- defaultArg x 1000 + match defaultConsistencyLevel with + | Some x -> co.ConsistencyLevel <- Nullable x + | None -> () + // TODO translate +// if defaultArg bypassCertificateValidation false then +// let inhibitCertCheck = new System.Net.Http.HttpClientHandler(ServerCertificateCustomValidationCallback = fun _ _ _ _ -> true) +// co.TransportClientHandlerFactory <- inhibitCertCheck co - /// Yields a DocumentClient configured per the specified strategy + /// Yields a CosmosClient configured and connected the requested `discovery` strategy member __.CreateClient ( /// Name should be sufficient to uniquely identify this connection within a single app instance's logs name, discovery : Discovery, /// true to inhibit logging of client name - []?skipLog) : Client.DocumentClient = + []?skipLog) : CosmosClient = let (Discovery.UriAndKey (databaseUri=uri; key=key)) = discovery if skipLog <> Some true then logName uri name - let consistencyLevel = Nullable(defaultArg defaultConsistencyLevel ConsistencyLevel.Session) - if defaultArg bypassCertificateValidation false then - let inhibitCertCheck = new System.Net.Http.HttpClientHandler(ServerCertificateCustomValidationCallback = fun _ _ _ _ -> true) - new Client.DocumentClient(uri, key, inhibitCertCheck, __.ClientOptions, consistencyLevel) // overload introduced in 2.2.0 SDK - else new Client.DocumentClient(uri, key, __.ClientOptions, consistencyLevel) + new CosmosClient(string uri, key, __.ClientOptions) /// Yields a Connection configured per the specified strategy member __.Connect @@ -1173,7 +1150,8 @@ type Connector /// true to inhibit logging of client name []?skipLog) : Async = async { let client = __.CreateClient(name, discovery, ?skipLog=skipLog) - if skipOpen <> Some true then do! client.OpenAsync() |> Async.AwaitTaskCorrect + // TODO validate this is equivalent to forcing a connect + if skipOpen <> Some true then let! _ = client.ReadAccountAsync() |> Async.AwaitTaskCorrect in () return Connection(client, ?readRetryPolicy=readRetryPolicy, ?writeRetryPolicy=writeRetryPolicy) } namespace Equinox.Cosmos.Core @@ -1350,4 +1328,4 @@ module Events = /// Obtains the `index` from the current write Position let getNextIndex (ctx: Context) (streamName: string) : Async = - ctx.Sync(ctx.CreateStream streamName) |> stripPosition \ No newline at end of file + ctx.Sync(ctx.CreateStream streamName) |> stripPosition diff --git a/src/Equinox.Cosmos/Equinox.Cosmos.fsproj b/src/Equinox.Cosmos/Equinox.Cosmos.fsproj index df7f0f958..e8b1ad7bf 100644 --- a/src/Equinox.Cosmos/Equinox.Cosmos.fsproj +++ b/src/Equinox.Cosmos/Equinox.Cosmos.fsproj @@ -27,8 +27,9 @@ - - + + + \ No newline at end of file diff --git a/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs b/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs index e03f7c005..0240c7d04 100644 --- a/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs +++ b/tests/Equinox.Cosmos.Integration/CosmosFixtures.fs @@ -34,4 +34,4 @@ let containers = let createCosmosContext connection batchSize = let gateway = Gateway(connection, BatchingPolicy(defaultMaxItems=batchSize)) - Context(gateway, containers) \ No newline at end of file + Context(gateway, containers) diff --git a/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj b/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj index 84b0cc9ad..d23f3f845 100644 --- a/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj +++ b/tests/Equinox.Cosmos.Integration/Equinox.Cosmos.Integration.fsproj @@ -28,6 +28,7 @@ + diff --git a/tools/Equinox.Tool/Program.fs b/tools/Equinox.Tool/Program.fs index 78372342f..c668ca3b5 100644 --- a/tools/Equinox.Tool/Program.fs +++ b/tools/Equinox.Tool/Program.fs @@ -334,11 +334,10 @@ module SqlInit = | _ -> failwith "please specify a `ms`,`my` or `pg` endpoint" } module CosmosStats = - type Equinox.Cosmos.Store.Container with + type Microsoft.Azure.Cosmos.Container with // NB DO NOT CONSIDER PROMULGATING THIS HACK - member container.QueryValue<'T>(sqlQuery : string, ?options) = - let options = defaultArg options null - let query : seq<'T> = container.Client.CreateDocumentQuery<'T>(container.CollectionUri, sqlExpression = sqlQuery, feedOptions = options) :> _ + member container.QueryValue<'T>(sqlQuery : string) = + let query : seq<'T> = failwith "TODO translate" //container.ReadItemAsync(sqlQuery) :> _ query |> Seq.exactlyOne let run (log : ILogger, verboseConsole, maybeSeq) (args : ParseResults) = async { match args.TryGetSubCommand() with @@ -347,7 +346,7 @@ module CosmosStats = let doS = doS || (not doD && not doE) // default to counting streams only unless otherwise specified let inParallel = args.Contains Parallel let! _storeLog,conn,dName,cName = CosmosInit.conn (log,verboseConsole,maybeSeq) sargs - let container = Equinox.Cosmos.Store.Container(conn.Client,dName,cName) + let container = conn.Client.GetDatabase(dName).GetContainer(cName) let ops = [ if doS then yield "Streams", """SELECT VALUE COUNT(1) FROM c WHERE c.id="-1" """ if doD then yield "Documents", """SELECT VALUE COUNT(1) FROM c""" @@ -355,7 +354,7 @@ module CosmosStats = log.Information("Computing {measures} ({mode})", Seq.map fst ops, (if inParallel then "in parallel" else "serially")) ops |> Seq.map (fun (name,sql) -> async { log.Debug("Running query: {sql}", sql) - let res = container.QueryValue(sql, Microsoft.Azure.Documents.Client.FeedOptions(EnableCrossPartitionQuery=true)) + let res = container.QueryValue(sql) log.Information("{stat}: {result:N0}", name, res)}) |> if inParallel then Async.Parallel else Async.ParallelThrottled 1 // TOCONSIDER replace with Async.Sequence when using new enough FSharp.Core |> Async.Ignore @@ -436,4 +435,4 @@ let main argv = with e -> log.Debug(e, "Fatal error; exiting"); reraise () with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1 | Storage.MissingArg msg -> eprintfn "%s" msg; 1 - | e -> eprintfn "%s" e.Message; 1 \ No newline at end of file + | e -> eprintfn "%s" e.Message; 1