Skip to content

Commit

Permalink
add and update XdsClient tests
Browse files Browse the repository at this point in the history
  • Loading branch information
bohhyang committed Jan 21, 2025
1 parent 3de5965 commit f69d31f
Show file tree
Hide file tree
Showing 2 changed files with 466 additions and 205 deletions.
70 changes: 50 additions & 20 deletions d2/src/main/java/com/linkedin/d2/xds/XdsClientImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -89,9 +89,11 @@ public class XdsClientImpl extends XdsClient
private final boolean _subscribeToUriGlobCollection;
private final BackoffPolicy.Provider _backoffPolicyProvider = new ExponentialBackoffPolicy.Provider();
private BackoffPolicy _retryBackoffPolicy;
private AdsStream _adsStream;
@VisibleForTesting
AdsStream _adsStream;
private boolean _shutdown;
private ScheduledFuture<?> _retryRpcStreamFuture;
@VisibleForTesting
ScheduledFuture<?> _retryRpcStreamFuture;
private ScheduledFuture<?> _readyTimeoutFuture;
private final long _readyTimeoutMillis;

Expand Down Expand Up @@ -561,11 +563,15 @@ private void handleResourceUpdate(Map<String, ? extends ResourceUpdate> updates,

for (Map.Entry<String, ? extends ResourceUpdate> entry : updates.entrySet())
{
ResourceSubscriber subscriber = subscribers.get(entry.getKey());
if (subscriber != null)
if (subscribers != null)
{
subscriber.onData(entry.getValue(), _serverMetricsProvider);
ResourceSubscriber subscriber = subscribers.get(entry.getKey());
if (subscriber != null)
{
subscriber.onData(entry.getValue(), _serverMetricsProvider);
}
}

if (wildcardSubscriber != null)
{
wildcardSubscriber.onData(entry.getKey(), entry.getValue());
Expand All @@ -580,19 +586,25 @@ private void handleResourceRemoval(Collection<String> removedResources, Resource
return;
}

Map<String, ResourceSubscriber> subscribers = getResourceSubscriberMap(type);
WildcardResourceSubscriber wildcardSubscriber = getWildcardResourceSubscriber(type);
for (String resourceName : removedResources)
{
_xdsClientJmx.incrementResourceNotFoundCount();
_log.warn("Received response that {} {} was removed", type, resourceName);
ResourceSubscriber subscriber = getResourceSubscriberMap(type).get(resourceName);
if (subscriber != null)

if (subscribers != null)
{
subscriber.onRemoval();
ResourceSubscriber subscriber = subscribers.get(resourceName);
if (subscriber != null)
{
subscriber.onRemoval();
}
}

if (wildcardSubscriber != null)
{
removedResources.forEach(wildcardSubscriber::onRemoval);
wildcardSubscriber.onRemoval(resourceName);
}
}
}
Expand Down Expand Up @@ -632,7 +644,7 @@ private void notifyStreamReconnect()
@VisibleForTesting
Map<String, ResourceSubscriber> getResourceSubscriberMap(ResourceType type)
{
return _resourceSubscribers.get(type);
return getResourceSubscribers().get(type);
}

@VisibleForTesting
Expand All @@ -644,7 +656,7 @@ Map<ResourceType, Map<String, ResourceSubscriber>> getResourceSubscribers()
@VisibleForTesting
WildcardResourceSubscriber getWildcardResourceSubscriber(ResourceType type)
{
return _wildcardSubscribers.get(type);
return getWildcardResourceSubscribers().get(type);
}

@VisibleForTesting
Expand Down Expand Up @@ -861,28 +873,28 @@ void addWatcher(WildcardResourceWatcher watcher)
}

@VisibleForTesting
void onData(String resourceName, ResourceUpdate update)
void onData(String resourceName, ResourceUpdate data)
{
if (Objects.equals(_data.get(resourceName), update))
if (Objects.equals(_data.get(resourceName), data))
{
_log.debug("Received resource update data equal to the current data. Will not perform the update.");
return;
}
// null value guard to avoid overwriting the property with null
if (update != null && update.isValid())
if (data != null && data.isValid())
{
_data.put(resourceName, update);
_data.put(resourceName, data);
}
else
{
// invalid data is received, log a warning and check if existing data is present.
if (_type == ResourceType.D2_URI_MAP || _type == ResourceType.D2_URI)
{
RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, update);
RATE_LIMITED_LOGGER.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data);
}
else
{
_log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, update);
_log.warn("Received invalid data for {} {}, data: {}", _type, resourceName, data);
}
// if no data has ever been set, init it to an empty data in case watchers are waiting for it
if (_data.get(resourceName) == null)
Expand All @@ -898,7 +910,7 @@ void onData(String resourceName, ResourceUpdate update)

for (WildcardResourceWatcher watcher : _watchers)
{
watcher.onChanged(resourceName, update);
watcher.onChanged(resourceName, _data.get(resourceName));
}
}

Expand Down Expand Up @@ -942,6 +954,22 @@ private void onAllResourcesProcessed()
}
}

/**
* This is a test-only method to simulate the retry task being executed. It should only be called from tests.
* @param testStream test ads stream
*/
@VisibleForTesting
void testRetryTask(AdsStream testStream)
{
if (_adsStream != null && _adsStream != testStream)
{
_log.warn("Non-testing ADS stream exists, ignoring test call");
return;
}
_adsStream = testStream;
_retryRpcStreamFuture = _executorService.schedule(new RpcRetryTask(), 0, TimeUnit.NANOSECONDS);
}

final class RpcRetryTask implements Runnable
{
@Override
Expand Down Expand Up @@ -1132,7 +1160,8 @@ public String toString()
}
}

private final class AdsStream
@VisibleForTesting
class AdsStream
{
private final AggregatedDiscoveryServiceGrpc.AggregatedDiscoveryServiceStub _stub;

Expand Down Expand Up @@ -1208,7 +1237,8 @@ public void onCompleted()
/**
* Sends a client-initiated discovery request.
*/
private void sendDiscoveryRequest(ResourceType type, Collection<String> resources)
@VisibleForTesting
void sendDiscoveryRequest(ResourceType type, Collection<String> resources)
{
_log.info("Sending {} request for resources: {}", type, resources);
DeltaDiscoveryRequest request = new DiscoveryRequestData(_node, type, resources).toEnvoyProto();
Expand Down
Loading

0 comments on commit f69d31f

Please sign in to comment.