Skip to content

Commit

Permalink
- RetryFailedMessage cooldown time is now dynamic and configurable. (#…
Browse files Browse the repository at this point in the history
…1455)

* - RetryFailedMessage cooldown time is now dynamic and configurable.

* - update documentation to include RetryCoolDownSeconds everywhere where 4 minutes was mentioned

* - change the name of RetryCoolDownSeconds option into FallbackWindowLookbackSeconds (in docs and code)

* - add warning to log if setting is set to lower than minimum sugested.
  • Loading branch information
apatozi authored Dec 14, 2023
1 parent d24284b commit f0c18eb
Show file tree
Hide file tree
Showing 10 changed files with 62 additions and 38 deletions.
12 changes: 10 additions & 2 deletions docs/content/user-guide/en/cap/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ During the message sending process if message transport fails, CAP will try to s
During the message sending process if consumption method fails, CAP will try to execute the method again. This configuration option is used to configure the interval between each retry.

!!! WARNING "Retry & Interval"
By default if failure occurs on send or consume, retry will start after **4 minutes** in order to avoid possible problems caused by setting message state delays.
By default if failure occurs on send or consume, retry will start after **4 minutes** (FallbackWindowLookbackSeconds) in order to avoid possible problems caused by setting message state delays.
Failures in the process of sending and consuming messages will be retried 3 times immediately, and will be retried polling after 3 times, at which point the FailedRetryInterval configuration will take effect.

!!! WARNING "Multi-instance concurrent retries"
Expand Down Expand Up @@ -112,6 +112,14 @@ Number of consumer threads, when this value is greater than 1, the order of mess
Maximum number of retries. When this value is reached, retry will stop and the maximum number of retries will be modified by setting this parameter.


#### FallbackWindowLookbackSeconds

> Default: 240
Time in seconds to wait before continue retrying.


#### FailedThresholdCallback

> Default: NULL
Expand Down Expand Up @@ -146,4 +154,4 @@ By default, CAP will only read one message from the message queue, then execute
If set to true, the consumer will prefetch some messages to the memory queue, and then distribute them to the .NET thread pool for execution.

!!! note "Precautions"
Setting it to true may cause some problems. When the subscription method executes too slowly and takes too long, it will cause the retry thread to pick up messages that have not yet been executed. The retry thread picks up messages from 4 minutes ago by default, that is to say, if the message backlog of more than 4 minutes on the consumer side will be picked up again and executed again
Setting it to true may cause some problems. When the subscription method executes too slowly and takes too long, it will cause the retry thread to pick up messages that have not yet been executed. The retry thread picks up messages from 4 minutes (FallbackWindowLookbackSeconds) ago by default , that is to say, if the message backlog of more than 4 minutes (FallbackWindowLookbackSeconds) on the consumer side will be picked up again and executed again
2 changes: 1 addition & 1 deletion docs/content/user-guide/en/cap/messaging.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ Retrying plays an important role in the overall CAP architecture design, CAP ret

### Send retry

During the message sending process, when the broker crashes or the connection fails or an abnormality occurs, CAP will retry the sending. Retry 3 times for the first time, retry every minute after 4 minutes, and +1 retry. When the total number of retries reaches 50, CAP will stop retrying.
During the message sending process, when the broker crashes or the connection fails or an abnormality occurs, CAP will retry the sending. Retry 3 times for the first time, retry every minute after 4 minutes (FallbackWindowLookbackSeconds), and +1 retry. When the total number of retries reaches 50, CAP will stop retrying.

You can adjust the total number of retries by setting [FailedRetryCount](../configuration#failedretrycount) in CapOptions Or use [FailedThresholdCallback](../configuration#failedthresholdcallback) to receive notifications when the maximum retry count is reached.

Expand Down
8 changes: 4 additions & 4 deletions src/DotNetCore.CAP.InMemoryStorage/IDataStorage.InMemory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -180,11 +180,11 @@ public Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCou
return Task.FromResult(removed);
}

public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime)
{
IEnumerable<MediumMessage> result = PublishedMessages.Values
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& x.Added < DateTime.Now.Subtract(coolDownTime)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.Select(x => (MediumMessage)x).ToList();
Expand All @@ -197,11 +197,11 @@ public Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
return Task.FromResult(result);
}

