Skip to content

Commit

Permalink
Refactored mongo index initializer (#1604)
Browse files Browse the repository at this point in the history
* refactored mongo initializer

* Update IStorageInitializer.MongoDB.cs

using default index naming
  • Loading branch information
demorgi authored Oct 30, 2024
1 parent 83b4217 commit 6f6f057
Showing 1 changed file with 33 additions and 25 deletions.
58 changes: 33 additions & 25 deletions src/DotNetCore.CAP.MongoDB/IStorageInitializer.MongoDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ await database.CreateCollectionAsync(options.LockCollection, cancellationToken:
.ConfigureAwait(false);

await Task.WhenAll(
TryCreateIndexesAsync<ReceivedMessage>(options.ReceivedCollection),
TryCreateIndexesAsync<PublishedMessage>(options.PublishedCollection)).ConfigureAwait(false);
CreateReceivedMessageIndexesAsync(),
CreatePublishedMessageIndexesAsync()).ConfigureAwait(false);

if (_capOptions.Value.UseStorageLock)
{
Expand All @@ -88,33 +88,41 @@ await database.GetCollection<Lock>(options.LockCollection)

_logger.LogDebug("Ensuring all create database tables script are applied.");


async Task TryCreateIndexesAsync<T>(string collectionName)
async Task CreateReceivedMessageIndexesAsync()
{
var indexNames = new[] { "Name", "Added", "ExpiresAt", "StatusName", "Retries", "Version" };
var col = database.GetCollection<T>(collectionName);
using (var cursor = await col.Indexes.ListAsync(cancellationToken).ConfigureAwait(false))
IndexKeysDefinitionBuilder<ReceivedMessage> builder = Builders<ReceivedMessage>.IndexKeys;
var col = database.GetCollection<ReceivedMessage>(options.ReceivedCollection);

CreateIndexModel<ReceivedMessage>[] indexes =
{
var existingIndexes = await cursor.ToListAsync(cancellationToken).ConfigureAwait(false);
var existingIndexNames = existingIndexes.Select(o => o["name"].AsString).ToArray();
indexNames = indexNames.Except(existingIndexNames).ToArray();
}
new(builder.Ascending(x => x.Name)),
new(builder.Ascending(x => x.Added)),
new(builder.Ascending(x => x.ExpiresAt)),
new(builder.Ascending(x => x.StatusName)),
new(builder.Ascending(x => x.Retries)),
new(builder.Ascending(x => x.Version))
};

await col.Indexes.CreateManyAsync(indexes, cancellationToken);
}

if (indexNames.Any() == false)
return;
async Task CreatePublishedMessageIndexesAsync()
{
IndexKeysDefinitionBuilder<PublishedMessage> builder = Builders<PublishedMessage>.IndexKeys;
var col = database.GetCollection<PublishedMessage>(options.PublishedCollection);

var indexes = indexNames.Select(indexName =>
CreateIndexModel<PublishedMessage>[] indexes =
{
var indexOptions = new CreateIndexOptions
{
Name = indexName,
Background = true
};
var indexBuilder = Builders<T>.IndexKeys;
return new CreateIndexModel<T>(indexBuilder.Ascending(indexName), indexOptions);
}).ToArray();

await col.Indexes.CreateManyAsync(indexes, cancellationToken).ConfigureAwait(false);
new(builder.Ascending(x => x.Name)),
new(builder.Ascending(x => x.Added)),
new(builder.Ascending(x => x.ExpiresAt)),
new(builder.Ascending(x => x.StatusName)),
new(builder.Ascending(x => x.Retries)),
new(builder.Ascending(x => x.Version)),
new(builder.Ascending(x => x.StatusName).Ascending(x => x.ExpiresAt))
};

await col.Indexes.CreateManyAsync(indexes, cancellationToken);
}
}
}
}

0 comments on commit 6f6f057

Please sign in to comment.