diff --git a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java index c70e62df8..7915bfac7 100644 --- a/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java +++ b/d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java @@ -89,9 +89,11 @@ public class XdsClientImpl extends XdsClient private final boolean _subscribeToUriGlobCollection; private final BackoffPolicy.Provider _backoffPolicyProvider = new ExponentialBackoffPolicy.Provider(); private BackoffPolicy _retryBackoffPolicy; - private AdsStream _adsStream; + @VisibleForTesting + AdsStream _adsStream; private boolean _shutdown; - private ScheduledFuture _retryRpcStreamFuture; + @VisibleForTesting + ScheduledFuture _retryRpcStreamFuture; private ScheduledFuture _readyTimeoutFuture; private final long _readyTimeoutMillis; @@ -561,11 +563,15 @@ private void handleResourceUpdate(Map updates, for (Map.Entry entry : updates.entrySet()) { - ResourceSubscriber subscriber = subscribers.get(entry.getKey()); - if (subscriber != null) + if (subscribers != null) { - subscriber.onData(entry.getValue(), _serverMetricsProvider); + ResourceSubscriber subscriber = subscribers.get(entry.getKey()); + if (subscriber != null) + { + subscriber.onData(entry.getValue(), _serverMetricsProvider); + } } + if (wildcardSubscriber != null) { wildcardSubscriber.onData(entry.getKey(), entry.getValue()); @@ -580,19 +586,25 @@ private void handleResourceRemoval(Collection removedResources, Resource return; } + Map subscribers = getResourceSubscriberMap(type); WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type); for (String resourceName : removedResources) { _xdsClientJmx.incrementResourceNotFoundCount(); _log.warn("Received response that {} {} was removed", type, resourceName); - ResourceSubscriber subscriber = getResourceSubscriberMap(type).get(resourceName); - if (subscriber != null) + + if (subscribers != null) { - subscriber.onRemoval(); + ResourceSubscriber subscriber = subscribers.get(resourceName); + if (subscriber != null) + { + subscriber.onRemoval(); + } } + if (wildcardSubscriber != null) { - removedResources.forEach(wildcardSubscriber::onRemoval); + wildcardSubscriber.onRemoval(resourceName); } } } @@ -632,7 +644,7 @@ private void notifyStreamReconnect() @VisibleForTesting Map getResourceSubscriberMap(ResourceType type) { - return _resourceSubscribers.get(type); + return getResourceSubscribers().get(type); } @VisibleForTesting @@ -644,7 +656,7 @@ Map> getResourceSubscribers() @VisibleForTesting WildcardResourceSubscriber getWildcardResourceSubscriber(ResourceType type) { - return _wildcardSubscribers.get(type); + return getWildcardResourceSubscribers().get(type); } @VisibleForTesting @@ -861,28 +873,28 @@ void addWatcher(WildcardResourceWatcher watcher) } @VisibleForTesting - void onData(String resourceName, ResourceUpdate update) + void onData(String resourceName, ResourceUpdate data) { - if (Objects.equals(_data.get(resourceName), update)) + if (Objects.equals(_data.get(resourceName), data)) { _log.debug("Received resource update data equal to the current data. Will not perform the update."); return; } // null value guard to avoid overwriting the property with null - if (update != null && update.isValid()) + if (data != null && data.isValid()) { - _data.put(resourceName, update); + _data.put(resourceName, data); } else { // invalid data is received, log a warning and check if existing data is present. if (_type == ResourceType.D2_URI_MAP || _type == ResourceType.D2_URI) { - RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, update); + RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data); } else { - _log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, update); + _log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data); } // if no data has ever been set, init it to an empty data in case watchers are waiting for it if (_data.get(resourceName) == null) @@ -898,7 +910,7 @@ void onData(String resourceName, ResourceUpdate update) for (WildcardResourceWatcher watcher : _watchers) { - watcher.onChanged(resourceName, update); + watcher.onChanged(resourceName, _data.get(resourceName)); } } @@ -942,6 +954,22 @@ private void onAllResourcesProcessed() } } + /** + * This is a test-only method to simulate the retry task being executed. It should only be called from tests. + * @param testStream test ads stream + */ + @VisibleForTesting + void testRetryTask(AdsStream testStream) + { + if (_adsStream != null && _adsStream != testStream) + { + _log.warn("Non-testing ADS stream exists, ignoring test call"); + return; + } + _adsStream = testStream; + _retryRpcStreamFuture = _executorService.schedule(new RpcRetryTask(), 0, TimeUnit.NANOSECONDS); + } + final class RpcRetryTask implements Runnable { @Override @@ -1132,7 +1160,8 @@ public String toString() } } - private final class AdsStream + @VisibleForTesting + class AdsStream { private final AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub _stub; @@ -1208,7 +1237,8 @@ public void onCompleted() /** * Sends a client-initiated discovery request. */ - private void sendDiscoveryRequest(ResourceType type, Collection resources) + @VisibleForTesting + void sendDiscoveryRequest(ResourceType type, Collection resources) { _log.info("Sending {} request for resources: {}", type, resources); DeltaDiscoveryRequest request = new DiscoveryRequestData(_node, type, resources).toEnvoyProto(); diff --git a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java index 5549d45b4..6487b914e 100644 --- a/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java +++ b/d2/src/test/java/com/linkedin/d2/xds/TestXdsClientImpl.java @@ -10,13 +10,23 @@ import com.linkedin.d2.xds.XdsClientImpl.DiscoveryResponseData; import com.linkedin.d2.xds.XdsClientImpl.ResourceSubscriber; import com.linkedin.d2.xds.XdsClientImpl.WildcardResourceSubscriber; +import com.linkedin.r2.util.NamedThreadFactory; import indis.XdsD2; import io.envoyproxy.envoy.service.discovery.v3.Resource; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.stream.Collectors; +import org.apache.commons.lang3.tuple.Pair; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; import org.mockito.Mock; import org.mockito.Mockito; import org.mockito.MockitoAnnotations; @@ -34,7 +44,10 @@ public class TestXdsClientImpl { private static final byte[] DATA = "data".getBytes(); private static final byte[] DATA2 = "data2".getBytes(); - private static final String SERVICE_RESOURCE_NAME = "/d2/services/FooService"; + private static final String SERVICE_NAME = "FooService"; + private static final String SERVICE_NAME_2 = "BarService"; + private static final String SERVICE_RESOURCE_NAME = "/d2/services/" + SERVICE_NAME; + private static final String SERVICE_RESOURCE_NAME_2 = "/d2/services/" + SERVICE_NAME_2; private static final String CLUSTER_NAME = "FooClusterMaster-prod-ltx1"; private static final String CLUSTER_RESOURCE_NAME = "/d2/uris/" + CLUSTER_NAME; private static final String URI1 = "TestURI1"; @@ -55,9 +68,36 @@ public class TestXdsClientImpl private static final List NODE_RESOURCES_WITH_DATA2 = Collections.singletonList( Resource.newBuilder().setVersion(VERSION2).setName(SERVICE_RESOURCE_NAME).setResource(PACKED_NODE_WITH_DATA2).build()); - private static final List NODE_RESOURCES_WITH_NULL_RESOURCE_FILED = Collections.singletonList( + private static final List NODE_RESOURCES_WITH_NULL_RESOURCE_FIELD = Collections.singletonList( Resource.newBuilder().setVersion(VERSION1).setName(SERVICE_RESOURCE_NAME).setResource(PACKED_NODE_WITH_EMPTY_DATA).build()); + private static final XdsD2.D2ClusterOrServiceName CLUSTER_NAME_DATA = XdsD2.D2ClusterOrServiceName.newBuilder() + .setClusterName(CLUSTER_NAME).build(); + private static final XdsD2.D2ClusterOrServiceName SERVICE_NAME_DATA = XdsD2.D2ClusterOrServiceName.newBuilder() + .setServiceName(SERVICE_NAME).build(); + private static final XdsD2.D2ClusterOrServiceName SERVICE_NAME_DATA_2 = XdsD2.D2ClusterOrServiceName.newBuilder() + .setServiceName(SERVICE_NAME_2).build(); + private static final XdsD2.D2ClusterOrServiceName NAME_DATA_WITH_NULL = XdsD2.D2ClusterOrServiceName.newBuilder().build(); + private static final Any PACKED_SERVICE_NAME_DATA = Any.pack(SERVICE_NAME_DATA); + private static final Any PACKED_SERVICE_NAME_DATA_2 = Any.pack(SERVICE_NAME_DATA_2); + private static final Any PACKED_NAME_DATA_WITH_NULL = Any.pack(NAME_DATA_WITH_NULL); + private static final XdsClient.D2ClusterOrServiceNameUpdate CLUSTER_NAME_DATA_UPDATE = + new XdsClient.D2ClusterOrServiceNameUpdate(CLUSTER_NAME_DATA); + private static final XdsClient.D2ClusterOrServiceNameUpdate SERVICE_NAME_DATA_UPDATE = + new XdsClient.D2ClusterOrServiceNameUpdate(SERVICE_NAME_DATA); + private static final XdsClient.D2ClusterOrServiceNameUpdate SERVICE_NAME_DATA_UPDATE_2 = + new XdsClient.D2ClusterOrServiceNameUpdate(SERVICE_NAME_DATA_2); + private static final List SERVICE_NAME_DATA_RESOURCES = Arrays.asList( + Resource.newBuilder().setVersion(VERSION1).setName(SERVICE_RESOURCE_NAME) + .setResource(PACKED_SERVICE_NAME_DATA).build(), + Resource.newBuilder().setVersion(VERSION1).setName(SERVICE_RESOURCE_NAME_2) + .setResource(PACKED_SERVICE_NAME_DATA_2).build() + ); + private static final List NULL_NAME_RESOURCES = Arrays.asList( + Resource.newBuilder().setVersion(VERSION1).setName(CLUSTER_RESOURCE_NAME).build(), + Resource.newBuilder().setVersion(VERSION1).setName(SERVICE_RESOURCE_NAME).setResource(PACKED_NAME_DATA_WITH_NULL).build() + ); + private static final XdsD2.D2URI D2URI_1 = XdsD2.D2URI.newBuilder().setVersion(Long.parseLong(VERSION1)).setClusterName(CLUSTER_NAME).setUri(URI1).build(); private static final XdsD2.D2URI D2URI_1_1 = @@ -75,6 +115,7 @@ public class TestXdsClientImpl new D2URIMapUpdate(D2_URI_MAP_WITH_DATA1.getUrisMap()); private static final D2URIMapUpdate D2_URI_MAP_UPDATE_WITH_DATA2 = new D2URIMapUpdate(D2_URI_MAP_WITH_DATA2.getUrisMap()); + private static final D2URIMapUpdate D_2_URI_MAP_UPDATE_WITH_EMPTY_MAP = new D2URIMapUpdate(Collections.emptyMap()); private static final Any PACKED_D2_URI_MAP_WITH_DATA1 = Any.pack(D2_URI_MAP_WITH_DATA1); private static final Any PACKED_D2_URI_MAP_WITH_DATA2 = Any.pack(D2_URI_MAP_WITH_DATA2); private static final Any PACKED_D2_URI_MAP_WITH_EMPTY_DATA = Any.pack(D2_URI_MAP_WITH_EMPTY_DATA); @@ -83,7 +124,6 @@ public class TestXdsClientImpl .setName(CLUSTER_RESOURCE_NAME) .setResource(PACKED_D2_URI_MAP_WITH_DATA1) .build()); - private static final List URI_MAP_RESOURCE_WITH_DATA2 = Collections.singletonList(Resource.newBuilder() .setVersion(VERSION1) .setName(CLUSTER_RESOURCE_NAME) @@ -95,13 +135,12 @@ public class TestXdsClientImpl .setName(CLUSTER_RESOURCE_NAME) .setResource(PACKED_D2_URI_MAP_WITH_EMPTY_DATA) .build()); - // private static final List REMOVED_RESOURCE = ; + private static final DiscoveryResponseData DISCOVERY_RESPONSE_NODE_DATA1 = new DiscoveryResponseData(NODE, NODE_RESOURCES_WITH_DATA1, null, NONCE, null); private static final DiscoveryResponseData DISCOVERY_RESPONSE_NODE_DATA2 = new DiscoveryResponseData(NODE, NODE_RESOURCES_WITH_DATA2, null, NONCE, null); - - // case 1: Resource in ResourceUpdate is null, failed to parse which causes InvalidProtocolBufferException + // Resource in ResourceUpdate is null, failed to parse which causes InvalidProtocolBufferException private static final DiscoveryResponseData DISCOVERY_RESPONSE_NODE_RESOURCE_IS_NULL = new DiscoveryResponseData( NODE, @@ -111,20 +150,29 @@ public class TestXdsClientImpl null, NONCE, null); - - // case 2: Resource field in Resource is null - private static final DiscoveryResponseData DISCOVERY_RESPONSE_NODE_NULL_DATA_IN_RESOURCE_FILED = - new DiscoveryResponseData(NODE, NODE_RESOURCES_WITH_NULL_RESOURCE_FILED, null, NONCE, null); - - // case3 : ResourceList is empty + // Resource field in Resource is null + private static final DiscoveryResponseData DISCOVERY_RESPONSE_NODE_NULL_DATA_IN_RESOURCE_FIELD = + new DiscoveryResponseData(NODE, NODE_RESOURCES_WITH_NULL_RESOURCE_FIELD, null, NONCE, null); + // ResourceList is empty private static final DiscoveryResponseData DISCOVERY_RESPONSE_WITH_EMPTY_NODE_RESPONSE = new DiscoveryResponseData(NODE, Collections.emptyList(), null, NONCE, null); + + private static final DiscoveryResponseData RESPONSE_WITH_SERVICE_NAMES = + new DiscoveryResponseData(D2_CLUSTER_OR_SERVICE_NAME, SERVICE_NAME_DATA_RESOURCES, null, NONCE, null); + private static final DiscoveryResponseData RESPONSE_WITH_NULL_NAMES = + new DiscoveryResponseData(D2_CLUSTER_OR_SERVICE_NAME, NULL_NAME_RESOURCES, null, NONCE, null); + private static final DiscoveryResponseData RESPONSE_WITH_EMPTY_NAMES = + new DiscoveryResponseData(D2_CLUSTER_OR_SERVICE_NAME, Collections.emptyList(), null, NONCE, null); + private static final DiscoveryResponseData RESPONSE_WITH_NAME_REMOVAL = + new DiscoveryResponseData(D2_CLUSTER_OR_SERVICE_NAME, Collections.emptyList(), + Collections.singletonList(SERVICE_RESOURCE_NAME), NONCE, null); + private static final DiscoveryResponseData DISCOVERY_RESPONSE_URI_MAP_DATA1 = new DiscoveryResponseData(D2_URI_MAP, URI_MAP_RESOURCE_WITH_DATA1, null, NONCE, null); private static final DiscoveryResponseData DISCOVERY_RESPONSE_URI_MAP_DATA2 = new DiscoveryResponseData(D2_URI_MAP, URI_MAP_RESOURCE_WITH_DATA2, null, NONCE, null); - // case1: Resource in ResourceUpdate is null, failed to parse response.resource + // Resource in ResourceUpdate is null, failed to parse response.resource private static final DiscoveryResponseData DISCOVERY_RESPONSE_URI_MAP_RESOURCE_IS_NULL = new DiscoveryResponseData( D2_URI_MAP, @@ -135,13 +183,12 @@ public class TestXdsClientImpl NONCE, null); - // case2 : Resource field in Resource is null - private static final DiscoveryResponseData DISCOVERY_RESPONSE_URI_MAP_EMPTY_MAP = + private static final DiscoveryResponseData DISCOVERY_RESPONSE_URI_MAP_EMPTY = new DiscoveryResponseData(D2_URI_MAP, EMPTY_URI_MAP_RESOURCE, null, NONCE, null); - // case3 : ResourceList is empty + // ResourceList is empty private static final DiscoveryResponseData DISCOVERY_RESPONSE_WITH_EMPTY_URI_MAP_RESPONSE = - new DiscoveryResponseData(D2_URI_MAP, null, null, NONCE, null); + new DiscoveryResponseData(D2_URI_MAP, Collections.emptyList(), null, NONCE, null); private static final DiscoveryResponseData DISCOVERY_RESPONSE_NODE_DATA_WITH_REMOVAL = new DiscoveryResponseData(NODE, Collections.emptyList(), Collections.singletonList(SERVICE_RESOURCE_NAME), NONCE, null); private static final DiscoveryResponseData DISCOVERY_RESPONSE_URI_MAP_DATA_WITH_REMOVAL = @@ -151,48 +198,69 @@ public class TestXdsClientImpl private static final String URI_URN1 = "xdstp:///indis.D2URI/" + CLUSTER_NAME + "/" + URI1; private static final String URI_URN2 = "xdstp:///indis.D2URI/" + CLUSTER_NAME + "/" + URI2; - @Test - public void testHandleD2NodeResponseWithData() + @DataProvider(name = "providerWatcherFlags") + public Object[][] watcherFlags() + { + // { + // toWatchIndividual --- whether to watch resources with individual watcher + // toWatchWildcard --- whether to watch resources with wildcard watcher + // } + return new Object[][] + { + {true, false}, + {false, true}, + {true, true} + }; + } + @Test(dataProvider = "providerWatcherFlags") + public void testHandleD2NodeResponseWithData(boolean toWatchIndividual, boolean toWatchWildcard) { + // make sure the watchers are notified as expected regardless of watching only by its own type, or watching + // with both via individual and wildcard watchers XdsClientImplFixture fixture = new XdsClientImplFixture(); + if (toWatchIndividual) + { + fixture.watchNodeResource(); + } + if (toWatchWildcard) + { + fixture.watchNodeResourceViaWildcard(); + } // subscriber original data is null - fixture._nodeSubscriber.setData(null); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); fixture.verifyAckSent(1); - verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE1)); - verify(fixture._wildcardResourceWatcher).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE1)); + verify(fixture._resourceWatcher, times(toWatchIndividual ? 1 : 0)).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher, times(toWatchWildcard ? 1 : 0)) + .onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE1)); verifyZeroInteractions(fixture._serverMetricsProvider); // initial update should not track latency - XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); // subscriber data should be updated to NODE_UPDATE1 - Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); - actualData = (XdsClient.NodeUpdate) fixture._nodeWildcardSubscriber.getData(SERVICE_RESOURCE_NAME); - // subscriber data should be updated to NODE_UPDATE1 - Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); + Assert.assertEquals(fixture._nodeSubscriber.getData(), NODE_UPDATE1); + Assert.assertEquals(fixture._nodeWildcardSubscriber.getData(SERVICE_RESOURCE_NAME), NODE_UPDATE1); // subscriber original data is invalid, xds server latency won't be tracked fixture._nodeSubscriber.setData(new XdsClient.NodeUpdate(null)); fixture._nodeWildcardSubscriber.setData(SERVICE_RESOURCE_NAME, new XdsClient.NodeUpdate(null)); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); fixture.verifyAckSent(2); - verify(fixture._resourceWatcher, times(2)).onChanged(eq(NODE_UPDATE1)); - verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE1)); + verify(fixture._resourceWatcher, times(toWatchIndividual ? 2 : 0)).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher, times(toWatchWildcard ? 2 : 0)).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE1)); verifyZeroInteractions(fixture._serverMetricsProvider); // subscriber data should be updated to NODE_UPDATE2 fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA2); - actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); - verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE2)); - verify(fixture._wildcardResourceWatcher).onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE2)); + verify(fixture._resourceWatcher, times(toWatchIndividual ? 1 : 0)).onChanged(eq(NODE_UPDATE2)); + verify(fixture._wildcardResourceWatcher, times(toWatchWildcard ? 1 : 0)). + onChanged(eq(SERVICE_RESOURCE_NAME), eq(NODE_UPDATE2)); verify(fixture._serverMetricsProvider).trackLatency(anyLong()); - Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE2.getNodeData()); - actualData = (XdsClient.NodeUpdate) fixture._nodeWildcardSubscriber.getData(SERVICE_RESOURCE_NAME); - Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE2.getNodeData()); + Assert.assertEquals(fixture._nodeSubscriber.getData(), NODE_UPDATE2); + Assert.assertEquals(fixture._nodeWildcardSubscriber.getData(SERVICE_RESOURCE_NAME), NODE_UPDATE2); } @Test public void testHandleD2NodeUpdateWithEmptyResponse() { XdsClientImplFixture fixture = new XdsClientImplFixture(); + fixture.watchAllResourceAndWatcherTypes(); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_WITH_EMPTY_NODE_RESPONSE); fixture.verifyAckSent(1); verify(fixture._clusterSubscriber, times(0)).onData(any(), any()); @@ -202,39 +270,58 @@ public void testHandleD2NodeUpdateWithEmptyResponse() @DataProvider(name = "badNodeUpdateTestCases") public Object[][] provideBadNodeDataTestCases() { + // { + // badData --- bad resource data to test + // nackExpected --- whether nack is expected + // toWatchIndividual --- whether to watch resources with individual watcher + // toWatchWildcard --- whether to watch resources with wildcard watcher + // } return new Object[][]{ - {DISCOVERY_RESPONSE_NODE_RESOURCE_IS_NULL, true}, - {DISCOVERY_RESPONSE_NODE_NULL_DATA_IN_RESOURCE_FILED, false}, + {DISCOVERY_RESPONSE_NODE_RESOURCE_IS_NULL, true, true, false}, + {DISCOVERY_RESPONSE_NODE_RESOURCE_IS_NULL, true, false, true}, + {DISCOVERY_RESPONSE_NODE_RESOURCE_IS_NULL, true, true, true}, + {DISCOVERY_RESPONSE_NODE_NULL_DATA_IN_RESOURCE_FIELD, false, true, false}, + {DISCOVERY_RESPONSE_NODE_NULL_DATA_IN_RESOURCE_FIELD, false, false, true}, + {DISCOVERY_RESPONSE_NODE_NULL_DATA_IN_RESOURCE_FIELD, false, true, true}, }; } @Test(dataProvider = "badNodeUpdateTestCases") - public void testHandleD2NodeUpdateWithBadData(DiscoveryResponseData badData, boolean nackExpected) + public void testHandleD2NodeUpdateWithBadData(DiscoveryResponseData badData, boolean nackExpected, + boolean toWatchIndividual, boolean toWatchWildcard ) { XdsClientImplFixture fixture = new XdsClientImplFixture(); - fixture._nodeSubscriber.setData(null); + if (toWatchIndividual) + { + fixture.watchNodeResource(); + } + if (toWatchWildcard) + { + fixture.watchNodeResourceViaWildcard(); + } fixture._xdsClientImpl.handleResponse(badData); fixture.verifyAckOrNack(nackExpected, 1); - verify(fixture._resourceWatcher).onChanged(eq(NODE.emptyData())); - // The wildcard subscriber doesn't care about bad data, it doesn't need to notify the watcher - verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); - XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); - Assert.assertNull(Objects.requireNonNull(actualData).getNodeData()); + // since current data is null, all watchers should be notified for bad data to stop waiting. + verify(fixture._resourceWatcher, times(toWatchIndividual ? 1 : 0)).onChanged(eq(NODE.emptyData())); + verify(fixture._wildcardResourceWatcher, times(toWatchWildcard ? 1 : 0)).onChanged(any(), eq(NODE.emptyData())); + Assert.assertEquals(fixture._nodeSubscriber.getData(), NODE.emptyData()); fixture._nodeSubscriber.setData(NODE_UPDATE1); fixture._xdsClientImpl.handleResponse(badData); fixture.verifyAckOrNack(nackExpected, 2); - verify(fixture._resourceWatcher).onChanged(eq(NODE_UPDATE1)); - verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); - actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); - // bad data will not overwrite the original valid data - Assert.assertEquals(actualData.getNodeData(), NODE_UPDATE1.getNodeData()); + // current data is not null, bad data will not overwrite the original valid data and watchers won't be notified. + Assert.assertEquals(fixture._nodeSubscriber.getData(), NODE_UPDATE1); + verify(fixture._resourceWatcher, times(0)).onChanged(eq(NODE_UPDATE1)); + verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), eq(NODE_UPDATE1)); } + // Removed resource will not overwrite the original valid data for individual subscriber, but will be removed + // in wildcard subscriber @Test public void testHandleD2NodeResponseWithRemoval() { XdsClientImplFixture fixture = new XdsClientImplFixture(); + fixture.watchAllResourceAndWatcherTypes(); fixture._nodeSubscriber.setData(NODE_UPDATE1); fixture._nodeWildcardSubscriber.setData(SERVICE_RESOURCE_NAME, NODE_UPDATE1); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA_WITH_REMOVAL); @@ -243,45 +330,127 @@ public void testHandleD2NodeResponseWithRemoval() verify(fixture._wildcardResourceWatcher).onRemoval(eq(SERVICE_RESOURCE_NAME)); verify(fixture._nodeSubscriber).onRemoval(); verify(fixture._nodeWildcardSubscriber).onRemoval(eq(SERVICE_RESOURCE_NAME)); - XdsClient.NodeUpdate actualData = (XdsClient.NodeUpdate) fixture._nodeSubscriber.getData(); - // removed resource will not overwrite the original valid data - Assert.assertEquals(Objects.requireNonNull(actualData).getNodeData(), NODE_UPDATE1.getNodeData()); + Assert.assertEquals(fixture._nodeSubscriber.getData(), NODE_UPDATE1); + Assert.assertNull(fixture._nodeWildcardSubscriber.getData(SERVICE_RESOURCE_NAME)); } @Test - public void testHandleD2URIMapResponseWithData() + public void testHandleD2ClusterOrServiceNameResponse() { XdsClientImplFixture fixture = new XdsClientImplFixture(); - // subscriber original data is null - fixture._clusterSubscriber.setData(null); - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); + fixture.watchAllResourceAndWatcherTypes(); + // D2ClusterOrServiceName can be subscribed only via wildcard, valid new data should update subscriber data + fixture._xdsClientImpl.handleResponse(RESPONSE_WITH_SERVICE_NAMES); fixture.verifyAckSent(1); - verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); - verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); - verifyZeroInteractions(fixture._serverMetricsProvider); - D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); - // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA1 - Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); - actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); - Assert.assertEquals(Objects.requireNonNull(actualData).getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); + verify(fixture._wildcardResourceWatcher).onChanged(eq(SERVICE_RESOURCE_NAME), eq(SERVICE_NAME_DATA_UPDATE)); + verify(fixture._wildcardResourceWatcher).onChanged(eq(SERVICE_RESOURCE_NAME_2), eq(SERVICE_NAME_DATA_UPDATE_2)); + verify(fixture._wildcardResourceWatcher).onAllResourcesProcessed(); + Assert.assertEquals(fixture._nameWildcardSubscriber.getData(SERVICE_RESOURCE_NAME), SERVICE_NAME_DATA_UPDATE); + Assert.assertEquals(fixture._nameWildcardSubscriber.getData(SERVICE_RESOURCE_NAME_2), SERVICE_NAME_DATA_UPDATE_2); + verifyZeroInteractions(fixture._serverMetricsProvider); // initial update should not track latency + } - // subscriber original data is invalid, xds server latency won't be tracked - fixture._clusterSubscriber.setData(new XdsClient.D2URIMapUpdate(null)); - fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, new XdsClient.D2URIMapUpdate(null)); + @Test + public void testHandleD2ClusterOrServiceNameEmptyResponse() + { + XdsClientImplFixture fixture = new XdsClientImplFixture(); + fixture.watchAllResourceAndWatcherTypes(); + fixture._xdsClientImpl.handleResponse(RESPONSE_WITH_EMPTY_NAMES); + fixture.verifyAckSent(1); + verify(fixture._nameWildcardSubscriber, times(0)).onData(any(), any()); + } + + @Test + public void testHandleD2ClusterOrServiceNameResponseWithBadData() + { + XdsClientImplFixture fixture = new XdsClientImplFixture(); + fixture.watchAllResourceAndWatcherTypes(); + // when current data is null, all watchers should be notified for bad data to stop waiting. + fixture._xdsClientImpl.handleResponse(RESPONSE_WITH_NULL_NAMES); + fixture.verifyAckOrNack(true, 1); + verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), + eq(D2_CLUSTER_OR_SERVICE_NAME.emptyData())); + verify(fixture._wildcardResourceWatcher).onChanged(eq(SERVICE_RESOURCE_NAME), + eq(D2_CLUSTER_OR_SERVICE_NAME.emptyData())); + verify(fixture._wildcardResourceWatcher).onAllResourcesProcessed(); + Assert.assertEquals(fixture._nameWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME), + D2_CLUSTER_OR_SERVICE_NAME.emptyData()); + Assert.assertEquals(fixture._nameWildcardSubscriber.getData(SERVICE_RESOURCE_NAME), + D2_CLUSTER_OR_SERVICE_NAME.emptyData()); + + // when current data is not null, bad data won't overwrite the original valid data and watchers won't be notified. + fixture._nameWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, CLUSTER_NAME_DATA_UPDATE); + fixture._nameWildcardSubscriber.setData(SERVICE_RESOURCE_NAME, SERVICE_NAME_DATA_UPDATE); + fixture._xdsClientImpl.handleResponse(RESPONSE_WITH_NULL_NAMES); + fixture.verifyAckOrNack(true, 2); + verify(fixture._wildcardResourceWatcher, times(0)) + .onChanged(eq(CLUSTER_RESOURCE_NAME), eq(CLUSTER_NAME_DATA_UPDATE)); + verify(fixture._wildcardResourceWatcher, times(0)) + .onChanged(eq(SERVICE_RESOURCE_NAME), eq(SERVICE_NAME_DATA_UPDATE)); + verify(fixture._wildcardResourceWatcher, times(2)).onAllResourcesProcessed(); + Assert.assertEquals(fixture._nameWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME), CLUSTER_NAME_DATA_UPDATE); + Assert.assertEquals(fixture._nameWildcardSubscriber.getData(SERVICE_RESOURCE_NAME), SERVICE_NAME_DATA_UPDATE); + } + + // Removed resource will be removed in wildcard subscriber, where other resource is still kept intact. + @Test + public void testHandleD2ClusterOrServiceNameResponseWithRemoval() + { + XdsClientImplFixture fixture = new XdsClientImplFixture(); + fixture.watchAllResourceAndWatcherTypes(); + fixture._nameWildcardSubscriber.setData(SERVICE_RESOURCE_NAME, SERVICE_NAME_DATA_UPDATE); + fixture._nameWildcardSubscriber.setData(SERVICE_RESOURCE_NAME_2, SERVICE_NAME_DATA_UPDATE_2); + fixture._xdsClientImpl.handleResponse(RESPONSE_WITH_NAME_REMOVAL); + fixture.verifyAckSent(1); + verify(fixture._wildcardResourceWatcher).onRemoval(SERVICE_RESOURCE_NAME); + verify(fixture._nameWildcardSubscriber).onRemoval(SERVICE_RESOURCE_NAME); + Assert.assertNull(fixture._nameWildcardSubscriber.getData(SERVICE_RESOURCE_NAME)); + Assert.assertEquals(fixture._nameWildcardSubscriber.getData(SERVICE_RESOURCE_NAME_2), SERVICE_NAME_DATA_UPDATE_2); + } + + @Test(dataProvider = "providerWatcherFlags") + public void testHandleD2URIMapResponseWithData(boolean toWatchIndividual, boolean toWatchWildcard) + { + XdsClientImplFixture fixture = new XdsClientImplFixture(); + if (toWatchIndividual) + { + fixture.watchUriMapResource(); + } + if (toWatchWildcard) + { + fixture.watchUriMapResourceViaWildcard(); + } + // subscriber original data is null, watchers and subscribers will be notified/updated for new valid data, and + // xds server latency won't be tracked fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); - verify(fixture._resourceWatcher, times(2)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); - verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + fixture.verifyAckSent(1); + verify(fixture._resourceWatcher, times(toWatchIndividual ? 1 : 0)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(toWatchWildcard ? 1 : 0)) + .onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); - fixture.verifyAckSent(2); + Assert.assertEquals(fixture._clusterSubscriber.getData(), D2_URI_MAP_UPDATE_WITH_DATA1); + Assert.assertEquals(fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME), D2_URI_MAP_UPDATE_WITH_DATA1); + // subscriber original data is not null, new data will overwrite the original valid data, and watchers will be + // notified, and xds server latency will be tracked. fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA2); // updated uri1, added uri2 - actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); - // subscriber data should be updated to D2_URI_MAP_UPDATE_WITH_DATA2 - verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA2)); + verify(fixture._resourceWatcher, times(toWatchIndividual ? 1 : 0)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA2)); + verify(fixture._wildcardResourceWatcher, times(toWatchWildcard ? 1 : 0)) + .onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA2)); verify(fixture._serverMetricsProvider, times(2)).trackLatency(anyLong()); - Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA2.getURIMap()); - actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); - Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA2.getURIMap()); + Assert.assertEquals(fixture._clusterSubscriber.getData(), D2_URI_MAP_UPDATE_WITH_DATA2); + Assert.assertEquals(fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME), D2_URI_MAP_UPDATE_WITH_DATA2); + fixture.verifyAckSent(2); + + // new data with an empty uri map will update the original data, watchers will be notified, but xds server latency + // won't be tracked. + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_EMPTY); + verify(fixture._resourceWatcher, times(toWatchIndividual ? 1 : 0)).onChanged(eq(D_2_URI_MAP_UPDATE_WITH_EMPTY_MAP)); + verify(fixture._wildcardResourceWatcher, times(toWatchWildcard ? 1 : 0)) + .onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D_2_URI_MAP_UPDATE_WITH_EMPTY_MAP)); + verifyNoMoreInteractions(fixture._serverMetricsProvider); // won't track latency for removed uris + Assert.assertEquals(fixture._clusterSubscriber.getData(), D_2_URI_MAP_UPDATE_WITH_EMPTY_MAP); + Assert.assertEquals(fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME), D_2_URI_MAP_UPDATE_WITH_EMPTY_MAP); fixture.verifyAckSent(3); } @@ -289,6 +458,7 @@ public void testHandleD2URIMapResponseWithData() public void testHandleD2URIMapUpdateWithEmptyResponse() { XdsClientImplFixture fixture = new XdsClientImplFixture(); + fixture.watchAllResourceAndWatcherTypes(); // Sanity check that the code handles empty responses fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_WITH_EMPTY_URI_MAP_RESPONSE); fixture.verifyAckSent(1); @@ -296,60 +466,38 @@ public void testHandleD2URIMapUpdateWithEmptyResponse() verify(fixture._uriMapWildcardSubscriber, times(0)).onData(any(), any()); } - @DataProvider(name = "badD2URIMapUpdateTestCases") - public Object[][] provideBadD2URIMapDataTestCases() - { - return new Object[][]{ - {DISCOVERY_RESPONSE_URI_MAP_RESOURCE_IS_NULL, true}, - {DISCOVERY_RESPONSE_URI_MAP_EMPTY_MAP, false}, - }; - } - - @Test(dataProvider = "badD2URIMapUpdateTestCases") - public void testHandleD2URIMapUpdateWithBadData(DiscoveryResponseData badData, boolean invalidData) + @Test(dataProvider = "providerWatcherFlags") + public void testHandleD2URIMapUpdateWithBadData(boolean toWatchIndividual, boolean toWatchWildcard) { XdsClientImplFixture fixture = new XdsClientImplFixture(); - fixture._clusterSubscriber.setData(null); - fixture._xdsClientImpl.handleResponse(badData); - fixture.verifyAckOrNack(invalidData, 1); - // If the map is empty, we expect an empty map, but if it's invalid we expect a null - D2URIMapUpdate expectedUpdate = - invalidData - ? (D2URIMapUpdate) D2_URI_MAP.emptyData() - : new D2URIMapUpdate(Collections.emptyMap()); - verify(fixture._resourceWatcher).onChanged(eq(expectedUpdate)); - if (!invalidData) + if (toWatchIndividual) { - verify(fixture._wildcardResourceWatcher).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); + fixture.watchUriMapResource(); } - verify(fixture._clusterSubscriber).setData(eq(null)); - verify(fixture._uriMapWildcardSubscriber, times(0)).setData(any(), any()); - verifyZeroInteractions(fixture._serverMetricsProvider); - D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); - Assert.assertEquals(actualData, expectedUpdate); - - fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); - fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); - fixture._xdsClientImpl.handleResponse(badData); - fixture.verifyAckOrNack(invalidData, 2); - actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); - Objects.requireNonNull(actualData); - if (invalidData) + if (toWatchWildcard) { - verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); - verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); - // bad data will not overwrite the original valid data - Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); - } - else - { - verify(fixture._resourceWatcher, times(2)).onChanged(eq(expectedUpdate)); - verify(fixture._wildcardResourceWatcher, times(2)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(expectedUpdate)); - // But an empty cluster should clear the data - Assert.assertEquals(actualData.getURIMap(), Collections.emptyMap()); - actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); - Assert.assertEquals(actualData.getURIMap(), Collections.emptyMap()); + fixture.watchUriMapResourceViaWildcard(); } + // current data is null, all watchers should be notified for bad data to stop waiting. + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_RESOURCE_IS_NULL); + fixture.verifyAckOrNack(true, 1); + verify(fixture._resourceWatcher, times(toWatchIndividual ? 1 : 0)).onChanged(eq(D2_URI_MAP.emptyData())); + verify(fixture._wildcardResourceWatcher, times(toWatchWildcard ? 1 : 0)) + .onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP.emptyData())); + Assert.assertEquals(fixture._clusterSubscriber.getData(), D2_URI_MAP.emptyData()); + Assert.assertEquals(fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME), D2_URI_MAP.emptyData()); + + // current data is not null, bad data will not overwrite the original valid data and watchers won't be notified. + fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); + fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_RESOURCE_IS_NULL); + + fixture.verifyAckOrNack(true, 2); + verify(fixture._resourceWatcher, times(0)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + verify(fixture._wildcardResourceWatcher, times(0)) + .onChanged(any(), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); + // bad data will not overwrite the original valid data + Assert.assertEquals(fixture._clusterSubscriber.getData(), D2_URI_MAP_UPDATE_WITH_DATA1); verifyZeroInteractions(fixture._serverMetricsProvider); } @@ -357,6 +505,7 @@ public void testHandleD2URIMapUpdateWithBadData(DiscoveryResponseData badData, b public void testHandleD2URIMapResponseWithRemoval() { XdsClientImplFixture fixture = new XdsClientImplFixture(); + fixture.watchAllResourceAndWatcherTypes(); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA_WITH_REMOVAL); @@ -382,8 +531,8 @@ public void testHandleD2URICollectionResponseWithData() .build() ), null, NONCE, null); XdsClientImplFixture fixture = new XdsClientImplFixture(); + fixture.watchAllResourceAndWatcherTypes(); // subscriber original data is null - fixture._clusterSubscriber.setData(null); fixture._xdsClientImpl.handleResponse(createUri1); fixture.verifyAckSent(1); verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); @@ -444,13 +593,14 @@ public void testHandleD2URICollectionResponseWithData() public void testHandleD2URICollectionUpdateWithEmptyResponse() { XdsClientImplFixture fixture = new XdsClientImplFixture(); + fixture.watchAllResourceAndWatcherTypes(); // Sanity check that the code handles empty responses fixture._xdsClientImpl.handleResponse(new DiscoveryResponseData(D2_URI, null, null, NONCE, null)); fixture.verifyAckSent(1); } - @Test - public void testHandleD2URICollectionUpdateWithBadData() + @Test(dataProvider = "providerWatcherFlags") + public void testHandleD2URICollectionUpdateWithBadData(boolean toWatchIndividual, boolean toWatchWildcard) { DiscoveryResponseData badData = new DiscoveryResponseData( D2_URI, @@ -462,27 +612,34 @@ public void testHandleD2URICollectionUpdateWithBadData() null); XdsClientImplFixture fixture = new XdsClientImplFixture(); - fixture._clusterSubscriber.setData(null); + if (toWatchIndividual) + { + fixture.watchUriMapResource(); + } + if (toWatchWildcard) + { + fixture.watchUriMapResourceViaWildcard(); + } + + // current data is null, empty placeholder data will be set the subscriber, + // and all watchers should be notified for bad data to stop waiting. fixture._xdsClientImpl.handleResponse(badData); fixture.verifyNackSent(1); - verify(fixture._resourceWatcher).onChanged(eq(D2_URI_MAP.emptyData())); - // The wildcard subscriber doesn't care about bad data, and simply treats it as the resource not existing - verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); + verify(fixture._resourceWatcher, times(toWatchIndividual ? 1 : 0)).onChanged(eq(D2_URI_MAP.emptyData())); + verify(fixture._wildcardResourceWatcher, times(toWatchWildcard ? 1 : 0)) + .onChanged(any(), eq(D2_URI_MAP.emptyData())); verifyZeroInteractions(fixture._serverMetricsProvider); - D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); - Assert.assertNull(Objects.requireNonNull(actualData).getURIMap()); - actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); - Assert.assertNull(actualData); + // current data is not null, bad data will not overwrite the original valid data and watchers won't be notified. fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(badData); fixture.verifyNackSent(2); - // Due to the way glob collection updates are handled, bad data is dropped rather than showing any visible side - // effects other than NACKing the response. verify(fixture._resourceWatcher, times(0)).onChanged(eq(D2_URI_MAP_UPDATE_WITH_DATA1)); - verify(fixture._wildcardResourceWatcher, times(0)).onChanged(any(), any()); + verify(fixture._wildcardResourceWatcher, times(0)) + .onChanged(any(), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); verifyZeroInteractions(fixture._serverMetricsProvider); + Assert.assertEquals(fixture._clusterSubscriber.getData(), D2_URI_MAP_UPDATE_WITH_DATA1); } @Test @@ -492,6 +649,7 @@ public void testHandleD2URICollectionResponseWithRemoval() new DiscoveryResponseData(D2_URI, null, Collections.singletonList(CLUSTER_GLOB_COLLECTION), NONCE, null); XdsClientImplFixture fixture = new XdsClientImplFixture(); + fixture.watchAllResourceAndWatcherTypes(); fixture._clusterSubscriber.setData(D2_URI_MAP_UPDATE_WITH_DATA1); fixture._uriMapWildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, D2_URI_MAP_UPDATE_WITH_DATA1); fixture._xdsClientImpl.handleResponse(removeClusterResponse); @@ -501,42 +659,9 @@ public void testHandleD2URICollectionResponseWithRemoval() verify(fixture._clusterSubscriber).onRemoval(); verify(fixture._uriMapWildcardSubscriber).onRemoval(eq(CLUSTER_RESOURCE_NAME)); verifyZeroInteractions(fixture._serverMetricsProvider); - D2URIMapUpdate actualData = (D2URIMapUpdate) fixture._clusterSubscriber.getData(); // removed resource will not overwrite the original valid data - Assert.assertEquals(actualData.getURIMap(), D2_URI_MAP_UPDATE_WITH_DATA1.getURIMap()); - actualData = (D2URIMapUpdate) fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME); - Assert.assertNull(actualData); - } - - @Test - public void testWildCardResourceSubscription() - { - XdsClientImplFixture fixture = new XdsClientImplFixture(); - - XdsClient.WildcardNodeResourceWatcher nodeWildCardWatcher = Mockito.mock(XdsClient.WildcardNodeResourceWatcher.class); - XdsClient.WildcardD2URIMapResourceWatcher uriMapWildCardWatcher = Mockito.mock(XdsClient.WildcardD2URIMapResourceWatcher.class); - fixture._xdsClientImpl.getWildcardResourceSubscriber(NODE).addWatcher(nodeWildCardWatcher); - fixture._xdsClientImpl.getWildcardResourceSubscriber(D2_URI_MAP).addWatcher(uriMapWildCardWatcher); - - // NODE resource added - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA1); - fixture.verifyAckSent(1); - nodeWildCardWatcher.onChanged(eq(SERVICE_RESOURCE_NAME) , eq(NODE_UPDATE1)); - - // NODE resource removed - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_NODE_DATA_WITH_REMOVAL); - fixture.verifyAckSent(2); - nodeWildCardWatcher.onRemoval(eq(SERVICE_RESOURCE_NAME)); - - // URI_MAP resource added - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA1); - fixture.verifyAckSent(3); - uriMapWildCardWatcher.onChanged(eq(CLUSTER_RESOURCE_NAME), eq(D2_URI_MAP_UPDATE_WITH_DATA1)); - - // URI_MAP resource removed - fixture._xdsClientImpl.handleResponse(DISCOVERY_RESPONSE_URI_MAP_DATA_WITH_REMOVAL); - fixture.verifyAckSent(4); - uriMapWildCardWatcher.onRemoval(eq(CLUSTER_RESOURCE_NAME)); + Assert.assertEquals(fixture._clusterSubscriber.getData(), D2_URI_MAP_UPDATE_WITH_DATA1); + Assert.assertNull(fixture._uriMapWildcardSubscriber.getData(CLUSTER_RESOURCE_NAME)); } @Test @@ -554,17 +679,80 @@ public void testResourceSubscriberAddWatcher() subscriber.addWatcher(watcher); } verify(watcher, times(10)).onChanged(eq(update)); + + WildcardResourceSubscriber wildcardSubscriber = new WildcardResourceSubscriber(D2_CLUSTER_OR_SERVICE_NAME); + XdsClient.WildcardResourceWatcher _wildcardWatcher = Mockito.mock(XdsClient.WildcardResourceWatcher.class); + wildcardSubscriber.addWatcher(_wildcardWatcher); + verify(_wildcardWatcher, times(0)).onChanged(any(), any()); + + wildcardSubscriber.setData(CLUSTER_RESOURCE_NAME, CLUSTER_NAME_DATA_UPDATE); + for (int i = 0; i < 10; i++) + { + wildcardSubscriber.addWatcher(_wildcardWatcher); + } + verify(_wildcardWatcher, times(10)).onChanged(eq(CLUSTER_RESOURCE_NAME), eq(CLUSTER_NAME_DATA_UPDATE)); + } + + @DataProvider(name = "provideUseGlobCollection") + public Object[][] provideUseGlobCollection() + { + // { + // useGlobCollection --- whether to use glob collection + // } + return new Object[][]{ + {true}, + {false} + }; + } + @Test(dataProvider = "provideUseGlobCollection", timeOut = 2000) + // Retry task should re-subscribe the resources registered in each subscriber type. + public void testRetry(boolean useGlobCollection) throws ExecutionException, InterruptedException { + XdsClientImplFixture fixture = new XdsClientImplFixture(useGlobCollection); + fixture.watchAllResourceAndWatcherTypes(); + fixture._xdsClientImpl.testRetryTask(fixture._adsStream); + fixture._xdsClientImpl._retryRpcStreamFuture.get(); + + // get all the resource types and names sent in the discovery requests and verify them + List types = fixture._resourceTypesArgumentCaptor.getAllValues(); + List nameLists = fixture._resourceNamesArgumentCaptor.getAllValues().stream() + .map(names -> { + if (names.size() != 1) + { + Assert.fail("Resource names should be a singleton list"); + } + return names.iterator().next(); + }).collect(Collectors.toList()); + Assert.assertEquals(types.size(), 5); + Assert.assertEquals(nameLists.size(), 5); + + List> args = new ArrayList<>(); + for (int i = 0; i < 5; i++) + { + args.add(Pair.of(types.get(i), nameLists.get(i))); + } + args = args.stream().sorted().collect(Collectors.toList()); + + Assert.assertEquals(args, Arrays.asList( + Pair.of(NODE, "*"), + Pair.of(NODE, SERVICE_RESOURCE_NAME), + Pair.of(useGlobCollection ? D2_URI : D2_URI_MAP, "*"), + Pair.of(useGlobCollection ? D2_URI : D2_URI_MAP, useGlobCollection ? CLUSTER_GLOB_COLLECTION : CLUSTER_RESOURCE_NAME), + Pair.of(D2_CLUSTER_OR_SERVICE_NAME, "*") + )); } private static class XdsClientImplFixture { XdsClientImpl _xdsClientImpl; @Mock + XdsClientImpl.AdsStream _adsStream; + @Mock XdsClientJmx _xdsClientJmx; ResourceSubscriber _nodeSubscriber; ResourceSubscriber _clusterSubscriber; XdsClientImpl.WildcardResourceSubscriber _nodeWildcardSubscriber; XdsClientImpl.WildcardResourceSubscriber _uriMapWildcardSubscriber; + XdsClientImpl.WildcardResourceSubscriber _nameWildcardSubscriber; Map> _subscribers = new HashMap<>(); Map _wildcardSubscribers = new HashMap<>(); @@ -575,6 +763,11 @@ private static class XdsClientImplFixture @Mock XdsServerMetricsProvider _serverMetricsProvider; + @Captor + ArgumentCaptor _resourceTypesArgumentCaptor; + @Captor + ArgumentCaptor> _resourceNamesArgumentCaptor; + XdsClientImplFixture() { this(false); @@ -587,29 +780,67 @@ private static class XdsClientImplFixture _clusterSubscriber = spy(new ResourceSubscriber(D2_URI_MAP, CLUSTER_RESOURCE_NAME, _xdsClientJmx)); _nodeWildcardSubscriber = spy(new XdsClientImpl.WildcardResourceSubscriber(NODE)); _uriMapWildcardSubscriber = spy(new XdsClientImpl.WildcardResourceSubscriber(D2_URI_MAP)); - + _nameWildcardSubscriber = spy(new XdsClientImpl.WildcardResourceSubscriber(D2_CLUSTER_OR_SERVICE_NAME)); doNothing().when(_resourceWatcher).onChanged(any()); + doNothing().when(_wildcardResourceWatcher).onChanged(any(), any()); + doNothing().when(_serverMetricsProvider).trackLatency(anyLong()); + for (ResourceSubscriber subscriber : Lists.newArrayList(_nodeSubscriber, _clusterSubscriber)) { - subscriber.addWatcher(_resourceWatcher); _subscribers.put(subscriber.getType(), Collections.singletonMap(subscriber.getResource(), subscriber)); } - for (WildcardResourceSubscriber subscriber : Lists.newArrayList(_nodeWildcardSubscriber, _uriMapWildcardSubscriber)) + for (WildcardResourceSubscriber subscriber : Lists.newArrayList(_nodeWildcardSubscriber, + _uriMapWildcardSubscriber, _nameWildcardSubscriber)) { - subscriber.addWatcher(_wildcardResourceWatcher); _wildcardSubscribers.put(subscriber.getType(), subscriber); } - doNothing().when(_serverMetricsProvider).trackLatency(anyLong()); - _xdsClientImpl = spy(new XdsClientImpl(null, null, null, 0, - useGlobCollections, _serverMetricsProvider)); + + _xdsClientImpl = spy(new XdsClientImpl(null, null, + Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("test executor")), + 0, useGlobCollections, _serverMetricsProvider)); + _xdsClientImpl._adsStream = _adsStream; + + doNothing().when(_xdsClientImpl).startRpcStreamLocal(); doNothing().when(_xdsClientImpl).sendAckOrNack(any(), any(), any()); + doNothing().when(_adsStream).sendDiscoveryRequest(_resourceTypesArgumentCaptor.capture(), _resourceNamesArgumentCaptor.capture()); + when(_xdsClientImpl.getXdsClientJmx()).thenReturn(_xdsClientJmx); - when(_xdsClientImpl.getResourceSubscriberMap(any())) - .thenAnswer(a -> _subscribers.get((ResourceType) a.getArguments()[0])); - doNothing().when(_xdsClientImpl).watchAllXdsResources(any()); - when(_xdsClientImpl.getWildcardResourceSubscriber(any())) - .thenAnswer(a -> _wildcardSubscribers.get((ResourceType) a.getArguments()[0])); + when(_xdsClientImpl.getResourceSubscribers()).thenReturn(_subscribers); + when(_xdsClientImpl.getWildcardResourceSubscribers()).thenReturn(_wildcardSubscribers); + } + + void watchAllResourceAndWatcherTypes() + { + for (ResourceSubscriber subscriber : Lists.newArrayList(_nodeSubscriber, _clusterSubscriber)) + { + subscriber.addWatcher(_resourceWatcher); + } + for (WildcardResourceSubscriber subscriber : Lists.newArrayList(_nodeWildcardSubscriber, + _uriMapWildcardSubscriber, _nameWildcardSubscriber)) + { + subscriber.addWatcher(_wildcardResourceWatcher); + } + } + + void watchNodeResource() + { + _nodeSubscriber.addWatcher(_resourceWatcher); + } + + void watchNodeResourceViaWildcard() + { + _nodeWildcardSubscriber.addWatcher(_wildcardResourceWatcher); + } + + void watchUriMapResource() + { + _clusterSubscriber.addWatcher(_resourceWatcher); + } + + void watchUriMapResourceViaWildcard() + { + _uriMapWildcardSubscriber.addWatcher(_wildcardResourceWatcher); } void verifyAckSent(int count)