Skip to content

Commit

Permalink
Added connection class for cassandra and created connection helper
Browse files Browse the repository at this point in the history
  • Loading branch information
akashthawaitcc committed Nov 27, 2024
1 parent d557dcd commit e379796
Show file tree
Hide file tree
Showing 5 changed files with 328 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,209 @@
package com.google.cloud.teleport.v2.spanner.migrations.cassandra;

import java.io.Serializable;
import java.util.Objects;

public class CassandraConfig implements Serializable {

private String host;
private String port;
private String username;
private String password;
private String keyspace;
private String consistencyLevel = "LOCAL_QUORUM";
private boolean sslOptions = false;
private String protocolVersion = "v5";
private String dataCenter = "datacenter1";
private int localPoolSize = 1024;
private int remotePoolSize = 256;

public CassandraConfig() {}

public CassandraConfig(
String host,
String port,
String username,
String password,
String keyspace,
String consistencyLevel,
boolean sslOptions,
String protocolVersion,
String dataCenter,
int localPoolSize,
int remotePoolSize) {
this.host = host;
this.port = port;
this.username = username;
this.password = password;
this.keyspace = keyspace;
this.consistencyLevel = consistencyLevel;
this.sslOptions = sslOptions;
this.protocolVersion = protocolVersion;
this.dataCenter = dataCenter;
this.localPoolSize = localPoolSize;
this.remotePoolSize = remotePoolSize;
}

// Getter and Setter methods for all fields

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public String getPort() {
return port;
}

public void setPort(String port) {
this.port = port;
}

public String getUsername() {
return username;
}

public void setUsername(String username) {
this.username = username;
}

public String getPassword() {
return password;
}

public void setPassword(String password) {
this.password = password;
}

public String getKeyspace() {
return keyspace;
}

public void setKeyspace(String keyspace) {
this.keyspace = keyspace;
}

public String getConsistencyLevel() {
return consistencyLevel;
}

public void setConsistencyLevel(String consistencyLevel) {
this.consistencyLevel = consistencyLevel;
}

public boolean isSslOptions() {
return sslOptions;
}

public void setSslOptions(boolean sslOptions) {
this.sslOptions = sslOptions;
}

public String getProtocolVersion() {
return protocolVersion;
}

public void setProtocolVersion(String protocolVersion) {
this.protocolVersion = protocolVersion;
}

public String getDataCenter() {
return dataCenter;
}

public void setDataCenter(String dataCenter) {
this.dataCenter = dataCenter;
}

public int getLocalPoolSize() {
return localPoolSize;
}

public void setLocalPoolSize(int localPoolSize) {
this.localPoolSize = localPoolSize;
}

public int getRemotePoolSize() {
return remotePoolSize;
}

public void setRemotePoolSize(int remotePoolSize) {
this.remotePoolSize = remotePoolSize;
}

public void validate() throws IllegalArgumentException {
if (host == null || host.isEmpty()) {
throw new IllegalArgumentException("Host is required");
}
if (port == null || port.isEmpty()) {
throw new IllegalArgumentException("Port is required");
}
if (username == null || username.isEmpty()) {
throw new IllegalArgumentException("Username is required");
}
if (password == null || password.isEmpty()) {
throw new IllegalArgumentException("Password is required");
}
if (keyspace == null || keyspace.isEmpty()) {
throw new IllegalArgumentException("Keyspace is required");
}
}

@Override
public String toString() {
return "CassandraConfig{"
+ "host='" + host + '\''
+ ", port='" + port + '\''
+ ", username='" + username + '\''
+ ", password='" + password + '\''
+ ", keyspace='" + keyspace + '\''
+ ", consistencyLevel='" + consistencyLevel + '\''
+ ", sslOptions=" + sslOptions
+ ", protocolVersion='" + protocolVersion + '\''
+ ", dataCenter='" + dataCenter + '\''
+ ", localPoolSize=" + localPoolSize
+ ", remotePoolSize=" + remotePoolSize
+ '}';
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof CassandraConfig)) {
return false;
}
CassandraConfig cassandraConfig = (CassandraConfig) o;
return sslOptions == cassandraConfig.sslOptions
&& localPoolSize == cassandraConfig.localPoolSize
&& remotePoolSize == cassandraConfig.remotePoolSize
&& Objects.equals(host, cassandraConfig.host)
&& Objects.equals(port, cassandraConfig.port)
&& Objects.equals(username, cassandraConfig.username)
&& Objects.equals(password, cassandraConfig.password)
&& Objects.equals(keyspace, cassandraConfig.keyspace)
&& Objects.equals(consistencyLevel, cassandraConfig.consistencyLevel)
&& Objects.equals(protocolVersion, cassandraConfig.protocolVersion)
&& Objects.equals(dataCenter, cassandraConfig.dataCenter);
}

