Skip to content

Commit

Permalink
Fixes #2813: Fail fast when Batch GRV Rate Limit Exceeded
Browse files Browse the repository at this point in the history
Loading the metadata version key does not seem to throw
Batch GRV rate limit exceeded, while other operations on the same
transaciton do. Instead it stalls forever, and the tests end up
timing out at the junit level.
This change introduces a new combine that will fail when the loading
the record store state fails, instead of waiting forever for the
metadata version to be loaded.
  • Loading branch information
ScottDugas committed Jul 12, 2024
1 parent c90ef5e commit 427473e
Show file tree
Hide file tree
Showing 5 changed files with 108 additions and 6 deletions.
2 changes: 1 addition & 1 deletion docs/ReleaseNotes.md
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Starting with version [3.4.455.0](#344550), the semantics of `UnnestedRecordType
* **Bug fix** Fix 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Fix 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Fix 3 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Fix 4 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Bug fix** Fail fast when Batch GRV rate limit exceeded [(Issue #2813)](https://github.com/FoundationDB/fdb-record-layer/issues/2813)
* **Bug fix** Fix 5 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Performance** Improvement 1 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
* **Performance** Improvement 2 [(Issue #NNN)](https://github.com/FoundationDB/fdb-record-layer/issues/NNN)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,38 @@ private static RuntimeException getRuntimeException(@Nonnull Throwable exception
return exceptionMapper == null ? new RuntimeException(exception) : exceptionMapper.apply(exception);
}

/**
* Combine the results of two futures, but fail fast if either future fails.
* <p>
* This has the same behavior as {@link CompletableFuture#thenCombine}, except, if either future fails, it won't
* wait for the other one to complete before completing the result with the failure.
* </p>
* @param future1 one future
* @param future2 another future
* @param combiner a function to combine the results of both {@code future1} and {@code future2} into a single result.
*
* @param <T> the result type for {@code future1}
* @param <U> the result type for {@code future2}
* @param <R> the result type for the returned future
*
* @return a future that fails with one of the exceptions from {@code future1} or {@code future2} if either of those
* failed, or the result of applying {@code combiner} to their results if both succeeded.
*/
public static <T, U, R> CompletableFuture<R> combineAndFailFast(CompletableFuture<T> future1,
CompletableFuture<U> future2,
BiFunction<T, U, R> combiner) {
return CompletableFuture.anyOf(future1, future2).thenCompose(vignore -> {
// We know at least one of them completed
if (future1.isDone() && future1.isCompletedExceptionally()) {
return future1.thenApply(vignore1 -> null);
}
if (future2.isDone() && future2.isCompletedExceptionally()) {
return future2.thenApply(vignore2 -> null);
}
return future1.thenCombine(future2, combiner);
});
}

/**
* A {@code Boolean} function that is always true.
* @param <T> the type of the (ignored) argument to the function
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,22 @@

import com.apple.foundationdb.test.TestExecutors;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

import java.util.Arrays;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Stream;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;

/**
Expand Down Expand Up @@ -103,4 +112,61 @@ public void manyParallelNaive() {
long end = System.currentTimeMillis();
assertTrue(end - start >= 100, "Delay was not long enough");
}

enum FutureBehavior {
SucceedInstantly(true, false, CompletableFuture::completedFuture),
SucceedSlowly(true, false, result -> MoreAsyncUtil.delayedFuture(100, TimeUnit.MILLISECONDS)
.thenApply(vignore -> result)),
RunForever(false, false, result -> new CompletableFuture<>()),
FailInstantly(false, true, result -> {
final CompletableFuture<String> future = new CompletableFuture<>();
future.completeExceptionally(new RuntimeException(result));
return future;
}),
FailSlowly(false, true, result -> {
final CompletableFuture<String> future = new CompletableFuture<>();
MoreAsyncUtil.delayedFuture(100, TimeUnit.MILLISECONDS)
.whenComplete((vignore, e) -> future.completeExceptionally(new RuntimeException(result)));
return future;
});

private final boolean succeeds;
private final boolean fails;
private final Function<String, CompletableFuture<String>> futureGenerator;

FutureBehavior(final boolean succeeds, final boolean fails,
final Function<String, CompletableFuture<String>> futureGenerator) {
this.succeeds = succeeds;
this.fails = fails;
this.futureGenerator = futureGenerator;
}
}

public static Stream<Arguments> combineAndFailFast() {
return Arrays.stream(FutureBehavior.values())
.flatMap(future1 ->
Arrays.stream(FutureBehavior.values())
.map(future2 -> Arguments.of(future1, future2)));
}

@ParameterizedTest
@MethodSource
void combineAndFailFast(FutureBehavior behavior1, FutureBehavior behavior2)
throws ExecutionException, InterruptedException, TimeoutException {
final CompletableFuture<String> future1 = behavior1.futureGenerator.apply("a");
final CompletableFuture<String> future2 = behavior2.futureGenerator.apply("b");
final CompletableFuture<String> future = MoreAsyncUtil.combineAndFailFast(future1, future2, (a, b) -> a + "-" + b);
final int getTimeoutSeconds = 1;
if (behavior1.succeeds && behavior2.succeeds) {
assertEquals("a-b", future.get(getTimeoutSeconds, TimeUnit.SECONDS));
} else if (behavior1.fails || behavior2.fails) {
final ExecutionException executionException = assertThrows(ExecutionException.class,
() -> future.get(getTimeoutSeconds, TimeUnit.SECONDS));
assertEquals(RuntimeException.class, executionException.getCause().getClass());
} else {
assertThrows(TimeoutException.class, () -> future.get(getTimeoutSeconds, TimeUnit.SECONDS));
}


}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

package com.apple.foundationdb.record.provider.foundationdb.storestate;

import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.Transaction;
import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.async.MoreAsyncUtil;
import com.apple.foundationdb.record.IsolationLevel;
import com.apple.foundationdb.record.RecordStoreState;
import com.apple.foundationdb.record.provider.foundationdb.FDBRecordContext;
Expand Down Expand Up @@ -94,9 +95,14 @@ CompletableFuture<Void> handleCachedState(@Nonnull FDBRecordContext context, @No
@Nonnull
static CompletableFuture<FDBRecordStoreStateCacheEntry> load(@Nonnull FDBRecordStore recordStore,
@Nonnull FDBRecordStore.StoreExistenceCheck existenceCheck) {
// This is primarily needed because of https://github.com/apple/foundationdb/issues/11500 where the call to
// getMetaDataVersionStampAsync might never complete. In the tests we don't set a timeout on the futures, and
// thus the overall test times out, but in production situations, this should mostly make a difference, because
// "Batch GRV rate limit exceeded" is clearer than an asyncToSync timeout, on whatever eventual future depends
// on this.
final CompletableFuture<byte[]> metaDataVersionStampFuture = recordStore.getContext().getMetaDataVersionStampAsync(IsolationLevel.SNAPSHOT);
return recordStore.loadRecordStoreStateAsync(existenceCheck)
.thenCombine(metaDataVersionStampFuture, (recordStoreState, metaDataVersionStamp) ->
return MoreAsyncUtil.combineAndFailFast(recordStore.loadRecordStoreStateAsync(existenceCheck),
metaDataVersionStampFuture, (recordStoreState, metaDataVersionStamp) ->
new FDBRecordStoreStateCacheEntry(recordStore.getSubspaceProvider(), recordStore.getSubspace(), recordStoreState.toImmutable(), metaDataVersionStamp));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,6 @@
import org.hamcrest.Matchers;
import org.junit.jupiter.api.Tag;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
Expand Down Expand Up @@ -757,7 +756,6 @@ static Stream<Arguments> concurrentStoreTest() {
@ParameterizedTest
@MethodSource
@SuperSlow
@Timeout(value = 10, unit = TimeUnit.MINUTES) // https://github.com/FoundationDB/fdb-record-layer/issues/2813
void concurrentStoreTest(boolean isGrouped,
boolean isSynthetic,
boolean primaryKeySegmentIndexEnabled,
Expand Down

0 comments on commit 427473e

Please sign in to comment.