Skip to content

Commit

Permalink
optimization2: ignore response from cluster publish
Browse files Browse the repository at this point in the history
  • Loading branch information
vazois committed Dec 12, 2024
1 parent 79e0d6d commit 422532b
Show file tree
Hide file tree
Showing 5 changed files with 7 additions and 97 deletions.
87 changes: 0 additions & 87 deletions libs/client/GarnetClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -742,93 +742,6 @@ async ValueTask InternalExecuteAsync(TcsWrapper tcs, Memory<byte> op, Memory<byt
return;
}

async ValueTask InternalExecuteForNoResponse(Memory<byte> op, Memory<byte> subop, Memory<byte> param1, Memory<byte> param2, CancellationToken token = default)
{
int totalLen = 0;
int arraySize = 1;

totalLen += op.Length;

int len = subop.Length;
totalLen += 1 + NumUtils.NumDigits(len) + 2 + len + 2;
arraySize++;

len = param1.Length;
totalLen += 1 + NumUtils.NumDigits(len) + 2 + len + 2;
arraySize++;

len = param2.Length;
totalLen += 1 + NumUtils.NumDigits(len) + 2 + len + 2;
arraySize++;

totalLen += 1 + NumUtils.NumDigits(arraySize) + 2;

if (totalLen > networkWriter.PageSize)
{
ThrowException(new Exception($"Entry of size {totalLen} does not fit on page of size {networkWriter.PageSize}. Try increasing sendPageSize parameter to GarnetClient constructor."));
}

// No need for gate as this is a void return
// await InputGateAsync(token);

try
{
networkWriter.epoch.Resume();

#region reserveSpaceAndWriteIntoNetworkBuffer
int taskId;
long address;
while (true)
{
token.ThrowIfCancellationRequested();
if (!IsConnected)
{
Dispose();
ThrowException(disposeException);
}
(taskId, address) = networkWriter.TryAllocate(totalLen, out var flushEvent);
if (address >= 0) break;
try
{
networkWriter.epoch.Suspend();
await flushEvent.WaitAsync(token).ConfigureAwait(false);
}
finally
{
networkWriter.epoch.Resume();
}
}

unsafe
{
byte* curr = (byte*)networkWriter.GetPhysicalAddress(address);
byte* end = curr + totalLen;
RespWriteUtils.WriteArrayLength(arraySize, ref curr, end);

RespWriteUtils.WriteDirect(op.Span, ref curr, end);
RespWriteUtils.WriteBulkString(subop.Span, ref curr, end);
RespWriteUtils.WriteBulkString(param1.Span, ref curr, end);
RespWriteUtils.WriteBulkString(param2.Span, ref curr, end);
Debug.Assert(curr == end);
}
#endregion

if (!IsConnected)
{
Dispose();
ThrowException(disposeException);
}
// Console.WriteLine($"Filled {address}-{address + totalLen}");
networkWriter.epoch.ProtectAndDrain();
networkWriter.DoAggressiveShiftReadOnly();
}
finally
{
networkWriter.epoch.Suspend();
}
return;
}

async ValueTask InternalExecuteFireAndForgetWithNoResponse(TcsWrapper tcs, Memory<byte> op, Memory<byte> subop, Memory<byte> param1, Memory<byte> param2, CancellationToken token = default)
{
int totalLen = 0;
Expand Down
6 changes: 0 additions & 6 deletions libs/client/GarnetClientAPI/GarnetClientExecuteAPI.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1084,12 +1084,6 @@ public async Task<long> ExecuteFireAndForgetWithNoResponse(Memory<byte> op, Memo
var _ = InternalExecuteFireAndForgetWithNoResponse(tcs, op, param1, param2, param3, token);
return await tcs.longTcs.Task.ConfigureAwait(false);
}

public void ExecuteForNoResponse(Memory<byte> op, Memory<byte> param1, Memory<byte> param2, Memory<byte> param3, CancellationToken token = default)
{
var _ = InternalExecuteForNoResponse(op, param1, param2, param3, token);
}

#endregion

void TokenRegistrationLongCallback(object s) => ((TaskCompletionSource<long>)s).TrySetCanceled();
Expand Down
3 changes: 0 additions & 3 deletions libs/cluster/Server/GarnetClientExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -64,8 +64,5 @@ public static async Task<long> failreplicationoffset(this GarnetClient client, l

public static async Task<long> ClusterPublish(this GarnetClient client, RespCommand cmd, Memory<byte> channel, Memory<byte> message, CancellationToken cancellationToken = default)
=> await client.ExecuteFireAndForgetWithNoResponse(GarnetClient.CLUSTER, RespCommand.PUBLISH == cmd ? GarnetClient.PUBLISH : GarnetClient.SPUBLISH, channel, message, cancellationToken);

//public static void ClusterPublish(this GarnetClient client, RespCommand cmd, Memory<byte> channel, Memory<byte> message, CancellationToken cancellationToken = default)
// => client.ExecuteForNoResponse(GarnetClient.CLUSTER, RespCommand.PUBLISH == cmd ? GarnetClient.PUBLISH : GarnetClient.SPUBLISH, channel, message, cancellationToken);
}
}
2 changes: 1 addition & 1 deletion libs/cluster/Server/GarnetServerNode.cs
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,6 @@ public ConnectionInfo GetConnectionInfo()
/// <param name="channel"></param>
/// <param name="message"></param>
public void TryClusterPublish(RespCommand cmd, Memory<byte> channel, Memory<byte> message)
=> gc.ClusterPublish(cmd, channel, message).GetAwaiter().GetResult();
=> _ = gc.ClusterPublish(cmd, channel, message);
}
}
6 changes: 6 additions & 0 deletions libs/cluster/Server/Gossip.cs
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,12 @@ public void TryMeet(string address, int port, bool acquireLock = true)
}
}

/// <summary>
/// Forward message by issuing CLUSTER PUBLISH|SPUBLISH
/// </summary>
/// <param name="cmd"></param>
/// <param name="channel"></param>
/// <param name="message"></param>
public void TryClusterPublish(RespCommand cmd, ref Span<byte> channel, ref Span<byte> message)
{
GarnetServerNode gsn = null;
Expand Down

0 comments on commit 422532b

Please sign in to comment.