diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java new file mode 100644 index 00000000000..f69d35f182d --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/api/core/loadbalancing/LocalDcAwareLoadBalancingPolicy.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.api.core.loadbalancing; + +import edu.umd.cs.findbugs.annotations.Nullable; + +/** Load balancing policy taking into account local datacenter of the application. */ +public interface LocalDcAwareLoadBalancingPolicy extends LoadBalancingPolicy { + + /** Returns the local datacenter name, if known; empty otherwise. */ + @Nullable + String getLocalDatacenter(); +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java index 684d6b01b9c..544849c3070 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/context/StartupOptionsBuilder.java @@ -19,13 +19,19 @@ import com.datastax.dse.driver.api.core.config.DseDriverOption; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; +import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.api.core.uuid.Uuids; +import com.datastax.oss.driver.shaded.guava.common.base.Joiner; +import com.datastax.oss.driver.shaded.guava.common.collect.Maps; import com.datastax.oss.protocol.internal.request.Startup; import com.datastax.oss.protocol.internal.util.collection.NullAllowingImmutableMap; import edu.umd.cs.findbugs.annotations.Nullable; import java.util.Map; +import java.util.Optional; import java.util.UUID; +import java.util.stream.Collectors; import net.jcip.annotations.Immutable; @Immutable @@ -33,6 +39,7 @@ public class StartupOptionsBuilder { public static final String DRIVER_NAME_KEY = "DRIVER_NAME"; public static final String DRIVER_VERSION_KEY = "DRIVER_VERSION"; + public static final String DRIVER_LOCAL_DC = "DRIVER_LOCAL_DC"; public static final String APPLICATION_NAME_KEY = "APPLICATION_NAME"; public static final String APPLICATION_VERSION_KEY = "APPLICATION_VERSION"; public static final String CLIENT_ID_KEY = "CLIENT_ID"; @@ -119,6 +126,8 @@ public Map build() { if (applicationVersion != null) { builder.put(APPLICATION_VERSION_KEY, applicationVersion); } + // do not cache local DC as it can change within LBP implementation + localDcs().ifPresent(s -> builder.put(DRIVER_LOCAL_DC, s)); return builder.build(); } @@ -142,4 +151,28 @@ protected String getDriverName() { protected String getDriverVersion() { return Session.OSS_DRIVER_COORDINATES.getVersion().toString(); } + + private Optional localDcs() { + Joiner joiner = Joiner.on(": "); + Map> lbpToDc = + Maps.transformValues(context.getLoadBalancingPolicies(), this::getLocalDc); + if (lbpToDc.isEmpty()) { + return Optional.empty(); + } + return Optional.of( + lbpToDc.entrySet().stream() + .filter(e -> e.getValue().isPresent()) + .map(entry -> joiner.join(entry.getKey(), entry.getValue().get())) + .collect(Collectors.joining(", "))); + } + + private Optional getLocalDc(LoadBalancingPolicy loadBalancingPolicy) { + if (loadBalancingPolicy instanceof LocalDcAwareLoadBalancingPolicy) { + String dc = ((LocalDcAwareLoadBalancingPolicy) loadBalancingPolicy).getLocalDatacenter(); + if (dc != null) { + return Optional.of(dc); + } + } + return Optional.empty(); + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java index 587ef4183bd..04a031f11ac 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/loadbalancing/BasicLoadBalancingPolicy.java @@ -24,6 +24,7 @@ import com.datastax.oss.driver.api.core.context.DriverContext; import com.datastax.oss.driver.api.core.cql.Statement; import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; +import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy; import com.datastax.oss.driver.api.core.loadbalancing.NodeDistance; import com.datastax.oss.driver.api.core.loadbalancing.NodeDistanceEvaluator; import com.datastax.oss.driver.api.core.metadata.Node; @@ -99,7 +100,7 @@ * DefaultLoadBalancingPolicy}. */ @ThreadSafe -public class BasicLoadBalancingPolicy implements LoadBalancingPolicy { +public class BasicLoadBalancingPolicy implements LocalDcAwareLoadBalancingPolicy { private static final Logger LOG = LoggerFactory.getLogger(BasicLoadBalancingPolicy.class); @@ -155,7 +156,8 @@ public BasicLoadBalancingPolicy(@NonNull DriverContext context, @NonNull String * Before initialization, this method always returns null. */ @Nullable - protected String getLocalDatacenter() { + @Override + public String getLocalDatacenter() { return localDc; } diff --git a/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java b/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java index 9e4556e528d..b7260266611 100644 --- a/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java +++ b/core/src/test/java/com/datastax/dse/driver/internal/core/context/DseStartupOptionsBuilderTest.java @@ -19,6 +19,7 @@ import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatIllegalArgumentException; +import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.MockitoAnnotations.initMocks; @@ -28,14 +29,19 @@ import com.datastax.oss.driver.api.core.config.DriverConfig; import com.datastax.oss.driver.api.core.config.DriverConfigLoader; import com.datastax.oss.driver.api.core.config.DriverExecutionProfile; +import com.datastax.oss.driver.api.core.loadbalancing.LoadBalancingPolicy; +import com.datastax.oss.driver.api.core.loadbalancing.LocalDcAwareLoadBalancingPolicy; import com.datastax.oss.driver.api.core.session.ProgrammaticArguments; import com.datastax.oss.driver.api.core.session.Session; import com.datastax.oss.driver.api.core.uuid.Uuids; import com.datastax.oss.driver.internal.core.context.DefaultDriverContext; import com.datastax.oss.driver.internal.core.context.StartupOptionsBuilder; +import com.datastax.oss.driver.shaded.guava.common.collect.ImmutableMap; import com.datastax.oss.protocol.internal.request.Startup; import com.tngtech.java.junit.dataprovider.DataProvider; import com.tngtech.java.junit.dataprovider.DataProviderRunner; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.Map; import java.util.UUID; import org.junit.Before; import org.junit.Test; @@ -60,15 +66,35 @@ public void before() { when(defaultProfile.isDefined(DseDriverOption.CONTINUOUS_PAGING_PAGE_SIZE)).thenReturn(true); } - private void buildContext(UUID clientId, String applicationName, String applicationVersion) { + private void buildContext( + UUID clientId, + String applicationName, + String applicationVersion, + Map localDcPerProfile) { + ProgrammaticArguments.Builder builder = + ProgrammaticArguments.builder() + .withStartupClientId(clientId) + .withStartupApplicationName(applicationName) + .withStartupApplicationVersion(applicationVersion); this.driverContext = - new DefaultDriverContext( - configLoader, - ProgrammaticArguments.builder() - .withStartupClientId(clientId) - .withStartupApplicationName(applicationName) - .withStartupApplicationVersion(applicationVersion) - .build()); + new DefaultDriverContext(configLoader, builder.build()) { + @NonNull + @Override + public Map getLoadBalancingPolicies() { + if (localDcPerProfile != null) { + ImmutableMap.Builder map = ImmutableMap.builder(); + localDcPerProfile.forEach( + (profile, dc) -> { + LocalDcAwareLoadBalancingPolicy loadBalancingPolicy = + mock(LocalDcAwareLoadBalancingPolicy.class); + when(loadBalancingPolicy.getLocalDatacenter()).thenReturn(dc); + map.put(profile, loadBalancingPolicy); + }); + return map.build(); + } + return super.getLoadBalancingPolicies(); + } + }; } private void assertDefaultStartupOptions(Startup startup) { @@ -86,7 +112,7 @@ private void assertDefaultStartupOptions(Startup startup) { public void should_build_startup_options_with_no_compression_if_undefined() { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("none"); - buildContext(null, null, null); + buildContext(null, null, null, null); Startup startup = new Startup(driverContext.getStartupOptions()); assertThat(startup.options).doesNotContainKey(Startup.COMPRESSION_KEY); assertDefaultStartupOptions(startup); @@ -97,7 +123,7 @@ public void should_build_startup_options_with_no_compression_if_undefined() { public void should_build_startup_options_with_compression(String compression) { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn(compression); - buildContext(null, null, null); + buildContext(null, null, null, null); Startup startup = new Startup(driverContext.getStartupOptions()); // assert the compression option is present assertThat(startup.options).containsEntry(Startup.COMPRESSION_KEY, compression); @@ -110,7 +136,7 @@ public void should_build_startup_options_with_compression(String compression) { public void should_fail_to_build_startup_options_with_invalid_compression() { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("foobar"); - buildContext(null, null, null); + buildContext(null, null, null, null); assertThatIllegalArgumentException() .isThrownBy(() -> new Startup(driverContext.getStartupOptions())); } @@ -120,7 +146,7 @@ public void should_build_startup_options_with_client_id() { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("none"); UUID customClientId = Uuids.random(); - buildContext(customClientId, null, null); + buildContext(customClientId, null, null, null); Startup startup = new Startup(driverContext.getStartupOptions()); // assert the client id is present assertThat(startup.options) @@ -135,7 +161,7 @@ public void should_build_startup_options_with_client_id() { public void should_build_startup_options_with_application_version_and_name() { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("none"); - buildContext(null, "Custom_App_Name", "Custom_App_Version"); + buildContext(null, "Custom_App_Name", "Custom_App_Version", null); Startup startup = new Startup(driverContext.getStartupOptions()); // assert the app name and version are present assertThat(startup.options) @@ -151,15 +177,18 @@ public void should_build_startup_options_with_all_options() { // mock config to specify "snappy" compression when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("snappy"); + when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); UUID customClientId = Uuids.random(); - buildContext(customClientId, "Custom_App_Name", "Custom_App_Version"); + buildContext( + customClientId, "Custom_App_Name", "Custom_App_Version", ImmutableMap.of("default", "dc6")); Startup startup = new Startup(driverContext.getStartupOptions()); assertThat(startup.options) .containsEntry(StartupOptionsBuilder.CLIENT_ID_KEY, customClientId.toString()) .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name") - .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version"); + .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") + .containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "default: dc6"); assertThat(startup.options).containsEntry(Startup.COMPRESSION_KEY, "snappy"); assertDefaultStartupOptions(startup); } @@ -172,8 +201,11 @@ public void should_use_configuration_when_no_programmatic_values_provided() { .thenReturn("Config_App_Version"); when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("none"); + when(defaultProfile.isDefined(DefaultDriverOption.LOAD_BALANCING_LOCAL_DATACENTER)) + .thenReturn(true); + when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); - buildContext(null, null, null); + buildContext(null, null, null, null); Startup startup = new Startup(driverContext.getStartupOptions()); assertThat(startup.options) @@ -185,12 +217,62 @@ public void should_use_configuration_when_no_programmatic_values_provided() { public void should_ignore_configuration_when_programmatic_values_provided() { when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) .thenReturn("none"); + when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); - buildContext(null, "Custom_App_Name", "Custom_App_Version"); + buildContext( + null, "Custom_App_Name", "Custom_App_Version", ImmutableMap.of("default", "us-west-2")); Startup startup = new Startup(driverContext.getStartupOptions()); assertThat(startup.options) .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name") - .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version"); + .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") + .containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "default: us-west-2"); + } + + @Test + public void should_include_all_local_dc_in_startup_message() { + when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) + .thenReturn("none"); + when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); + + buildContext( + null, + "Custom_App_Name", + "Custom_App_Version", + ImmutableMap.of("default", "us-west-2", "oltp", "us-east-2", "olap", "eu-central-1")); + Startup startup = new Startup(driverContext.getStartupOptions()); + + assertThat(startup.options) + .containsEntry(StartupOptionsBuilder.APPLICATION_NAME_KEY, "Custom_App_Name") + .containsEntry(StartupOptionsBuilder.APPLICATION_VERSION_KEY, "Custom_App_Version") + .containsEntry( + StartupOptionsBuilder.DRIVER_LOCAL_DC, + "default: us-west-2, oltp: us-east-2, olap: eu-central-1"); + } + + @Test + public void should_skip_non_local_dc_lbp_in_startup_message() { + when(defaultProfile.getString(DefaultDriverOption.PROTOCOL_COMPRESSION, "none")) + .thenReturn("none"); + when(defaultProfile.getName()).thenReturn(DriverExecutionProfile.DEFAULT_NAME); + + ProgrammaticArguments.Builder builder = ProgrammaticArguments.builder(); + DefaultDriverContext driverContext = + new DefaultDriverContext(configLoader, builder.build()) { + @NonNull + @Override + public Map getLoadBalancingPolicies() { + ImmutableMap.Builder map = ImmutableMap.builder(); + LocalDcAwareLoadBalancingPolicy loadBalancingPolicy = + mock(LocalDcAwareLoadBalancingPolicy.class); + when(loadBalancingPolicy.getLocalDatacenter()).thenReturn("dc1"); + map.put("oltp", loadBalancingPolicy); + map.put("default", mock(LoadBalancingPolicy.class)); + return map.build(); + } + }; + Startup startup = new Startup(driverContext.getStartupOptions()); + + assertThat(startup.options).containsEntry(StartupOptionsBuilder.DRIVER_LOCAL_DC, "oltp: dc1"); } }