@Override
public int hashCode() {
return Objects.hash(
host,
port,
username,
password,
keyspace,
consistencyLevel,
sslOptions,
protocolVersion,
dataCenter,
localPoolSize,
remotePoolSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
//package com.google.cloud.teleport.v2.spanner.migrations.cassandra;
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package com.google.cloud.teleport.v2.templates.dao.source;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.SimpleStatement;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.utils.connection.IConnectionHelper;

public class CassandraDao implements IDao<String> {
private final String cassandraUrl;
private final String cassandraUser;
private final IConnectionHelper<CqlSession> connectionHelper;

public CassandraDao(String cassandraUrl, String cassandraUser, IConnectionHelper<CqlSession> connectionHelper) {
this.cassandraUrl = cassandraUrl;
this.cassandraUser = cassandraUser;
this.connectionHelper = connectionHelper;
}

@Override
public void write(String cqlStatement) throws Exception {
CqlSession session = null;

try {
session = connectionHelper.getConnection(this.cassandraUrl + "/" + this.cassandraUser);
if (session == null) {
throw new ConnectionException("Connection is null");
}
SimpleStatement statement = SimpleStatement.newInstance(cqlStatement);
session.execute(statement);
} finally {
if (session != null) {
session.close();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.teleport.v2.templates.models;

import com.google.cloud.teleport.v2.spanner.migrations.cassandra.CassandraConfig;
import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import java.util.List;

Expand All @@ -33,7 +34,8 @@
public class ConnectionHelperRequest {
private List<Shard> shards;
private String properties;
private int maxConnections;
private final int maxConnections;
private CassandraConfig cassandraConfig;

public List<Shard> getShards() {
return shards;
Expand All @@ -47,9 +49,18 @@ public int getMaxConnections() {
return maxConnections;
}

public CassandraConfig getCassandraConfig() {
return cassandraConfig;
}

public ConnectionHelperRequest(List<Shard> shards, String properties, int maxConnections) {
this.shards = shards;
this.properties = properties;
this.maxConnections = maxConnections;
}

public ConnectionHelperRequest(CassandraConfig cassandraConfig, int maxConnections) {
this.cassandraConfig = cassandraConfig;
this.maxConnections = maxConnections;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.google.cloud.teleport.v2.templates.utils.connection;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.CqlSessionBuilder;
import com.datastax.oss.driver.api.core.config.DriverConfigLoader;
import com.datastax.oss.driver.api.core.config.DriverOption;
import com.datastax.oss.driver.api.core.config.ProgrammaticDriverConfigLoaderBuilder;
import com.datastax.oss.driver.api.core.config.TypedDriverOption;
import com.google.cloud.teleport.v2.spanner.migrations.cassandra.CassandraConfig;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.models.ConnectionHelperRequest;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CassandraConnectionHelper implements IConnectionHelper<CqlSession> {

private static final Logger LOG = LoggerFactory.getLogger(CassandraConnectionHelper.class);
private static Map<String, CqlSession> connectionPoolMap = null;

@Override
public synchronized void init(ConnectionHelperRequest connectionHelperRequest) {
if (connectionPoolMap != null) {
return;
}
LOG.info("Initializing Cassandra connection pool with size: {}", connectionHelperRequest.getMaxConnections());
connectionPoolMap = new HashMap<>();
CassandraConfig cassandraConfig = connectionHelperRequest.getCassandraConfig();
cassandraConfig.validate();

CqlSessionBuilder builder = CqlSession.builder()
.addContactPoint(new InetSocketAddress(cassandraConfig.getHost(), Integer.parseInt(cassandraConfig.getPort())))
.withAuthCredentials(cassandraConfig.getUsername(), cassandraConfig.getPassword())
.withKeyspace(cassandraConfig.getKeyspace());

ProgrammaticDriverConfigLoaderBuilder configLoaderBuilder = DriverConfigLoader.programmaticBuilder();
configLoaderBuilder.withInt((DriverOption) TypedDriverOption.CONNECTION_POOL_LOCAL_SIZE, cassandraConfig.getLocalPoolSize());
configLoaderBuilder.withInt((DriverOption) TypedDriverOption.CONNECTION_POOL_REMOTE_SIZE, cassandraConfig.getRemotePoolSize());
builder.withConfigLoader(configLoaderBuilder.build());

CqlSession session = builder.build();
String connectionKey = cassandraConfig.getHost() + ":" + cassandraConfig.getPort() + "/" + cassandraConfig.getUsername();
connectionPoolMap.put(connectionKey, session);
}

@Override
public CqlSession getConnection(String connectionRequestKey) throws ConnectionException {
try {
if (connectionPoolMap == null) {
LOG.warn("Connection pool not initialized");
return null;
}
CqlSession session = connectionPoolMap.get(connectionRequestKey);
if (session == null) {
LOG.warn("Connection pool not found for source connection: {}", connectionRequestKey);
return null;
}
return session;
} catch (Exception e) {
throw new ConnectionException(e);
}
}

// for unit testing
public void setConnectionPoolMap(Map<String, CqlSession> inputMap) {
connectionPoolMap = inputMap;
}
}

0 comments on commit e379796

Please sign in to comment.