Skip to content

Commit

Permalink
Fixing TODO for Ingesting GCS Config (#2104)
Browse files Browse the repository at this point in the history
  • Loading branch information
VardhanThigle authored Jan 3, 2025
1 parent 934a393 commit 3b1477a
Show file tree
Hide file tree
Showing 7 changed files with 165 additions and 170 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,10 @@

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
import com.datastax.oss.driver.api.core.session.Session;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.schema.CassandraSchemaReference;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import org.jline.utils.Log;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -46,11 +43,6 @@ public CassandraConnector(
schemaReference);
CqlSessionBuilder builder =
CqlSession.builder().withConfigLoader(getDriverConfigLoader(dataSource));
builder = setCredentials(builder, dataSource);
if (dataSource.localDataCenter() != null) {
builder = builder.addContactPoints(List.copyOf(dataSource.contactPoints()));
builder = builder.withLocalDatacenter(dataSource.localDataCenter());
}
if (schemaReference.keyspaceName() != null) {
builder.withKeyspace(schemaReference.keyspaceName());
}
Expand All @@ -62,36 +54,9 @@ public CassandraConnector(
schemaReference);
}

@VisibleForTesting
protected static CqlSessionBuilder setCredentials(
CqlSessionBuilder builder, CassandraDataSource cassandraDataSource) {
if (cassandraDataSource.dbAuth() != null) {
return builder.withAuthCredentials(
cassandraDataSource.dbAuth().getUserName().get(),
cassandraDataSource.dbAuth().getPassword().get());
} else {
return builder;
}
}

@VisibleForTesting
protected static DriverConfigLoader getDriverConfigLoader(CassandraDataSource dataSource) {
ProgrammaticDriverConfigLoaderBuilder driverConfigLoaderBuilder =
DriverConfigLoader.programmaticBuilder()
.withString(
DefaultDriverOption.REQUEST_CONSISTENCY, dataSource.consistencyLevel().name())
.withClass(DefaultDriverOption.RETRY_POLICY_CLASS, dataSource.retryPolicy());
if (dataSource.connectTimeout() != null) {
driverConfigLoaderBuilder =
driverConfigLoaderBuilder.withDuration(
DefaultDriverOption.CONNECTION_CONNECT_TIMEOUT, dataSource.connectTimeout());
}
if (dataSource.requestTimeout() != null) {
driverConfigLoaderBuilder =
driverConfigLoaderBuilder.withDuration(
DefaultDriverOption.REQUEST_TIMEOUT, dataSource.requestTimeout());
}
return driverConfigLoaderBuilder.build();
return dataSource.driverConfigLoader();
}

public Session getSession() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,83 +16,125 @@
package com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.internal.core.retry.DefaultRetryPolicy;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.v2.source.reader.auth.dbauth.DbAuth;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraDriverConfigLoader;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import java.io.FileNotFoundException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.time.Duration;
import java.util.List;
import javax.annotation.Nullable;

