Skip to content

Commit

Permalink
Added cassandra Migration code
Browse files Browse the repository at this point in the history
  • Loading branch information
rpadul committed Nov 20, 2024
1 parent b7e6db7 commit a3d23ad
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 0 deletions.
12 changes: 12 additions & 0 deletions v2/spanner-to-sourcedb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,18 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.16.0</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.11</version>
<scope>compile</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package com.google.cloud.teleport.v2.templates.utils;
import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.auth.ProgrammaticPlainTextAuthProvider;

import java.net.InetSocketAddress;

public class CassandraConnection {
private static CassandraConnection instance;
private CqlSession session;

// Private constructor to enforce Singleton
private CassandraConnection(String host, int port, String datacenter, String username, String password) {
try {
session = CqlSession.builder()
.addContactPoint(new InetSocketAddress(host, port))
.withLocalDatacenter(datacenter)
// .withAuthProvider(new ProgrammaticPlainTextAuthProvider(username, password)) // Use if authentication is enabled
.build();
session.execute("USE mykeyspace;");

System.out.println("Connected to Cassandra!");
} catch (Exception e) {
throw new RuntimeException("Failed to connect to Cassandra", e);
}
}
public static synchronized CassandraConnection getInstance(String host, int port, String datacenter, String username, String password) {
if (instance == null) {
instance = new CassandraConnection(host, port, datacenter, username, password);
}
return instance;
}

// Method to get the CqlSession
public CqlSession getSession() {
return session;
}

// Close the session when done
public void close() {
if (session != null) {
session.close();
System.out.println("Cassandra connection closed.");
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package com.google.cloud.teleport.v2.templates.utils;

import java.io.Serializable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.datastax.oss.driver.api.core.CqlSession;
/** Writes data to MySQL. */
public class CassandraDao implements Serializable {
private static final Logger LOG = LoggerFactory.getLogger(CassandraDao.class);
String cassandraHost;
String cassandraUser;
String cassandraPasswd;
int port;
String datacenter;

public CassandraDao(String cassandraHost, int cassandraPort, String dataCenter, String cassandraUser, String cassandraPasswd) {
this.cassandraHost = cassandraHost;
this.port = cassandraPort;
this.cassandraUser = cassandraUser;
this.cassandraPasswd = cassandraPasswd;
this.datacenter = dataCenter;
}

// writes to database
public void write(String cassandraStatement) {
CassandraConnection cassandraConnection = null;
CqlSession session = null;

try {
cassandraConnection = CassandraConnection.getInstance(this.cassandraHost , this.port, this.datacenter, this.cassandraUser,this.cassandraPasswd);
session = cassandraConnection.getSession();
session.execute(cassandraStatement);
} catch (Exception e) {
System.err.println("An error occurred while interacting with Cassandra:");
e.printStackTrace();
} finally {
// Ensure the connection is closed in the finally block
if (cassandraConnection != null) {
cassandraConnection.close();
}
}
}
}

0 comments on commit a3d23ad

Please sign in to comment.