Skip to content

Commit

Permalink
Make xds directory lazy to subscribe names (#1039)
Browse files Browse the repository at this point in the history
* make xds directory lazy to subscribe names

* optimize test to get watcher with count down latch

* make watcher private
  • Loading branch information
bohhyang authored Jan 31, 2025
1 parent b2da251 commit a3a2025
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 17 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@ and what APIs have changed, if applicable.

## [Unreleased]

## [29.63.2] - 2025-01-31
- Make XdsDirectory lazy to subscribe the names

## [29.63.1] - 2025-01-14
- Add XdsDirectory to get d2 service and cluster names from INDIS

Expand Down Expand Up @@ -5764,7 +5767,8 @@ patch operations can re-use these classes for generating patch messages.

## [0.14.1]

[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.63.1...master
[Unreleased]: https://github.com/linkedin/rest.li/compare/v29.63.2...master
[29.63.2]: https://github.com/linkedin/rest.li/compare/v29.63.1...v29.63.2
[29.63.1]: https://github.com/linkedin/rest.li/compare/v29.63.0...v29.63.1
[29.63.0]: https://github.com/linkedin/rest.li/compare/v29.62.1...v29.63.0
[29.62.1]: https://github.com/linkedin/rest.li/compare/v29.62.0...v29.62.1
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ public class XdsDirectory implements Directory
final ConcurrentMap<String, String> _serviceNames = new ConcurrentHashMap<>();
@VisibleForTesting
final ConcurrentMap<String, String> _clusterNames = new ConcurrentHashMap<>();
@VisibleForTesting
final AtomicReference<WildcardD2ClusterOrServiceNameResourceWatcher> _watcher = new AtomicReference<>();
private final AtomicReference<WildcardD2ClusterOrServiceNameResourceWatcher> _watcher = new AtomicReference<>();
/**
* A flag that shows whether the service/cluster names data is being updated. Requests to the data should wait until
* the update is done.
Expand All @@ -51,8 +50,7 @@ public XdsDirectory(XdsClient xdsClient)

public void start() {
LOG.debug("Starting. Setting isUpdating to true");
_isUpdating.set(true); // initially set to true to block reads before the first update completes
addNameWatcher();
_isUpdating.set(true); // initially set to true to block reads before the first (lazy) update completes
}

@Override
Expand Down
41 changes: 30 additions & 11 deletions d2/src/test/java/com/linkedin/d2/xds/balancer/TestXdsDirectory.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;
import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.testng.Assert;
Expand All @@ -35,12 +37,11 @@ public void testGetClusterAndServiceNames() throws InterruptedException {
int halfCallers = numCallers / 2;
XdsDirectoryFixture fixture = new XdsDirectoryFixture();
XdsDirectory directory = fixture._xdsDirectory;
Assert.assertNull(directory._watcher.get());
directory.start();
List<String> expectedClusterNames = Collections.singletonList(CLUSTER_NAME);
List<String> expectedServiceNames = Collections.singletonList(SERVICE_NAME);
fixture.runCallers(halfCallers, expectedClusterNames, expectedServiceNames);
XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher watcher = Objects.requireNonNull(directory._watcher.get());
XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher watcher = fixture.waitWatcher();

// verified names are not updated, results are empty, which means all threads are waiting.
Assert.assertTrue(directory._isUpdating.get());
Expand All @@ -58,7 +59,7 @@ public void testGetClusterAndServiceNames() throws InterruptedException {
Assert.assertEquals(directory._clusterNames, Collections.singletonMap(CLUSTER_RESOURCE_NAME, CLUSTER_NAME));
Assert.assertEquals(directory._serviceNames, Collections.singletonMap(SERVICE_RESOURCE_NAME, SERVICE_NAME));
Assert.assertTrue(directory._isUpdating.get());
Assert.assertEquals(fixture._latch.getCount(), numCallers);
Assert.assertEquals(fixture._callerLatch.getCount(), numCallers);

// finish updating by another thread to verify the lock can be released by a different thread. All callers should
// be unblocked and the isUpdating flag is false.
Expand All @@ -77,7 +78,7 @@ public void testGetClusterAndServiceNames() throws InterruptedException {
Assert.assertTrue(directory._isUpdating.get());
Assert.assertEquals(directory._serviceNames,
ImmutableMap.of(SERVICE_RESOURCE_NAME, SERVICE_NAME, SERVICE_RESOURCE_NAME_2, SERVICE_NAME_2));
Assert.assertEquals(fixture._latch.getCount(), 1);
Assert.assertEquals(fixture._callerLatch.getCount(), 1);

// finish updating again, new data should be added to the results
fixture.notifyComplete();
Expand All @@ -90,26 +91,44 @@ private static final class XdsDirectoryFixture
XdsDirectory _xdsDirectory;
@Mock
XdsClient _xdsClient;
CountDownLatch _latch;
CountDownLatch _callerLatch;
ExecutorService _executor;

CountDownLatch _watcherLatch = new CountDownLatch(1);
@Captor
ArgumentCaptor<XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher> _watcherCaptor =
ArgumentCaptor.forClass(XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher.class);


public XdsDirectoryFixture()
{
MockitoAnnotations.initMocks(this);
doNothing().when(_xdsClient).watchAllXdsResources(any());
doAnswer((invocation) -> {
_watcherLatch.countDown();
return null;
}).when(_xdsClient).watchAllXdsResources(_watcherCaptor.capture());
_xdsDirectory = new XdsDirectory(_xdsClient);
}

XdsClient.WildcardD2ClusterOrServiceNameResourceWatcher waitWatcher() throws InterruptedException
{
if (!_watcherLatch.await(1000, java.util.concurrent.TimeUnit.MILLISECONDS))
{
Assert.fail("Timeout waiting for watcher to be added");
}
return _watcherCaptor.getValue();
}

void runCallers(int num, List<String> expectedClusterResult, List<String> expectedServiceResult)
{
if (_executor == null || _executor.isShutdown() || _executor.isTerminated())
{
_executor = Executors.newFixedThreadPool(num);
_latch = new CountDownLatch(num);
_callerLatch = new CountDownLatch(num);
}
else
{
_latch = new CountDownLatch((int) (_latch.getCount() + num));
_callerLatch = new CountDownLatch((int) (_callerLatch.getCount() + num));
}

for (int i = 0; i < num; i++)
Expand All @@ -122,7 +141,7 @@ void runCallers(int num, List<String> expectedClusterResult, List<String> expect

void waitCallers() throws InterruptedException {
_executor.shutdown();
if (!_latch.await(1000, java.util.concurrent.TimeUnit.MILLISECONDS))
if (!_callerLatch.await(1000, java.util.concurrent.TimeUnit.MILLISECONDS))
{
Assert.fail("Timeout waiting for all callers to finish");
}
Expand All @@ -135,7 +154,7 @@ CallerThread createCaller(boolean isForServiceNames, List<String> expectedResult

void notifyComplete()
{
Thread t = new Thread(() -> _xdsDirectory._watcher.get().onAllResourcesProcessed());
Thread t = new Thread(() -> _watcherCaptor.getValue().onAllResourcesProcessed());

t.start();

Expand Down Expand Up @@ -177,7 +196,7 @@ public void onError(Throwable e)
public void onSuccess(List<String> result)
{
assertTrue(matchSortedLists(result, expectedResult));
_latch.countDown();
_callerLatch.countDown();
}
};
_isForServiceNames = isForServiceNames;
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version=29.63.1
version=29.63.2
group=com.linkedin.pegasus
org.gradle.configureondemand=true
org.gradle.parallel=true
Expand Down

0 comments on commit a3a2025

Please sign in to comment.