Skip to content

Commit

Permalink
Add a way of scanning records in a KeyRange instead of a TupleRange.
Browse files Browse the repository at this point in the history
  • Loading branch information
MMcM committed Apr 11, 2024
1 parent 6d96589 commit 6c59ebc
Show file tree
Hide file tree
Showing 6 changed files with 70 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,9 @@

package com.apple.foundationdb.record;

import com.apple.foundationdb.annotation.SpotBugsSuppressWarnings;
import com.apple.foundationdb.Range;
import com.apple.foundationdb.annotation.API;
import com.apple.foundationdb.annotation.SpotBugsSuppressWarnings;

import javax.annotation.Nonnull;

Expand Down Expand Up @@ -68,6 +69,15 @@ public KeyRange(@Nonnull byte[] lowKey, @Nonnull byte[] highKey) {
this(lowKey, EndpointType.RANGE_INCLUSIVE, highKey, EndpointType.RANGE_EXCLUSIVE);
}

/**
* Creates a key range from a {@link Range}.
*
* @param range the range
*/
public KeyRange(@Nonnull Range range) {
this(range.begin, range.end);
}

/**
* Returns the lower boundary of the range to be scanned. How this starting key is to be interpreted
* in relation to a scan (e.g., inclusive or exclusive) is determined by the low endpoint value
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import com.apple.foundationdb.record.IndexScanType;
import com.apple.foundationdb.record.IndexState;
import com.apple.foundationdb.record.IsolationLevel;
import com.apple.foundationdb.record.KeyRange;
import com.apple.foundationdb.record.MutableRecordStoreState;
import com.apple.foundationdb.record.PipelineOperation;
import com.apple.foundationdb.record.PlanHashable;
Expand Down Expand Up @@ -136,6 +137,7 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.Supplier;
Expand Down Expand Up @@ -1192,32 +1194,56 @@ public RecordCursor<FDBStoredRecord<Message>> scanRecords(@Nullable final Tuple
}

@Nonnull
@SuppressWarnings("PMD.CloseResource")
public <M extends Message> RecordCursor<FDBStoredRecord<M>> scanTypedRecords(@Nonnull RecordSerializer<M> typedSerializer,
@Nullable final Tuple low, @Nullable final Tuple high,
@Nonnull final EndpointType lowEndpoint, @Nonnull final EndpointType highEndpoint,
@Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties) {
return scanTypedRecordsInternal(typedSerializer,
builder -> builder.setLow(low, lowEndpoint).setHigh(high, highEndpoint),
continuation, scanProperties);
}

@Nonnull
@Override
public RecordCursor<FDBStoredRecord<Message>> scanRecordsKeyRange(@Nonnull final KeyRange range, @Nullable final byte[] continuation, @Nonnull final ScanProperties scanProperties) {
return scanTypedRecordsKeyRange(serializer, range, continuation, scanProperties);
}

@Nonnull
public <M extends Message> RecordCursor<FDBStoredRecord<M>> scanTypedRecordsKeyRange(@Nonnull RecordSerializer<M> typedSerializer,
@Nonnull final KeyRange range,
@Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties) {
return scanTypedRecordsInternal(typedSerializer,
builder -> builder.setRange(range),
continuation, scanProperties);
}

@Nonnull
@SuppressWarnings("PMD.CloseResource")
private <M extends Message> RecordCursor<FDBStoredRecord<M>> scanTypedRecordsInternal(@Nonnull RecordSerializer<M> typedSerializer,
@Nonnull Consumer<KeyValueCursor.Builder> setRange,
@Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties) {
final RecordMetaData metaData = metaDataProvider.getRecordMetaData();
final Subspace recordsSubspace = recordsSubspace();
final SplitHelper.SizeInfo sizeInfo = new SplitHelper.SizeInfo();
final RecordCursor<FDBRawRecord> rawRecords;
if (metaData.isSplitLongRecords()) {
RecordCursor<KeyValue> keyValues = KeyValueCursor.Builder.withSubspace(recordsSubspace)
KeyValueCursor.Builder keyValuesBuilder = KeyValueCursor.Builder.withSubspace(recordsSubspace)
.setContext(context).setContinuation(continuation)
.setLow(low, lowEndpoint)
.setHigh(high, highEndpoint)
.setScanProperties(scanProperties.with(ExecuteProperties::clearRowAndTimeLimits).with(ExecuteProperties::clearState))
.build();
.setScanProperties(scanProperties.with(ExecuteProperties::clearRowAndTimeLimits).with(ExecuteProperties::clearState));
setRange.accept(keyValuesBuilder);
RecordCursor<KeyValue> keyValues = keyValuesBuilder.build();
rawRecords = new SplitHelper.KeyValueUnsplitter(context, recordsSubspace, keyValues, useOldVersionFormat(), sizeInfo, scanProperties.isReverse(),
new CursorLimitManager(context, scanProperties.with(ExecuteProperties::clearReturnedRowLimit)))
.skip(scanProperties.getExecuteProperties().getSkip())
.limitRowsTo(scanProperties.getExecuteProperties().getReturnedRowLimit());
} else {
KeyValueCursor.Builder keyValuesBuilder = KeyValueCursor.Builder.withSubspace(recordsSubspace)
.setContext(context).setContinuation(continuation)
.setLow(low, lowEndpoint)
.setHigh(high, highEndpoint);
.setContext(context).setContinuation(continuation);
setRange.accept(keyValuesBuilder);
if (omitUnsplitRecordSuffix) {
rawRecords = keyValuesBuilder.setScanProperties(scanProperties).build().map(kv -> {
sizeInfo.set(kv);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import com.apple.foundationdb.record.IndexScanType;
import com.apple.foundationdb.record.IndexState;
import com.apple.foundationdb.record.IsolationLevel;
import com.apple.foundationdb.record.KeyRange;
import com.apple.foundationdb.record.PipelineOperation;
import com.apple.foundationdb.record.RecordCoreArgumentException;
import com.apple.foundationdb.record.RecordCoreException;
Expand Down Expand Up @@ -866,6 +867,20 @@ RecordCursor<FDBStoredRecord<M>> scanRecords(@Nullable Tuple low, @Nullable Tupl
@Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties);

/**
* Scan the records in the database in a key range.
*
* @param range key range
* @param continuation any continuation from a previous scan
* @param scanProperties skip, limit and other scan properties
*
* @return a cursor that will scan everything in the range, picking up at continuation, and honoring the given scan properties
*/
@Nonnull
RecordCursor<FDBStoredRecord<M>> scanRecordsKeyRange(@Nonnull KeyRange range,
@Nullable byte[] continuation,
@Nonnull ScanProperties scanProperties);

