Skip to content

Commit

Permalink
Fix AOF fast-forward during main memory replication (#632)
Browse files Browse the repository at this point in the history
* Fix AOF fast-forward during main memory replication

* noFlush should not wait on FlushEvent

* fix the fix

* update version for release

* nit

* NetworkClusterAppendLog shouldnt write a response

* Fix the flush behavior for safe tail refresh.

* clean up exception handling

* Unrelated fix to pub-sub - see issue #425.

* fix non-MMR skipping logic for space at the end of a page

* fix comparison

---------

Co-authored-by: Vasileios Zois <[email protected]>
  • Loading branch information
badrishc and vazois authored Sep 9, 2024
1 parent 739ef1c commit 7ac1a9b
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 58 deletions.
4 changes: 2 additions & 2 deletions .azure/pipelines/azure-pipelines-external-release.yml
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
######################################
# NOTE: Before running this pipeline to generate a new nuget package, update the version string in two places
# 1) update the name: string below (line 6) -- this is the version for the nuget package (e.g. 1.0.0)
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 45) -- NOTE - these two values need to be the same
# 2) update \libs\host\GarnetServer.cs readonly string version (~line 53) -- NOTE - these two values need to be the same
######################################
name: 1.0.19
name: 1.0.20
trigger:
branches:
include:
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/ClusterProvider.cs
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public void SafeTruncateAOF(StoreType storeType, bool full, long CheckpointCover
else
{
if (serverOptions.MainMemoryReplication)
storeWrapper.appendOnlyFile?.UnsafeShiftBeginAddress(CheckpointCoveredAofAddress, truncateLog: true, noFlush: true);
storeWrapper.appendOnlyFile?.UnsafeShiftBeginAddress(CheckpointCoveredAofAddress, truncateLog: true);
else
{
storeWrapper.appendOnlyFile?.TruncateUntil(CheckpointCoveredAofAddress);
Expand Down
2 changes: 1 addition & 1 deletion libs/cluster/Server/Replication/PrimaryOps/AofTaskStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,7 @@ public long SafeTruncateAof(long CheckpointCoveredAofAddress = long.MaxValue)
{
if (clusterProvider.serverOptions.MainMemoryReplication)
{
clusterProvider.storeWrapper.appendOnlyFile?.UnsafeShiftBeginAddress(TruncatedUntil, snapToPageStart: true, truncateLog: true, noFlush: true);
clusterProvider.storeWrapper.appendOnlyFile?.UnsafeShiftBeginAddress(TruncatedUntil, snapToPageStart: true, truncateLog: true);
}
else
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,20 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
if (clusterProvider.replicationManager.Recovering)
{
logger?.LogWarning("Replica is recovering cannot sync AOF");
throw new GarnetException("Replica is recovering cannot sync AOF", LogLevel.Warning);
throw new GarnetException("Replica is recovering cannot sync AOF", LogLevel.Warning, clientResponse: false);
}

if (currentConfig.LocalNodeRole != NodeRole.REPLICA)
{
logger?.LogWarning("This node {nodeId} is not a replica", currentConfig.LocalNodeId);
throw new GarnetException($"This node {currentConfig.LocalNodeId} is not a replica", LogLevel.Warning);
throw new GarnetException($"This node {currentConfig.LocalNodeId} is not a replica", LogLevel.Warning, clientResponse: false);
}

if (clusterProvider.serverOptions.MainMemoryReplication)
{
var firstRecordLength = GetFirstAofEntryLength(record);
if (previousAddress > ReplicationOffset ||
currentAddress > previousAddress + firstRecordLength)
currentAddress >= previousAddress + firstRecordLength)
{
logger?.LogWarning("MainMemoryReplication: Skipping from {ReplicaReplicationOffset} to {currentAddress}", ReplicationOffset, currentAddress);
storeWrapper.appendOnlyFile.Initialize(currentAddress, currentAddress);
Expand All @@ -53,7 +53,16 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
{
logger?.LogInformation("Processing {recordLength} bytes; previousAddress {previousAddress}, currentAddress {currentAddress}, nextAddress {nextAddress}, current AOF tail {tail}", recordLength, previousAddress, currentAddress, nextAddress, storeWrapper.appendOnlyFile.TailAddress);
logger?.LogError("Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, aof.TailAddress {tailAddress}", ReplicationOffset, storeWrapper.appendOnlyFile.TailAddress);
throw new GarnetException($"Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, aof.TailAddress {storeWrapper.appendOnlyFile.TailAddress}", LogLevel.Warning);
throw new GarnetException($"Before ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, aof.TailAddress {storeWrapper.appendOnlyFile.TailAddress}", LogLevel.Warning, clientResponse: false);
}

// If there is a gap between the local tail and incoming currentAddress, try to skip local AOF to the next page
if (currentAddress >= storeWrapper.appendOnlyFile.TailAddress + recordLength
&& storeWrapper.appendOnlyFile.GetPage(currentAddress) == storeWrapper.appendOnlyFile.GetPage(storeWrapper.appendOnlyFile.TailAddress) + 1)
{
logger?.LogWarning("SkipPage from {previousAddress} to {currentAddress}, tail is {tailAddress}", previousAddress, currentAddress, storeWrapper.appendOnlyFile.TailAddress);
storeWrapper.appendOnlyFile.UnsafeSkipPage();
logger?.LogWarning("New tail after SkipPage is {tailAddress}", storeWrapper.appendOnlyFile.TailAddress);
}

// Enqueue to AOF
Expand All @@ -76,7 +85,7 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
{
if (!clusterProvider.serverOptions.EnableFastCommit)
{
throw new Exception("Received FastCommit request at replica AOF processor, but FastCommit is not enabled");
throw new GarnetException("Received FastCommit request at replica AOF processor, but FastCommit is not enabled", clientResponse: false);
}
TsavoriteLogRecoveryInfo info = new();
info.Initialize(new ReadOnlySpan<byte>(ptr + entryLength, -payloadLength));
Expand All @@ -90,19 +99,19 @@ public unsafe void ProcessPrimaryStream(byte* record, int recordLength, long pre
if (ReplicationOffset != nextAddress)
{
logger?.LogWarning("Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, nextAddress {nextAddress}", ReplicationOffset, nextAddress);
throw new GarnetException($"Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, nextAddress {nextAddress}", LogLevel.Warning);
throw new GarnetException($"Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, nextAddress {nextAddress}", LogLevel.Warning, clientResponse: false);
}

if (ReplicationOffset != storeWrapper.appendOnlyFile.TailAddress)
{
logger?.LogWarning("After ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicaReplicationOffset}, aof.TailAddress {tailAddress}", ReplicationOffset, storeWrapper.appendOnlyFile.TailAddress);
throw new GarnetException($"After ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, aof.TailAddress {storeWrapper.appendOnlyFile.TailAddress}", LogLevel.Warning);
throw new GarnetException($"After ProcessPrimaryStream: Replication offset mismatch: ReplicaReplicationOffset {ReplicationOffset}, aof.TailAddress {storeWrapper.appendOnlyFile.TailAddress}", LogLevel.Warning, clientResponse: false);
}
}
catch (Exception ex)
{
logger?.LogWarning(ex, "An exception occurred at ReplicationManager.ProcessPrimaryStream");
throw new GarnetException(ex.Message, ex, LogLevel.Warning);
throw new GarnetException(ex.Message, ex, LogLevel.Warning, clientResponse: false);
}
}

Expand Down
14 changes: 4 additions & 10 deletions libs/cluster/Session/RespClusterReplicationCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Text;
using Garnet.common;
using Garnet.server;
using Microsoft.Extensions.Logging;

namespace Garnet.cluster
{
Expand Down Expand Up @@ -169,8 +170,7 @@ private bool NetworkClusterAppendLog(out bool invalidParameters)
!parseState.TryGetLong(2, out var currentAddress) ||
!parseState.TryGetLong(3, out var nextAddress))
{
while (!RespWriteUtils.WriteError(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER, ref dcurr, dend))
SendAndReset();
logger?.LogError("{str}", Encoding.ASCII.GetString(CmdStrings.RESP_ERR_GENERIC_VALUE_IS_NOT_INTEGER));
return true;
}

Expand All @@ -181,22 +181,16 @@ private bool NetworkClusterAppendLog(out bool invalidParameters)
var primaryId = currentConfig.LocalNodePrimaryId;
if (localRole != NodeRole.REPLICA)
{
// TODO: handle this
//while (!RespWriteUtils.WriteError("ERR aofsync node not a replica"u8, ref dcurr, dend))
// SendAndReset();
throw new GarnetException("aofsync node not a replica", LogLevel.Error, clientResponse: false);
}
else if (!primaryId.Equals(nodeId))
{
// TODO: handle this
//while (!RespWriteUtils.WriteError($"ERR aofsync node replicating {primaryId}", ref dcurr, dend))
// SendAndReset();
throw new GarnetException($"aofsync node replicating {primaryId}", LogLevel.Error, clientResponse: false);
}
else
{
clusterProvider.replicationManager.ProcessPrimaryStream(sbRecord.ToPointer(), sbRecord.Length,
previousAddress, currentAddress, nextAddress);
//while (!RespWriteUtils.WriteDirect(CmdStrings.RESP_OK, ref dcurr, dend))
// SendAndReset();
}

return true;
Expand Down
10 changes: 7 additions & 3 deletions libs/common/GarnetException.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,32 +16,36 @@ public class GarnetException : Exception
/// LogLevel for this exception
/// </summary>
public LogLevel LogLevel { get; } = LogLevel.Trace;
public bool ClientResponse { get; } = true;

/// <summary>
/// Throw Garnet exception
/// </summary>
public GarnetException(LogLevel logLevel = LogLevel.Trace)
public GarnetException(LogLevel logLevel = LogLevel.Trace, bool clientResponse = true)
{
LogLevel = logLevel;
ClientResponse = clientResponse;
}

/// <summary>
/// Throw Garnet exception with message
/// </summary>
/// <param name="message"></param>
public GarnetException(string message, LogLevel logLevel = LogLevel.Trace) : base(message)
public GarnetException(string message, LogLevel logLevel = LogLevel.Trace, bool clientResponse = true) : base(message)
{
LogLevel = logLevel;
ClientResponse = clientResponse;
}

/// <summary>
/// Throw Garnet exception with message and inner exception
/// </summary>
/// <param name="message"></param>
/// <param name="innerException"></param>
public GarnetException(string message, Exception innerException, LogLevel logLevel = LogLevel.Trace) : base(message, innerException)
public GarnetException(string message, Exception innerException, LogLevel logLevel = LogLevel.Trace, bool clientResponse = true) : base(message, innerException)
{
LogLevel = logLevel;
ClientResponse = clientResponse;
}

/// <summary>
Expand Down
2 changes: 1 addition & 1 deletion libs/host/GarnetServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public class GarnetServer : IDisposable
protected StoreWrapper storeWrapper;

// IMPORTANT: Keep the version in sync with .azure\pipelines\azure-pipelines-external-release.yml line ~6.
readonly string version = "1.0.19";
readonly string version = "1.0.20";

/// <summary>
/// Resp protocol version
Expand Down
1 change: 0 additions & 1 deletion libs/server/Resp/PubSubCommands.cs
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,6 @@ private bool NetworkUNSUBSCRIBE()
SendAndReset();
}

Debug.Assert(numActiveChannels == 0);
if (numActiveChannels == 0)
isSubscriptionSession = false;

Expand Down
7 changes: 5 additions & 2 deletions libs/server/Resp/RespServerSession.cs
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,11 @@ public override int TryConsumeMessages(byte* reqBuffer, int bytesReceived)
logger?.Log(ex.LogLevel, ex, "ProcessMessages threw a GarnetException:");

// Forward Garnet error as RESP error
while (!RespWriteUtils.WriteError($"ERR Garnet Exception: {ex.Message}", ref dcurr, dend))
SendAndReset();
if (ex.ClientResponse)
{
while (!RespWriteUtils.WriteError($"ERR Garnet Exception: {ex.Message}", ref dcurr, dend))
SendAndReset();
}

// Send message and dispose the network sender to end the session
if (dcurr > networkSender.GetResponseObjectHead())
Expand Down
107 changes: 87 additions & 20 deletions libs/storage/Tsavorite/cs/src/core/Allocator/AllocatorBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -919,6 +919,67 @@ public long TryAllocateRetryNow(int numSlots = 1)
return logicalAddress;
}

/// <summary>Skip the rest of the current page</summary>
public void SkipPage()
{
PageOffset localTailPageOffset = default;
localTailPageOffset.PageAndOffset = TailPageOffset.PageAndOffset;

// Force page overflow
int numSlots = PageSize + 1;

// Determine insertion index.
localTailPageOffset.PageAndOffset = Interlocked.Add(ref TailPageOffset.PageAndOffset, numSlots);

int page = localTailPageOffset.Page;
int offset = localTailPageOffset.Offset - numSlots;

#region HANDLE PAGE OVERFLOW
if (localTailPageOffset.Offset > PageSize)
{
int pageIndex = localTailPageOffset.Page + 1;

// All overflow threads try to shift addresses
long shiftAddress = ((long)pageIndex) << LogPageSizeBits;
PageAlignedShiftReadOnlyAddress(shiftAddress);
PageAlignedShiftHeadAddress(shiftAddress);

// This thread is trying to allocate at an offset past where one or more previous threads
// already overflowed; exit and allow the first overflow thread to proceed
if (offset > PageSize)
{
if (NeedToWait(pageIndex))
return; // RETRY_LATER
return; // RETRY_NOW
}

if (NeedToWait(pageIndex))
{
// Reset to end of page so that next attempt can retry
localTailPageOffset.Offset = PageSize;
Interlocked.Exchange(ref TailPageOffset.PageAndOffset, localTailPageOffset.PageAndOffset);
return; // RETRY_LATER
}

// The thread that "makes" the offset incorrect should allocate next page and set new tail
if (CannotAllocate(pageIndex))
{
// Reset to end of page so that next attempt can retry
localTailPageOffset.Offset = PageSize;
Interlocked.Exchange(ref TailPageOffset.PageAndOffset, localTailPageOffset.PageAndOffset);
return; // RETRY_NOW
}

if (!_wrapper.IsAllocated(pageIndex % BufferSize) || !_wrapper.IsAllocated((pageIndex + 1) % BufferSize))
AllocatePagesWithException(pageIndex, localTailPageOffset);

localTailPageOffset.Page++;
localTailPageOffset.Offset = 0;
TailPageOffset = localTailPageOffset;
}
#endregion
}

/// <summary>
/// If the page we are trying to allocate is past the last page with an unclosed address region,
/// then we can retry immediately because this is called after NeedToWait, so we know we've
Expand Down Expand Up @@ -978,27 +1039,30 @@ public void ShiftBeginAddress(long newBeginAddress, bool truncateLog, bool noFlu
var flushEvent = FlushEvent;
_ = ShiftReadOnlyAddress(newBeginAddress, noFlush);

// Wait for flush to complete
var spins = 0;
while (true)
if (!noFlush)
{
if (FlushedUntilAddress >= newBeginAddress)
break;
if (++spins < Constants.kFlushSpinCount)
{
_ = Thread.Yield();
continue;
}
try
// Wait for flush to complete
var spins = 0;
while (true)
{
epoch.Suspend();
flushEvent.Wait();
}
finally
{
epoch.Resume();
if (FlushedUntilAddress >= newBeginAddress)
break;
if (++spins < Constants.kFlushSpinCount)
{
_ = Thread.Yield();
continue;
}
try
{
epoch.Suspend();
flushEvent.Wait();
}
finally
{
epoch.Resume();
}
flushEvent = FlushEvent;
}
flushEvent = FlushEvent;
}

// Then shift head address
Expand Down Expand Up @@ -1444,22 +1508,25 @@ public void AsyncFlushPages(long fromAddress, long untilAddress, bool noFlush =
asyncResult.fromAddress = fromAddress;
}

bool skip = false;
if (asyncResult.untilAddress <= BeginAddress)
{
// Short circuit as no flush needed
_ = Utility.MonotonicUpdate(ref PageStatusIndicator[flushPage % BufferSize].LastFlushedUntilAddress, BeginAddress, out _);
ShiftFlushedUntilAddress();
continue;
skip = true;
}

if (IsNullDevice || noFlush)
{
// Short circuit as no flush needed
_ = Utility.MonotonicUpdate(ref PageStatusIndicator[flushPage % BufferSize].LastFlushedUntilAddress, asyncResult.untilAddress, out _);
ShiftFlushedUntilAddress();
continue;
skip = true;
}

if (skip) continue;

// Partial page starting point, need to wait until the
// ongoing adjacent flush is completed to ensure correctness
if (GetOffsetInPage(asyncResult.fromAddress) > 0)
Expand Down
Loading

0 comments on commit 7ac1a9b

Please sign in to comment.