Skip to content

Commit

Permalink
Added pipeline options for cassandra. Made changes to the required cl…
Browse files Browse the repository at this point in the history
…ass to support cassandra
  • Loading branch information
akashthawaitcc committed Nov 27, 2024
1 parent 3e375ac commit 5c2a0aa
Show file tree
Hide file tree
Showing 7 changed files with 245 additions and 89 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.google.cloud.teleport.v2.spanner.migrations.utils;

import com.google.cloud.teleport.v2.spanner.migrations.cassandra.CassandraConfig;
import com.google.gson.FieldNamingPolicy;
import com.google.gson.GsonBuilder;
import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Class to read the Cassandra configuration file in GCS and convert it into a CassandraConfig object. */
public class CassandraConfigFileReader {

private static final Logger LOG = LoggerFactory.getLogger(CassandraConfigFileReader.class);

public CassandraConfig getCassandraConfig(String cassandraConfigFilePath) {
try (InputStream stream =
Channels.newInputStream(
FileSystems.open(FileSystems.matchNewResource(cassandraConfigFilePath, false)))) {

String result = IOUtils.toString(stream, StandardCharsets.UTF_8);
CassandraConfig cassandraConfig =
new GsonBuilder()
.setFieldNamingPolicy(FieldNamingPolicy.IDENTITY)
.create()
.fromJson(result, CassandraConfig.class);

LOG.info("The Cassandra config is: {}", cassandraConfig);
return cassandraConfig;

} catch (IOException e) {
LOG.error(
"Failed to read Cassandra config file. Make sure it is ASCII or UTF-8 encoded and contains a well-formed JSON string.",
e);
throw new RuntimeException(
"Failed to read Cassandra config file. Make sure it is ASCII or UTF-8 encoded and contains a well-formed JSON string.",
e);
}
}
}
12 changes: 12 additions & 0 deletions v2/spanner-to-sourcedb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.16.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.10.2</version>
<scope>compile</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.cassandra.CassandraConfig;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.spanner.SpannerSchema;
import com.google.cloud.teleport.v2.spanner.migrations.utils.CassandraConfigFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SecretManagerAccessorImpl;
import com.google.cloud.teleport.v2.spanner.migrations.utils.SessionFileReader;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ShardFileReader;
Expand All @@ -45,9 +47,11 @@
import com.google.cloud.teleport.v2.templates.utils.ShadowTableCreator;
import com.google.cloud.teleport.v2.transforms.DLQWriteTransform;
import com.google.cloud.teleport.v2.values.FailsafeElement;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

