Skip to content

Commit

Permalink
Concurrency optimization for graph native loading update
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Ramadurai <[email protected]>
  • Loading branch information
Gankris96 committed Dec 31, 2024
1 parent c728f02 commit 79392bc
Show file tree
Hide file tree
Showing 7 changed files with 119 additions and 18 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add Support for Multi Values in innerHit for Nested k-NN Fields in Lucene and FAISS (#2283)[https://github.com/opensearch-project/k-NN/pull/2283]
- Add binary index support for Lucene engine. (#2292)[https://github.com/opensearch-project/k-NN/pull/2292]
- Add expand_nested_docs Parameter support to NMSLIB engine (#2331)[https://github.com/opensearch-project/k-NN/pull/2331]
- Add concurrency optimizations with native memory graph loading and force eviction (#2265) [https://github.com/opensearch-project/k-NN/pull/2345]
### Enhancements
- Introduced a writing layer in native engines where relies on the writing interface to process IO. (#2241)[https://github.com/opensearch-project/k-NN/pull/2241]
- Allow method parameter override for training based indices (#2290) https://github.com/opensearch-project/k-NN/pull/2290]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,9 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC

// Cache Miss
// Evict before put
// openIndexInput the graph file before proceeding to load the graph into memory
nativeMemoryEntryContext.openIndexInput();
logger.debug("[KNN] NativeMemoryCacheManager openIndexInput successful");
synchronized (this) {
if (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight) {
Iterator<String> lruIterator = accessRecencyQueue.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@

import lombok.Getter;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.store.IndexInputWithBuffer;

import java.io.IOException;
import java.util.Map;
Expand All @@ -26,7 +29,7 @@
/**
* Encapsulates all information needed to load a component into native memory.
*/
public abstract class NativeMemoryEntryContext<T extends NativeMemoryAllocation> {
public abstract class NativeMemoryEntryContext<T extends NativeMemoryAllocation> implements AutoCloseable {

protected final String key;

Expand Down Expand Up @@ -55,6 +58,18 @@ public String getKey() {
*/
public abstract Integer calculateSizeInKB();

/**
* Opens the indexInput file so that it is available for graph loading
*/

public void openIndexInput() {}

/**
* Provides the capability to close the closable objects in the {@link NativeMemoryEntryContext}
*/
@Override
public void close() {}

/**
* Loads entry into memory.
*
Expand All @@ -75,6 +90,17 @@ public static class IndexEntryContext extends NativeMemoryEntryContext<NativeMem
@Getter
private final String modelId;

@Getter
private boolean indexInputOpened = false;
@Getter
private int indexSizeKb;

@Getter
private IndexInput readStream;

@Getter
IndexInputWithBuffer indexInputWithBuffer;

/**
* Constructor
*
Expand Down Expand Up @@ -131,10 +157,56 @@ public Integer calculateSizeInKB() {
}
}

@Override
public void openIndexInput() {
// Extract vector file name from the given cache key.
// Ex: _0_165_my_field.faiss@1vaqiupVUwvkXAG4Qc/RPg==
final String cacheKey = this.getKey();
final String vectorFileName = NativeMemoryCacheKeyHelper.extractVectorIndexFileName(cacheKey);
if (vectorFileName == null) {
throw new IllegalStateException(
"Invalid cache key was given. The key [" + cacheKey + "] does not contain the corresponding vector file name."
);
}

// Prepare for opening index input from directory.
final Directory directory = this.getDirectory();

// Try to open an index input then pass it down to native engine for loading an index.
try {
indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024);
readStream = directory.openInput(vectorFileName, IOContext.READONCE);
readStream.seek(0);
indexInputWithBuffer = new IndexInputWithBuffer(readStream);
indexInputOpened = true;
} catch (IOException e) {
throw new RuntimeException("Failed to openIndexInput the index " + openSearchIndexName);
}
}

@Override
public NativeMemoryAllocation.IndexAllocation load() throws IOException {
if (!isIndexInputOpened()) {
openIndexInput();
}
return indexLoadStrategy.load(this);
}

// close the indexInput
@Override
public void close() {
if (readStream != null) {
try {
readStream.close();
indexInputOpened = false;
} catch (IOException e) {
throw new RuntimeException(
"Exception while closing the indexInput index [" + openSearchIndexName + "] for loading the graph file.",
e
);
}
}
}
}

public static class TrainingDataEntryContext extends NativeMemoryEntryContext<NativeMemoryAllocation.TrainingDataAllocation> {
Expand Down Expand Up @@ -192,6 +264,11 @@ public Integer calculateSizeInKB() {
return size;
}

@Override
public void openIndexInput() {
return;
}

@Override
public NativeMemoryAllocation.TrainingDataAllocation load() {
return trainingLoadStrategy.load(this);
Expand Down Expand Up @@ -278,6 +355,11 @@ public Integer calculateSizeInKB() {
return size;
}

@Override
public void openIndexInput() {
return;
}

@Override
public NativeMemoryAllocation.AnonymousAllocation load() throws IOException {
return loadStrategy.load(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@

import lombok.extern.log4j.Log4j2;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.store.IndexInputWithBuffer;
import org.opensearch.knn.index.util.IndexUtil;
import org.opensearch.knn.jni.JNIService;
import org.opensearch.knn.index.engine.KNNEngine;
Expand Down Expand Up @@ -88,10 +85,16 @@ public NativeMemoryAllocation.IndexAllocation load(NativeMemoryEntryContext.Inde
final int indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024);

// Try to open an index input then pass it down to native engine for loading an index.
try (IndexInput readStream = directory.openInput(vectorFileName, IOContext.READONCE)) {
final IndexInputWithBuffer indexInputWithBuffer = new IndexInputWithBuffer(readStream);
final long indexAddress = JNIService.loadIndex(indexInputWithBuffer, indexEntryContext.getParameters(), knnEngine);

// openIndexInput takes care of opening the indexInput file
if (!indexEntryContext.isIndexInputOpened()) {
throw new IllegalStateException("Index [" + indexEntryContext.getOpenSearchIndexName() + "] is not preloaded");
}
try (indexEntryContext) {
final long indexAddress = JNIService.loadIndex(
indexEntryContext.indexInputWithBuffer,
indexEntryContext.getParameters(),
knnEngine
);
return createIndexAllocation(indexEntryContext, knnEngine, indexAddress, indexSizeKb, vectorFileName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,9 @@ public Integer calculateSizeInKB() {
return size;
}

@Override
public void openIndexInput() {}

@Override
public TestNativeMemoryAllocation load() throws IOException {
return new TestNativeMemoryAllocation(size, memoryAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doReturn;

public class NativeMemoryEntryContextTests extends KNNTestCase {

Expand All @@ -42,12 +44,14 @@ public void testAbstract_getKey() {

public void testIndexEntryContext_load() throws IOException {
NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy = mock(NativeMemoryLoadStrategy.IndexLoadStrategy.class);
NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext(
(Directory) null,
TestUtils.createFakeNativeMamoryCacheKey("test"),
indexLoadStrategy,
null,
"test"
NativeMemoryEntryContext.IndexEntryContext indexEntryContext = spy(
new NativeMemoryEntryContext.IndexEntryContext(
(Directory) null,
TestUtils.createFakeNativeMamoryCacheKey("test"),
indexLoadStrategy,
null,
"test"
)
);

NativeMemoryAllocation.IndexAllocation indexAllocation = new NativeMemoryAllocation.IndexAllocation(
Expand All @@ -61,6 +65,8 @@ public void testIndexEntryContext_load() throws IOException {

when(indexLoadStrategy.load(indexEntryContext)).thenReturn(indexAllocation);

// since we are returning mock instance, set indexEntryContext.indexInputOpened to true.
doReturn(true).when(indexEntryContext).isIndexInputOpened();
assertEquals(indexAllocation, indexEntryContext.load());
}

Expand Down Expand Up @@ -292,6 +298,11 @@ public Integer calculateSizeInKB() {
return size;
}

@Override
public void openIndexInput() {
return;
}

@Override
public TestNativeMemoryAllocation load() throws IOException {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,7 @@ public void testIndexLoadStrategy_load() throws IOException {
);

// Load
NativeMemoryAllocation.IndexAllocation indexAllocation = NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance()
.load(indexEntryContext);
NativeMemoryAllocation.IndexAllocation indexAllocation = indexEntryContext.load();

// Confirm that the file was loaded by querying
float[] query = new float[dimension];
Expand Down Expand Up @@ -115,8 +114,7 @@ public void testLoad_whenFaissBinary_thenSuccess() throws IOException {
);

// Load
NativeMemoryAllocation.IndexAllocation indexAllocation = NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance()
.load(indexEntryContext);
NativeMemoryAllocation.IndexAllocation indexAllocation = indexEntryContext.load();

// Verify
assertTrue(indexAllocation.isBinaryIndex());
Expand Down

0 comments on commit 79392bc

Please sign in to comment.