Skip to content

Commit

Permalink
Refactor RedundantStatus to encode vector of states that can be merge…
Browse files Browse the repository at this point in the history
…d independently

Also fix:
 - Truncate command on first access, without participants
 - Use Ballot.ZERO when invoking CFK.insertOutOfRange where appropriate
 - Don't supply a command's own route to ProgressLog.waiting to ensure new keys are incorporated
 - Ensure progress in CommandsForKey by setting vestigial commands to ERASED
 - Add any missing owned keys to StoreParticipants.route to ensure fetch can make progress
 - Recovery must wait for earlier not-accepted transactions if either has the privileged coordinator optimisation
 - Inclusive SyncPoint used incorrect topologies for propose phase
 - Barrier must not register local listener without up-to-date topology information
 - Stop home shard truncating a TxnId to vestigial rather than Invalidated so other shards can make progress
Also improve:
 - Validate commands are constructed with non-empty participants
 - Remove some unnecessary synchronized keywords
 - Clear ok messages on PreAccept and Accept to free up memory
 - Introduce TxnId.Cardinality flag so we can optimise single key queries
 - Update CommandsForKey serialization to better handle larger flag space
 - Configurable which Txn.Kind can result in a CommandStore being marked stale
 - Process DefaultProgressLog queue synchronously when relevant state is resident in memory
 - Remove defunct CollectMaxApplied version of ListStore bootstrap
 - Standardise linearizability violation reporting
 - Improve CommandStore.execute method naming to reduce chance of misuse
 - Prune and address some comments

patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20282
  • Loading branch information
belliottsmith committed Feb 3, 2025
1 parent 2936dd7 commit a3a37f3
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 71 deletions.
2 changes: 1 addition & 1 deletion modules/accord
Submodule accord updated 110 files
Original file line number Diff line number Diff line change
Expand Up @@ -472,11 +472,11 @@ public DataSet data()
ds.row(storeId, decompose(start), decompose(end))
.column("start_ownership_epoch", entry.startOwnershipEpoch)
.column("end_ownership_epoch", entry.endOwnershipEpoch)
.column("locally_applied_or_invalidated_before", entry.locallyAppliedOrInvalidatedBefore.toString())
.column("locally_decided_and_applied_or_invalidated_before", entry.locallyDecidedAndAppliedOrInvalidatedBefore.toString())
.column("shard_applied_or_invalidated_before", entry.shardAppliedOrInvalidatedBefore.toString())
.column("locally_applied_before", entry.locallyAppliedBefore.toString())
.column("locally_decided_and_applied_before", entry.locallyDecidedAndAppliedBefore.toString())
.column("shard_applied_before", entry.shardAppliedBefore.toString())
.column("gc_before", entry.gcBefore.toString())
.column("shard_only_applied_or_invalidated_before", entry.shardOnlyAppliedOrInvalidatedBefore.toString())
.column("shard_only_applied_before", entry.shardOnlyAppliedBefore.toString())
.column("bootstrapped_at", entry.bootstrappedAt.toString())
.column("stale_until_at_least", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null);
return ds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ public long nextSystemTimestampMicros()
return lastSystemTimestampMicros;
}
@Override
public <T> AsyncChain<T> submit(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
public <T> AsyncChain<T> build(PreLoadContext loadCtx, Function<? super SafeCommandStore, T> function)
{
return AccordTask.create(this, loadCtx, function).chain();
}
Expand All @@ -336,7 +336,7 @@ ProgressLog progressLog()
}

@Override
public AsyncChain<Void> execute(PreLoadContext preLoadContext, Consumer<? super SafeCommandStore> consumer)
public AsyncChain<Void> build(PreLoadContext preLoadContext, Consumer<? super SafeCommandStore> consumer)
{
return AccordTask.create(this, preLoadContext, consumer).chain();
}
Expand Down
22 changes: 11 additions & 11 deletions src/java/org/apache/cassandra/service/accord/AccordMessageSink.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,13 @@ public class AccordMessageSink implements MessageSink