public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
public Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime)
{
IEnumerable<MediumMessage> result = ReceivedMessages.Values
.Where(x => x.Retries < _capOptions.Value.FailedRetryCount
&& x.Added < DateTime.Now.AddSeconds(-10)
&& x.Added < DateTime.Now.Subtract(coolDownTime)
&& (x.StatusName == StatusName.Scheduled || x.StatusName == StatusName.Failed))
.Take(200)
.Select(x => (MediumMessage)x).ToList();
Expand Down
8 changes: 4 additions & 4 deletions src/DotNetCore.CAP.MongoDB/IDataStorage.MongoDB.cs
Original file line number Diff line number Diff line change
Expand Up @@ -248,9 +248,9 @@ public async Task<int> DeleteExpiresAsync(string collection, DateTime timeout, i
}
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime)
{
var fourMinAgo = DateTime.Now.AddMinutes(-4);
var fourMinAgo = DateTime.Now.Subtract(coolDownTime);
var collection = _database.GetCollection<PublishedMessage>(_options.Value.PublishedCollection);
var queryResult = await collection
.Find(x => x.Retries < _capOptions.Value.FailedRetryCount
Expand All @@ -269,9 +269,9 @@ public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
}).ToList();
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime)
{
var fourMinAgo = DateTime.Now.AddMinutes(-4);
var fourMinAgo = DateTime.Now.Subtract(coolDownTime);
var collection = _database.GetCollection<ReceivedMessage>(_options.Value.ReceivedCollection);
var queryResult = await collection
.Find(x => x.Retries < _capOptions.Value.FailedRetryCount
Expand Down
12 changes: 6 additions & 6 deletions src/DotNetCore.CAP.MySql/IDataStorage.MySql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -216,14 +216,14 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
.ConfigureAwait(false);
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime)
{
return await GetMessagesOfNeedRetryAsync(_pubName).ConfigureAwait(false);
return await GetMessagesOfNeedRetryAsync(_pubName, coolDownTime).ConfigureAwait(false);
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime)
{
return await GetMessagesOfNeedRetryAsync(_recName).ConfigureAwait(false);
return await GetMessagesOfNeedRetryAsync(_recName, coolDownTime).ConfigureAwait(false);
}

public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask,
Expand Down Expand Up @@ -313,9 +313,9 @@ private async Task StoreReceivedMessage(object[] sqlParams)
await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false);
}

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName)
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan coolDownTime)
{
var fourMinAgo = DateTime.Now.AddMinutes(-4);
var fourMinAgo = DateTime.Now.Subtract(coolDownTime);
var sql =
$"SELECT `Id`,`Content`,`Retries`,`Added` FROM `{tableName}` WHERE `Retries`<@Retries " +
$"AND `Version`=@Version AND `Added`<@Added AND (`StatusName` = '{StatusName.Failed}' OR `StatusName` = '{StatusName.Scheduled}') LIMIT 200;";
Expand Down
12 changes: 6 additions & 6 deletions src/DotNetCore.CAP.PostgreSql/IDataStorage.PostgreSql.cs
Original file line number Diff line number Diff line change
Expand Up @@ -214,14 +214,14 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
.ConfigureAwait(false);
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime)
{
return await GetMessagesOfNeedRetryAsync(_pubName).ConfigureAwait(false);
return await GetMessagesOfNeedRetryAsync(_pubName, coolDownTime).ConfigureAwait(false);
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime)
{
return await GetMessagesOfNeedRetryAsync(_recName).ConfigureAwait(false);
return await GetMessagesOfNeedRetryAsync(_recName, coolDownTime).ConfigureAwait(false);
}

public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask,
Expand Down Expand Up @@ -313,9 +313,9 @@ private async Task StoreReceivedMessage(object[] sqlParams)
await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false);
}

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName)
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan coolDownTime)
{
var fourMinAgo = DateTime.Now.AddMinutes(-4);
var fourMinAgo = DateTime.Now.Subtract(coolDownTime);
var sql =
$"SELECT \"Id\",\"Content\",\"Retries\",\"Added\" FROM {tableName} WHERE \"Retries\"<@Retries " +
$"AND \"Version\"=@Version AND \"Added\"<@Added AND (\"StatusName\"='{StatusName.Failed}' OR \"StatusName\"='{StatusName.Scheduled}') LIMIT 200;";
Expand Down
12 changes: 6 additions & 6 deletions src/DotNetCore.CAP.SqlServer/IDataStorage.SqlServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -213,14 +213,14 @@ public async Task<int> DeleteExpiresAsync(string table, DateTime timeout, int ba
new SqlParameter("@timeout", timeout), new SqlParameter("@batchCount", batchCount)).ConfigureAwait(false);
}