/**
* Count the number of records in the database in a range.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.apple.foundationdb.record.ExecuteState;
import com.apple.foundationdb.record.IndexEntry;
import com.apple.foundationdb.record.IsolationLevel;
import com.apple.foundationdb.record.KeyRange;
import com.apple.foundationdb.record.RecordCoreException;
import com.apple.foundationdb.record.RecordCursor;
import com.apple.foundationdb.record.RecordIndexUniquenessViolation;
Expand Down Expand Up @@ -183,6 +184,12 @@ public RecordCursor<FDBStoredRecord<M>> scanRecords(@Nullable Tuple low, @Nullab
return untypedStore.scanTypedRecords(typedSerializer, low, high, lowEndpoint, highEndpoint, continuation, scanProperties);
}

@Nonnull
@Override
public RecordCursor<FDBStoredRecord<M>> scanRecordsKeyRange(@Nonnull final KeyRange range, @Nullable final byte[] continuation, @Nonnull final ScanProperties scanProperties) {
return untypedStore.scanTypedRecordsKeyRange(typedSerializer, range, continuation, scanProperties);
}

@Nonnull
@Override
public CompletableFuture<Integer> countRecords(@Nullable Tuple low, @Nullable Tuple high, @Nonnull EndpointType lowEndpoint, @Nonnull EndpointType highEndpoint, @Nullable byte[] continuation, @Nonnull ScanProperties scanProperties) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ public CompletableFuture<RecordCursorContinuation> rebalancePartitions(RecordCur
props -> props.clearState().setReturnedRowLimit(1));

final Range range = state.indexSubspace.range();
final KeyRange keyRange = new KeyRange(range.begin, range.end);
final KeyRange keyRange = new KeyRange(range);
final Subspace subspace = state.indexSubspace;
try (RecordCursor<Tuple> cursor = new ChainedCursor<>(
state.context,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public CompletableFuture<Void> mergeIndex(@Nonnull LucenePartitioner partitioner
props -> props.clearState().setReturnedRowLimit(1));

final Range range = state.indexSubspace.range();
final KeyRange keyRange = new KeyRange(range.begin, range.end);
final KeyRange keyRange = new KeyRange(range);
final Subspace subspace = state.indexSubspace;
final KeyExpression rootExpression = state.index.getRootExpression();

Expand Down

0 comments on commit 6c59ebc

Please sign in to comment.