public static final class AccordMessageType extends MessageType
{
public static final AccordMessageType INTEROP_READ_REQ = remote("INTEROP_READ_REQ", false);
public static final AccordMessageType INTEROP_READ_RSP = remote("INTEROP_READ_RSP", false);
public static final AccordMessageType INTEROP_STABLE_THEN_READ_REQ = remote("INTEROP_STABLE_THEN_READ_REQ", false);
public static final AccordMessageType INTEROP_READ_REPAIR_REQ = remote("INTEROP_READ_REPAIR_REQ", false);
public static final AccordMessageType INTEROP_READ_REPAIR_RSP = remote("INTEROP_READ_REPAIR_RSP", false);
public static final AccordMessageType INTEROP_APPLY_MINIMAL_REQ = remote("INTEROP_APPLY_MINIMAL_REQ", true );
public static final AccordMessageType INTEROP_APPLY_MAXIMAL_REQ = remote("INTEROP_APPLY_MAXIMAL_REQ", true );
public static final AccordMessageType INTEROP_READ_REQ = remote("INTEROP_READ_REQ");
public static final AccordMessageType INTEROP_READ_RSP = remote("INTEROP_READ_RSP");
public static final AccordMessageType INTEROP_STABLE_THEN_READ_REQ = remote("INTEROP_STABLE_THEN_READ_REQ");
public static final AccordMessageType INTEROP_READ_REPAIR_REQ = remote("INTEROP_READ_REPAIR_REQ");
public static final AccordMessageType INTEROP_READ_REPAIR_RSP = remote("INTEROP_READ_REPAIR_RSP");
public static final AccordMessageType INTEROP_APPLY_MINIMAL_REQ = remote("INTEROP_APPLY_MINIMAL_REQ");
public static final AccordMessageType INTEROP_APPLY_MAXIMAL_REQ = remote("INTEROP_APPLY_MAXIMAL_REQ");

public static final List<MessageType> values;

Expand All @@ -101,14 +101,14 @@ public static final class AccordMessageType extends MessageType
values = builder.build();
}

protected static AccordMessageType remote(String name, boolean hasSideEffects)
protected static AccordMessageType remote(String name)
{
return new AccordMessageType(name, REMOTE, hasSideEffects);
return new AccordMessageType(name, REMOTE);
}

