Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renamed TxName to FateOp #5273

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions core/src/main/java/org/apache/accumulo/core/fate/AdminUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,20 +74,20 @@ public static class TransactionStatus {
private final FateId fateId;
private final FateInstanceType instanceType;
private final TStatus status;
private final Fate.FateOperation txName;
private final Fate.FateOperation fateOp;
private final List<String> hlocks;
private final List<String> wlocks;
private final String top;
private final long timeCreated;

private TransactionStatus(FateId fateId, FateInstanceType instanceType, TStatus status,
Fate.FateOperation txName, List<String> hlocks, List<String> wlocks, String top,
Fate.FateOperation fateOp, List<String> hlocks, List<String> wlocks, String top,
Long timeCreated) {

this.fateId = fateId;
this.instanceType = instanceType;
this.status = status;
this.txName = txName;
this.fateOp = fateOp;
this.hlocks = Collections.unmodifiableList(hlocks);
this.wlocks = Collections.unmodifiableList(wlocks);
this.top = top;
Expand All @@ -114,8 +114,8 @@ public TStatus getStatus() {
/**
* @return The name of the transaction running.
*/
public Fate.FateOperation getTxName() {
return txName;
public Fate.FateOperation getfateOp() {
return fateOp;
}

/**
Expand Down Expand Up @@ -365,8 +365,8 @@ public static <T> FateStatus getTransactionStatus(

ReadOnlyFateTxStore<T> txStore = store.read(fateId);
// tx name will not be set if the tx is not seeded with work (it is NEW)
Fate.FateOperation txName = txStore.getTransactionInfo(Fate.TxInfo.TX_NAME) == null ? null
: ((Fate.FateOperation) txStore.getTransactionInfo(Fate.TxInfo.TX_NAME));
Fate.FateOperation fateOp = txStore.getTransactionInfo(Fate.TxInfo.FATE_OP) == null ? null
: ((Fate.FateOperation) txStore.getTransactionInfo(Fate.TxInfo.FATE_OP));

List<String> hlocks = heldLocks.remove(fateId);

Expand All @@ -392,7 +392,7 @@ public static <T> FateStatus getTransactionStatus(

if (includeByStatus(status, statusFilter) && includeByFateId(fateId, fateIdFilter)
&& includeByInstanceType(fateId.getType(), typesFilter)) {
statuses.add(new TransactionStatus(fateId, type, status, txName, hlocks, wlocks, top,
statuses.add(new TransactionStatus(fateId, type, status, fateOp, hlocks, wlocks, top,
timeCreated));
}
});
Expand Down Expand Up @@ -429,7 +429,7 @@ public void print(Map<FateInstanceType,ReadOnlyFateStore<T>> readOnlyFateStores,
for (TransactionStatus txStatus : fateStatus.getTransactions()) {
fmt.format(
"%-15s fateId: %s status: %-18s locked: %-15s locking: %-15s op: %-15s created: %s%n",
txStatus.getTxName(), txStatus.getFateId(), txStatus.getStatus(), txStatus.getHeldLocks(),
txStatus.getfateOp(), txStatus.getFateId(), txStatus.getStatus(), txStatus.getHeldLocks(),
txStatus.getWaitingLocks(), txStatus.getTop(), txStatus.getTimeCreatedFormatted());
}
fmt.format(" %s transactions", fateStatus.getTransactions().size());
Expand Down
10 changes: 5 additions & 5 deletions core/src/main/java/org/apache/accumulo/core/fate/Fate.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ public class Fate<T> {
private final ConcurrentLinkedQueue<Integer> idleCountHistory = new ConcurrentLinkedQueue<>();

public enum TxInfo {
TX_NAME, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
FATE_OP, AUTO_CLEAN, EXCEPTION, TX_AGEOFF, RETURN_VALUE
}

public enum FateOperation {
Expand Down Expand Up @@ -493,17 +493,17 @@ public FateId startTransaction() {
return store.create();
}

public void seedTransaction(FateOperation txName, FateKey fateKey, Repo<T> repo,
public void seedTransaction(FateOperation fateOp, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp) {
store.seedTransaction(txName, fateKey, repo, autoCleanUp);
store.seedTransaction(fateOp, fateKey, repo, autoCleanUp);
}

// start work in the transaction.. it is safe to call this
// multiple times for a transaction... but it will only seed once
public void seedTransaction(FateOperation txName, FateId fateId, Repo<T> repo,
public void seedTransaction(FateOperation fateOp, FateId fateId, Repo<T> repo,
boolean autoCleanUp, String goalMessage) {
log.info("Seeding {} {}", fateId, goalMessage);
store.seedTransaction(txName, fateId, repo, autoCleanUp);
store.seedTransaction(fateOp, fateId, repo, autoCleanUp);
}

// check on the transaction
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ public interface FateStore<T> extends ReadOnlyFateStore<T> {
* empty optional otherwise. If there was a failure this could return an empty optional
* when it actually succeeded.
*/
Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey, Repo<T> repo,
Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp);

/**
Expand All @@ -84,7 +84,7 @@ Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey, Rep
* failures. When there are no failures returns true if seeded and false otherwise. If
* there was a failure this could return false when it actually succeeded.
*/
boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T> repo,
boolean seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T> repo,
boolean autoCleanUp);

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ public FateMutator<T> putUnreserveTx(FateStore.FateReservation reservation) {

@Override
public FateMutator<T> putName(byte[] data) {
TxInfoColumnFamily.TX_NAME_COLUMN.put(mutation, new Value(data));
TxInfoColumnFamily.FATE_OP_COLUMN.put(mutation, new Value(data));
return this;
}

Expand Down Expand Up @@ -161,7 +161,7 @@ public FateMutator<T> putAgeOff(byte[] data) {
@Override
public FateMutator<T> putTxInfo(TxInfo txInfo, byte[] data) {
switch (txInfo) {
case TX_NAME:
case FATE_OP:
putName(data);
break;
case AUTO_CLEAN:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,33 +132,33 @@ public FateId getFateId() {
}

@Override
public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey, Repo<T> repo,
public Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp) {
final var fateId = fateIdGenerator.fromTypeAndKey(type(), fateKey);
Supplier<FateMutator<T>> mutatorFactory = () -> newMutator(fateId).requireAbsent()
.putKey(fateKey).putCreateTime(System.currentTimeMillis());
if (seedTransaction(mutatorFactory, fateKey + " " + fateId, txName, repo, autoCleanUp)) {
if (seedTransaction(mutatorFactory, fateKey + " " + fateId, fateOp, repo, autoCleanUp)) {
return Optional.of(fateId);
} else {
return Optional.empty();
}
}

@Override
public boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T> repo,
public boolean seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T> repo,
boolean autoCleanUp) {
Supplier<FateMutator<T>> mutatorFactory =
() -> newMutator(fateId).requireStatus(TStatus.NEW).requireUnreserved().requireAbsentKey();
return seedTransaction(mutatorFactory, fateId.canonical(), txName, repo, autoCleanUp);
return seedTransaction(mutatorFactory, fateId.canonical(), fateOp, repo, autoCleanUp);
}

private boolean seedTransaction(Supplier<FateMutator<T>> mutatorFactory, String logId,
Fate.FateOperation txName, Repo<T> repo, boolean autoCleanUp) {
Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp) {
int maxAttempts = 5;
for (int attempt = 0; attempt < maxAttempts; attempt++) {
var mutator = mutatorFactory.get();
mutator =
mutator.putName(serializeTxInfo(txName)).putRepo(1, repo).putStatus(TStatus.SUBMITTED);
mutator.putName(serializeTxInfo(fateOp)).putRepo(1, repo).putStatus(TStatus.SUBMITTED);
if (autoCleanUp) {
mutator = mutator.putAutoClean(serializeTxInfo(autoCleanUp));
}
Expand Down Expand Up @@ -419,8 +419,8 @@ public Serializable getTransactionInfo(TxInfo txInfo) {

final ColumnFQ cq;
switch (txInfo) {
case TX_NAME:
cq = TxInfoColumnFamily.TX_NAME_COLUMN;
case FATE_OP:
cq = TxInfoColumnFamily.FATE_OP_COLUMN;
break;
case AUTO_CLEAN:
cq = TxInfoColumnFamily.AUTO_CLEAN_COLUMN;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ public static class TxInfoColumnFamily {
public static final String STR_NAME = "txinfo";
public static final Text NAME = new Text(STR_NAME);

public static final String TX_NAME = "txname";
public static final ColumnFQ TX_NAME_COLUMN = new ColumnFQ(NAME, new Text(TX_NAME));
public static final String FATE_OP = "fateOp";
public static final ColumnFQ FATE_OP_COLUMN = new ColumnFQ(NAME, new Text(FATE_OP));

public static final String AUTO_CLEAN = "autoclean";
public static final ColumnFQ AUTO_CLEAN_COLUMN = new ColumnFQ(NAME, new Text(AUTO_CLEAN));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,11 +181,11 @@ private Optional<FateTxStore<T>> createAndReserve(FateKey fateKey) {
}

@Override
public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey, Repo<T> repo,
public Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey, Repo<T> repo,
boolean autoCleanUp) {
return createAndReserve(fateKey).map(txStore -> {
try {
seedTransaction(txName, repo, autoCleanUp, txStore);
seedTransaction(fateOp, repo, autoCleanUp, txStore);
return txStore.getID();
} finally {
txStore.unreserve(Duration.ZERO);
Expand All @@ -194,12 +194,12 @@ public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateK
}

@Override
public boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T> repo,
public boolean seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T> repo,
boolean autoCleanUp) {
return tryReserve(fateId).map(txStore -> {
try {
if (txStore.getStatus() == NEW) {
seedTransaction(txName, repo, autoCleanUp, txStore);
seedTransaction(fateOp, repo, autoCleanUp, txStore);
return true;
}
return false;
Expand All @@ -209,7 +209,7 @@ public boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T>
}).orElse(false);
}

private void seedTransaction(Fate.FateOperation txName, Repo<T> repo, boolean autoCleanUp,
private void seedTransaction(Fate.FateOperation fateOp, Repo<T> repo, boolean autoCleanUp,
FateTxStore<T> txStore) {
if (txStore.top() == null) {
try {
Expand All @@ -223,7 +223,7 @@ private void seedTransaction(Fate.FateOperation txName, Repo<T> repo, boolean au
if (autoCleanUp) {
txStore.setTransactionInfo(TxInfo.AUTO_CLEAN, autoCleanUp);
}
txStore.setTransactionInfo(TxInfo.TX_NAME, txName);
txStore.setTransactionInfo(TxInfo.FATE_OP, fateOp);
txStore.setStatus(SUBMITTED);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,9 +150,9 @@ public FateId create() {
}

@Override
public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey,
public Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey,
Repo<T> repo, boolean autoCleanUp) {
var optional = store.seedTransaction(txName, fateKey, repo, autoCleanUp);
var optional = store.seedTransaction(fateOp, fateKey, repo, autoCleanUp);
if (storeLog.isTraceEnabled()) {
optional.ifPresentOrElse(fateId -> {
storeLog.trace("{} seeded {} {} {}", fateId, fateKey, toLogString.apply(repo),
Expand All @@ -166,9 +166,9 @@ public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateK
}

@Override
public boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<T> repo,
public boolean seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<T> repo,
boolean autoCleanUp) {
boolean seeded = store.seedTransaction(txName, fateId, repo, autoCleanUp);
boolean seeded = store.seedTransaction(fateOp, fateId, repo, autoCleanUp);
if (storeLog.isTraceEnabled()) {
storeLog.trace("{} {} {} {}", fateId, seeded ? "seeded" : "unable to seed",
toLogString.apply(repo), autoCleanUp);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,13 @@ public FateId create() {
}

@Override
public Optional<FateId> seedTransaction(Fate.FateOperation txName, FateKey fateKey,
public Optional<FateId> seedTransaction(Fate.FateOperation fateOp, FateKey fateKey,
Repo<String> repo, boolean autoCleanUp) {
return Optional.empty();
}

@Override
public boolean seedTransaction(Fate.FateOperation txName, FateId fateId, Repo<String> repo,
public boolean seedTransaction(Fate.FateOperation fateOp, FateId fateId, Repo<String> repo,
boolean autoCleanUp) {
return false;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public void gatherTxnStatus(AdminUtil.TransactionStatus txnStatus) {
}
String top = txnStatus.getTop();
stepCounts.merge(Objects.requireNonNullElse(top, "?"), 1, Integer::sum);
Fate.FateOperation runningRepo = txnStatus.getTxName();
Fate.FateOperation runningRepo = txnStatus.getfateOp();

cmdCounts.merge(runningRepo == null ? "?" : runningRepo.name(), 1, Integer::sum);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public class FateTxnDetails implements Comparable<FateTxnDetails> {

private long running;
private String status = "?";
private String txName = "?";
private String fateOp = "?";
private String step = "?";
private String fateId = "?";
private List<String> locksHeld = List.of();
Expand Down Expand Up @@ -73,8 +73,8 @@ public FateTxnDetails(final long reportTime, final AdminUtil.TransactionStatus t
if (txnStatus.getTop() != null) {
step = txnStatus.getTop();
}
if (txnStatus.getTxName() != null) {
txName = txnStatus.getTxName().name();
if (txnStatus.getfateOp() != null) {
fateOp= txnStatus.getfateOp().name();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Run the maven command:
mvn clean package -DskipTests
to quickly format the project

}
if (txnStatus.getFateId() != null) {
fateId = txnStatus.getFateId().canonical();
Expand Down Expand Up @@ -103,8 +103,8 @@ public long getRunning() {
return running;
}

public String getTxName() {
return txName;
public String getFateOp() {
return fateOp;
}

public String getStep() {
Expand Down Expand Up @@ -167,7 +167,7 @@ public String toString() {
String hms = String.format("%d:%02d:%02d", elapsed.toHours(), elapsed.toMinutesPart(),
elapsed.toSecondsPart());

return hms + "\t" + fateId + "\t" + status + "\t" + txName + "\t" + step + "\theld:"
return hms + "\t" + fateId + "\t" + status + "\t" + fateOp + "\t" + step + "\theld:"
+ locksHeld.toString() + "\twaiting:" + locksWaiting.toString();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public void noTablenameReport() {
expect(status1.getTimeCreated()).andReturn(now - TimeUnit.DAYS.toMillis(1)).anyTimes();
expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes();
expect(status1.getTop()).andReturn(null).anyTimes();
expect(status1.getTxName()).andReturn(null).anyTimes();
expect(status1.getfateOp()).andReturn(null).anyTimes();
expect(status1.getFateId()).andReturn(FateId.from("FATE:USER:" + UUID.randomUUID())).anyTimes();
expect(status1.getHeldLocks()).andReturn(List.of()).anyTimes();
expect(status1.getWaitingLocks()).andReturn(List.of()).anyTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ void orderingByDuration() {
expect(status1.getTimeCreated()).andReturn(now - TimeUnit.DAYS.toMillis(1)).anyTimes();
expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes();
expect(status1.getTop()).andReturn("step1").anyTimes();
expect(status1.getTxName()).andReturn(Fate.FateOperation.TABLE_CREATE).anyTimes();
expect(status1.getfateOp()).andReturn(Fate.FateOperation.TABLE_CREATE).anyTimes();
expect(status1.getFateId()).andReturn(FateId.from("FATE:USER:" + uuid1)).anyTimes();
expect(status1.getHeldLocks()).andReturn(List.of()).anyTimes();
expect(status1.getWaitingLocks()).andReturn(List.of()).anyTimes();
Expand All @@ -67,7 +67,7 @@ void orderingByDuration() {
expect(status2.getTimeCreated()).andReturn(now - TimeUnit.DAYS.toMillis(7)).anyTimes();
expect(status2.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes();
expect(status2.getTop()).andReturn("step2").anyTimes();
expect(status2.getTxName()).andReturn(Fate.FateOperation.TABLE_DELETE).anyTimes();
expect(status2.getfateOp()).andReturn(Fate.FateOperation.TABLE_DELETE).anyTimes();
expect(status2.getFateId()).andReturn(FateId.from("FATE:USER:" + uuid2)).anyTimes();
expect(status2.getHeldLocks()).andReturn(List.of()).anyTimes();
expect(status2.getWaitingLocks()).andReturn(List.of()).anyTimes();
Expand Down Expand Up @@ -101,7 +101,7 @@ public void lockTest() {
expect(status1.getTimeCreated()).andReturn(now - TimeUnit.DAYS.toMillis(1)).anyTimes();
expect(status1.getStatus()).andReturn(ReadOnlyFateStore.TStatus.IN_PROGRESS).anyTimes();
expect(status1.getTop()).andReturn("step1").anyTimes();
expect(status1.getTxName()).andReturn(Fate.FateOperation.TABLE_COMPACT).anyTimes();
expect(status1.getfateOp()).andReturn(Fate.FateOperation.TABLE_COMPACT).anyTimes();
expect(status1.getFateId()).andReturn(FateId.from("FATE:USER:" + UUID.randomUUID())).anyTimes();
// incomplete lock info (W unknown ns id, no table))
expect(status1.getHeldLocks()).andReturn(List.of("R:1", "R:2", "W:a")).anyTimes();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public Map<String,Long> getOpTypeCounters() {

// incr count for op type for for in_progress transactions.
if (ReadOnlyFateStore.TStatus.IN_PROGRESS.equals(tx.getStatus())) {
Fate.FateOperation opType = tx.getTxName();
Fate.FateOperation opType = tx.getfateOp();
String opTypeStr = opType == null ? "UNKNOWN" : opType.name();
opTypeCounters.merge(opTypeStr, 1L, Long::sum);
}
Expand Down
4 changes: 2 additions & 2 deletions test/src/main/java/org/apache/accumulo/test/fate/FateIT.java
Original file line number Diff line number Diff line change
Expand Up @@ -532,14 +532,14 @@ protected void testNoWriteAfterDelete(FateStore<TestEnv> store, ServerContext sc
assertDoesNotThrow(() -> txStore.push(repo));
assertDoesNotThrow(() -> txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL));
assertDoesNotThrow(txStore::pop);
assertDoesNotThrow(() -> txStore.setTransactionInfo(Fate.TxInfo.TX_NAME, "name"));
assertDoesNotThrow(() -> txStore.setTransactionInfo(Fate.TxInfo.FATE_OP, "name"));
assertDoesNotThrow(txStore::delete);

// test that all write ops result in an exception since the tx has been deleted
assertThrows(Exception.class, () -> txStore.push(repo));
assertThrows(Exception.class, () -> txStore.setStatus(ReadOnlyFateStore.TStatus.SUCCESSFUL));
assertThrows(Exception.class, txStore::pop);
assertThrows(Exception.class, () -> txStore.setTransactionInfo(Fate.TxInfo.TX_NAME, "name"));
assertThrows(Exception.class, () -> txStore.setTransactionInfo(Fate.TxInfo.FATE_OP, "name"));
assertThrows(Exception.class, txStore::delete);
}

Expand Down
Loading
Loading