public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry()
public async Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime)
{
return await GetMessagesOfNeedRetryAsync(_pubName).ConfigureAwait(false);
return await GetMessagesOfNeedRetryAsync(_pubName, coolDownTime).ConfigureAwait(false);
}

public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry()
public async Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime)
{
return await GetMessagesOfNeedRetryAsync(_recName).ConfigureAwait(false);
return await GetMessagesOfNeedRetryAsync(_recName, coolDownTime).ConfigureAwait(false);
}

public async Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask,
Expand Down Expand Up @@ -307,9 +307,9 @@ private async Task StoreReceivedMessage(object[] sqlParams)
await connection.ExecuteNonQueryAsync(sql, sqlParams: sqlParams).ConfigureAwait(false);
}

private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName)
private async Task<IEnumerable<MediumMessage>> GetMessagesOfNeedRetryAsync(string tableName, TimeSpan substract)
{
var fourMinAgo = DateTime.Now.AddMinutes(-4);
var fourMinAgo = DateTime.Now.Subtract(substract);
var sql =
$"SELECT TOP (200) Id, Content, Retries, Added FROM {tableName} WITH (readpast) WHERE Retries<@Retries " +
$"AND Version=@Version AND Added<@Added AND (StatusName = '{StatusName.Failed}' OR StatusName = '{StatusName.Scheduled}')";
Expand Down
3 changes: 3 additions & 0 deletions src/DotNetCore.CAP/CAP.Options.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ public CapOptions()
DefaultGroupName = "cap.queue." + Assembly.GetEntryAssembly()?.GetName().Name!.ToLower();
CollectorCleaningInterval = 300;
UseDispatchingPerGroup = false;
FallbackWindowLookbackSeconds = 240;
}

internal IList<ICapOptionsExtension> Extensions { get; }
Expand Down Expand Up @@ -102,6 +103,8 @@ public CapOptions()
/// </summary>
public bool UseDispatchingPerGroup { get; set; }

public int FallbackWindowLookbackSeconds { get; set; }

/// <summary>
/// The interval of the collector processor deletes expired messages.
/// Default is 300 seconds.
Expand Down
4 changes: 2 additions & 2 deletions src/DotNetCore.CAP/Persistence/IDataStorage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ public interface IDataStorage

Task<int> DeleteExpiresAsync(string table, DateTime timeout, int batchCount = 1000, CancellationToken token = default);

Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry();
Task<IEnumerable<MediumMessage>> GetPublishedMessagesOfNeedRetry(TimeSpan coolDownTime);

Task ScheduleMessagesOfDelayedAsync(Func<object, IEnumerable<MediumMessage>, Task> scheduleTask, CancellationToken token = default);

Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry();
Task<IEnumerable<MediumMessage>> GetReceivedMessagesOfNeedRetry(TimeSpan coolDownTime);

//dashboard api
IMonitoringApi GetMonitoringApi();
Expand Down
27 changes: 20 additions & 7 deletions src/DotNetCore.CAP/Processor/IProcessor.NeedRetry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,14 @@ namespace DotNetCore.CAP.Processor;

