From a3a37f35199b87b85e97435c1d2bae6ceaa7e9ba Mon Sep 17 00:00:00 2001 From: Benedict Elliott Smith Date: Sat, 1 Feb 2025 17:42:21 +0000 Subject: [PATCH] Refactor RedundantStatus to encode vector of states that can be merged independently Also fix: - Truncate command on first access, without participants - Use Ballot.ZERO when invoking CFK.insertOutOfRange where appropriate - Don't supply a command's own route to ProgressLog.waiting to ensure new keys are incorporated - Ensure progress in CommandsForKey by setting vestigial commands to ERASED - Add any missing owned keys to StoreParticipants.route to ensure fetch can make progress - Recovery must wait for earlier not-accepted transactions if either has the privileged coordinator optimisation - Inclusive SyncPoint used incorrect topologies for propose phase - Barrier must not register local listener without up-to-date topology information - Stop home shard truncating a TxnId to vestigial rather than Invalidated so other shards can make progress Also improve: - Validate commands are constructed with non-empty participants - Remove some unnecessary synchronized keywords - Clear ok messages on PreAccept and Accept to free up memory - Introduce TxnId.Cardinality flag so we can optimise single key queries - Update CommandsForKey serialization to better handle larger flag space - Configurable which Txn.Kind can result in a CommandStore being marked stale - Process DefaultProgressLog queue synchronously when relevant state is resident in memory - Remove defunct CollectMaxApplied version of ListStore bootstrap - Standardise linearizability violation reporting - Improve CommandStore.execute method naming to reduce chance of misuse - Prune and address some comments patch by Benedict; reviewed by Alex Petrov for CASSANDRA-20282 --- modules/accord | 2 +- .../db/virtual/AccordDebugKeyspace.java | 8 +-- .../service/accord/AccordCommandStore.java | 4 +- .../service/accord/AccordMessageSink.java | 22 +++---- .../service/accord/AccordObjectSizes.java | 10 +-- .../service/accord/AccordService.java | 3 +- .../service/accord/api/AccordAgent.java | 9 +-- .../serializers/CommandSerializers.java | 63 ++++++++++++++----- .../serializers/CommandStoreSerializers.java | 20 +++--- .../db/virtual/AccordDebugKeyspaceTest.java | 4 +- .../index/accord/RouteIndexTest.java | 4 +- .../accord/AccordJournalOrderTest.java | 8 ++- .../service/accord/CommandChangeTest.java | 1 - .../CommandsForKeySerializerTest.java | 8 +-- .../cassandra/utils/AccordGenerators.java | 14 ++--- 15 files changed, 109 insertions(+), 71 deletions(-) diff --git a/modules/accord b/modules/accord index 78ab7eef904e..cd7f49564a5a 160000 --- a/modules/accord +++ b/modules/accord @@ -1 +1 @@ -Subproject commit 78ab7eef904ef549d0d7a34332b83d6110e0762d +Subproject commit cd7f49564a5ad053286453d10f8cd46b8d870c4f diff --git a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java index 757fb9488f2e..ff8159b63239 100644 --- a/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java +++ b/src/java/org/apache/cassandra/db/virtual/AccordDebugKeyspace.java @@ -472,11 +472,11 @@ public DataSet data() ds.row(storeId, decompose(start), decompose(end)) .column("start_ownership_epoch", entry.startOwnershipEpoch) .column("end_ownership_epoch", entry.endOwnershipEpoch) - .column("locally_applied_or_invalidated_before", entry.locallyAppliedOrInvalidatedBefore.toString()) - .column("locally_decided_and_applied_or_invalidated_before", entry.locallyDecidedAndAppliedOrInvalidatedBefore.toString()) - .column("shard_applied_or_invalidated_before", entry.shardAppliedOrInvalidatedBefore.toString()) + .column("locally_applied_before", entry.locallyAppliedBefore.toString()) + .column("locally_decided_and_applied_before", entry.locallyDecidedAndAppliedBefore.toString()) + .column("shard_applied_before", entry.shardAppliedBefore.toString()) .column("gc_before", entry.gcBefore.toString()) - .column("shard_only_applied_or_invalidated_before", entry.shardOnlyAppliedOrInvalidatedBefore.toString()) + .column("shard_only_applied_before", entry.shardOnlyAppliedBefore.toString()) .column("bootstrapped_at", entry.bootstrappedAt.toString()) .column("stale_until_at_least", entry.staleUntilAtLeast != null ? entry.staleUntilAtLeast.toString() : null); return ds; diff --git a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java index 7fc135da588a..c87cd41bafdd 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java +++ b/src/java/org/apache/cassandra/service/accord/AccordCommandStore.java @@ -309,7 +309,7 @@ public long nextSystemTimestampMicros() return lastSystemTimestampMicros; } @Override - public AsyncChain submit(PreLoadContext loadCtx, Function function) + public AsyncChain build(PreLoadContext loadCtx, Function function) { return AccordTask.create(this, loadCtx, function).chain(); } @@ -336,7 +336,7 @@ ProgressLog progressLog() } @Override - public AsyncChain execute(PreLoadContext preLoadContext, Consumer consumer) + public AsyncChain build(PreLoadContext preLoadContext, Consumer consumer) { return AccordTask.create(this, preLoadContext, consumer).chain(); } diff --git a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java index e5d897ed3bfd..cf8f8c7515a8 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java +++ b/src/java/org/apache/cassandra/service/accord/AccordMessageSink.java @@ -71,13 +71,13 @@ public class AccordMessageSink implements MessageSink public static final class AccordMessageType extends MessageType { - public static final AccordMessageType INTEROP_READ_REQ = remote("INTEROP_READ_REQ", false); - public static final AccordMessageType INTEROP_READ_RSP = remote("INTEROP_READ_RSP", false); - public static final AccordMessageType INTEROP_STABLE_THEN_READ_REQ = remote("INTEROP_STABLE_THEN_READ_REQ", false); - public static final AccordMessageType INTEROP_READ_REPAIR_REQ = remote("INTEROP_READ_REPAIR_REQ", false); - public static final AccordMessageType INTEROP_READ_REPAIR_RSP = remote("INTEROP_READ_REPAIR_RSP", false); - public static final AccordMessageType INTEROP_APPLY_MINIMAL_REQ = remote("INTEROP_APPLY_MINIMAL_REQ", true ); - public static final AccordMessageType INTEROP_APPLY_MAXIMAL_REQ = remote("INTEROP_APPLY_MAXIMAL_REQ", true ); + public static final AccordMessageType INTEROP_READ_REQ = remote("INTEROP_READ_REQ"); + public static final AccordMessageType INTEROP_READ_RSP = remote("INTEROP_READ_RSP"); + public static final AccordMessageType INTEROP_STABLE_THEN_READ_REQ = remote("INTEROP_STABLE_THEN_READ_REQ"); + public static final AccordMessageType INTEROP_READ_REPAIR_REQ = remote("INTEROP_READ_REPAIR_REQ"); + public static final AccordMessageType INTEROP_READ_REPAIR_RSP = remote("INTEROP_READ_REPAIR_RSP"); + public static final AccordMessageType INTEROP_APPLY_MINIMAL_REQ = remote("INTEROP_APPLY_MINIMAL_REQ"); + public static final AccordMessageType INTEROP_APPLY_MAXIMAL_REQ = remote("INTEROP_APPLY_MAXIMAL_REQ"); public static final List values; @@ -101,14 +101,14 @@ public static final class AccordMessageType extends MessageType values = builder.build(); } - protected static AccordMessageType remote(String name, boolean hasSideEffects) + protected static AccordMessageType remote(String name) { - return new AccordMessageType(name, REMOTE, hasSideEffects); + return new AccordMessageType(name, REMOTE); } - private AccordMessageType(String name, MessageType.Kind kind, boolean hasSideEffects) + private AccordMessageType(String name, MessageType.Kind kind) { - super(name, kind, hasSideEffects); + super(name, kind); } } diff --git a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java index c218ac65d8b0..03e168208c1d 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java +++ b/src/java/org/apache/cassandra/service/accord/AccordObjectSizes.java @@ -44,6 +44,7 @@ import accord.primitives.PartialKeyRoute; import accord.primitives.PartialRangeRoute; import accord.primitives.PartialTxn; +import accord.primitives.Participants; import accord.primitives.Range; import accord.primitives.RangeDeps; import accord.primitives.Ranges; @@ -290,8 +291,9 @@ private static class CommandEmptySizes private static ICommand attrs(boolean hasDeps, boolean hasTxn, boolean executes) { FullKeyRoute route = new FullKeyRoute(EMPTY_KEY, new RoutingKey[]{ EMPTY_KEY }); + Participants empty = route.slice(0, 0); ICommand.Builder builder = new ICommand.Builder(EMPTY_TXNID) - .setParticipants(StoreParticipants.empty(EMPTY_TXNID, route, !executes)) + .setParticipants(StoreParticipants.create(route, empty, executes ? empty : null, empty, route)) .durability(Status.Durability.NotDurable) .executeAt(EMPTY_TXNID) .promised(Ballot.ZERO); @@ -316,8 +318,8 @@ private static ICommand attrs(boolean hasDeps, boolean hasTxn, boolean executes) final static long ACCEPTED = measure(Command.Accepted.accepted(attrs(true, false, false), SaveStatus.AcceptedMedium)); final static long COMMITTED = measure(Command.Committed.committed(attrs(true, true, false), SaveStatus.Committed)); final static long EXECUTED = measure(Command.Executed.executed(attrs(true, true, true), SaveStatus.Applied)); - final static long TRUNCATED = measure(Command.Truncated.truncatedApply(attrs(false, false, false), SaveStatus.TruncatedApply, EMPTY_TXNID, null, null)); - final static long INVALIDATED = measure(Command.Truncated.invalidated(EMPTY_TXNID, StoreParticipants.empty(EMPTY_TXNID))); + final static long TRUNCATED = measure(Command.Truncated.truncated(attrs(false, false, false), SaveStatus.TruncatedApply, EMPTY_TXNID, null, null)); + final static long INVALIDATED = measure(Command.Truncated.invalidated(EMPTY_TXNID, attrs(false, false, false).participants())); private static long emptySize(Command command) { @@ -353,7 +355,7 @@ private static long emptySize(Command command) case Applied: return EXECUTED; case TruncatedApply: - case TruncatedApplyWithDeps: + case TruncatedUnapplied: case TruncatedApplyWithOutcome: case Vestigial: case Erased: diff --git a/src/java/org/apache/cassandra/service/accord/AccordService.java b/src/java/org/apache/cassandra/service/accord/AccordService.java index b4c3e636bf9e..a287a87d6fb3 100644 --- a/src/java/org/apache/cassandra/service/accord/AccordService.java +++ b/src/java/org/apache/cassandra/service/accord/AccordService.java @@ -173,6 +173,7 @@ import static accord.messages.SimpleReply.Ok; import static accord.primitives.Routable.Domain.Key; import static accord.primitives.Routable.Domain.Range; +import static accord.primitives.TxnId.Cardinality.cardinality; import static accord.utils.Invariants.require; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -818,7 +819,7 @@ private static Set txnDroppedTables(Seekables keys) @Override public @Nonnull AsyncTxnResult coordinateAsync(long minEpoch, @Nonnull Txn txn, @Nonnull ConsistencyLevel consistencyLevel, @Nonnull Dispatcher.RequestTime requestTime) { - TxnId txnId = node.nextTxnId(txn.kind(), txn.keys().domain()); + TxnId txnId = node.nextTxnId(txn.kind(), txn.keys().domain(), cardinality(txn.keys())); ClientRequestMetrics sharedMetrics; AccordClientRequestMetrics metrics; if (txn.isWrite()) diff --git a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java index 8ee59572092c..466914eae6f7 100644 --- a/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java +++ b/src/java/org/apache/cassandra/service/accord/api/AccordAgent.java @@ -20,6 +20,8 @@ import java.util.concurrent.TimeUnit; +import javax.annotation.Nullable; + import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -35,6 +37,7 @@ import accord.local.SafeCommandStore; import accord.messages.ReplyContext; import accord.primitives.Keys; +import accord.primitives.Participants; import accord.primitives.Ranges; import accord.primitives.Routable; import accord.primitives.Seekables; @@ -59,7 +62,6 @@ import static accord.primitives.Routable.Domain.Key; import static accord.primitives.Txn.Kind.Write; -import static accord.utils.Invariants.illegalState; import static java.util.concurrent.TimeUnit.MICROSECONDS; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.NANOSECONDS; @@ -259,9 +261,8 @@ public long expiresAt(ReplyContext replyContext, TimeUnit unit) } @Override - public void onViolation(String message) + public void onViolation(String message, Participants participants, @Nullable TxnId notWitnessed, @Nullable Timestamp notWitnessedExecuteAt, @Nullable TxnId by, @Nullable Timestamp byEexecuteAt) { - try { throw illegalState(message); } - catch (Throwable t) { logger.error("Consistency violation", t); } + logger.error(message); } } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java index 227e5c74f3ee..d5a80d7b262a 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandSerializers.java @@ -94,7 +94,12 @@ public static Timestamp deserialize(TxnId txnId, DataInputPlus in) throws IOExce long epoch = txnId.epoch(); if((flags & HAS_EPOCH) != 0) - epoch += in.readUnsignedVInt(); + { + long delta = in.readUnsignedVInt(); + if (delta == 0) + return Timestamp.NONE; + epoch += delta - 1; + } long hlc = txnId.hlc() + in.readUnsignedVInt(); Node.Id node = new Node.Id(in.readUnsignedVInt32()); @@ -108,8 +113,8 @@ public static void skip(TxnId txnId, DataInputPlus in) throws IOException int flags = in.readUnsignedVInt32(); if ((flags & 1) != 0) { - if ((flags & HAS_EPOCH) != 0) - in.readUnsignedVInt(); + if ((flags & HAS_EPOCH) != 0 && in.readUnsignedVInt() == 0) + return; in.readUnsignedVInt(); in.readUnsignedVInt32(); if ((flags & HAS_UNIQUE_HLC) != 0) @@ -124,7 +129,14 @@ public static void serialize(TxnId txnId, Timestamp executeAt, DataOutputPlus ou if ((flags & 1) != 0) { if ((flags & HAS_EPOCH) != 0) - out.writeUnsignedVInt(executeAt.epoch() - txnId.epoch()); + { + if (executeAt.equals(Timestamp.NONE)) + { + out.writeUnsignedVInt(0L); + return; + } + out.writeUnsignedVInt(1 + executeAt.epoch() - txnId.epoch()); + } out.writeUnsignedVInt(executeAt.hlc() - txnId.hlc()); out.writeUnsignedVInt32(executeAt.node.id); if ((flags & HAS_UNIQUE_HLC) != 0) @@ -152,7 +164,12 @@ public static long serializedSize(TxnId txnId, Timestamp executeAt) if ((flags & 1) != 0) { if ((flags & HAS_EPOCH) != 0) + { + if (executeAt.equals(Timestamp.NONE)) + return size + TypeSizes.sizeofUnsignedVInt(0L); + size += TypeSizes.sizeofUnsignedVInt(executeAt.epoch() - txnId.epoch()); + } size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc() - txnId.hlc()); size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id); if ((flags & HAS_UNIQUE_HLC) != 0) @@ -176,10 +193,13 @@ private static Timestamp deserialize(DataInputPlus in, boolean nullable) throws int flags = in.readUnsignedVInt32(); if (nullable) { - if ((flags & 1) != 0) return null; - flags >>>= 1; + if (flags == 0) return null; + flags--; } long epoch = in.readUnsignedVInt(); + if (epoch-- == 0) + return Timestamp.NONE; + long hlc = in.readUnsignedVInt(); Node.Id node = new Node.Id(in.readUnsignedVInt32()); if ((flags & HAS_UNIQUE_HLC) == 0) @@ -206,11 +226,12 @@ private static void skip(DataInputPlus in, boolean nullable) throws IOException int flags = in.readUnsignedVInt32(); if (nullable) { - if ((flags & 1) != 0) + if (flags == 0) return; - flags >>>= 1; + flags--; } - in.readUnsignedVInt(); + if (0 == in.readUnsignedVInt()) + return; in.readUnsignedVInt(); in.readUnsignedVInt32(); if ((flags & HAS_UNIQUE_HLC) != 0) @@ -235,9 +256,13 @@ private static void serialize(Timestamp executeAt, DataOutputPlus out, boolean n { Invariants.require(nullable); } + else if (executeAt.equals(Timestamp.NONE)) + { + out.writeUnsignedVInt(0L); + } else { - out.writeUnsignedVInt(executeAt.epoch()); + out.writeUnsignedVInt(1 + executeAt.epoch()); out.writeUnsignedVInt(executeAt.hlc()); out.writeUnsignedVInt32(executeAt.node.id); if (executeAt.hasDistinctHlcAndUniqueHlc()) @@ -264,11 +289,15 @@ private static long serializedSize(Timestamp executeAt, boolean nullable) Invariants.require(nullable); return size; } - size += TypeSizes.sizeofUnsignedVInt(executeAt.epoch()); - size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc()); - size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id); - if (executeAt.hasDistinctHlcAndUniqueHlc()) - size += TypeSizes.sizeofUnsignedVInt(executeAt.uniqueHlc() - executeAt.hlc()); + if (executeAt.equals(Timestamp.NONE)) size += TypeSizes.sizeofUnsignedVInt(0); + else + { + size += TypeSizes.sizeofUnsignedVInt(1 + executeAt.epoch()); + size += TypeSizes.sizeofUnsignedVInt(executeAt.hlc()); + size += TypeSizes.sizeofUnsignedVInt(executeAt.node.id); + if (executeAt.hasDistinctHlcAndUniqueHlc()) + size += TypeSizes.sizeofUnsignedVInt(executeAt.uniqueHlc() - executeAt.hlc()); + } return size; } @@ -277,7 +306,7 @@ private static int flags(Timestamp executeAt, boolean nullable) if (executeAt == null) { Invariants.require(nullable); - return 1; + return 0; } int flags = executeAt.flags() << 2; @@ -286,7 +315,7 @@ private static int flags(Timestamp executeAt, boolean nullable) if (executeAt.hasDistinctHlcAndUniqueHlc()) flags |= HAS_UNIQUE_HLC; if (nullable) - flags <<= 1; + flags++; return flags; } } diff --git a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java index 1a1a767176b1..9fa9333defb7 100644 --- a/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java +++ b/src/java/org/apache/cassandra/service/accord/serializers/CommandStoreSerializers.java @@ -142,11 +142,11 @@ public void serialize(RedundantBefore.Entry t, DataOutputPlus out, int version) out.writeUnsignedVInt(t.startOwnershipEpoch); if (t.endOwnershipEpoch == Long.MAX_VALUE) out.writeUnsignedVInt(0L); else out.writeUnsignedVInt(1 + t.endOwnershipEpoch - t.startOwnershipEpoch); - CommandSerializers.txnId.serialize(t.locallyWitnessedOrInvalidatedBefore, out, version); - CommandSerializers.txnId.serialize(t.locallyAppliedOrInvalidatedBefore, out, version); - CommandSerializers.txnId.serialize(t.locallyDecidedAndAppliedOrInvalidatedBefore, out, version); - CommandSerializers.txnId.serialize(t.shardOnlyAppliedOrInvalidatedBefore, out, version); - CommandSerializers.txnId.serialize(t.shardAppliedOrInvalidatedBefore, out, version); + CommandSerializers.txnId.serialize(t.locallyWitnessedBefore, out, version); + CommandSerializers.txnId.serialize(t.locallyAppliedBefore, out, version); + CommandSerializers.txnId.serialize(t.locallyDecidedAndAppliedBefore, out, version); + CommandSerializers.txnId.serialize(t.shardOnlyAppliedBefore, out, version); + CommandSerializers.txnId.serialize(t.shardAppliedBefore, out, version); CommandSerializers.txnId.serialize(t.gcBefore, out, version); CommandSerializers.txnId.serialize(t.bootstrappedAt, out, version); CommandSerializers.nullableTimestamp.serialize(t.staleUntilAtLeast, out, version); @@ -177,11 +177,11 @@ public long serializedSize(RedundantBefore.Entry t, int version) long size = KeySerializers.range.serializedSize(t.range, version); size += TypeSizes.sizeofUnsignedVInt(t.startOwnershipEpoch); size += TypeSizes.sizeofUnsignedVInt(t.endOwnershipEpoch == Long.MAX_VALUE ? 0 : 1 + t.endOwnershipEpoch - t.startOwnershipEpoch); - size += CommandSerializers.txnId.serializedSize(t.locallyWitnessedOrInvalidatedBefore, version); - size += CommandSerializers.txnId.serializedSize(t.locallyAppliedOrInvalidatedBefore, version); - size += CommandSerializers.txnId.serializedSize(t.locallyDecidedAndAppliedOrInvalidatedBefore, version); - size += CommandSerializers.txnId.serializedSize(t.shardOnlyAppliedOrInvalidatedBefore, version); - size += CommandSerializers.txnId.serializedSize(t.shardAppliedOrInvalidatedBefore, version); + size += CommandSerializers.txnId.serializedSize(t.locallyWitnessedBefore, version); + size += CommandSerializers.txnId.serializedSize(t.locallyAppliedBefore, version); + size += CommandSerializers.txnId.serializedSize(t.locallyDecidedAndAppliedBefore, version); + size += CommandSerializers.txnId.serializedSize(t.shardOnlyAppliedBefore, version); + size += CommandSerializers.txnId.serializedSize(t.shardAppliedBefore, version); size += CommandSerializers.txnId.serializedSize(t.gcBefore, version); size += CommandSerializers.txnId.serializedSize(t.bootstrappedAt, version); size += CommandSerializers.nullableTimestamp.serializedSize(t.staleUntilAtLeast, version); diff --git a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java index 7698fe764e9a..877ba587c425 100644 --- a/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java +++ b/test/unit/org/apache/cassandra/db/virtual/AccordDebugKeyspaceTest.java @@ -53,7 +53,7 @@ import org.apache.cassandra.utils.concurrent.Condition; import org.awaitility.Awaitility; -import static accord.primitives.TxnId.FastPath.UNOPTIMISED; +import static accord.primitives.TxnId.FastPath.Unoptimised; import static org.apache.cassandra.Util.spinUntilSuccess; import static org.apache.cassandra.service.accord.AccordTestUtils.createTxn; @@ -136,7 +136,7 @@ public void inflight() throws ExecutionException, InterruptedException public void blocked() throws ExecutionException, InterruptedException { ProtocolModifiers.Toggles.setPermitLocalExecution(false); - ProtocolModifiers.Toggles.setPermittedFastPaths(new TxnId.FastPaths(UNOPTIMISED)); + ProtocolModifiers.Toggles.setPermittedFastPaths(new TxnId.FastPaths(Unoptimised)); AccordMsgFilter filter = new AccordMsgFilter(); MessagingService.instance().outboundSink.add(filter); try diff --git a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java index ccabbcf265d6..ad9adfe33d3c 100644 --- a/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java +++ b/test/unit/org/apache/cassandra/index/accord/RouteIndexTest.java @@ -634,8 +634,8 @@ private static IAccordService.CompactionInfo emptyCompactionInfo() Int2ObjectHashMap durableBefores = new Int2ObjectHashMap<>(); Int2ObjectHashMap ranges = new Int2ObjectHashMap<>(); RedundantBefore redundantBefore = Mockito.spy(RedundantBefore.EMPTY); - Mockito.doReturn(RedundantStatus.LIVE).when(redundantBefore).status(Mockito.any(), Mockito.any(), (Participants) Mockito.any()); - Mockito.doReturn(RedundantStatus.LIVE).when(redundantBefore).status(Mockito.any(), Mockito.any(), (RoutingKey) Mockito.any()); + Mockito.doReturn(RedundantStatus.NONE).when(redundantBefore).status(Mockito.any(), Mockito.any(), (Participants) Mockito.any()); + Mockito.doReturn(RedundantStatus.NONE).when(redundantBefore).status(Mockito.any(), Mockito.any(), (RoutingKey) Mockito.any()); for (int i = 0; i < MAX_STORES; i++) { redundantBefores.put(i, redundantBefore); diff --git a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java index 45ff263646f0..4e376e49391c 100644 --- a/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java +++ b/test/unit/org/apache/cassandra/service/accord/AccordJournalOrderTest.java @@ -30,6 +30,8 @@ import accord.local.Command; import accord.local.StoreParticipants; import accord.primitives.Ballot; +import accord.primitives.Participants; +import accord.primitives.RoutingKeys; import accord.primitives.SaveStatus; import accord.primitives.Status; import accord.primitives.TxnId; @@ -39,13 +41,16 @@ import org.apache.cassandra.ServerTestUtils; import org.apache.cassandra.config.CassandraRelevantProperties; import org.apache.cassandra.config.DatabaseDescriptor; +import org.apache.cassandra.dht.Murmur3Partitioner; import org.apache.cassandra.io.util.File; import org.apache.cassandra.journal.TestParams; import org.apache.cassandra.schema.KeyspaceParams; import org.apache.cassandra.schema.Schema; +import org.apache.cassandra.schema.TableId; import org.apache.cassandra.schema.TableMetadata; import org.apache.cassandra.service.StorageService; import org.apache.cassandra.service.accord.api.AccordAgent; +import org.apache.cassandra.service.accord.api.AccordRoutingKey; import org.apache.cassandra.service.consensus.TransactionalMode; import org.apache.cassandra.utils.StorageCompatibilityMode; @@ -82,7 +87,8 @@ public void simpleKeyTest() TxnId txnId = randomSource.nextBoolean() ? id1 : id2; JournalKey key = new JournalKey(txnId, JournalKey.Type.COMMAND_DIFF, randomSource.nextInt(5)); res.compute(key, (k, prev) -> prev == null ? 1 : prev + 1); - Command command = Command.NotDefined.notDefined(txnId, SaveStatus.NotDefined, Status.Durability.NotDurable, StoreParticipants.empty(txnId), Ballot.ZERO); + Participants participants = RoutingKeys.of(new AccordRoutingKey.TokenKey(TableId.generate(), new Murmur3Partitioner.LongToken(1))); + Command command = Command.NotDefined.notDefined(txnId, SaveStatus.NotDefined, Status.Durability.NotDurable, StoreParticipants.create(null, participants, null, participants, participants), Ballot.ZERO); accordJournal.saveCommand(key.commandStoreId, new Journal.CommandUpdate(null, command), () -> {}); diff --git a/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java index b24a60a670b3..facb15357f0f 100644 --- a/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java +++ b/test/unit/org/apache/cassandra/service/accord/CommandChangeTest.java @@ -95,7 +95,6 @@ public void serde() SoftAssertions checks = new SoftAssertions(); for (SaveStatus saveStatus : SaveStatus.values()) { - if (saveStatus == SaveStatus.TruncatedApplyWithDeps) continue; out.clear(); Command orig = cmdBuilder.build(saveStatus); diff --git a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java index ddd5146462f6..f8c85ad544f5 100644 --- a/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java +++ b/test/unit/org/apache/cassandra/service/accord/serializers/CommandsForKeySerializerTest.java @@ -476,7 +476,7 @@ private static void testOne(long seed) } // TODO (expected): we currently don't explore TruncatedApply statuses because we don't transition through all phases and therefore don't adopt the Applied status - Choices saveStatusChoices = Choices.uniform(EnumSet.complementOf(EnumSet.of(SaveStatus.TruncatedApply, SaveStatus.TruncatedApplyWithOutcome, SaveStatus.TruncatedApplyWithDeps)).toArray(SaveStatus[]::new)); + Choices saveStatusChoices = Choices.uniform(EnumSet.complementOf(EnumSet.of(SaveStatus.TruncatedApply, SaveStatus.TruncatedUnapplied, SaveStatus.TruncatedApplyWithOutcome)).toArray(SaveStatus[]::new)); Supplier saveStatusSupplier = () -> { SaveStatus result = saveStatusChoices.choose(source); while (result.is(Status.Truncated)) // we don't currently process truncations @@ -559,7 +559,7 @@ private static void testOne(long seed) public void test() { var tableGen = AccordGenerators.fromQT(CassandraGenerators.TABLE_ID_GEN); - var txnIdGen = AccordGens.txnIds(rs -> rs.nextLong(0, 100), rs -> rs.nextLong(100), rs -> rs.nextInt(10)); + var txnIdGen = AccordGens.txnIds((Gen.LongGen) rs -> rs.nextLong(0, 100), rs -> rs.nextLong(100), rs -> rs.nextInt(10)); qt().check(rs -> { TableId table = tableGen.next(rs); TokenKey pk = new TokenKey(table, new Murmur3Partitioner.LongToken(rs.nextLong())); @@ -647,8 +647,8 @@ protected TestCommandStore() @Override public boolean inStore() { return true; } @Override public Journal.Loader loader() { throw new UnsupportedOperationException(); } @Override public Agent agent() { return this; } - @Override public AsyncChain execute(PreLoadContext context, Consumer consumer) { return null; } - @Override public AsyncChain submit(PreLoadContext context, Function apply) { throw new UnsupportedOperationException(); } + @Override public AsyncChain build(PreLoadContext context, Consumer consumer) { return null; } + @Override public AsyncChain build(PreLoadContext context, Function apply) { throw new UnsupportedOperationException(); } @Override public void shutdown() { } @Override protected void registerTransitive(SafeCommandStore safeStore, RangeDeps deps){ } @Override public AsyncChain submit(Callable task) { throw new UnsupportedOperationException(); } diff --git a/test/unit/org/apache/cassandra/utils/AccordGenerators.java b/test/unit/org/apache/cassandra/utils/AccordGenerators.java index 05c731f7dbc6..1b60d0a3b5ed 100644 --- a/test/unit/org/apache/cassandra/utils/AccordGenerators.java +++ b/test/unit/org/apache/cassandra/utils/AccordGenerators.java @@ -33,6 +33,7 @@ import com.google.common.collect.ImmutableSortedMap; import accord.local.Command; +import accord.local.Command.Truncated; import accord.local.ICommand; import accord.local.DurableBefore; import accord.local.RedundantBefore; @@ -245,8 +246,6 @@ public Command build(SaveStatus saveStatus) switch (saveStatus) { default: throw new AssertionError("Unhandled saveStatus: " + saveStatus); - case TruncatedApplyWithDeps: - throw new IllegalArgumentException("TruncatedApplyWithDeps is not a valid state for a Command to be in, its for FetchData"); case Uninitialised: case NotDefined: return Command.NotDefined.notDefined(attributes(saveStatus), Ballot.ZERO); @@ -285,17 +284,18 @@ public Command build(SaveStatus saveStatus) return Command.Executed.executed(attributes(saveStatus), saveStatus); case TruncatedApply: - if (txnId.kind().awaitsOnlyDeps()) return Command.Truncated.truncatedApply(attributes(saveStatus), saveStatus, executeAt, null, null, txnId); - else return Command.Truncated.truncatedApply(attributes(saveStatus), saveStatus, executeAt, null, null); + case TruncatedUnapplied: + if (txnId.kind().awaitsOnlyDeps()) return Truncated.truncated(attributes(saveStatus), saveStatus, executeAt, null, null, txnId); + else return Truncated.truncated(attributes(saveStatus), saveStatus, executeAt, null, null); case TruncatedApplyWithOutcome: - if (txnId.kind().awaitsOnlyDeps()) return Command.Truncated.truncatedApply(attributes(saveStatus), saveStatus, executeAt, txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges,new TxnWrite(Collections.emptyList(), true)) : null, new TxnData(), txnId); - else return Command.Truncated.truncatedApply(attributes(saveStatus), saveStatus, executeAt, txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges, new TxnWrite(Collections.emptyList(), true)) : null, new TxnData()); + if (txnId.kind().awaitsOnlyDeps()) return Truncated.truncated(attributes(saveStatus), saveStatus, executeAt, txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges,new TxnWrite(Collections.emptyList(), true)) : null, new TxnData(), txnId); + else return Truncated.truncated(attributes(saveStatus), saveStatus, executeAt, txnId.is(Write) ? new Writes(txnId, executeAt, keysOrRanges, new TxnWrite(Collections.emptyList(), true)) : null, new TxnData()); case Erased: case Vestigial: case Invalidated: - return Command.Truncated.invalidated(txnId, attributes(saveStatus).participants()); + return Truncated.invalidated(txnId, attributes(saveStatus).participants()); } } }