From 3f4f3495368fb4451b8a1afb7a927d4561958724 Mon Sep 17 00:00:00 2001 From: Savorboard Date: Sun, 17 Dec 2017 21:22:04 +0800 Subject: [PATCH] modify fetched message method without delete queue message when get queue message. --- src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs | 14 ++++++++++---- src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs | 12 ++++++------ 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs b/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs index 6fb0788c9..c4ab7414f 100644 --- a/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs +++ b/src/DotNetCore.CAP.MySql/MySqlFetchedMessage.cs @@ -7,12 +7,14 @@ namespace DotNetCore.CAP.MySql public class MySqlFetchedMessage : IFetchedMessage { private readonly MySqlOptions _options; + private readonly string _processId; - public MySqlFetchedMessage(int messageId, MessageType type, MySqlOptions options) + public MySqlFetchedMessage(int messageId, MessageType type, string processId, MySqlOptions options) { MessageId = messageId; MessageType = type; + _processId = processId; _options = options; } @@ -22,15 +24,19 @@ public MySqlFetchedMessage(int messageId, MessageType type, MySqlOptions options public void RemoveFromQueue() { - // ignored + using (var connection = new MySqlConnection(_options.ConnectionString)) + { + connection.Execute($"DELETE FROM `{_options.TableNamePrefix}.queue` WHERE `ProcessId`=@ProcessId" + , new { ProcessId = _processId }); + } } public void Requeue() { using (var connection = new MySqlConnection(_options.ConnectionString)) { - connection.Execute($"insert into `{_options.TableNamePrefix}.queue`(`MessageId`,`MessageType`) values(@MessageId,@MessageType);" - , new {MessageId, MessageType }); + connection.Execute($"UPDATE `{_options.TableNamePrefix}.queue` SET `ProcessId`=NULL WHERE `ProcessId`=@ProcessId" + , new { ProcessId = _processId }); } } diff --git a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs index f8d678faf..1ed4e0e93 100644 --- a/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs +++ b/src/DotNetCore.CAP.MySql/MySqlStorageConnection.cs @@ -41,12 +41,12 @@ public async Task GetPublishedMessageAsync(int id) public Task FetchNextMessageAsync() { + var processId = ObjectId.GenerateNewStringId(); var sql = $@" UPDATE `{_prefix}.queue` SET `ProcessId`=@ProcessId WHERE `ProcessId` IS NULL LIMIT 1; -SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId; -DELETE FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId"; +SELECT `MessageId`,`MessageType` FROM `{_prefix}.queue` WHERE `ProcessId`=@ProcessId;"; - return FetchNextMessageCoreAsync(sql, new { ProcessId = Guid.NewGuid().ToString() }); + return FetchNextMessageCoreAsync(sql, processId); } public async Task GetNextPublishedMessageToBeEnqueuedAsync() @@ -139,18 +139,18 @@ public bool ChangeReceivedState(int messageId, string state) } } - private async Task FetchNextMessageCoreAsync(string sql, object args = null) + private async Task FetchNextMessageCoreAsync(string sql, string processId) { FetchedMessage fetchedMessage; using (var connection = new MySqlConnection(Options.ConnectionString)) { - fetchedMessage = await connection.QuerySingleOrDefaultAsync(sql, args); + fetchedMessage = await connection.QuerySingleOrDefaultAsync(sql, new { ProcessId = processId }); } if (fetchedMessage == null) return null; - return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, Options); + return new MySqlFetchedMessage(fetchedMessage.MessageId, fetchedMessage.MessageType, processId, Options); } public void Dispose()