Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposed resolution for issue #35 #75

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion src/FSharp.Azure.StorageTypeProvider/Table/ProvidedTableTypes.fs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,16 @@ type AzureTable internal (defaultConnection, tableName) =
DynamicTableEntity(partitionKey, rowKey, ETag = "*"))
|> executeBatchOperation TableOperation.Delete table

/// Asyncronously deletes a batch of entities from the table using the supplied pairs of Partition and Row keys.
member __.DeleteAsync(entities, ?connectionString) = async{
let table = getTableForConnection (defaultArg connectionString defaultConnection)
return! entities
|> Seq.map (fun entityId ->
let Partition(partitionKey), Row(rowKey) = entityId
DynamicTableEntity(partitionKey, rowKey, ETag = "*"))
|> executeBatchOperationAsync TableOperation.Delete table
}

/// Deletes an entire partition from the table
member __.DeletePartition(partitionKey, ?connectionString) =
let table = getTableForConnection (defaultArg connectionString defaultConnection)
Expand All @@ -60,7 +70,25 @@ type AzureTable internal (defaultConnection, tableName) =
|> table.ExecuteQuery
|> Seq.map(fun e -> (Partition(e.PartitionKey), Row(e.RowKey)))
|> __.Delete
|> ignore
|> ignore

/// Asyncronously deletes an entire partition from the table
member __.DeletePartitionAsync(partitionKey, ?connectionString) = async{
let table = getTableForConnection (defaultArg connectionString defaultConnection)
let connStringToUse =
match connectionString with
| Some c -> c
| None -> defaultConnection
let filter = Table.TableQuery.GenerateFilterCondition ("PartitionKey", Table.QueryComparisons.Equal, partitionKey)
let projection = [|"RowKey"|]
let! qryResp = executeGenericQueryAsync connStringToUse table.Name Int32.MaxValue filter (fun e -> (Partition(e.PartitionKey), Row(e.RowKey)))

return!
qryResp
|> __.DeleteAsync
|> Async.Ignore
}


/// Gets the name of the table.
member __.Name = tableName
Expand Down
17 changes: 16 additions & 1 deletion src/FSharp.Azure.StorageTypeProvider/Table/TableQueryBuilder.fs
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,32 @@ let createTableQueryType (tableEntityType: ProvidedTypeDefinition) connection ta
"Timestamp", buildPropertyOperatorsType tableName "Timestamp" EdmType.DateTime tableQueryType ] @
[ for (name, value) in properties -> name, buildPropertyOperatorsType tableName name value.PropertyType tableQueryType ]
tableQueryType.AddMembersDelayed(fun () ->

let executeQueryMethodAsync =
let symbKind = (typeof<Async<_>> |> SymbolKind.Generic)
let retType = ProvidedSymbolType( symbKind,[tableEntityType.MakeArrayType()]) :> Type
ProvidedMethod
("ExecuteAsync", [ ProvidedParameter("maxResults", typeof<int>, optionalValue = 0)
ProvidedParameter("connectionString", typeof<string>, optionalValue = connection) ],
retType,
InvokeCode = (fun args -> <@@ executeQueryAsync (%%args.[2] : string) tableName %%args.[1] (composeAllFilters((%%args.[0]: obj) :?> string list)) @@>))
executeQueryMethodAsync.AddXmlDocDelayed <| fun _ -> "Executes the current query asyncronously."

let executeQueryMethod =
ProvidedMethod
("Execute", [ ProvidedParameter("maxResults", typeof<int>, optionalValue = 0)
ProvidedParameter("connectionString", typeof<string>, optionalValue = connection) ],
tableEntityType.MakeArrayType(),
InvokeCode = (fun args -> <@@ executeQuery (%%args.[2] : string) tableName %%args.[1] (composeAllFilters((%%args.[0]: obj) :?> string list)) @@>))
executeQueryMethod.AddXmlDocDelayed <| fun _ -> "Executes the current query."

let customQueryProperties =
[ for (name, operatorType) in operatorTypes ->
let queryProperty = ProvidedProperty("Where" + name + "Is" |> splitOnCaps, operatorType, GetterCode = (fun args -> <@@ (%%args.[0]: obj) :?> string list @@>))
queryProperty.AddXmlDocDelayed <| fun _ -> sprintf "Creates a query part for the %s property." name
queryProperty :> MemberInfo ]
(executeQueryMethod :> MemberInfo) :: customQueryProperties)

[[(executeQueryMethodAsync :> MemberInfo); (executeQueryMethod :> MemberInfo)]; customQueryProperties]
|> List.concat)