private AccordMessageType(String name, MessageType.Kind kind, boolean hasSideEffects)
private AccordMessageType(String name, MessageType.Kind kind)
{
super(name, kind, hasSideEffects);
super(name, kind);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import accord.primitives.PartialKeyRoute;
import accord.primitives.PartialRangeRoute;
import accord.primitives.PartialTxn;
import accord.primitives.Participants;
import accord.primitives.Range;
import accord.primitives.RangeDeps;
import accord.primitives.Ranges;
Expand Down Expand Up @@ -290,8 +291,9 @@ private static class CommandEmptySizes
private static ICommand attrs(boolean hasDeps, boolean hasTxn, boolean executes)
{
FullKeyRoute route = new FullKeyRoute(EMPTY_KEY, new RoutingKey[]{ EMPTY_KEY });
Participants<?> empty = route.slice(0, 0);
ICommand.Builder builder = new ICommand.Builder(EMPTY_TXNID)
.setParticipants(StoreParticipants.empty(EMPTY_TXNID, route, !executes))
.setParticipants(StoreParticipants.create(route, empty, executes ? empty : null, empty, route))
.durability(Status.Durability.NotDurable)
.executeAt(EMPTY_TXNID)
.promised(Ballot.ZERO);
Expand All @@ -316,8 +318,8 @@ private static ICommand attrs(boolean hasDeps, boolean hasTxn, boolean executes)
final static long ACCEPTED = measure(Command.Accepted.accepted(attrs(true, false, false), SaveStatus.AcceptedMedium));
final static long COMMITTED = measure(Command.Committed.committed(attrs(true, true, false), SaveStatus.Committed));
final static long EXECUTED = measure(Command.Executed.executed(attrs(true, true, true), SaveStatus.Applied));
final static long TRUNCATED = measure(Command.Truncated.truncatedApply(attrs(false, false, false), SaveStatus.TruncatedApply, EMPTY_TXNID, null, null));
final static long INVALIDATED = measure(Command.Truncated.invalidated(EMPTY_TXNID, StoreParticipants.empty(EMPTY_TXNID)));
final static long TRUNCATED = measure(Command.Truncated.truncated(attrs(false, false, false), SaveStatus.TruncatedApply, EMPTY_TXNID, null, null));
final static long INVALIDATED = measure(Command.Truncated.invalidated(EMPTY_TXNID, attrs(false, false, false).participants()));

private static long emptySize(Command command)
{
Expand Down Expand Up @@ -353,7 +355,7 @@ private static long emptySize(Command command)
case Applied:
return EXECUTED;
case TruncatedApply:
case TruncatedApplyWithDeps:
case TruncatedUnapplied:
case TruncatedApplyWithOutcome:
case Vestigial:
case Erased:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@
import static accord.messages.SimpleReply.Ok;
import static accord.primitives.Routable.Domain.Key;
import static accord.primitives.Routable.Domain.Range;
import static accord.primitives.TxnId.Cardinality.cardinality;
import static accord.utils.Invariants.require;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down Expand Up @@ -818,7 +819,7 @@ private static Set<TableId> txnDroppedTables(Seekables<?,?> keys)
@Override
public @Nonnull AsyncTxnResult coordinateAsync(long minEpoch, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, @Nonnull Dispatcher.RequestTime requestTime)
{
TxnId txnId = node.nextTxnId(txn.kind(), txn.keys().domain());
TxnId txnId = node.nextTxnId(txn.kind(), txn.keys().domain(), cardinality(txn.keys()));
ClientRequestMetrics sharedMetrics;
AccordClientRequestMetrics metrics;
if (txn.isWrite())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.concurrent.TimeUnit;

import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -35,6 +37,7 @@
import accord.local.SafeCommandStore;
import accord.messages.ReplyContext;
import accord.primitives.Keys;
import accord.primitives.Participants;
import accord.primitives.Ranges;
import accord.primitives.Routable;
import accord.primitives.Seekables;
Expand All @@ -59,7 +62,6 @@

import static accord.primitives.Routable.Domain.Key;
import static accord.primitives.Txn.Kind.Write;
import static accord.utils.Invariants.illegalState;
import static java.util.concurrent.TimeUnit.MICROSECONDS;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down Expand Up @@ -259,9 +261,8 @@ public long expiresAt(ReplyContext replyContext, TimeUnit unit)
}

@Override
public void onViolation(String message)
public void onViolation(String message, Participants<?> participants, @Nullable TxnId notWitnessed, @Nullable Timestamp notWitnessedExecuteAt, @Nullable TxnId by, @Nullable Timestamp byEexecuteAt)
{
try { throw illegalState(message); }
catch (Throwable t) { logger.error("Consistency violation", t); }
logger.error(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,12 @@ public static Timestamp deserialize(TxnId txnId, DataInputPlus in) throws IOExce

long epoch = txnId.epoch();
if((flags & HAS_EPOCH) != 0)
epoch += in.readUnsignedVInt();
{
long delta = in.readUnsignedVInt();
if (delta == 0)
return Timestamp.NONE;
epoch += delta - 1;
}

long hlc = txnId.hlc() + in.readUnsignedVInt();
Node.Id node = new Node.Id(in.readUnsignedVInt32());
Expand All @@ -108,8 +113,8 @@ public static void skip(TxnId txnId, DataInputPlus in) throws IOException
int flags = in.readUnsignedVInt32();
if ((flags & 1) != 0)
{
if ((flags & HAS_EPOCH) != 0)
in.readUnsignedVInt();
if ((flags & HAS_EPOCH) != 0 && in.readUnsignedVInt() == 0)
return;
in.readUnsignedVInt();
in.readUnsignedVInt32();
if ((flags & HAS_UNIQUE_HLC) != 0)
Expand All @@ -124,7 +129,14 @@ public static void serialize(TxnId txnId, Timestamp executeAt, DataOutputPlus ou
if ((flags & 1) != 0)
{
if ((flags & HAS_EPOCH) != 0)
out.writeUnsignedVInt(executeAt.epoch() - txnId.epoch());
{
if (executeAt.equals(Timestamp.NONE))
{
out.writeUnsignedVInt(0L);
return;
}
out.writeUnsignedVInt(1 + executeAt.epoch() - txnId.epoch());
}
out.writeUnsignedVInt(executeAt.hlc() - txnId.hlc());
out.writeUnsignedVInt32(executeAt.node.id);
if ((flags & HAS_UNIQUE_HLC) != 0)
Expand Down Expand Up @@ -152,7 +164,12 @@ public static long serializedSize(TxnId txnId, Timestamp executeAt)
if ((flags & 1) != 0)
{
if ((flags & HAS_EPOCH) != 0)
{
if (executeAt.equals(Timestamp.NONE))
return size + TypeSizes.sizeofUnsignedVInt(0L);

size += TypeSizes.sizeofUnsignedVInt(executeAt.epoch() - txnId.epoch());
}
size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc() - txnId.hlc());
size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id);
if ((flags & HAS_UNIQUE_HLC) != 0)
Expand All @@ -176,10 +193,13 @@ private static Timestamp deserialize(DataInputPlus in, boolean nullable) throws
int flags = in.readUnsignedVInt32();
if (nullable)
{
if ((flags & 1) != 0) return null;
flags >>>= 1;
if (flags == 0) return null;
flags--;
}
long epoch = in.readUnsignedVInt();
if (epoch-- == 0)
return Timestamp.NONE;

long hlc = in.readUnsignedVInt();
Node.Id node = new Node.Id(in.readUnsignedVInt32());
if ((flags & HAS_UNIQUE_HLC) == 0)
Expand All @@ -206,11 +226,12 @@ private static void skip(DataInputPlus in, boolean nullable) throws IOException
int flags = in.readUnsignedVInt32();
if (nullable)
{
if ((flags & 1) != 0)
if (flags == 0)
return;
flags >>>= 1;
flags--;
}
in.readUnsignedVInt();
if (0 == in.readUnsignedVInt())
return;
in.readUnsignedVInt();
in.readUnsignedVInt32();
if ((flags & HAS_UNIQUE_HLC) != 0)
Expand All @@ -235,9 +256,13 @@ private static void serialize(Timestamp executeAt, DataOutputPlus out, boolean n
{
Invariants.require(nullable);
}
else if (executeAt.equals(Timestamp.NONE))
{
out.writeUnsignedVInt(0L);
}
else
{
out.writeUnsignedVInt(executeAt.epoch());
out.writeUnsignedVInt(1 + executeAt.epoch());
out.writeUnsignedVInt(executeAt.hlc());
out.writeUnsignedVInt32(executeAt.node.id);
if (executeAt.hasDistinctHlcAndUniqueHlc())
Expand All @@ -264,11 +289,15 @@ private static long serializedSize(Timestamp executeAt, boolean nullable)
Invariants.require(nullable);
return size;
}
size += TypeSizes.sizeofUnsignedVInt(executeAt.epoch());
size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc());
size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id);
if (executeAt.hasDistinctHlcAndUniqueHlc())
size += TypeSizes.sizeofUnsignedVInt(executeAt.uniqueHlc() - executeAt.hlc());
if (executeAt.equals(Timestamp.NONE)) size += TypeSizes.sizeofUnsignedVInt(0);
else
{
size += TypeSizes.sizeofUnsignedVInt(1 + executeAt.epoch());
size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc());
size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id);
if (executeAt.hasDistinctHlcAndUniqueHlc())
size += TypeSizes.sizeofUnsignedVInt(executeAt.uniqueHlc() - executeAt.hlc());
}
return size;
}

Expand All @@ -277,7 +306,7 @@ private static int flags(Timestamp executeAt, boolean nullable)
if (executeAt == null)
{
Invariants.require(nullable);
return 1;
return 0;
}

int flags = executeAt.flags() << 2;
Expand All @@ -286,7 +315,7 @@ private static int flags(Timestamp executeAt, boolean nullable)
if (executeAt.hasDistinctHlcAndUniqueHlc())
flags |= HAS_UNIQUE_HLC;
if (nullable)
flags <<= 1;
flags++;
return flags;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,11 @@ public void serialize(RedundantBefore.Entry t, DataOutputPlus out, int version)
out.writeUnsignedVInt(t.startOwnershipEpoch);
if (t.endOwnershipEpoch == Long.MAX_VALUE) out.writeUnsignedVInt(0L);
else out.writeUnsignedVInt(1 + t.endOwnershipEpoch - t.startOwnershipEpoch);
CommandSerializers.txnId.serialize(t.locallyWitnessedOrInvalidatedBefore, out, version);
CommandSerializers.txnId.serialize(t.locallyAppliedOrInvalidatedBefore, out, version);
CommandSerializers.txnId.serialize(t.locallyDecidedAndAppliedOrInvalidatedBefore, out, version);
CommandSerializers.txnId.serialize(t.shardOnlyAppliedOrInvalidatedBefore, out, version);
CommandSerializers.txnId.serialize(t.shardAppliedOrInvalidatedBefore, out, version);
CommandSerializers.txnId.serialize(t.locallyWitnessedBefore, out, version);
CommandSerializers.txnId.serialize(t.locallyAppliedBefore, out, version);
CommandSerializers.txnId.serialize(t.locallyDecidedAndAppliedBefore, out, version);
CommandSerializers.txnId.serialize(t.shardOnlyAppliedBefore, out, version);
CommandSerializers.txnId.serialize(t.shardAppliedBefore, out, version);
CommandSerializers.txnId.serialize(t.gcBefore, out, version);
CommandSerializers.txnId.serialize(t.bootstrappedAt, out, version);
CommandSerializers.nullableTimestamp.serialize(t.staleUntilAtLeast, out, version);
Expand Down Expand Up @@ -177,11 +177,11 @@ public long serializedSize(RedundantBefore.Entry t, int version)
long size = KeySerializers.range.serializedSize(t.range, version);
size += TypeSizes.sizeofUnsignedVInt(t.startOwnershipEpoch);
size += TypeSizes.sizeofUnsignedVInt(t.endOwnershipEpoch == Long.MAX_VALUE ? 0 : 1 + t.endOwnershipEpoch - t.startOwnershipEpoch);
size += CommandSerializers.txnId.serializedSize(t.locallyWitnessedOrInvalidatedBefore, version);
size += CommandSerializers.txnId.serializedSize(t.locallyAppliedOrInvalidatedBefore, version);
size += CommandSerializers.txnId.serializedSize(t.locallyDecidedAndAppliedOrInvalidatedBefore, version);
size += CommandSerializers.txnId.serializedSize(t.shardOnlyAppliedOrInvalidatedBefore, version);
size += CommandSerializers.txnId.serializedSize(t.shardAppliedOrInvalidatedBefore, version);
size += CommandSerializers.txnId.serializedSize(t.locallyWitnessedBefore, version);
size += CommandSerializers.txnId.serializedSize(t.locallyAppliedBefore, version);
size += CommandSerializers.txnId.serializedSize(t.locallyDecidedAndAppliedBefore, version);
size += CommandSerializers.txnId.serializedSize(t.shardOnlyAppliedBefore, version);
size += CommandSerializers.txnId.serializedSize(t.shardAppliedBefore, version);
size += CommandSerializers.txnId.serializedSize(t.gcBefore, version);
size += CommandSerializers.txnId.serializedSize(t.bootstrappedAt, version);
size += CommandSerializers.nullableTimestamp.serializedSize(t.staleUntilAtLeast, version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
import org.apache.cassandra.utils.concurrent.Condition;
import org.awaitility.Awaitility;

import static accord.primitives.TxnId.FastPath.UNOPTIMISED;
import static accord.primitives.TxnId.FastPath.Unoptimised;
import static org.apache.cassandra.Util.spinUntilSuccess;
import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn;

Expand Down Expand Up @@ -136,7 +136,7 @@ public void inflight() throws ExecutionException, InterruptedException
public void blocked() throws ExecutionException, InterruptedException
{
ProtocolModifiers.Toggles.setPermitLocalExecution(false);
ProtocolModifiers.Toggles.setPermittedFastPaths(new TxnId.FastPaths(UNOPTIMISED));
ProtocolModifiers.Toggles.setPermittedFastPaths(new TxnId.FastPaths(Unoptimised));
AccordMsgFilter filter = new AccordMsgFilter();
MessagingService.instance().outboundSink.add(filter);
try
Expand Down
Loading

0 comments on commit a3a37f3

Please sign in to comment.