Skip to content

Commit

Permalink
Merge pull request #1046 from onyn/feat/service-config
Browse files Browse the repository at this point in the history
feat: apply grpc service config from consul (#1045)
  • Loading branch information
yidongnan authored Apr 12, 2024
2 parents 8239f8a + dec5eff commit cb029de
Show file tree
Hide file tree
Showing 5 changed files with 224 additions and 1 deletion.
2 changes: 1 addition & 1 deletion docs/en/client/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ There are a number of supported schemes, that you can use to determine the targe
- `discovery` (Prio 6): \
(Optional) Uses spring-cloud's `DiscoveryClient` to lookup appropriate targets. The connections will be refreshed
automatically during `HeartbeatEvent`s. Uses the `gRPC_port` metadata to determine the port, otherwise uses the
service port. \
service port. Uses the `gRPC_service_config` metadata to determine [service config](https://grpc.github.io/grpc/core/md_doc_service_config.html). \
Example: `discovery:///service-name`
- `self` (Prio 0): \
The self address or scheme is a keyword that is available, if you also use `grpc-server-spring-boot-starter` and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static net.devh.boot.grpc.client.nameresolver.DiscoveryClientResolverFactory.DISCOVERY_INSTANCE_ID_KEY;
import static net.devh.boot.grpc.client.nameresolver.DiscoveryClientResolverFactory.DISCOVERY_SERVICE_NAME_KEY;
import static net.devh.boot.grpc.common.util.GrpcUtils.CLOUD_DISCOVERY_METADATA_PORT;
import static net.devh.boot.grpc.common.util.GrpcUtils.CLOUD_DISCOVERY_METADATA_SERVICE_CONFIG;

import java.net.InetSocketAddress;
import java.util.List;
Expand All @@ -35,6 +36,8 @@
import org.springframework.util.CollectionUtils;

import com.google.common.collect.Lists;
import com.google.gson.Gson;
import com.google.gson.JsonSyntaxException;

import io.grpc.Attributes;
import io.grpc.Attributes.Builder;
Expand All @@ -58,13 +61,15 @@ public class DiscoveryClientNameResolver extends NameResolver {
@Deprecated
private static final String LEGACY_CLOUD_DISCOVERY_METADATA_PORT = "gRPC.port";
private static final List<ServiceInstance> KEEP_PREVIOUS = null;
private static final Gson GSON = new Gson();

private final String name;
private final DiscoveryClient client;
private final SynchronizationContext syncContext;
private final Consumer<DiscoveryClientNameResolver> shutdownHook;
private final SharedResourceHolder.Resource<Executor> executorResource;
private final boolean usingExecutorResource;
private final ServiceConfigParser serviceConfigParser;

// The field must be accessed from syncContext, although the methods on an Listener2 can be called
// from any thread.
Expand Down Expand Up @@ -93,6 +98,7 @@ public DiscoveryClientNameResolver(final String name, final DiscoveryClient clie
this.executor = args.getOffloadExecutor();
this.usingExecutorResource = this.executor == null;
this.executorResource = executorResource;
this.serviceConfigParser = args.getServiceConfigParser();
}

/**
Expand Down Expand Up @@ -187,6 +193,55 @@ protected int getGrpcPort(final ServiceInstance instance) {
}
}

/**
* Extracts and parse gRPC service config from the given service instances.
*
* @param instances The list of instances to extract the service config from.
* @return Parsed gRPC service config or null.
*/
private ConfigOrError resolveServiceConfig(List<ServiceInstance> instances) {
final String serviceConfig = getServiceConfig(instances);
if (serviceConfig == null) {
return null;
}
log.debug("Found service config for {}", getName());
if (log.isTraceEnabled()) {
// This is to avoid blowing log into several lines if newlines present in service config string.
final String logStr = serviceConfig.replace("\r", "\\r").replace("\n", "\\n");
log.trace("Service config for {}: {}", getName(), logStr);
}
try {
@SuppressWarnings("unchecked")
Map<String, ?> parsedServiceConfig = GSON.fromJson(serviceConfig, Map.class);
return serviceConfigParser.parseServiceConfig(parsedServiceConfig);
} catch (JsonSyntaxException e) {
return ConfigOrError.fromError(
Status.UNKNOWN
.withDescription("Failed to parse grpc service config")
.withCause(e));
}
}

/**
* Extracts the gRPC service config string from the given service instances.
*
* @param instances The list of instances to extract the service config from.
* @return The gRPC service config or null.
*/
protected String getServiceConfig(final List<ServiceInstance> instances) {
for (final ServiceInstance inst : instances) {
final Map<String, String> metadata = inst.getMetadata();
if (metadata == null || metadata.isEmpty()) {
continue;
}
final String metaValue = metadata.get(CLOUD_DISCOVERY_METADATA_SERVICE_CONFIG);
if (metaValue != null && !metaValue.isEmpty()) {
return metaValue;
}
}
return null;
}

/**
* Gets the attributes from the service instance for later use in a load balancer. Can be overwritten to convert
* custom attributes.
Expand Down Expand Up @@ -318,6 +373,7 @@ private List<ServiceInstance> resolveInternal() {
log.debug("Ready to update server list for {}", getName());
this.savedListener.onResult(ResolutionResult.newBuilder()
.setAddresses(toTargets(newInstanceList))
.setServiceConfig(resolveServiceConfig(newInstanceList))
.build());
log.info("Done updating server list for {}", getName());
return newInstanceList;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Copyright (c) 2016-2023 The gRPC-Spring Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.devh.boot.grpc.client.nameresolver;

import static org.assertj.core.api.Assertions.assertThat;

import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;

import org.junit.jupiter.api.Test;
import org.springframework.cloud.client.DefaultServiceInstance;
import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryClient;
import org.springframework.cloud.client.discovery.simple.SimpleDiscoveryProperties;

import io.grpc.NameResolver;
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.internal.AutoConfiguredLoadBalancerFactory;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.ScParser;
import net.devh.boot.grpc.common.util.GrpcUtils;

/**
* Test for {@link DiscoveryClientNameResolver}.
*/
public class DiscoveryClientNameResolverTest {

private final NameResolver.Args args = NameResolver.Args.newBuilder()
.setDefaultPort(1212)
.setProxyDetector(GrpcUtil.DEFAULT_PROXY_DETECTOR)
.setSynchronizationContext(
new SynchronizationContext((t, e) -> {
throw new AssertionError(e);
}))
.setServiceConfigParser(new ScParser(true, 10, 10, new AutoConfiguredLoadBalancerFactory("pick_first")))
.setOffloadExecutor(Runnable::run)
.build();

@Test
void testValidServiceConfig() {
String validServiceConfig = """
{
"loadBalancingConfig": [
{"round_robin": {}}
],
"methodConfig": [
{
"name": [{}],
"retryPolicy": {
"maxAttempts": 5,
"initialBackoff": "0.05s",
"maxBackoff": "1s",
"backoffMultiplier": 2,
"retryableStatusCodes": [
"UNAVAILABLE",
"ABORTED",
"DATA_LOSS",
"INTERNAL",
"DEADLINE_EXCEEDED"
]
},
"timeout": "5s"
}
]
}
""";
TestableListener listener = resolveServiceAndVerify("test1", validServiceConfig);
NameResolver.ConfigOrError serviceConf = listener.getResult().getServiceConfig();
assertThat(serviceConf).isNotNull();
assertThat(serviceConf.getConfig()).isNotNull();
assertThat(serviceConf.getError()).isNull();
}

@Test
void testBrokenServiceConfig() {
TestableListener listener = resolveServiceAndVerify("test2", "intentionally invalid service config");
NameResolver.ConfigOrError serviceConf = listener.getResult().getServiceConfig();
assertThat(serviceConf).isNotNull();
assertThat(serviceConf.getConfig()).isNull();
assertThat(serviceConf.getError()).extracting(Status::getCode).isEqualTo(Status.Code.UNKNOWN);
}

private TestableListener resolveServiceAndVerify(String serviceName, String serviceConfig) {
SimpleDiscoveryProperties props = new SimpleDiscoveryProperties();
DefaultServiceInstance service = new DefaultServiceInstance(
serviceName + "-1", serviceName, "127.0.0.1", 3322, false);
Map<String, String> meta = service.getMetadata();
meta.put(GrpcUtils.CLOUD_DISCOVERY_METADATA_PORT, "6688");
meta.put(GrpcUtils.CLOUD_DISCOVERY_METADATA_SERVICE_CONFIG, serviceConfig);
props.setInstances(Map.of(serviceName, List.of(service)));
SimpleDiscoveryClient disco = new SimpleDiscoveryClient(props);
DiscoveryClientNameResolver dcnr = new DiscoveryClientNameResolver(serviceName, disco, args, null, null);

TestableListener listener = new TestableListener();
dcnr.start(listener);

assertThat(listener.isErrorWasSet()).isFalse();
assertThat(listener.isResultWasSet()).isTrue();
InetSocketAddress addr = (InetSocketAddress) listener.getResult().getAddresses().get(0).getAddresses().get(0);
assertThat(addr.getPort()).isEqualTo(6688);
assertThat(addr.getHostString()).isEqualTo("127.0.0.1");
return listener;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Copyright (c) 2016-2023 The gRPC-Spring Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package net.devh.boot.grpc.client.nameresolver;

import io.grpc.NameResolver;
import io.grpc.Status;
import lombok.Getter;

@Getter
public class TestableListener extends NameResolver.Listener2 {

private NameResolver.ResolutionResult result;
private Status error;
private boolean resultWasSet = false;
private boolean errorWasSet = false;

@Override
public void onResult(NameResolver.ResolutionResult resolutionResult) {
this.result = resolutionResult;
resultWasSet = true;
}

@Override
public void onError(Status error) {
this.error = error;
errorWasSet = true;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ public final class GrpcUtils {
*/
public static final String CLOUD_DISCOVERY_METADATA_PORT = "gRPC_port";

/**
* The cloud discovery metadata key used to identify service config.
*/
public static final String CLOUD_DISCOVERY_METADATA_SERVICE_CONFIG = "gRPC_service_config";

/**
* The constant for the grpc server port, -1 represents don't start an inter process server.
*/
Expand Down

0 comments on commit cb029de

Please sign in to comment.