public class MessageNeedToRetryProcessor : IProcessor
{
const int minSuggestedValueForFallbackWindowLookbackSeconds = 30;
private readonly ILogger<MessageNeedToRetryProcessor> _logger;
private readonly IDispatcher _dispatcher;
private readonly TimeSpan _waitingInterval;
private readonly IOptions<CapOptions> _options;
private readonly IDataStorage _dataStorage;
private readonly TimeSpan _ttl;
private readonly TimeSpan _coolDownTime;
private readonly string _instance;
private Task? _failedRetryConsumeTask;

Expand All @@ -32,10 +34,13 @@ public MessageNeedToRetryProcessor(IOptions<CapOptions> options, ILogger<Message
_logger = logger;
_dispatcher = dispatcher;
_waitingInterval = TimeSpan.FromSeconds(options.Value.FailedRetryInterval);
_coolDownTime = TimeSpan.FromSeconds(options.Value.FallbackWindowLookbackSeconds);
_dataStorage = dataStorage;
_ttl = _waitingInterval.Add(TimeSpan.FromSeconds(10));

_instance = string.Concat(Helper.GetInstanceHostname(), "_", Util.GenerateWorkerId(1023));

CheckSafeOptionsSet();
}

public virtual async Task ProcessAsync(ProcessingContext context)
Expand All @@ -49,14 +54,14 @@ public virtual async Task ProcessAsync(ProcessingContext context)
if (_options.Value.UseStorageLock && _failedRetryConsumeTask is { IsCompleted: false })
{
await _dataStorage.RenewLockAsync($"received_retry_{_options.Value.Version}", _ttl, _instance, context.CancellationToken);

await context.WaitAsync(_waitingInterval).ConfigureAwait(false);

return;
}

_failedRetryConsumeTask = Task.Run(() => ProcessReceivedAsync(storage, context));

_ = _failedRetryConsumeTask.ContinueWith(_ => { _failedRetryConsumeTask = null; });

await context.WaitAsync(_waitingInterval).ConfigureAwait(false);
Expand All @@ -69,7 +74,7 @@ private async Task ProcessPublishedAsync(IDataStorage connection, ProcessingCont
if (_options.Value.UseStorageLock && !await connection.AcquireLockAsync($"publish_retry_{_options.Value.Version}", _ttl, _instance, context.CancellationToken))
return;

var messages = await GetSafelyAsync(connection.GetPublishedMessagesOfNeedRetry).ConfigureAwait(false);
var messages = await GetSafelyAsync(connection.GetPublishedMessagesOfNeedRetry, _coolDownTime).ConfigureAwait(false);

foreach (var message in messages)
{
Expand All @@ -89,7 +94,7 @@ private async Task ProcessReceivedAsync(IDataStorage connection, ProcessingConte
if (_options.Value.UseStorageLock && !await connection.AcquireLockAsync($"received_retry_{_options.Value.Version}", _ttl, _instance, context.CancellationToken))
return;

var messages = await GetSafelyAsync(connection.GetReceivedMessagesOfNeedRetry).ConfigureAwait(false);
var messages = await GetSafelyAsync(connection.GetReceivedMessagesOfNeedRetry, _coolDownTime).ConfigureAwait(false);

foreach (var message in messages)
{
Expand All @@ -102,11 +107,11 @@ private async Task ProcessReceivedAsync(IDataStorage connection, ProcessingConte
await connection.ReleaseLockAsync($"received_retry_{_options.Value.Version}", _instance, context.CancellationToken);
}

private async Task<IEnumerable<T>> GetSafelyAsync<T>(Func<Task<IEnumerable<T>>> getMessagesAsync)
private async Task<IEnumerable<T>> GetSafelyAsync<T>(Func<TimeSpan, Task<IEnumerable<T>>> getMessagesAsync, TimeSpan coolDownTime)
{
try
{
return await getMessagesAsync().ConfigureAwait(false);
return await getMessagesAsync(coolDownTime).ConfigureAwait(false);
}
catch (Exception ex)
{
Expand All @@ -115,4 +120,12 @@ private async Task<IEnumerable<T>> GetSafelyAsync<T>(Func<Task<IEnumerable<T>>>
return Enumerable.Empty<T>();
}
}

private void CheckSafeOptionsSet()
{
if (_coolDownTime < TimeSpan.FromSeconds(minSuggestedValueForFallbackWindowLookbackSeconds))
{
_logger.LogWarning("The provided FallbackWindowLookbackSeconds of {currentSetFallbackWindowLookbackSeconds} is set to a value lower than {minSuggestedSeconds} seconds. This might cause unwanted unsafe behavior if the consumer takes more than the provided FallbackWindowLookbackSeconds to execute. ", _options.Value.FallbackWindowLookbackSeconds, minSuggestedValueForFallbackWindowLookbackSeconds);
}
}
}

0 comments on commit f0c18eb

Please sign in to comment.