/**
* Encapsulates details of a Cassandra Cluster. Cassandra Cluster can connect to multiple KeySpaces,
* just like a Mysql instance can have multiple databases. TODO(vardhanvthigle): Take
* DriverConfiguration as a GCS file for advanced overrides.
* just like a Mysql instance can have multiple databases.
*/
@AutoValue
public abstract class CassandraDataSource implements Serializable {

/** Name of the Cassandra Cluster. */
public abstract String clusterName();

/** Name of local Datacenter. Must be specified if contactPoints are not empty */
@Nullable
public abstract String localDataCenter();

/** Contact points for connecting to a Cassandra Cluster. */
public abstract ImmutableList<InetSocketAddress> contactPoints();
/** Options Map. * */
abstract OptionsMap optionsMap();

/** Cassandra Auth details. */
@Nullable
public abstract DbAuth dbAuth();
public abstract String clusterName();

/** Retry Policy for Cassandra Driver. Defaults to {@link DefaultRetryPolicy}. */
public abstract Class retryPolicy();
public DriverConfigLoader driverConfigLoader() {
return CassandraDriverConfigLoader.fromOptionsMap(optionsMap());
}

/** Consistency level for reading the source. Defaults to {@link ConsistencyLevel#QUORUM} */
public abstract ConsistencyLevel consistencyLevel();
/** returns List of ContactPoints. Added for easier compatibility with 3.0 cluster creation. */
public ImmutableList<InetSocketAddress> contactPoints() {
return driverConfigLoader()
.getInitialConfig()
.getDefaultProfile()
.getStringList(TypedDriverOption.CONTACT_POINTS.getRawOption())
.stream()
.map(
contactPoint -> {
String[] ipPort = contactPoint.split(":");
return new InetSocketAddress(ipPort[0], Integer.parseInt(ipPort[1]));
})
.collect(ImmutableList.toImmutableList());
}

/** Connection timeout for Cassandra driver. Set null for driver default. */
@Nullable
public abstract Duration connectTimeout();
/** Returns local datacenter. Added for easier compatibility with 3.0 cluster creation. */
public String localDataCenter() {
return driverConfigLoader()
.getInitialConfig()
.getDefaultProfile()
.getString(TypedDriverOption.LOAD_BALANCING_LOCAL_DATACENTER.getRawOption());
}

/** Read timeout for Cassandra driver. Set null for driver default. */
@Nullable
public abstract Duration requestTimeout();
/** Returns the logged Keyspace. */
public String loggedKeySpace() {
return driverConfigLoader()
.getInitialConfig()
.getDefaultProfile()
.getString(TypedDriverOption.SESSION_KEYSPACE.getRawOption());
}

public static Builder builder() {
return new AutoValue_CassandraDataSource.Builder()
.setRetryPolicy(DefaultRetryPolicy.class)
.setConsistencyLevel(ConsistencyLevel.QUORUM);
return new AutoValue_CassandraDataSource.Builder();
}

public abstract Builder toBuilder();

@AutoValue.Builder
public abstract static class Builder {

public abstract Builder setClusterName(String value);
public abstract Builder setOptionsMap(OptionsMap value);

public abstract Builder setLocalDataCenter(@Nullable String value);
public abstract Builder setClusterName(@Nullable String value);

public abstract Builder setContactPoints(ImmutableList<InetSocketAddress> value);
abstract OptionsMap optionsMap();

public Builder setContactPoints(List<InetSocketAddress> value) {
return setContactPoints(ImmutableList.copyOf(value));
public Builder setOptionsMapFromGcsFile(String gcsPath) throws FileNotFoundException {
return this.setOptionsMap(CassandraDriverConfigLoader.getOptionsMapFromFile(gcsPath));
}

public abstract Builder setDbAuth(@Nullable DbAuth value);

public abstract Builder setRetryPolicy(Class value);
public <ValueT> Builder overrideOptionInOptionsMap(
TypedDriverOption<ValueT> option, ValueT value) {
DriverConfigLoader.fromMap(optionsMap())
.getInitialConfig()
.getProfiles()
.keySet()
.forEach(profile -> this.optionsMap().put(profile, option, value));
return this;
}

public abstract Builder setConsistencyLevel(ConsistencyLevel value);
/**
* Allowing UT to set the contact points. In UT environment, the port is dynamically determined.
* We can't use a static GCS file to provide the contact points.
*/
@VisibleForTesting
public Builder setContactPoints(List<InetSocketAddress> contactPoints) {
overrideOptionInOptionsMap(
TypedDriverOption.CONTACT_POINTS,
contactPoints.stream()
.map(p -> p.getAddress().getHostAddress() + ":" + p.getPort())
.collect(ImmutableList.toImmutableList()));
return this;
}

public abstract Builder setConnectTimeout(@Nullable Duration value);
/** Set the local Datacenter. */
@VisibleForTesting
public Builder setLocalDataCenter(String localDataCenter) {
overrideOptionInOptionsMap(
TypedDriverOption.LOAD_BALANCING_LOCAL_DATACENTER, localDataCenter);
return this;
}

public abstract Builder setRequestTimeout(@Nullable Duration value);
abstract CassandraDataSource autoBuild();

public abstract CassandraDataSource build();
public CassandraDataSource build() {
/* Prefer to use quorum read until we encounter a strong use case to not do so. */
this.overrideOptionInOptionsMap(
TypedDriverOption.REQUEST_CONSISTENCY, ConsistencyLevel.QUORUM.toString());
return autoBuild();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,11 @@
import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CQLSH;
import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_KEYSPACE;
import static com.google.common.truth.Truth.assertThat;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.datastax.oss.driver.api.core.ConsistencyLevel;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.google.cloud.teleport.v2.source.reader.auth.dbauth.LocalCredentialsProvider;
import com.datastax.oss.driver.api.core.config.OptionsMap;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.schema.CassandraSchemaReference;
import com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.SharedEmbeddedCassandra;
import java.io.IOException;
import java.time.Duration;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
Expand Down Expand Up @@ -71,6 +62,7 @@ public void testBasic() throws IOException {
CassandraDataSource cassandraDataSource =
CassandraDataSource.builder()
.setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
.setOptionsMap(OptionsMap.driverDefaults())
.setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
.setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
.build();
Expand All @@ -95,77 +87,4 @@ public void testBasic() throws IOException {
assertThat(cassandraConnectorWithNullKeySpace.getSession().getKeyspace()).isEmpty();
}
}

@Test
public void testCredentialsSetter() {

final String testUserName = "testUseramNe";
final String testPassword = "test";

CassandraDataSource cassandraDataSource =
CassandraDataSource.builder()
.setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
.setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
.setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
.build();
CqlSessionBuilder mockSessionBuilder = mock(CqlSessionBuilder.class);
// No Auth Set
CassandraConnector.setCredentials(mockSessionBuilder, cassandraDataSource);
verify(mockSessionBuilder, never()).withAuthCredentials(anyString(), anyString());
// Auth set
CassandraConnector.setCredentials(
mockSessionBuilder,
cassandraDataSource.toBuilder()
.setDbAuth(
LocalCredentialsProvider.builder()
.setUserName(testUserName)
.setPassword(testPassword)
.build())
.build());
verify(mockSessionBuilder, times(1)).withAuthCredentials(testUserName, testPassword);
}

@Test
public void testConfigLoader() {

CassandraDataSource cassandraDataSource =
CassandraDataSource.builder()
.setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName())
.setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints())
.setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter())
.build();
assertThat(
CassandraConnector.getDriverConfigLoader(cassandraDataSource)
.getInitialConfig()
.getDefaultProfile()
.getString(DefaultDriverOption.REQUEST_CONSISTENCY))
.isEqualTo(ConsistencyLevel.QUORUM.name());
assertThat(
CassandraConnector.getDriverConfigLoader(
cassandraDataSource.toBuilder()
.setConsistencyLevel(ConsistencyLevel.ONE)
.build())
.getInitialConfig()
.getDefaultProfile()
.getString(DefaultDriverOption.REQUEST_CONSISTENCY))
.isEqualTo(ConsistencyLevel.ONE.name());
assertThat(
CassandraConnector.getDriverConfigLoader(
cassandraDataSource.toBuilder()
.setConnectTimeout(Duration.ofSeconds(42L))
.build())
.getInitialConfig()
.getDefaultProfile()
.getDuration(DefaultDriverOption.CONNECTION_CONNECT_TIMEOUT))
.isEqualTo(Duration.ofSeconds(42L));
assertThat(
CassandraConnector.getDriverConfigLoader(
cassandraDataSource.toBuilder()
.setRequestTimeout(Duration.ofSeconds(42L))
.build())
.getInitialConfig()
.getDefaultProfile()
.getDuration(DefaultDriverOption.REQUEST_TIMEOUT))
.isEqualTo(Duration.ofSeconds(42L));
}
}
Loading

0 comments on commit 3b1477a

Please sign in to comment.