From e06740ca2fecf770b871aa15737c1ea23de9a595 Mon Sep 17 00:00:00 2001 From: Stewart Robertson Date: Wed, 29 Jun 2016 13:09:36 +0100 Subject: [PATCH 1/2] Proposed resolution for issue #35 --- .../Table/TableQueryBuilder.fs | 17 +++++++- .../Table/TableRepository.fs | 39 +++++++++++++++---- tests/IntegrationTests/TableUnitTests.fs | 15 ++++++- 3 files changed, 60 insertions(+), 11 deletions(-) diff --git a/src/FSharp.Azure.StorageTypeProvider/Table/TableQueryBuilder.fs b/src/FSharp.Azure.StorageTypeProvider/Table/TableQueryBuilder.fs index 48df189..cfd8290 100644 --- a/src/FSharp.Azure.StorageTypeProvider/Table/TableQueryBuilder.fs +++ b/src/FSharp.Azure.StorageTypeProvider/Table/TableQueryBuilder.fs @@ -57,6 +57,17 @@ let createTableQueryType (tableEntityType: ProvidedTypeDefinition) connection ta "Timestamp", buildPropertyOperatorsType tableName "Timestamp" EdmType.DateTime tableQueryType ] @ [ for (name, value) in properties -> name, buildPropertyOperatorsType tableName name value.PropertyType tableQueryType ] tableQueryType.AddMembersDelayed(fun () -> + + let executeQueryMethodAsync = + let symbKind = (typeof> |> SymbolKind.Generic) + let retType = ProvidedSymbolType( symbKind,[tableEntityType.MakeArrayType()]) :> Type + ProvidedMethod + ("ExecuteAsync", [ ProvidedParameter("maxResults", typeof, optionalValue = 0) + ProvidedParameter("connectionString", typeof, optionalValue = connection) ], + retType, + InvokeCode = (fun args -> <@@ executeQueryAsync (%%args.[2] : string) tableName %%args.[1] (composeAllFilters((%%args.[0]: obj) :?> string list)) @@>)) + executeQueryMethodAsync.AddXmlDocDelayed <| fun _ -> "Executes the current query asyncronously." + let executeQueryMethod = ProvidedMethod ("Execute", [ ProvidedParameter("maxResults", typeof, optionalValue = 0) @@ -64,10 +75,14 @@ let createTableQueryType (tableEntityType: ProvidedTypeDefinition) connection ta tableEntityType.MakeArrayType(), InvokeCode = (fun args -> <@@ executeQuery (%%args.[2] : string) tableName %%args.[1] (composeAllFilters((%%args.[0]: obj) :?> string list)) @@>)) executeQueryMethod.AddXmlDocDelayed <| fun _ -> "Executes the current query." + let customQueryProperties = [ for (name, operatorType) in operatorTypes -> let queryProperty = ProvidedProperty("Where" + name + "Is" |> splitOnCaps, operatorType, GetterCode = (fun args -> <@@ (%%args.[0]: obj) :?> string list @@>)) queryProperty.AddXmlDocDelayed <| fun _ -> sprintf "Creates a query part for the %s property." name queryProperty :> MemberInfo ] - (executeQueryMethod :> MemberInfo) :: customQueryProperties) + + [[(executeQueryMethodAsync :> MemberInfo); (executeQueryMethod :> MemberInfo)]; customQueryProperties] + |> List.concat) + tableQueryType, operatorTypes |> List.unzip |> snd \ No newline at end of file diff --git a/src/FSharp.Azure.StorageTypeProvider/Table/TableRepository.fs b/src/FSharp.Azure.StorageTypeProvider/Table/TableRepository.fs index e202552..50d265f 100644 --- a/src/FSharp.Azure.StorageTypeProvider/Table/TableRepository.fs +++ b/src/FSharp.Azure.StorageTypeProvider/Table/TableRepository.fs @@ -62,19 +62,42 @@ let internal getRowsForSchema (rowCount: int) connection tableName = |> Seq.truncate rowCount |> Seq.toArray +let toLightweightTableEntity (dte:DynamicTableEntity) = + LightweightTableEntity( + Partition dte.PartitionKey, + Row dte.RowKey, + dte.Timestamp, + dte.Properties + |> Seq.map(fun p -> p.Key, p.Value.PropertyAsObject) + |> Map.ofSeq) + +let executeQueryAsync connection tableName maxResults filterString = async { + let query = DynamicQuery().Where(filterString) + let query = if maxResults > 0 then query.Take(Nullable maxResults) else query + + let resSet = ResizeArray() + let getBatch contTkn = async { + let! batch = (getTable tableName connection).ExecuteQuerySegmentedAsync(query,contTkn) |> Async.AwaitTask + batch + |> Seq.map(toLightweightTableEntity) + |> resSet.AddRange + return contTkn + } + let! firstContTkn = getBatch null + let mutable contTkn = firstContTkn + while (contTkn <> null) do + let! nextTkn = getBatch contTkn + contTkn <- nextTkn + + return resSet |> Array.ofSeq + } + let executeQuery connection tableName maxResults filterString = let query = DynamicQuery().Where(filterString) let query = if maxResults > 0 then query.Take(Nullable maxResults) else query (getTable tableName connection).ExecuteQuery(query) - |> Seq.map(fun dte -> - LightweightTableEntity( - Partition dte.PartitionKey, - Row dte.RowKey, - dte.Timestamp, - dte.Properties - |> Seq.map(fun p -> p.Key, p.Value.PropertyAsObject) - |> Map.ofSeq)) + |> Seq.map(toLightweightTableEntity) |> Seq.toArray let internal buildDynamicTableEntity(entity:LightweightTableEntity) = diff --git a/tests/IntegrationTests/TableUnitTests.fs b/tests/IntegrationTests/TableUnitTests.fs index 21259b7..55ec828 100644 --- a/tests/IntegrationTests/TableUnitTests.fs +++ b/tests/IntegrationTests/TableUnitTests.fs @@ -169,7 +169,7 @@ let ``Cloud Table Client relates to the same data as the type provider``() = let ``DeletePartition deletes entries with given partition key``() = table.DeletePartition "men" test <@ table.Query().``Where Partition Key Is``.``Equal To``("men").Execute().Length = 0 @> - + [] [] let ``Insert suceeds for entries over 4Mb``() = @@ -189,4 +189,15 @@ let ``Insert suceeds for entries over 4Mb``() = |> Array.collect snd |> Array.filter (function | SuccessfulResponse _ -> false | _ -> true) |> Array.length - test <@ failureCount = 0 @> \ No newline at end of file + test <@ failureCount = 0 @> + +[] +[] +let ``Async query without arguments brings back all rows``() = + let length = + async{ + let! results = table.Query().ExecuteAsync(); + return results.Length + } + |> Async.RunSynchronously + test <@ length = 5 @> From 0907520d64a407858770c56dff36185b5373a866 Mon Sep 17 00:00:00 2001 From: Stewart Robertson Date: Mon, 4 Jul 2016 07:58:00 +0100 Subject: [PATCH 2/2] Added .DeleteAsync and .DeletePartitionAsync members --- .../Table/ProvidedTableTypes.fs | 30 +++++- .../Table/TableRepository.fs | 101 ++++++++++++++---- tests/IntegrationTests/TableUnitTests.fs | 9 ++ 3 files changed, 119 insertions(+), 21 deletions(-) diff --git a/src/FSharp.Azure.StorageTypeProvider/Table/ProvidedTableTypes.fs b/src/FSharp.Azure.StorageTypeProvider/Table/ProvidedTableTypes.fs index 125ca08..bf16447 100644 --- a/src/FSharp.Azure.StorageTypeProvider/Table/ProvidedTableTypes.fs +++ b/src/FSharp.Azure.StorageTypeProvider/Table/ProvidedTableTypes.fs @@ -51,6 +51,16 @@ type AzureTable internal (defaultConnection, tableName) = DynamicTableEntity(partitionKey, rowKey, ETag = "*")) |> executeBatchOperation TableOperation.Delete table + /// Asyncronously deletes a batch of entities from the table using the supplied pairs of Partition and Row keys. + member __.DeleteAsync(entities, ?connectionString) = async{ + let table = getTableForConnection (defaultArg connectionString defaultConnection) + return! entities + |> Seq.map (fun entityId -> + let Partition(partitionKey), Row(rowKey) = entityId + DynamicTableEntity(partitionKey, rowKey, ETag = "*")) + |> executeBatchOperationAsync TableOperation.Delete table + } + /// Deletes an entire partition from the table member __.DeletePartition(partitionKey, ?connectionString) = let table = getTableForConnection (defaultArg connectionString defaultConnection) @@ -60,7 +70,25 @@ type AzureTable internal (defaultConnection, tableName) = |> table.ExecuteQuery |> Seq.map(fun e -> (Partition(e.PartitionKey), Row(e.RowKey))) |> __.Delete - |> ignore + |> ignore + + /// Asyncronously deletes an entire partition from the table + member __.DeletePartitionAsync(partitionKey, ?connectionString) = async{ + let table = getTableForConnection (defaultArg connectionString defaultConnection) + let connStringToUse = + match connectionString with + | Some c -> c + | None -> defaultConnection + let filter = Table.TableQuery.GenerateFilterCondition ("PartitionKey", Table.QueryComparisons.Equal, partitionKey) + let projection = [|"RowKey"|] + let! qryResp = executeGenericQueryAsync connStringToUse table.Name Int32.MaxValue filter (fun e -> (Partition(e.PartitionKey), Row(e.RowKey))) + + return! + qryResp + |> __.DeleteAsync + |> Async.Ignore + } + /// Gets the name of the table. member __.Name = tableName diff --git a/src/FSharp.Azure.StorageTypeProvider/Table/TableRepository.fs b/src/FSharp.Azure.StorageTypeProvider/Table/TableRepository.fs index 50d265f..27c46ca 100644 --- a/src/FSharp.Azure.StorageTypeProvider/Table/TableRepository.fs +++ b/src/FSharp.Azure.StorageTypeProvider/Table/TableRepository.fs @@ -7,6 +7,8 @@ open Microsoft.WindowsAzure.Storage open Microsoft.WindowsAzure.Storage.Table open Microsoft.WindowsAzure.Storage.Table.Queryable open System +open System.Collections.Generic +open System.Threading.Tasks /// Suggests batch sizes based on a given entity type and published EDM property type sizes (source: https://msdn.microsoft.com/en-us/library/dd179338.aspx) module private BatchCalculator = @@ -71,6 +73,7 @@ let toLightweightTableEntity (dte:DynamicTableEntity) = |> Seq.map(fun p -> p.Key, p.Value.PropertyAsObject) |> Map.ofSeq) +<<<<<<< HEAD let executeQueryAsync connection tableName maxResults filterString = async { let query = DynamicQuery().Where(filterString) let query = if maxResults > 0 then query.Take(Nullable maxResults) else query @@ -91,6 +94,32 @@ let executeQueryAsync connection tableName maxResults filterString = async { return resSet |> Array.ofSeq } +======= +let executeGenericQueryAsync<'a> connection tableName maxResults filterString mapToReturnEntity = async{ + let query = DynamicQuery().Where(filterString) + let query = if maxResults > 0 then query.Take(Nullable maxResults) else query + + let resSet = ResizeArray<'a>() + let getBatch contTkn = async { + let! batch = (getTable tableName connection).ExecuteQuerySegmentedAsync(query,contTkn) |> Async.AwaitTask + batch + |> Seq.map(mapToReturnEntity) + |> resSet.AddRange + return contTkn + } + let! firstContTkn = getBatch null + let mutable contTkn = firstContTkn + while (contTkn <> null) do + let! nextTkn = getBatch contTkn + contTkn <- nextTkn + + return resSet |> Array.ofSeq +} + + +let executeQueryAsync connection tableName maxResults filterString = + executeGenericQueryAsync connection tableName maxResults filterString toLightweightTableEntity +>>>>>>> async-tables-extrafuncs let executeQuery connection tableName maxResults filterString = let query = DynamicQuery().Where(filterString) @@ -130,7 +159,7 @@ let private batch size source = | head::tail -> doBatch output (head::currentBatch) (counter + 1) tail doBatch [] [] 0 (source |> Seq.toList) -let internal executeBatchOperation createTableOp (table:CloudTable) entities = +let private splitIntoBatches createTableOp entities = let batchSize = entities |> Seq.head |> BatchCalculator.getBatchSize entities |> Seq.groupBy(fun (entity:DynamicTableEntity) -> entity.PartitionKey) @@ -141,28 +170,60 @@ let internal executeBatchOperation createTableOp (table:CloudTable) entities = let batchForPartition = TableBatchOperation() entityBatch |> Seq.iter (createTableOp >> batchForPartition.Add) partitionKey, entityBatch, batchForPartition)) - |> Seq.map(fun (partitionKey, entityBatch, batchOperation) -> - let buildEntityId (entity:DynamicTableEntity) = Partition(entity.PartitionKey), Row(entity.RowKey) - let responses = - try - table.ExecuteBatch(batchOperation) + +let private processErrorResp entityBatch buildEntityId (ex:StorageException) = + let requestInformation = ex.RequestInformation + match requestInformation.ExtendedErrorInformation.ErrorMessage.Split('\n').[0].Split(':') with + | [|index;message|] -> + match Int32.TryParse(index) with + | true, index -> + entityBatch + |> Seq.mapi(fun entityIndex entity -> + if entityIndex = index then EntityError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode) + else BatchOperationFailedError(buildEntityId entity)) + | _ -> entityBatch |> Seq.map(fun entity -> BatchError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode)) + | [|message|] -> entityBatch |> Seq.map(fun entity -> EntityError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode)) + | _ -> entityBatch |> Seq.map(fun entity -> BatchError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode)) + +let internal executeBatchAsyncronously batchOp entityBatch buildEntityId (table:CloudTable) = async{ + let! response = table.ExecuteBatchAsync(batchOp) |> Async.AwaitTask |> Async.Catch + match response with + | Choice1Of2 successResp -> + return + successResp |> Seq.zip entityBatch |> Seq.map(fun (entity, res) -> SuccessfulResponse(buildEntityId entity, res.HttpStatusCode)) - with :? StorageException as ex -> - let requestInformation = ex.RequestInformation - match requestInformation.ExtendedErrorInformation.ErrorMessage.Split('\n').[0].Split(':') with - | [|index;message|] -> - match Int32.TryParse(index) with - | true, index -> - entityBatch - |> Seq.mapi(fun entityIndex entity -> - if entityIndex = index then EntityError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode) - else BatchOperationFailedError(buildEntityId entity)) - | _ -> entityBatch |> Seq.map(fun entity -> BatchError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode)) - | [|message|] -> entityBatch |> Seq.map(fun entity -> EntityError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode)) - | _ -> entityBatch |> Seq.map(fun entity -> BatchError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode)) - partitionKey, responses |> Seq.toArray) + | Choice2Of2 err -> + return + match err with + | :? StorageException as ex -> processErrorResp entityBatch buildEntityId ex + | _ -> raise (err) + } + +let internal executeBatchSyncronously batchOp entityBatch buildEntityId (table:CloudTable) = + try + table.ExecuteBatch(batchOp) + |> Seq.zip entityBatch + |> Seq.map(fun (entity, res) -> SuccessfulResponse(buildEntityId entity, res.HttpStatusCode)) + with :? StorageException as ex -> processErrorResp entityBatch buildEntityId ex + +let internal executeBatchOperationAsync createTableOp (table:CloudTable) entities = async { + return! + splitIntoBatches createTableOp entities + |> Seq.map(fun (partitionKey, entityBatch, batchOperation) -> async{ + let buildEntityId (entity:DynamicTableEntity) = Partition(entity.PartitionKey), Row(entity.RowKey) + let! responses = executeBatchAsyncronously batchOperation entityBatch buildEntityId table + return (partitionKey, responses |> Seq.toArray) + }) + |> Async.Parallel + } +let internal executeBatchOperation createTableOp (table:CloudTable) entities = + splitIntoBatches createTableOp entities + |> Seq.map(fun (partitionKey, entityBatch, batchOperation) -> + let buildEntityId (entity:DynamicTableEntity) = Partition(entity.PartitionKey), Row(entity.RowKey) + let responses = executeBatchSyncronously batchOperation entityBatch buildEntityId table + partitionKey, responses |> Seq.toArray) |> Seq.toArray let deleteEntities connection tableName entities = diff --git a/tests/IntegrationTests/TableUnitTests.fs b/tests/IntegrationTests/TableUnitTests.fs index 55ec828..8296ae9 100644 --- a/tests/IntegrationTests/TableUnitTests.fs +++ b/tests/IntegrationTests/TableUnitTests.fs @@ -170,6 +170,15 @@ let ``DeletePartition deletes entries with given partition key``() = table.DeletePartition "men" test <@ table.Query().``Where Partition Key Is``.``Equal To``("men").Execute().Length = 0 @> +<<<<<<< HEAD +======= +[] +[] +let ``DeletePartitionAsync deletes entries with given partition key``() = + table.DeletePartitionAsync "men" |> Async.RunSynchronously + test <@ table.Query().``Where Partition Key Is``.``Equal To``("men").Execute().Length = 0 @> + +>>>>>>> async-tables-extrafuncs [] [] let ``Insert suceeds for entries over 4Mb``() =