tableQueryType, operatorTypes |> List.unzip |> snd
140 changes: 112 additions & 28 deletions src/FSharp.Azure.StorageTypeProvider/Table/TableRepository.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ open Microsoft.WindowsAzure.Storage
open Microsoft.WindowsAzure.Storage.Table
open Microsoft.WindowsAzure.Storage.Table.Queryable
open System
open System.Collections.Generic
open System.Threading.Tasks

/// Suggests batch sizes based on a given entity type and published EDM property type sizes (source: https://msdn.microsoft.com/en-us/library/dd179338.aspx)
module private BatchCalculator =
Expand Down Expand Up @@ -62,19 +64,69 @@ let internal getRowsForSchema (rowCount: int) connection tableName =
|> Seq.truncate rowCount
|> Seq.toArray

let toLightweightTableEntity (dte:DynamicTableEntity) =
LightweightTableEntity(
Partition dte.PartitionKey,
Row dte.RowKey,
dte.Timestamp,
dte.Properties
|> Seq.map(fun p -> p.Key, p.Value.PropertyAsObject)
|> Map.ofSeq)

<<<<<<< HEAD
let executeQueryAsync connection tableName maxResults filterString = async {
let query = DynamicQuery().Where(filterString)
let query = if maxResults > 0 then query.Take(Nullable maxResults) else query

let resSet = ResizeArray<LightweightTableEntity>()
let getBatch contTkn = async {
let! batch = (getTable tableName connection).ExecuteQuerySegmentedAsync(query,contTkn) |> Async.AwaitTask
batch
|> Seq.map(toLightweightTableEntity)
|> resSet.AddRange
return contTkn
}
let! firstContTkn = getBatch null
let mutable contTkn = firstContTkn
while (contTkn <> null) do
let! nextTkn = getBatch contTkn
contTkn <- nextTkn

return resSet |> Array.ofSeq
}
=======
let executeGenericQueryAsync<'a> connection tableName maxResults filterString mapToReturnEntity = async{
let query = DynamicQuery().Where(filterString)
let query = if maxResults > 0 then query.Take(Nullable maxResults) else query

let resSet = ResizeArray<'a>()
let getBatch contTkn = async {
let! batch = (getTable tableName connection).ExecuteQuerySegmentedAsync(query,contTkn) |> Async.AwaitTask
batch
|> Seq.map(mapToReturnEntity)
|> resSet.AddRange
return contTkn
}
let! firstContTkn = getBatch null
let mutable contTkn = firstContTkn
while (contTkn <> null) do
let! nextTkn = getBatch contTkn
contTkn <- nextTkn

return resSet |> Array.ofSeq
}


let executeQueryAsync connection tableName maxResults filterString =
executeGenericQueryAsync connection tableName maxResults filterString toLightweightTableEntity
>>>>>>> async-tables-extrafuncs

let executeQuery connection tableName maxResults filterString =
let query = DynamicQuery().Where(filterString)
let query = if maxResults > 0 then query.Take(Nullable maxResults) else query

(getTable tableName connection).ExecuteQuery(query)
|> Seq.map(fun dte ->
LightweightTableEntity(
Partition dte.PartitionKey,
Row dte.RowKey,
dte.Timestamp,
dte.Properties
|> Seq.map(fun p -> p.Key, p.Value.PropertyAsObject)
|> Map.ofSeq))
|> Seq.map(toLightweightTableEntity)
|> Seq.toArray

let internal buildDynamicTableEntity(entity:LightweightTableEntity) =
Expand Down Expand Up @@ -107,7 +159,7 @@ let private batch size source =
| head::tail -> doBatch output (head::currentBatch) (counter + 1) tail
doBatch [] [] 0 (source |> Seq.toList)

let internal executeBatchOperation createTableOp (table:CloudTable) entities =
let private splitIntoBatches createTableOp entities =
let batchSize = entities |> Seq.head |> BatchCalculator.getBatchSize
entities
|> Seq.groupBy(fun (entity:DynamicTableEntity) -> entity.PartitionKey)
Expand All @@ -118,28 +170,60 @@ let internal executeBatchOperation createTableOp (table:CloudTable) entities =
let batchForPartition = TableBatchOperation()
entityBatch |> Seq.iter (createTableOp >> batchForPartition.Add)
partitionKey, entityBatch, batchForPartition))
|> Seq.map(fun (partitionKey, entityBatch, batchOperation) ->
let buildEntityId (entity:DynamicTableEntity) = Partition(entity.PartitionKey), Row(entity.RowKey)
let responses =
try
table.ExecuteBatch(batchOperation)

let private processErrorResp entityBatch buildEntityId (ex:StorageException) =
let requestInformation = ex.RequestInformation
match requestInformation.ExtendedErrorInformation.ErrorMessage.Split('\n').[0].Split(':') with
| [|index;message|] ->
match Int32.TryParse(index) with
| true, index ->
entityBatch
|> Seq.mapi(fun entityIndex entity ->
if entityIndex = index then EntityError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode)
else BatchOperationFailedError(buildEntityId entity))
| _ -> entityBatch |> Seq.map(fun entity -> BatchError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode))
| [|message|] -> entityBatch |> Seq.map(fun entity -> EntityError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode))
| _ -> entityBatch |> Seq.map(fun entity -> BatchError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode))

