Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Membership] Reduce default 'stale' silo detection time from 10min to 90s #9305

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 27 additions & 2 deletions src/Orleans.Core.Abstractions/IDs/SiloAddress.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,9 +266,28 @@ bool ISpanFormattable.TryFormat(Span<char> destination, out int charsWritten, Re
/// <returns>Consistent hash value for this silo address.</returns>
public int GetConsistentHashCode() => hashCodeSet ? hashCode : CalculateConsistentHashCode();

/// <summary>Returns a consistent hash value for this silo address.</summary>
/// <returns>Consistent hash value for this silo address.</returns>
internal int GetConsistentHashCode(int seed)
{
var tmp = (0, 0L, 0L, 0L); // avoid stackalloc overhead by using a fixed size buffer
var buf = MemoryMarshal.AsBytes(MemoryMarshal.CreateSpan(ref tmp, 1))[..28];

Endpoint.Address.TryWriteBytes(buf, out var len);
Debug.Assert(len is 4 or 16);

BinaryPrimitives.WriteInt32LittleEndian(buf[16..], Endpoint.Port);
BinaryPrimitives.WriteInt32LittleEndian(buf[20..], Generation);
BinaryPrimitives.WriteInt32LittleEndian(buf[24..], seed);

hashCode = (int)StableHash.ComputeHash(buf);
hashCodeSet = true;
return hashCode;
}

private int CalculateConsistentHashCode()
{
Unsafe.SkipInit(out (long, long, long) tmp); // avoid stackalloc overhead by using a fixed size buffer
var tmp = (0L, 0L, 0L); // avoid stackalloc overhead by using a fixed size buffer
var buf = MemoryMarshal.AsBytes(MemoryMarshal.CreateSpan(ref tmp, 1))[..24];

Endpoint.Address.TryWriteBytes(buf, out var len);
Expand All @@ -293,7 +312,12 @@ internal void InternalSetConsistentHashCode(int hashCode)
/// </summary>
/// <param name="numHashes">The number of hash codes to return.</param>
/// <returns>A collection of uniform hash codes variants for this instance.</returns>
public uint[] GetUniformHashCodes(int numHashes) => uniformHashCache ??= GetUniformHashCodesImpl(numHashes);
public uint[] GetUniformHashCodes(int numHashes)
{
var cache = uniformHashCache;
if (cache is not null && cache.Length == numHashes) return cache;
return uniformHashCache = GetUniformHashCodesImpl(numHashes);
}

private uint[] GetUniformHashCodesImpl(int numHashes)
{
Expand Down Expand Up @@ -327,6 +351,7 @@ private uint[] GetUniformHashCodesImpl(int numHashes)
BinaryPrimitives.WriteInt32LittleEndian(bytes[offset..], extraBit);
hashes[extraBit] = StableHash.ComputeHash(bytes);
}

return hashes;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ public class ClusterMembershipOptions
/// Gets or sets the number of missed "I am alive" updates in the table from a silo that causes warning to be logged.
/// </summary>
/// <seealso cref="IAmAliveTablePublishTimeout"/>
public int NumMissedTableIAmAliveLimit { get; set; } = 2;
public int NumMissedTableIAmAliveLimit { get; set; } = 3;

/// <summary>
/// Gets or sets a value indicating whether to disable silo liveness protocol (should be used only for testing).
Expand Down Expand Up @@ -49,7 +49,7 @@ public class ClusterMembershipOptions
/// is used to skip hosts in the membership table when performing an initial connectivity check upon startup.
/// </remarks>
/// <value>Publish an update every 5 minutes by default.</value>
public TimeSpan IAmAliveTablePublishTimeout { get; set; } = TimeSpan.FromMinutes(5);
public TimeSpan IAmAliveTablePublishTimeout { get; set; } = TimeSpan.FromSeconds(30);

/// <summary>
/// Gets or sets the maximum amount of time to attempt to join a cluster before giving up.
Expand Down Expand Up @@ -103,7 +103,7 @@ public class ClusterMembershipOptions
/// <summary>
/// /// Gets the period after which a silo is ignored for initial connectivity validation if it has not updated its heartbeat in the silo membership table.
/// </summary>
internal TimeSpan AllowedIAmAliveMissPeriod => this.IAmAliveTablePublishTimeout.Multiply(this.NumMissedTableIAmAliveLimit);
internal TimeSpan AllowedIAmAliveMissPeriod => IAmAliveTablePublishTimeout.Multiply(NumMissedTableIAmAliveLimit);

/// <summary>
/// Gets the amount of time to wait for the cluster membership system to terminate during shutdown.
Expand Down
96 changes: 75 additions & 21 deletions src/Orleans.Core/Runtime/MembershipTableSnapshot.cs
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
using System;
using System.Collections.Generic;
using System.Collections.Immutable;
using System.Linq;
using System.Text;

namespace Orleans.Runtime
{
[GenerateSerializer, Immutable]
internal sealed class MembershipTableSnapshot
{
private static readonly MembershipTableSnapshot InitialValue = new(MembershipVersion.MinValue, ImmutableDictionary<SiloAddress, MembershipEntry>.Empty);

public MembershipTableSnapshot(
MembershipVersion version,
ImmutableDictionary<SiloAddress, MembershipEntry> entries)
Expand All @@ -15,41 +19,52 @@ public MembershipTableSnapshot(
this.Entries = entries;
}

public static MembershipTableSnapshot Create(MembershipTableData table)
public static MembershipTableSnapshot Create(MembershipTableData table) => Update(InitialValue, table);

public static MembershipTableSnapshot Update(MembershipTableSnapshot previousSnapshot, MembershipTableData table)
{
ArgumentNullException.ThrowIfNull(previousSnapshot);
ArgumentNullException.ThrowIfNull(table);
var version = (table.Version.Version == 0 && table.Version.VersionEtag == "0")
? MembershipVersion.MinValue
: new MembershipVersion(table.Version.Version);
return Update(previousSnapshot, version, table.Members.Select(t => t.Item1));
}

public static MembershipTableSnapshot Update(MembershipTableSnapshot previousSnapshot, MembershipTableSnapshot updated)
{
ArgumentNullException.ThrowIfNull(previousSnapshot);
ArgumentNullException.ThrowIfNull(updated);
return Update(previousSnapshot, updated.Version, updated.Entries.Values);
}

private static MembershipTableSnapshot Update(MembershipTableSnapshot previousSnapshot, MembershipVersion version, IEnumerable<MembershipEntry> updatedEntries)
{
ArgumentNullException.ThrowIfNull(previousSnapshot);
ArgumentNullException.ThrowIfNull(updatedEntries);

var entries = ImmutableDictionary.CreateBuilder<SiloAddress, MembershipEntry>();
if (table.Members != null)
foreach (var item in updatedEntries)
{
foreach (var item in table.Members)
{
var entry = item.Item1;
entries.Add(entry.SiloAddress, entry);
}
var entry = item;
entry = PreserveIAmAliveTime(previousSnapshot, entry);
entries.Add(entry.SiloAddress, entry);
}

var version = (table.Version.Version == 0 && table.Version.VersionEtag == "0")
? MembershipVersion.MinValue
: new MembershipVersion(table.Version.Version);
return new MembershipTableSnapshot(version, entries.ToImmutable());
}

public static MembershipTableSnapshot Create(MembershipTableSnapshot snapshot)
private static MembershipEntry PreserveIAmAliveTime(MembershipTableSnapshot previousSnapshot, MembershipEntry entry)
{
ArgumentNullException.ThrowIfNull(snapshot);

var entries = ImmutableDictionary.CreateBuilder<SiloAddress, MembershipEntry>();
if (snapshot.Entries != null)
// Retain the maximum IAmAliveTime, since IAmAliveTime updates do not increase membership version
// and therefore can be clobbered by torn reads.
if (previousSnapshot.Entries.TryGetValue(entry.SiloAddress, out var previousEntry)
&& previousEntry.IAmAliveTime > entry.IAmAliveTime)
{
foreach (var item in snapshot.Entries)
{
var entry = item.Value;
entries.Add(entry.SiloAddress, entry);
}
entry = entry.WithIAmAliveTime(previousEntry.IAmAliveTime);
}

return new MembershipTableSnapshot(snapshot.Version, entries.ToImmutable());
return entry;
}

[Id(0)]
Expand Down Expand Up @@ -93,6 +108,45 @@ public SiloStatus GetSiloStatus(SiloAddress silo)
return status;
}

public bool IsSuccessorTo(MembershipTableSnapshot other)
{
if (Version > other.Version)
{
return true;
}

if (Version < other.Version)
{
return false;
}

if (Entries.Count > other.Entries.Count)
{
// Something is amiss.
return false;
}

foreach (var entry in Entries)
{
if (!other.Entries.TryGetValue(entry.Key, out var otherEntry))
{
// Something is amiss.
return false;
}
}

// This is a successor if any silo has a later EffectiveIAmAliveTime.
foreach (var entry in Entries)
{
if (entry.Value.EffectiveIAmAliveTime > other.Entries[entry.Key].EffectiveIAmAliveTime)
{
return true;
}
}

return false;
}

public override string ToString()
{
var sb = new StringBuilder();
Expand Down
10 changes: 10 additions & 0 deletions src/Orleans.Core/SystemTargetInterfaces/IMembershipTable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
using System.Collections.Immutable;
using System.Linq;
using System.Net;
using System.Text.Json.Serialization;
using System.Threading.Tasks;
using Orleans.Concurrency;
using Orleans.Runtime;
Expand Down Expand Up @@ -306,6 +307,8 @@ public sealed class MembershipEntry
[Id(10)]
public DateTime IAmAliveTime { get; set; }

internal DateTimeOffset EffectiveIAmAliveTime => StartTime > IAmAliveTime ? StartTime : IAmAliveTime;

public void AddOrUpdateSuspector(SiloAddress localSilo, DateTime voteTime, int maxVotes)
{
var allVotes = SuspectTimes ??= new List<Tuple<SiloAddress, DateTime>>();
Expand Down Expand Up @@ -372,6 +375,13 @@ internal MembershipEntry WithStatus(SiloStatus status)
return updated;
}

internal MembershipEntry WithIAmAliveTime(DateTime iAmAliveTime)
{
var updated = this.Copy();
updated.IAmAliveTime = iAmAliveTime;
return updated;
}

internal ImmutableList<Tuple<SiloAddress, DateTime>> GetFreshVotes(DateTime now, TimeSpan expiration)
{
if (this.SuspectTimes == null)
Expand Down
41 changes: 41 additions & 0 deletions src/Orleans.Core/Utils/AsyncEnumerable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,23 @@ public void Publish(T value)
}
}

public bool TryPublish<TState>(Func<T, TState, T> updateFunc, TState state) => TryPublishCore(updateFunc, state) == PublishResult.Success;

public void Publish<TState>(Func<T, TState, T> updateFunc, TState state)
{
switch (TryPublishCore(updateFunc, state))
{
case PublishResult.Success:
return;
case PublishResult.InvalidUpdate:
ThrowInvalidUpdate();
break;
case PublishResult.Disposed:
ThrowDisposed();
break;
}
}

private PublishResult TryPublish(Element newItem)
{
if (_current.IsDisposed) return PublishResult.Disposed;
Expand All @@ -66,6 +83,30 @@ private PublishResult TryPublish(Element newItem)
}
}

private PublishResult TryPublishCore<TState>(Func<T, TState, T> updateFunc, TState state)
{
if (_current.IsDisposed) return PublishResult.Disposed;

lock (_updateLock)
{
if (_current.IsDisposed) return PublishResult.Disposed;

var curr = _current;
var newItem = new Element(updateFunc(curr.Value, state));

if (curr.IsValid && newItem.IsValid && !_updateValidator(curr.Value, newItem.Value))
{
return PublishResult.InvalidUpdate;
}

Interlocked.Exchange(ref _current, newItem);
if (newItem.IsValid) _onPublished(newItem.Value);
curr.SetNext(newItem);

return PublishResult.Success;
}
}

public void Dispose()
{
if (_current.IsDisposed) return;
Expand Down
Loading
Loading