Skip to content

Commit

Permalink
Provide threadpool for cache unit tests to draw from
Browse files Browse the repository at this point in the history
Signed-off-by: owenhalpert <[email protected]>
  • Loading branch information
owenhalpert committed Jan 9, 2025
1 parent 0216d75 commit a7e5ff4
Show file tree
Hide file tree
Showing 4 changed files with 48 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,14 @@ public class NativeMemoryCacheManager implements Closeable {

private static final Logger logger = LogManager.getLogger(NativeMemoryCacheManager.class);
private static NativeMemoryCacheManager INSTANCE;
@Setter
private static ThreadPool threadPool;

private Cache<String, NativeMemoryAllocation> cache;
private Deque<String> accessRecencyQueue;
private final ExecutorService executor;
private AtomicBoolean cacheCapacityReached;
private long maxWeight;
@Setter
private static ThreadPool threadPool;
private Cancellable maintenanceTask;

NativeMemoryCacheManager() {
Expand Down Expand Up @@ -110,7 +110,9 @@ private void initialize(NativeMemoryCacheManagerDto nativeMemoryCacheDTO) {
cacheCapacityReached = new AtomicBoolean(false);
accessRecencyQueue = new ConcurrentLinkedDeque<>();
cache = cacheBuilder.build();
startMaintenance(cache);
if (threadPool != null) {
startMaintenance(cache);
}
}

/**
Expand Down Expand Up @@ -475,8 +477,6 @@ private void startMaintenance(Cache<String, NativeMemoryAllocation> cacheInstanc

TimeValue interval = KNNSettings.state().getSettingValue(KNNSettings.KNN_CACHE_ITEM_EXPIRY_TIME_MINUTES);

if (threadPool != null) {
maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT);
}
maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@
public class QuantizationStateCache implements Closeable {

private static volatile QuantizationStateCache instance;
@Setter
private static ThreadPool threadPool;
private Cache<String, QuantizationState> cache;
@Getter
private long maxCacheSizeInKB;
@Getter
private Instant evictedDueToSizeAt;
private Cancellable maintenanceTask;
@Setter
private static ThreadPool threadPool;

@VisibleForTesting
QuantizationStateCache() {
Expand Down Expand Up @@ -78,7 +78,9 @@ private void buildCache() {
)
.removalListener(this::onRemoval)
.build();
startMaintenance(cache);
if (threadPool != null) {
startMaintenance(cache);
}
}

private void startMaintenance(Cache<String, QuantizationState> cacheInstance) {
Expand All @@ -96,9 +98,7 @@ private void startMaintenance(Cache<String, QuantizationState> cacheInstance) {

TimeValue interval = KNNSettings.state().getSettingValue(QUANTIZATION_STATE_CACHE_EXPIRY_TIME_MINUTES);

if (threadPool != null) {
maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT);
}
maintenanceTask = threadPool.scheduleWithFixedDelay(cleanUp, interval, ThreadPool.Names.MANAGEMENT);
}

synchronized void rebuildCache() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
package org.opensearch.knn.index.memory;

import com.google.common.cache.CacheStats;
import org.junit.After;
import org.junit.Before;
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
import org.opensearch.common.settings.Settings;
import org.opensearch.knn.common.exception.OutOfNativeMemoryException;
Expand All @@ -20,6 +22,7 @@
import org.opensearch.knn.plugin.KNNPlugin;
import org.opensearch.plugins.Plugin;
import org.opensearch.test.OpenSearchSingleNodeTestCase;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.Collection;
Expand All @@ -34,6 +37,21 @@

public class NativeMemoryCacheManagerTests extends OpenSearchSingleNodeTestCase {

private ThreadPool threadPool;

@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new ThreadPool(Settings.builder().put("node.name", "NativeMemoryCacheManagerTests").build());
NativeMemoryCacheManager.setThreadPool(threadPool);
}

@After
public void shutdown() throws Exception {
super.tearDown();
terminate(threadPool);
}

@Override
public void tearDown() throws Exception {
// Clear out persistent metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

import com.google.common.collect.ImmutableSet;
import lombok.SneakyThrows;
import org.junit.After;
import org.junit.Before;
import org.opensearch.client.Client;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.settings.ClusterSettings;
Expand All @@ -15,6 +17,7 @@
import org.opensearch.knn.KNNTestCase;
import org.opensearch.knn.index.KNNSettings;
import org.opensearch.knn.quantization.models.quantizationParams.ScalarQuantizationParams;
import org.opensearch.threadpool.ThreadPool;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;
Expand All @@ -29,6 +32,21 @@

public class QuantizationStateCacheTests extends KNNTestCase {

private ThreadPool threadPool;

@Before
public void setUp() throws Exception {
super.setUp();
threadPool = new ThreadPool(Settings.builder().put("node.name", "QuantizationStateCacheTests").build());
QuantizationStateCache.setThreadPool(threadPool);
}

@After
public void shutdown() throws Exception {
super.tearDown();
terminate(threadPool);
}

@SneakyThrows
public void testSingleThreadedAddAndRetrieve() {
String fieldName = "singleThreadField";
Expand Down

0 comments on commit a7e5ff4

Please sign in to comment.