Skip to content

xds: Support tracking non-xds resources in XdsDepManager #12153

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 94 additions & 70 deletions xds/src/main/java/io/grpc/xds/XdsDependencyManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import io.grpc.xds.client.XdsClient.ResourceWatcher;
import io.grpc.xds.client.XdsResourceType;
import java.util.Collections;
import java.util.EnumMap;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedHashSet;
Expand All @@ -53,8 +54,19 @@
* applies to a single data plane authority.
*/
final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegistry {
public static final XdsClusterResource CLUSTER_RESOURCE = XdsClusterResource.getInstance();
public static final XdsEndpointResource ENDPOINT_RESOURCE = XdsEndpointResource.getInstance();
private enum TrackedWatcherTypeEnum {
LDS, RDS, CDS, EDS
}

private static final TrackedWatcherType<XdsListenerResource.LdsUpdate> LDS_TYPE =
new TrackedWatcherType<>(TrackedWatcherTypeEnum.LDS);
private static final TrackedWatcherType<RdsUpdate> RDS_TYPE =
new TrackedWatcherType<>(TrackedWatcherTypeEnum.RDS);
private static final TrackedWatcherType<XdsClusterResource.CdsUpdate> CDS_TYPE =
new TrackedWatcherType<>(TrackedWatcherTypeEnum.CDS);
private static final TrackedWatcherType<XdsEndpointResource.EdsUpdate> EDS_TYPE =
new TrackedWatcherType<>(TrackedWatcherTypeEnum.EDS);

private static final int MAX_CLUSTER_RECURSION_DEPTH = 16; // Specified by gRFC A37
private final String listenerName;
private final XdsClient xdsClient;
Expand All @@ -63,7 +75,8 @@ final class XdsDependencyManager implements XdsConfig.XdsClusterSubscriptionRegi
private XdsConfigWatcher xdsConfigWatcher;

private StatusOr<XdsConfig> lastUpdate = null;
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers = new HashMap<>();
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers =
new EnumMap<>(TrackedWatcherTypeEnum.class);
private final Set<ClusterSubscription> subscriptions = new HashSet<>();

XdsDependencyManager(XdsClient xdsClient,
Expand All @@ -86,7 +99,7 @@ public void start(XdsConfigWatcher xdsConfigWatcher) {
checkState(this.xdsConfigWatcher == null, "dep manager may not be restarted");
this.xdsConfigWatcher = checkNotNull(xdsConfigWatcher, "xdsConfigWatcher");
// start the ball rolling
syncContext.execute(() -> addWatcher(new LdsWatcher(listenerName)));
syncContext.execute(() -> addWatcher(LDS_TYPE, new LdsWatcher(listenerName)));
}

@Override
Expand All @@ -96,7 +109,7 @@ public XdsConfig.Subscription subscribeToCluster(String clusterName) {
ClusterSubscription subscription = new ClusterSubscription(clusterName);

syncContext.execute(() -> {
if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
if (getWatchers(LDS_TYPE).isEmpty()) {
subscription.closed = true;
return; // shutdown() called
}
Expand All @@ -107,33 +120,28 @@ public XdsConfig.Subscription subscribeToCluster(String clusterName) {
return subscription;
}

private <T extends ResourceUpdate> void addWatcher(XdsWatcherBase<T> watcher) {
private <T extends ResourceUpdate> void addWatcher(
TrackedWatcherType<T> watcherType, XdsWatcherBase<T> watcher) {
syncContext.throwIfNotInThisSynchronizationContext();
XdsResourceType<T> type = watcher.type;
String resourceName = watcher.resourceName;

getWatchers(type).put(resourceName, watcher);
getWatchers(watcherType).put(resourceName, watcher);
xdsClient.watchXdsResource(type, resourceName, watcher, syncContext);
}

public void shutdown() {
syncContext.execute(() -> {
for (TypeWatchers<?> watchers : resourceWatchers.values()) {
shutdownWatchersForType(watchers);
for (TrackedWatcher<?> watcher : watchers.watchers.values()) {
watcher.close();
}
}
resourceWatchers.clear();
subscriptions.clear();
});
}

private <T extends ResourceUpdate> void shutdownWatchersForType(TypeWatchers<T> watchers) {
for (Map.Entry<String, XdsWatcherBase<T>> watcherEntry : watchers.watchers.entrySet()) {
xdsClient.cancelXdsResourceWatch(watchers.resourceType, watcherEntry.getKey(),
watcherEntry.getValue());
watcherEntry.getValue().cancelled = true;
}
}

private void releaseSubscription(ClusterSubscription subscription) {
checkNotNull(subscription, "subscription");
syncContext.execute(() -> {
Expand All @@ -154,12 +162,12 @@ private void releaseSubscription(ClusterSubscription subscription) {
*/
private void maybePublishConfig() {
syncContext.throwIfNotInThisSynchronizationContext();
if (getWatchers(XdsListenerResource.getInstance()).isEmpty()) {
if (getWatchers(LDS_TYPE).isEmpty()) {
return; // shutdown() called
}
boolean waitingOnResource = resourceWatchers.values().stream()
.flatMap(typeWatchers -> typeWatchers.watchers.values().stream())
.anyMatch(XdsWatcherBase::missingResult);
.anyMatch(TrackedWatcher::missingResult);
if (waitingOnResource) {
return;
}
Expand Down Expand Up @@ -194,8 +202,8 @@ private static StatusOr<XdsConfig> buildUpdate(

// Iterate watchers and build the XdsConfig

XdsWatcherBase<XdsListenerResource.LdsUpdate> ldsWatcher
= tracer.getWatcher(XdsListenerResource.getInstance(), listenerName);
TrackedWatcher<XdsListenerResource.LdsUpdate> ldsWatcher
= tracer.getWatcher(LDS_TYPE, listenerName);
if (ldsWatcher == null) {
return StatusOr.fromStatus(Status.UNAVAILABLE.withDescription(
"Bug: No listener watcher found for " + listenerName));
Expand Down Expand Up @@ -241,14 +249,13 @@ private static StatusOr<XdsConfig> buildUpdate(
return StatusOr.fromValue(builder.build());
}

private <T extends ResourceUpdate> Map<String, XdsWatcherBase<T>> getWatchers(
XdsResourceType<T> resourceType) {
TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
private <T> Map<String, TrackedWatcher<T>> getWatchers(TrackedWatcherType<T> watcherType) {
TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
if (typeWatchers == null) {
typeWatchers = new TypeWatchers<T>(resourceType);
resourceWatchers.put(resourceType, typeWatchers);
typeWatchers = new TypeWatchers<T>(watcherType);
resourceWatchers.put(watcherType.typeEnum, typeWatchers);
}
assert typeWatchers.resourceType == resourceType;
assert typeWatchers.watcherType == watcherType;
@SuppressWarnings("unchecked")
TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
return tTypeWatchers.watchers;
Expand All @@ -275,7 +282,7 @@ private static void addConfigForCluster(
return;
}

CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CLUSTER_RESOURCE, clusterName);
CdsWatcher cdsWatcher = (CdsWatcher) tracer.getWatcher(CDS_TYPE, clusterName);
StatusOr<XdsClusterResource.CdsUpdate> cdsWatcherDataOr = cdsWatcher.getData();
if (!cdsWatcherDataOr.hasValue()) {
clusters.put(clusterName, StatusOr.fromStatus(cdsWatcherDataOr.getStatus()));
Expand Down Expand Up @@ -318,8 +325,8 @@ private static void addConfigForCluster(
child = new AggregateConfig(ImmutableList.copyOf(leafNames));
break;
case EDS:
XdsWatcherBase<XdsEndpointResource.EdsUpdate> edsWatcher =
tracer.getWatcher(ENDPOINT_RESOURCE, cdsWatcher.getEdsServiceName());
TrackedWatcher<XdsEndpointResource.EdsUpdate> edsWatcher =
tracer.getWatcher(EDS_TYPE, cdsWatcher.getEdsServiceName());
if (edsWatcher != null) {
child = new EndpointConfig(edsWatcher.getData());
} else {
Expand All @@ -346,27 +353,27 @@ private static void addConfigForCluster(
}

private void addRdsWatcher(String resourceName) {
if (getWatchers(XdsRouteConfigureResource.getInstance()).containsKey(resourceName)) {
if (getWatchers(RDS_TYPE).containsKey(resourceName)) {
return;
}

addWatcher(new RdsWatcher(resourceName));
addWatcher(RDS_TYPE, new RdsWatcher(resourceName));
}

private void addEdsWatcher(String edsServiceName) {
if (getWatchers(XdsEndpointResource.getInstance()).containsKey(edsServiceName)) {
if (getWatchers(EDS_TYPE).containsKey(edsServiceName)) {
return;
}

addWatcher(new EdsWatcher(edsServiceName));
addWatcher(EDS_TYPE, new EdsWatcher(edsServiceName));
}

private void addClusterWatcher(String clusterName) {
if (getWatchers(CLUSTER_RESOURCE).containsKey(clusterName)) {
if (getWatchers(CDS_TYPE).containsKey(clusterName)) {
return;
}

addWatcher(new CdsWatcher(clusterName));
addWatcher(CDS_TYPE, new CdsWatcher(clusterName));
}

private void updateRoutes(List<VirtualHost> virtualHosts) {
Expand Down Expand Up @@ -404,13 +411,13 @@ private static Set<String> getClusterNamesFromVirtualHost(VirtualHost virtualHos
return clusters;
}

private static class TypeWatchers<T extends ResourceUpdate> {
private static class TypeWatchers<T> {
// Key is resource name
final Map<String, XdsWatcherBase<T>> watchers = new HashMap<>();
final XdsResourceType<T> resourceType;
final Map<String, TrackedWatcher<T>> watchers = new HashMap<>();
final TrackedWatcherType<T> watcherType;

TypeWatchers(XdsResourceType<T> resourceType) {
this.resourceType = resourceType;
TypeWatchers(TrackedWatcherType<T> watcherType) {
this.watcherType = checkNotNull(watcherType, "watcherType");
}
}

Expand Down Expand Up @@ -442,48 +449,46 @@ public void close() {

/** State for tracing garbage collector. */
private static final class WatcherTracer {
private final Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers;
private final Map<XdsResourceType<?>, TypeWatchers<?>> usedWatchers;
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers;
private final Map<TrackedWatcherTypeEnum, TypeWatchers<?>> usedWatchers;

public WatcherTracer(Map<XdsResourceType<?>, TypeWatchers<?>> resourceWatchers) {
public WatcherTracer(Map<TrackedWatcherTypeEnum, TypeWatchers<?>> resourceWatchers) {
this.resourceWatchers = resourceWatchers;

this.usedWatchers = new HashMap<>();
for (XdsResourceType<?> type : resourceWatchers.keySet()) {
usedWatchers.put(type, newTypeWatchers(type));
this.usedWatchers = new EnumMap<>(TrackedWatcherTypeEnum.class);
for (Map.Entry<TrackedWatcherTypeEnum, TypeWatchers<?>> me : resourceWatchers.entrySet()) {
usedWatchers.put(me.getKey(), newTypeWatchers(me.getValue().watcherType));
}
}

private static <T extends ResourceUpdate> TypeWatchers<T> newTypeWatchers(
XdsResourceType<T> type) {
private static <T> TypeWatchers<T> newTypeWatchers(TrackedWatcherType<T> type) {
return new TypeWatchers<T>(type);
}

public <T extends ResourceUpdate> XdsWatcherBase<T> getWatcher(
XdsResourceType<T> resourceType, String name) {
TypeWatchers<?> typeWatchers = resourceWatchers.get(resourceType);
public <T> TrackedWatcher<T> getWatcher(TrackedWatcherType<T> watcherType, String name) {
TypeWatchers<?> typeWatchers = resourceWatchers.get(watcherType.typeEnum);
if (typeWatchers == null) {
return null;
}
assert typeWatchers.resourceType == resourceType;
assert typeWatchers.watcherType == watcherType;
@SuppressWarnings("unchecked")
TypeWatchers<T> tTypeWatchers = (TypeWatchers<T>) typeWatchers;
XdsWatcherBase<T> watcher = tTypeWatchers.watchers.get(name);
TrackedWatcher<T> watcher = tTypeWatchers.watchers.get(name);
if (watcher == null) {
return null;
}
@SuppressWarnings("unchecked")
TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(resourceType);
TypeWatchers<T> usedTypeWatchers = (TypeWatchers<T>) usedWatchers.get(watcherType.typeEnum);
usedTypeWatchers.watchers.put(name, watcher);
return watcher;
}

/** Shut down unused watchers. */
public void closeUnusedWatchers() {
boolean changed = false; // Help out the GC by preferring old objects
for (XdsResourceType<?> type : resourceWatchers.keySet()) {
TypeWatchers<?> orig = resourceWatchers.get(type);
TypeWatchers<?> used = usedWatchers.get(type);
for (TrackedWatcherTypeEnum key : resourceWatchers.keySet()) {
TypeWatchers<?> orig = resourceWatchers.get(key);
TypeWatchers<?> used = usedWatchers.get(key);
for (String name : orig.watchers.keySet()) {
if (used.watchers.containsKey(name)) {
continue;
Expand All @@ -498,8 +503,33 @@ public void closeUnusedWatchers() {
}
}

@SuppressWarnings("UnusedTypeParameter")
private static final class TrackedWatcherType<T> {
public final TrackedWatcherTypeEnum typeEnum;

public TrackedWatcherType(TrackedWatcherTypeEnum typeEnum) {
this.typeEnum = checkNotNull(typeEnum, "typeEnum");
}
}

private interface TrackedWatcher<T> {
@Nullable
StatusOr<T> getData();

default boolean missingResult() {
return getData() == null;
}

default boolean hasDataValue() {
StatusOr<T> data = getData();
return data != null && data.hasValue();
}

void close();
}

private abstract class XdsWatcherBase<T extends ResourceUpdate>
implements ResourceWatcher<T> {
implements ResourceWatcher<T>, TrackedWatcher<T> {
private final XdsResourceType<T> type;
private final String resourceName;
boolean cancelled;
Expand Down Expand Up @@ -554,24 +584,18 @@ public void onChanged(T update) {

protected abstract void subscribeToChildren(T update);

@Override
public void close() {
cancelled = true;
xdsClient.cancelXdsResourceWatch(type, resourceName, this);
}

boolean missingResult() {
return data == null;
}

@Override
@Nullable
StatusOr<T> getData() {
public StatusOr<T> getData() {
return data;
}

boolean hasDataValue() {
return data != null && data.hasValue();
}

public String toContextString() {
return toContextStr(type.typeName(), resourceName);
}
Expand Down Expand Up @@ -622,7 +646,7 @@ private RdsWatcher getRdsWatcher(XdsListenerResource.LdsUpdate update, WatcherTr
if (rdsName == null) {
return null;
}
return (RdsWatcher) tracer.getWatcher(XdsRouteConfigureResource.getInstance(), rdsName);
return (RdsWatcher) tracer.getWatcher(RDS_TYPE, rdsName);
}

public RdsUpdateSupplier getRouteSource(WatcherTracer tracer) {
Expand Down Expand Up @@ -688,7 +712,7 @@ public StatusOr<RdsUpdate> getRdsUpdate() {

private class CdsWatcher extends XdsWatcherBase<XdsClusterResource.CdsUpdate> {
CdsWatcher(String resourceName) {
super(CLUSTER_RESOURCE, checkNotNull(resourceName, "resourceName"));
super(XdsClusterResource.getInstance(), checkNotNull(resourceName, "resourceName"));
}

@Override
Expand Down Expand Up @@ -721,7 +745,7 @@ public String getEdsServiceName() {

private class EdsWatcher extends XdsWatcherBase<XdsEndpointResource.EdsUpdate> {
private EdsWatcher(String resourceName) {
super(ENDPOINT_RESOURCE, checkNotNull(resourceName, "resourceName"));
super(XdsEndpointResource.getInstance(), checkNotNull(resourceName, "resourceName"));
}

@Override
Expand Down
Loading