let internal executeBatchAsyncronously batchOp entityBatch buildEntityId (table:CloudTable) = async{
let! response = table.ExecuteBatchAsync(batchOp) |> Async.AwaitTask |> Async.Catch
match response with
| Choice1Of2 successResp ->
return
successResp
|> Seq.zip entityBatch
|> Seq.map(fun (entity, res) -> SuccessfulResponse(buildEntityId entity, res.HttpStatusCode))
with :? StorageException as ex ->
let requestInformation = ex.RequestInformation
match requestInformation.ExtendedErrorInformation.ErrorMessage.Split('\n').[0].Split(':') with
| [|index;message|] ->
match Int32.TryParse(index) with
| true, index ->
entityBatch
|> Seq.mapi(fun entityIndex entity ->
if entityIndex = index then EntityError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode)
else BatchOperationFailedError(buildEntityId entity))
| _ -> entityBatch |> Seq.map(fun entity -> BatchError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode))
| [|message|] -> entityBatch |> Seq.map(fun entity -> EntityError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode))
| _ -> entityBatch |> Seq.map(fun entity -> BatchError(buildEntityId entity, requestInformation.HttpStatusCode, requestInformation.ExtendedErrorInformation.ErrorCode))
partitionKey, responses |> Seq.toArray)
| Choice2Of2 err ->
return
match err with
| :? StorageException as ex -> processErrorResp entityBatch buildEntityId ex
| _ -> raise (err)
}

let internal executeBatchSyncronously batchOp entityBatch buildEntityId (table:CloudTable) =
try
table.ExecuteBatch(batchOp)
|> Seq.zip entityBatch
|> Seq.map(fun (entity, res) -> SuccessfulResponse(buildEntityId entity, res.HttpStatusCode))
with :? StorageException as ex -> processErrorResp entityBatch buildEntityId ex

let internal executeBatchOperationAsync createTableOp (table:CloudTable) entities = async {
return!
splitIntoBatches createTableOp entities
|> Seq.map(fun (partitionKey, entityBatch, batchOperation) -> async{
let buildEntityId (entity:DynamicTableEntity) = Partition(entity.PartitionKey), Row(entity.RowKey)
let! responses = executeBatchAsyncronously batchOperation entityBatch buildEntityId table
return (partitionKey, responses |> Seq.toArray)
})
|> Async.Parallel
}

let internal executeBatchOperation createTableOp (table:CloudTable) entities =
splitIntoBatches createTableOp entities
|> Seq.map(fun (partitionKey, entityBatch, batchOperation) ->
let buildEntityId (entity:DynamicTableEntity) = Partition(entity.PartitionKey), Row(entity.RowKey)
let responses = executeBatchSyncronously batchOperation entityBatch buildEntityId table
partitionKey, responses |> Seq.toArray)
|> Seq.toArray

let deleteEntities connection tableName entities =
Expand Down
24 changes: 22 additions & 2 deletions tests/IntegrationTests/TableUnitTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,16 @@ let ``Cloud Table Client relates to the same data as the type provider``() =
let ``DeletePartition deletes entries with given partition key``() =
table.DeletePartition "men"
test <@ table.Query().``Where Partition Key Is``.``Equal To``("men").Execute().Length = 0 @>


<<<<<<< HEAD
=======
[<Fact>]
[<ResetTableData>]
let ``DeletePartitionAsync deletes entries with given partition key``() =
table.DeletePartitionAsync "men" |> Async.RunSynchronously
test <@ table.Query().``Where Partition Key Is``.``Equal To``("men").Execute().Length = 0 @>

>>>>>>> async-tables-extrafuncs
[<Fact>]
[<ResetTableData>]
let ``Insert suceeds for entries over 4Mb``() =
Expand All @@ -189,4 +198,15 @@ let ``Insert suceeds for entries over 4Mb``() =
|> Array.collect snd
|> Array.filter (function | SuccessfulResponse _ -> false | _ -> true)
|> Array.length
test <@ failureCount = 0 @>
test <@ failureCount = 0 @>

[<Fact>]
[<ResetTableData>]
let ``Async query without arguments brings back all rows``() =
let length =
async{
let! results = table.Query().ExecuteAsync();
return results.Length
}
|> Async.RunSynchronously
test <@ length = 5 @>