Skip to content

Commit

Permalink
Concurrency optimization for graph native loading update
Browse files Browse the repository at this point in the history
Signed-off-by: Ganesh Ramadurai <[email protected]>
  • Loading branch information
Gankris96 authored and Ganesh Ramadurai committed Jan 9, 2025
1 parent 405e5e2 commit 33afd58
Show file tree
Hide file tree
Showing 7 changed files with 154 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Add Support for Multi Values in innerHit for Nested k-NN Fields in Lucene and FAISS (#2283)[https://github.com/opensearch-project/k-NN/pull/2283]
- Add binary index support for Lucene engine. (#2292)[https://github.com/opensearch-project/k-NN/pull/2292]
- Add expand_nested_docs Parameter support to NMSLIB engine (#2331)[https://github.com/opensearch-project/k-NN/pull/2331]
- Add concurrency optimizations with native memory graph loading and force eviction (#2265) [https://github.com/opensearch-project/k-NN/pull/2345]
### Enhancements
- Introduced a writing layer in native engines where relies on the writing interface to process IO. (#2241)[https://github.com/opensearch-project/k-NN/pull/2241]
- Allow method parameter override for training based indices (#2290) https://github.com/opensearch-project/k-NN/pull/2290]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,8 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC

// Cache Miss
// Evict before put
// open the graph file before proceeding to load the graph into memory
nativeMemoryEntryContext.openVectorIndex();
synchronized (this) {
if (getCacheSizeInKilobytes() + nativeMemoryEntryContext.calculateSizeInKB() >= maxWeight) {
Iterator<String> lruIterator = accessRecencyQueue.iterator();
Expand All @@ -350,7 +352,11 @@ public NativeMemoryAllocation get(NativeMemoryEntryContext<?> nativeMemoryEntryC
return result;
}
} else {
return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load);
// open graphFile before load
try (nativeMemoryEntryContext) {
nativeMemoryEntryContext.openVectorIndex();
return cache.get(nativeMemoryEntryContext.getKey(), nativeMemoryEntryContext::load);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,16 @@
package org.opensearch.knn.index.memory;

import lombok.Getter;
import lombok.extern.log4j.Log4j2;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.VectorDataType;
import org.opensearch.knn.index.store.IndexInputWithBuffer;

import java.io.IOException;
import java.util.Map;
Expand All @@ -26,7 +30,7 @@
/**
* Encapsulates all information needed to load a component into native memory.
*/
public abstract class NativeMemoryEntryContext<T extends NativeMemoryAllocation> {
public abstract class NativeMemoryEntryContext<T extends NativeMemoryAllocation> implements AutoCloseable {

protected final String key;

Expand Down Expand Up @@ -55,13 +59,27 @@ public String getKey() {
*/
public abstract Integer calculateSizeInKB();

/**
* Opens the graph file by opening the corresponding indexInput so
* that it is available for graph loading
*/

public void openVectorIndex() {}

/**
* Provides the capability to close the closable objects in the {@link NativeMemoryEntryContext}
*/
@Override
public void close() {}

/**
* Loads entry into memory.
*
* @return NativeMemoryAllocation associated with NativeMemoryEntryContext
*/
public abstract T load() throws IOException;

@Log4j2
public static class IndexEntryContext extends NativeMemoryEntryContext<NativeMemoryAllocation.IndexAllocation> {

@Getter
Expand All @@ -75,6 +93,17 @@ public static class IndexEntryContext extends NativeMemoryEntryContext<NativeMem
@Getter
private final String modelId;

@Getter
private boolean indexGraphFileOpened = false;
@Getter
private int indexSizeKb;

@Getter
private IndexInput readStream;

@Getter
IndexInputWithBuffer indexInputWithBuffer;

/**
* Constructor
*
Expand Down Expand Up @@ -131,10 +160,61 @@ public Integer calculateSizeInKB() {
}
}

@Override
public void openVectorIndex() {
// if graph file is already opened for index, do nothing
if (isIndexGraphFileOpened()) {
return;
}
// Extract vector file name from the given cache key.
// Ex: _0_165_my_field.faiss@1vaqiupVUwvkXAG4Qc/RPg==
final String cacheKey = this.getKey();
final String vectorFileName = NativeMemoryCacheKeyHelper.extractVectorIndexFileName(cacheKey);
if (vectorFileName == null) {
throw new IllegalStateException(
"Invalid cache key was given. The key [" + cacheKey + "] does not contain the corresponding vector file name."
);
}

// Prepare for opening index input from directory.
final Directory directory = this.getDirectory();

// Try to open an index input then pass it down to native engine for loading an index.
try {
indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024);
readStream = directory.openInput(vectorFileName, IOContext.READONCE);
readStream.seek(0);
indexInputWithBuffer = new IndexInputWithBuffer(readStream);
indexGraphFileOpened = true;
log.debug("[KNN] NativeMemoryCacheManager openVectorIndex successful");
} catch (IOException e) {
throw new RuntimeException("Failed to openVectorIndex the index " + openSearchIndexName);
}
}

@Override
public NativeMemoryAllocation.IndexAllocation load() throws IOException {
if (!isIndexGraphFileOpened()) {
throw new IllegalStateException("Index graph file is not open");
}
return indexLoadStrategy.load(this);
}

// close the indexInput
@Override
public void close() {
if (readStream != null) {
try {
readStream.close();
indexGraphFileOpened = false;
} catch (IOException e) {
throw new RuntimeException(
"Exception while closing the indexInput index [" + openSearchIndexName + "] for loading the graph file.",
e
);
}
}
}
}

public static class TrainingDataEntryContext extends NativeMemoryEntryContext<NativeMemoryAllocation.TrainingDataAllocation> {
Expand Down Expand Up @@ -192,6 +272,11 @@ public Integer calculateSizeInKB() {
return size;
}

@Override
public void openVectorIndex() {
throw new UnsupportedOperationException("openVectorIndex operation is not supported");
}

@Override
public NativeMemoryAllocation.TrainingDataAllocation load() {
return trainingLoadStrategy.load(this);
Expand Down Expand Up @@ -278,6 +363,11 @@ public Integer calculateSizeInKB() {
return size;
}

@Override
public void openVectorIndex() {
throw new UnsupportedOperationException("openVectorIndex operation is not supported");
}

@Override
public NativeMemoryAllocation.AnonymousAllocation load() throws IOException {
return loadStrategy.load(this);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,9 @@

import lombok.extern.log4j.Log4j2;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.IOContext;
import org.apache.lucene.store.IndexInput;
import org.opensearch.core.action.ActionListener;
import org.opensearch.knn.index.codec.util.NativeMemoryCacheKeyHelper;
import org.opensearch.knn.index.engine.qframe.QuantizationConfig;
import org.opensearch.knn.index.store.IndexInputWithBuffer;
import org.opensearch.knn.index.util.IndexUtil;
import org.opensearch.knn.jni.JNIService;
import org.opensearch.knn.index.engine.KNNEngine;
Expand Down Expand Up @@ -88,10 +85,16 @@ public NativeMemoryAllocation.IndexAllocation load(NativeMemoryEntryContext.Inde
final int indexSizeKb = Math.toIntExact(directory.fileLength(vectorFileName) / 1024);

// Try to open an index input then pass it down to native engine for loading an index.
try (IndexInput readStream = directory.openInput(vectorFileName, IOContext.READONCE)) {
final IndexInputWithBuffer indexInputWithBuffer = new IndexInputWithBuffer(readStream);
final long indexAddress = JNIService.loadIndex(indexInputWithBuffer, indexEntryContext.getParameters(), knnEngine);

// openVectorIndex takes care of opening the indexInput file
if (!indexEntryContext.isIndexGraphFileOpened()) {
throw new IllegalStateException("Index [" + indexEntryContext.getOpenSearchIndexName() + "] is not preloaded");
}
try (indexEntryContext) {
final long indexAddress = JNIService.loadIndex(
indexEntryContext.indexInputWithBuffer,
indexEntryContext.getParameters(),
knnEngine
);
return createIndexAllocation(indexEntryContext, knnEngine, indexAddress, indexSizeKb, vectorFileName);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -542,6 +542,9 @@ public Integer calculateSizeInKB() {
return size;
}

@Override
public void openVectorIndex() {}

@Override
public TestNativeMemoryAllocation load() throws IOException {
return new TestNativeMemoryAllocation(size, memoryAddress);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.doReturn;

public class NativeMemoryEntryContextTests extends KNNTestCase {

Expand All @@ -41,6 +43,34 @@ public void testAbstract_getKey() {
}

public void testIndexEntryContext_load() throws IOException {
NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy = mock(NativeMemoryLoadStrategy.IndexLoadStrategy.class);
NativeMemoryEntryContext.IndexEntryContext indexEntryContext = spy(
new NativeMemoryEntryContext.IndexEntryContext(
(Directory) null,
TestUtils.createFakeNativeMamoryCacheKey("test"),
indexLoadStrategy,
null,
"test"
)
);

NativeMemoryAllocation.IndexAllocation indexAllocation = new NativeMemoryAllocation.IndexAllocation(
null,
0,
10,
KNNEngine.DEFAULT,
"test-path",
"test-name"
);

when(indexLoadStrategy.load(indexEntryContext)).thenReturn(indexAllocation);

// since we are returning mock instance, set indexEntryContext.isIndexGraphFileOpened to true.
doReturn(true).when(indexEntryContext).isIndexGraphFileOpened();
assertEquals(indexAllocation, indexEntryContext.load());
}

public void testIndexEntryContext_load_with_unopened_graphFile() throws IOException {
NativeMemoryLoadStrategy.IndexLoadStrategy indexLoadStrategy = mock(NativeMemoryLoadStrategy.IndexLoadStrategy.class);
NativeMemoryEntryContext.IndexEntryContext indexEntryContext = new NativeMemoryEntryContext.IndexEntryContext(
(Directory) null,
Expand All @@ -59,9 +89,7 @@ public void testIndexEntryContext_load() throws IOException {
"test-name"
);

when(indexLoadStrategy.load(indexEntryContext)).thenReturn(indexAllocation);

assertEquals(indexAllocation, indexEntryContext.load());
assertThrows(IllegalStateException.class, indexEntryContext::load);
}

public void testIndexEntryContext_calculateSize() throws IOException {
Expand Down Expand Up @@ -292,6 +320,11 @@ public Integer calculateSizeInKB() {
return size;
}

@Override
public void openVectorIndex() {
return;
}

@Override
public TestNativeMemoryAllocation load() throws IOException {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,10 @@ public void testIndexLoadStrategy_load() throws IOException {
"test"
);

// open graph file before load
indexEntryContext.openVectorIndex();
// Load
NativeMemoryAllocation.IndexAllocation indexAllocation = NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance()
.load(indexEntryContext);
NativeMemoryAllocation.IndexAllocation indexAllocation = indexEntryContext.load();

// Confirm that the file was loaded by querying
float[] query = new float[dimension];
Expand Down Expand Up @@ -114,9 +115,10 @@ public void testLoad_whenFaissBinary_thenSuccess() throws IOException {
"test"
);

// open graph file before load
indexEntryContext.openVectorIndex();
// Load
NativeMemoryAllocation.IndexAllocation indexAllocation = NativeMemoryLoadStrategy.IndexLoadStrategy.getInstance()
.load(indexEntryContext);
NativeMemoryAllocation.IndexAllocation indexAllocation = indexEntryContext.load();

// Verify
assertTrue(indexAllocation.isBinaryIndex());
Expand Down

0 comments on commit 33afd58

Please sign in to comment.