import org.apache.beam.runners.dataflow.options.DataflowPipelineDebugOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
Expand Down Expand Up @@ -78,7 +82,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** This pipeline reads Spanner Change streams data and writes them to a source DB. */
/**
* This pipeline reads Spanner Change streams data and writes them to a source DB.
*/
@Template(
name = "Spanner_to_SourceDb",
category = TemplateCategory.STREAMING,
Expand Down Expand Up @@ -367,6 +373,25 @@ public interface Options extends PipelineOptions, StreamingOptions {
String getSourceType();

void setSourceType(String value);

@TemplateParameter.GcsReadFile(
order = 10,
optional = false,
description = "Path to GCS file containing the the Cassandra Config details",
helpText = "Path to GCS file containing connection profile info for cassandra.")
String getCassandraConfigFilePath();

void setCassandraConfigFilePath(String value);

@TemplateParameter.Long(
order = 19,
optional = true,
description = "Maximum connections per cassandra cluster.",
helpText = "This will come from cassandra config file eventually.")
@Default.Long(10000)
Long getMaxConnections();

void setMaxConnections(Long value);
}

/**
Expand Down Expand Up @@ -405,7 +430,13 @@ public static PipelineResult run(Options options) {
pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getMaxNumWorkers() > 0
? pipeline.getOptions().as(DataflowPipelineWorkerPoolOptions.class).getMaxNumWorkers()
: 1;
int connectionPoolSizePerWorker = (int) (options.getMaxShardConnections() / maxNumWorkers);
int connectionPoolSizePerWorker = 1;
if ("mysql".equals(options.getSourceType())) {
connectionPoolSizePerWorker = (int) (options.getMaxShardConnections() / maxNumWorkers);
} else {
connectionPoolSizePerWorker = (int) (options.getMaxConnections() / maxNumWorkers);
}

if (connectionPoolSizePerWorker < 1) {
// This can happen when the number of workers is more than max.
// This can cause overload on the source database. Error out and let the user know.
Expand Down Expand Up @@ -463,28 +494,40 @@ public static PipelineResult run(Options options) {

shadowTableCreator.createShadowTablesInSpanner();
Ddl ddl = SpannerSchema.getInformationSchemaAsDdl(spannerConfig);
ShardFileReader shardFileReader = new ShardFileReader(new SecretManagerAccessorImpl());
List<Shard> shards = shardFileReader.getOrderedShardDetails(options.getSourceShardsFilePath());
String shardingMode = Constants.SHARDING_MODE_MULTI_SHARD;
if (shards.size() == 1) {
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;

Shard singleShard = shards.get(0);
if (singleShard.getLogicalShardId() == null) {
singleShard.setLogicalShardId(Constants.DEFAULT_SHARD_ID);
LOG.info(
"Logical shard id was not found, hence setting it to : " + Constants.DEFAULT_SHARD_ID);

List<Shard> shards = new ArrayList<>();
String shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;
CassandraConfig cassandraConfig = null;

if ("mysql".equals(options.getSourceType())) {
ShardFileReader shardFileReader = new ShardFileReader(new SecretManagerAccessorImpl());
shards = shardFileReader.getOrderedShardDetails(options.getSourceShardsFilePath());
shardingMode = Constants.SHARDING_MODE_MULTI_SHARD;
if (shards.size() == 1) {
shardingMode = Constants.SHARDING_MODE_SINGLE_SHARD;

Shard singleShard = shards.get(0);
if (singleShard.getLogicalShardId() == null) {
singleShard.setLogicalShardId(Constants.DEFAULT_SHARD_ID);
LOG.info(
"Logical shard id was not found, hence setting it to : " + Constants.DEFAULT_SHARD_ID);
}
}
} else {
CassandraConfigFileReader cassandraConfigFileReader = new CassandraConfigFileReader();
cassandraConfig = cassandraConfigFileReader.getCassandraConfig(options.getCassandraConfigFilePath());
LOG.info("Cassandra config is: {}", cassandraConfig);
}

boolean isRegularMode = "regular".equals(options.getRunMode());
PCollectionTuple reconsumedElements = null;
DeadLetterQueueManager dlqManager = buildDlqManager(options);

int reshuffleBucketSize =
maxNumWorkers
* (debugOptions.getNumberOfWorkerHarnessThreads() > 0
? debugOptions.getNumberOfWorkerHarnessThreads()
: Constants.DEFAULT_WORKER_HARNESS_THREAD_COUNT);
? debugOptions.getNumberOfWorkerHarnessThreads()
: Constants.DEFAULT_WORKER_HARNESS_THREAD_COUNT);

if (isRegularMode) {
reconsumedElements =
Expand Down Expand Up @@ -561,7 +604,11 @@ public static PipelineResult run(Options options) {
options.getShardingCustomClassName(),
options.getShardingCustomParameters(),
options.getMaxShardConnections()
* shards.size()))) // currently assuming that all shards accept the same
* shards.size(),
options.getSourceType(),
cassandraConfig,
options.getMaxConnections()
))) // currently assuming that all shards accept the same
// number of max connections
.setCoder(
KvCoder.of(
Expand All @@ -578,7 +625,10 @@ public static PipelineResult run(Options options) {
options.getShadowTablePrefix(),
options.getSkipDirectoryName(),
connectionPoolSizePerWorker,
options.getSourceType()));
options.getSourceType(),
cassandraConfig,
options.getMaxConnections()
));

PCollection<FailsafeElement<String, String>> dlqPermErrorRecords =
reconsumedElements
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,5 @@ public class Constants {
public static final String DEFAULT_SHARD_ID = "single_shard";

public static final String SOURCE_MYSQL = "mysql";
public static final String SOURCE_CASSANDRA = "cassandra";
}
Loading

0 comments on commit 5c2a0aa

Please sign in to comment.