From f0c18eb34eee97e3b981c6a02bf1c3b4f7e436fb Mon Sep 17 00:00:00 2001 From: Albi Date: Thu, 14 Dec 2023 12:35:48 +0100 Subject: [PATCH] - RetryFailedMessage cooldown time is now dynamic and configurable. (#1455) * - RetryFailedMessage cooldown time is now dynamic and configurable. * - update documentation to include RetryCoolDownSeconds everywhere where 4 minutes was mentioned * - change the name of RetryCoolDownSeconds option into FallbackWindowLookbackSeconds (in docs and code) * - add warning to log if setting is set to lower than minimum sugested. --- .../user-guide/en/cap/configuration.md | 12 +++++++-- docs/content/user-guide/en/cap/messaging.md | 2 +- .../IDataStorage.InMemory.cs | 8 +++--- .../IDataStorage.MongoDB.cs | 8 +++--- .../IDataStorage.MySql.cs | 12 ++++----- .../IDataStorage.PostgreSql.cs | 12 ++++----- .../IDataStorage.SqlServer.cs | 12 ++++----- src/DotNetCore.CAP/CAP.Options.cs | 3 +++ .../Persistence/IDataStorage.cs | 4 +-- .../Processor/IProcessor.NeedRetry.cs | 27 ++++++++++++++----- 10 files changed, 62 insertions(+), 38 deletions(-) diff --git a/docs/content/user-guide/en/cap/configuration.md b/docs/content/user-guide/en/cap/configuration.md index 235d4b993..d47858fa5 100644 --- a/docs/content/user-guide/en/cap/configuration.md +++ b/docs/content/user-guide/en/cap/configuration.md @@ -82,7 +82,7 @@ During the message sending process if message transport fails, CAP will try to s During the message sending process if consumption method fails, CAP will try to execute the method again. This configuration option is used to configure the interval between each retry. !!! WARNING "Retry & Interval" - By default if failure occurs on send or consume, retry will start after **4 minutes** in order to avoid possible problems caused by setting message state delays. + By default if failure occurs on send or consume, retry will start after **4 minutes** (FallbackWindowLookbackSeconds) in order to avoid possible problems caused by setting message state delays. Failures in the process of sending and consuming messages will be retried 3 times immediately, and will be retried polling after 3 times, at which point the FailedRetryInterval configuration will take effect. !!! WARNING "Multi-instance concurrent retries" @@ -112,6 +112,14 @@ Number of consumer threads, when this value is greater than 1, the order of mess Maximum number of retries. When this value is reached, retry will stop and the maximum number of retries will be modified by setting this parameter. + +#### FallbackWindowLookbackSeconds + +> Default: 240 + +Time in seconds to wait before continue retrying. + + #### FailedThresholdCallback > Default: NULL @@ -146,4 +154,4 @@ By default, CAP will only read one message from the message queue, then execute If set to true, the consumer will prefetch some messages to the memory queue, and then distribute them to the .NET thread pool for execution. !!! note "Precautions" - Setting it to true may cause some problems. When the subscription method executes too slowly and takes too long, it will cause the retry thread to pick up messages that have not yet been executed. The retry thread picks up messages from 4 minutes ago by default, that is to say, if the message backlog of more than 4 minutes on the consumer side will be picked up again and executed again \ No newline at end of file + Setting it to true may cause some problems. When the subscription method executes too slowly and takes too long, it will cause the retry thread to pick up messages that have not yet been executed. The retry thread picks up messages from 4 minutes (FallbackWindowLookbackSeconds) ago by default , that is to say, if the message backlog of more than 4 minutes (FallbackWindowLookbackSeconds) on the consumer side will be picked up again and executed again \ No newline at end of file diff --git a/docs/content/user-guide/en/cap/messaging.md b/docs/content/user-guide/en/cap/messaging.md index 2285e2369..ec49b9ef4 100644 --- a/docs/content/user-guide/en/cap/messaging.md +++ b/docs/content/user-guide/en/cap/messaging.md @@ -123,7 +123,7 @@ Retrying plays an important role in the overall CAP architecture design, CAP ret ### Send retry -During the message sending process, when the broker crashes or the connection fails or an abnormality occurs, CAP will retry the sending. Retry 3 times for the first time, retry every minute after 4 minutes, and +1 retry. When the total number of retries reaches 50, CAP will stop retrying. +During the message sending process, when the broker crashes or the connection fails or an abnormality occurs, CAP will retry the sending. Retry 3 times for the first time, retry every minute after 4 minutes (FallbackWindowLookbackSeconds), and +1 retry. When the total number of retries reaches 50, CAP will stop retrying. You can adjust the total number of retries by setting [FailedRetryCount](../configuration#failedretrycount) in CapOptions Or use [FailedThresholdCallback](../configuration#failedthresholdcallback) to receive notifications when the maximum retry count is reached. diff --git a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs index 11cb32090..9346e73d4 100644 --- a/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs +++ b/src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs @@ -180,11 +180,11 @@ public Task DeleteExpiresAsync(string table, DateTime timeout, int batchCou return Task.FromResult(removed); } - public Task> GetPublishedMessagesOfNeedRetry() + public Task> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime) { IEnumerable result = PublishedMessages.Values .Where(x => x.Retries < _capOptions.Value.FailedRetryCount - && x.Added < DateTime.Now.AddSeconds(-10) + && x.Added < DateTime.Now.Subtract(coolDownTime) && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) .Take(200) .Select(x => (MediumMessage)x).ToList(); @@ -197,11 +197,11 @@ public Task> GetPublishedMessagesOfNeedRetry() return Task.FromResult(result); } - public Task> GetReceivedMessagesOfNeedRetry() + public Task> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime) { IEnumerable result = ReceivedMessages.Values .Where(x => x.Retries < _capOptions.Value.FailedRetryCount - && x.Added < DateTime.Now.AddSeconds(-10) + && x.Added < DateTime.Now.Subtract(coolDownTime) && (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed)) .Take(200) .Select(x => (MediumMessage)x).ToList(); diff --git a/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs b/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs index 2cef7323a..d9563fd3b 100644 --- a/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs +++ b/src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs @@ -248,9 +248,9 @@ public async Task DeleteExpiresAsync(string collection, DateTime timeout, i } } - public async Task> GetPublishedMessagesOfNeedRetry() + public async Task> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime) { - var fourMinAgo = DateTime.Now.AddMinutes(-4); + var fourMinAgo = DateTime.Now.Subtract(coolDownTime); var collection = _database.GetCollection(_options.Value.PublishedCollection); var queryResult = await collection .Find(x => x.Retries < _capOptions.Value.FailedRetryCount @@ -269,9 +269,9 @@ public async Task> GetPublishedMessagesOfNeedRetry() }).ToList(); } - public async Task> GetReceivedMessagesOfNeedRetry() + public async Task> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime) { - var fourMinAgo = DateTime.Now.AddMinutes(-4); + var fourMinAgo = DateTime.Now.Subtract(coolDownTime); var collection = _database.GetCollection(_options.Value.ReceivedCollection); var queryResult = await collection .Find(x => x.Retries < _capOptions.Value.FailedRetryCount diff --git a/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs index 6d15a21c8..b1c3254f7 100644 --- a/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs +++ b/src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs @@ -216,14 +216,14 @@ public async Task DeleteExpiresAsync(string table, DateTime timeout, int ba .ConfigureAwait(false); } - public async Task> GetPublishedMessagesOfNeedRetry() + public async Task> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime) { - return await GetMessagesOfNeedRetryAsync(_pubName).ConfigureAwait(false); + return await GetMessagesOfNeedRetryAsync(_pubName, coolDownTime).ConfigureAwait(false); } - public async Task> GetReceivedMessagesOfNeedRetry() + public async Task> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime) { - return await GetMessagesOfNeedRetryAsync(_recName).ConfigureAwait(false); + return await GetMessagesOfNeedRetryAsync(_recName, coolDownTime).ConfigureAwait(false); } public async Task ScheduleMessagesOfDelayedAsync(Func, Task> scheduleTask, @@ -313,9 +313,9 @@ private async Task StoreReceivedMessage(object[] sqlParams) await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false); } - private async Task> GetMessagesOfNeedRetryAsync(string tableName) + private async Task> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan coolDownTime) { - var fourMinAgo = DateTime.Now.AddMinutes(-4); + var fourMinAgo = DateTime.Now.Subtract(coolDownTime); var sql = $"SELECT `Id`,`Content`,`Retries`,`Added` FROM `{tableName}` WHERE `Retries`<@Retries " + $"AND `Version`=@Version AND `Added`<@Added AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;"; diff --git a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs index c2e4398e5..264d74882 100644 --- a/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs +++ b/src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs @@ -214,14 +214,14 @@ public async Task DeleteExpiresAsync(string table, DateTime timeout, int ba .ConfigureAwait(false); } - public async Task> GetPublishedMessagesOfNeedRetry() + public async Task> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime) { - return await GetMessagesOfNeedRetryAsync(_pubName).ConfigureAwait(false); + return await GetMessagesOfNeedRetryAsync(_pubName, coolDownTime).ConfigureAwait(false); } - public async Task> GetReceivedMessagesOfNeedRetry() + public async Task> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime) { - return await GetMessagesOfNeedRetryAsync(_recName).ConfigureAwait(false); + return await GetMessagesOfNeedRetryAsync(_recName, coolDownTime).ConfigureAwait(false); } public async Task ScheduleMessagesOfDelayedAsync(Func, Task> scheduleTask, @@ -313,9 +313,9 @@ private async Task StoreReceivedMessage(object[] sqlParams) await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false); } - private async Task> GetMessagesOfNeedRetryAsync(string tableName) + private async Task> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan coolDownTime) { - var fourMinAgo = DateTime.Now.AddMinutes(-4); + var fourMinAgo = DateTime.Now.Subtract(coolDownTime); var sql = $"SELECT \"Id\",\"Content\",\"Retries\",\"Added\" FROM {tableName} WHERE \"Retries\"<@Retries " + $"AND \"Version\"=@Version AND \"Added\"<@Added AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;"; diff --git a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs index 5afd562f6..fd81a6e14 100644 --- a/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs +++ b/src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs @@ -213,14 +213,14 @@ public async Task DeleteExpiresAsync(string table, DateTime timeout, int ba new SqlParameter("@timeout", timeout), new SqlParameter("@batchCount", batchCount)).ConfigureAwait(false); } - public async Task> GetPublishedMessagesOfNeedRetry() + public async Task> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime) { - return await GetMessagesOfNeedRetryAsync(_pubName).ConfigureAwait(false); + return await GetMessagesOfNeedRetryAsync(_pubName, coolDownTime).ConfigureAwait(false); } - public async Task> GetReceivedMessagesOfNeedRetry() + public async Task> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime) { - return await GetMessagesOfNeedRetryAsync(_recName).ConfigureAwait(false); + return await GetMessagesOfNeedRetryAsync(_recName, coolDownTime).ConfigureAwait(false); } public async Task ScheduleMessagesOfDelayedAsync(Func, Task> scheduleTask, @@ -307,9 +307,9 @@ private async Task StoreReceivedMessage(object[] sqlParams) await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false); } - private async Task> GetMessagesOfNeedRetryAsync(string tableName) + private async Task> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan substract) { - var fourMinAgo = DateTime.Now.AddMinutes(-4); + var fourMinAgo = DateTime.Now.Subtract(substract); var sql = $"SELECT TOP (200) Id, Content, Retries, Added FROM {tableName} WITH (readpast) WHERE Retries<@Retries " + $"AND Version=@Version AND Added<@Added AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')"; diff --git a/src/DotNetCore.CAP/CAP.Options.cs b/src/DotNetCore.CAP/CAP.Options.cs index f20a84c40..7f637561d 100644 --- a/src/DotNetCore.CAP/CAP.Options.cs +++ b/src/DotNetCore.CAP/CAP.Options.cs @@ -29,6 +29,7 @@ public CapOptions() DefaultGroupName = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name!.ToLower(); CollectorCleaningInterval = 300; UseDispatchingPerGroup = false; + FallbackWindowLookbackSeconds = 240; } internal IList Extensions { get; } @@ -102,6 +103,8 @@ public CapOptions() /// public bool UseDispatchingPerGroup { get; set; } + public int FallbackWindowLookbackSeconds { get; set; } + /// /// The interval of the collector processor deletes expired messages. /// Default is 300 seconds. diff --git a/src/DotNetCore.CAP/Persistence/IDataStorage.cs b/src/DotNetCore.CAP/Persistence/IDataStorage.cs index 6ff5051f8..d6ee056b8 100644 --- a/src/DotNetCore.CAP/Persistence/IDataStorage.cs +++ b/src/DotNetCore.CAP/Persistence/IDataStorage.cs @@ -33,11 +33,11 @@ public interface IDataStorage Task DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default); - Task> GetPublishedMessagesOfNeedRetry(); + Task> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime); Task ScheduleMessagesOfDelayedAsync(Func, Task> scheduleTask, CancellationToken token = default); - Task> GetReceivedMessagesOfNeedRetry(); + Task> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime); //dashboard api IMonitoringApi GetMonitoringApi(); diff --git a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs index ef976a6ce..b76729679 100644 --- a/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs +++ b/src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs @@ -16,12 +16,14 @@ namespace DotNetCore.CAP.Processor; public class MessageNeedToRetryProcessor : IProcessor { + const int minSuggestedValueForFallbackWindowLookbackSeconds = 30; private readonly ILogger _logger; private readonly IDispatcher _dispatcher; private readonly TimeSpan _waitingInterval; private readonly IOptions _options; private readonly IDataStorage _dataStorage; private readonly TimeSpan _ttl; + private readonly TimeSpan _coolDownTime; private readonly string _instance; private Task? _failedRetryConsumeTask; @@ -32,10 +34,13 @@ public MessageNeedToRetryProcessor(IOptions options, ILogger ProcessReceivedAsync(storage, context)); - + _ = _failedRetryConsumeTask.ContinueWith(_ => { _failedRetryConsumeTask = null; }); await context.WaitAsync(_waitingInterval).ConfigureAwait(false); @@ -69,7 +74,7 @@ private async Task ProcessPublishedAsync(IDataStorage connection, ProcessingCont if (_options.Value.UseStorageLock && !await connection.AcquireLockAsync($"publish_retry_{_options.Value.Version}", _ttl, _instance, context.CancellationToken)) return; - var messages = await GetSafelyAsync(connection.GetPublishedMessagesOfNeedRetry).ConfigureAwait(false); + var messages = await GetSafelyAsync(connection.GetPublishedMessagesOfNeedRetry, _coolDownTime).ConfigureAwait(false); foreach (var message in messages) { @@ -89,7 +94,7 @@ private async Task ProcessReceivedAsync(IDataStorage connection, ProcessingConte if (_options.Value.UseStorageLock && !await connection.AcquireLockAsync($"received_retry_{_options.Value.Version}", _ttl, _instance, context.CancellationToken)) return; - var messages = await GetSafelyAsync(connection.GetReceivedMessagesOfNeedRetry).ConfigureAwait(false); + var messages = await GetSafelyAsync(connection.GetReceivedMessagesOfNeedRetry, _coolDownTime).ConfigureAwait(false); foreach (var message in messages) { @@ -102,11 +107,11 @@ private async Task ProcessReceivedAsync(IDataStorage connection, ProcessingConte await connection.ReleaseLockAsync($"received_retry_{_options.Value.Version}", _instance, context.CancellationToken); } - private async Task> GetSafelyAsync(Func>> getMessagesAsync) + private async Task> GetSafelyAsync(Func>> getMessagesAsync, TimeSpan coolDownTime) { try { - return await getMessagesAsync().ConfigureAwait(false); + return await getMessagesAsync(coolDownTime).ConfigureAwait(false); } catch (Exception ex) { @@ -115,4 +120,12 @@ private async Task> GetSafelyAsync(Func>> return Enumerable.Empty(); } } + + private void CheckSafeOptionsSet() + { + if (_coolDownTime < TimeSpan.FromSeconds(minSuggestedValueForFallbackWindowLookbackSeconds)) + { + _logger.LogWarning("The provided FallbackWindowLookbackSeconds of {currentSetFallbackWindowLookbackSeconds} is set to a value lower than {minSuggestedSeconds} seconds. This might cause unwanted unsafe behavior if the consumer takes more than the provided FallbackWindowLookbackSeconds to execute. ", _options.Value.FallbackWindowLookbackSeconds, minSuggestedValueForFallbackWindowLookbackSeconds); + } + } } \ No newline at end of file