diff --git a/v2/spanner-to-sourcedb/pom.xml b/v2/spanner-to-sourcedb/pom.xml
index fd7896b923..63bd047ed6 100644
--- a/v2/spanner-to-sourcedb/pom.xml
+++ b/v2/spanner-to-sourcedb/pom.xml
@@ -88,6 +88,11 @@
${project.version}
test
+
+ com.datastax.oss
+ java-driver-core
+ 4.17.0
+
com.google.cloud.teleport
diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDao.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDao.java
new file mode 100644
index 0000000000..0022c4fd7b
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDao.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.dbutils.dao.source;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
+import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
+import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
+import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
+import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;
+
+public class CassandraDao implements IDao {
+ private final String cassandraUrl;
+ private final String cassandraUser;
+ private final IConnectionHelper connectionHelper;
+
+ public CassandraDao(
+ String cassandraUrl, String cassandraUser, IConnectionHelper connectionHelper) {
+ this.cassandraUrl = cassandraUrl;
+ this.cassandraUser = cassandraUser;
+ this.connectionHelper = connectionHelper;
+ }
+
+ @Override
+ public void write(DMLGeneratorResponse dmlGeneratorResponse) throws Exception {
+ try (CqlSession session =
+ (CqlSession)
+ connectionHelper.getConnection(this.cassandraUrl)) { // Ensure connection is obtained
+ if (session == null) {
+ throw new ConnectionException("Connection is null");
+ }
+ if (dmlGeneratorResponse instanceof PreparedStatementGeneratedResponse) {
+ PreparedStatementGeneratedResponse preparedStatementGeneratedResponse =
+ (PreparedStatementGeneratedResponse) dmlGeneratorResponse;
+ try {
+ String dmlStatement = preparedStatementGeneratedResponse.getDmlStatement();
+ PreparedStatement preparedStatement = session.prepare(dmlStatement);
+ BoundStatement boundStatement =
+ preparedStatement.bind(
+ preparedStatementGeneratedResponse.getValues().stream()
+ .map(PreparedStatementValueObject::getValue)
+ .toArray());
+ session.execute(boundStatement);
+ } catch (Exception e) {
+ throw e;
+ }
+ } else {
+ String simpleStatement = dmlGeneratorResponse.getDmlStatement();
+ session.execute(simpleStatement);
+ }
+ }
+ }
+}
diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/models/PreparedStatementGeneratedResponse.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/models/PreparedStatementGeneratedResponse.java
new file mode 100644
index 0000000000..0a7feea80a
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/models/PreparedStatementGeneratedResponse.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.models;
+
+import java.util.List;
+
+public class PreparedStatementGeneratedResponse extends DMLGeneratorResponse {
+ private List> values;
+
+ public PreparedStatementGeneratedResponse(
+ String dmlStatement, List> values) {
+ super(dmlStatement);
+ this.values = values;
+ }
+
+ public List> getValues() {
+ return values;
+ }
+
+ public void setValues(List> values) {
+ this.values = values;
+ }
+}
diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/models/PreparedStatementValueObject.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/models/PreparedStatementValueObject.java
new file mode 100644
index 0000000000..3b9111dc05
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/models/PreparedStatementValueObject.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.models;
+
+public class PreparedStatementValueObject {
+ private String dataType;
+ private T value;
+
+ public PreparedStatementValueObject(String dataType, T value) {
+ this.dataType = dataType;
+ this.value = value;
+ }
+
+ public String getDataType() {
+ return dataType;
+ }
+
+ public void setDataType(String dataType) {
+ this.dataType = dataType;
+ }
+
+ public T getValue() {
+ return value;
+ }
+
+ public void setValue(T value) {
+ this.value = value;
+ }
+
+ // Override toString() for better readability when printing objects
+ @Override
+ public String toString() {
+ return "PreparedStatementValueObject{" + "key='" + dataType + '\'' + ", value=" + value + '}';
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) return true;
+ if (obj == null || getClass() != obj.getClass()) return false;
+ PreparedStatementValueObject> that = (PreparedStatementValueObject>) obj;
+ return dataType.equals(that.dataType) && value.equals(that.value);
+ }
+
+ @Override
+ public int hashCode() {
+ return 31 * dataType.hashCode() + (value != null ? value.hashCode() : 0);
+ }
+}
diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDaoTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDaoTest.java
new file mode 100644
index 0000000000..c577573d82
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDaoTest.java
@@ -0,0 +1,168 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.dbutils.dao.source;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.cql.Statement;
+import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
+import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
+import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
+import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
+import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+public class CassandraDaoTest {
+
+ @Mock private IConnectionHelper mockConnectionHelper;
+ @Mock private CqlSession mockSession;
+ @Mock private PreparedStatement mockPreparedStatement;
+ @Mock private BoundStatement mockBoundStatement;
+ @Mock private PreparedStatementGeneratedResponse mockPreparedStatementGeneratedResponse;
+
+ private CassandraDao cassandraDao;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ cassandraDao = new CassandraDao("cassandraUrl", "cassandraUser", mockConnectionHelper);
+ }
+
+ @Test
+ public void testNullConnectionForWrite() throws Exception {
+ Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString())).thenReturn(null);
+ ConnectionException exception =
+ assertThrows(
+ ConnectionException.class,
+ () -> cassandraDao.write(mockPreparedStatementGeneratedResponse));
+ assertEquals("Connection is null", exception.getMessage());
+ }
+
+ @Test
+ public void testPreparedStatementExecution() throws Exception {
+ String preparedDmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
+ List> values =
+ Arrays.asList(
+ new PreparedStatementValueObject<>("", preparedDmlStatement),
+ new PreparedStatementValueObject<>("Test", preparedDmlStatement));
+
+ Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement())
+ .thenReturn(preparedDmlStatement);
+ Mockito.when(mockPreparedStatementGeneratedResponse.getValues()).thenReturn(values);
+ Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
+ .thenReturn(mockSession);
+ Mockito.when(mockSession.prepare(ArgumentMatchers.eq(preparedDmlStatement)))
+ .thenReturn(mockPreparedStatement);
+ Mockito.when(mockPreparedStatement.bind(ArgumentMatchers.any())).thenReturn(mockBoundStatement);
+
+ cassandraDao.write(mockPreparedStatementGeneratedResponse);
+
+ Mockito.verify(mockSession).prepare(ArgumentMatchers.eq(preparedDmlStatement));
+ Mockito.verify(mockPreparedStatement).bind(ArgumentMatchers.any());
+ Mockito.verify(mockSession).execute(ArgumentMatchers.eq(mockBoundStatement));
+ }
+
+ @Test
+ public void testWriteWithExceptionInPreparedStatement() throws Exception {
+ String preparedDmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
+ List> values =
+ Arrays.asList(
+ new PreparedStatementValueObject<>("", preparedDmlStatement),
+ new PreparedStatementValueObject<>("Test", preparedDmlStatement));
+
+ Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement())
+ .thenReturn(preparedDmlStatement);
+ Mockito.when(mockPreparedStatementGeneratedResponse.getValues()).thenReturn(values);
+ Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
+ .thenReturn(mockSession);
+ Mockito.when(mockSession.prepare(ArgumentMatchers.eq(preparedDmlStatement)))
+ .thenReturn(mockPreparedStatement);
+ Mockito.when(mockPreparedStatement.bind(ArgumentMatchers.any())).thenReturn(mockBoundStatement);
+ Mockito.doThrow(new RuntimeException("Prepared statement execution failed"))
+ .when(mockSession)
+ .execute(ArgumentMatchers.eq(mockBoundStatement));
+
+ RuntimeException exception =
+ assertThrows(
+ RuntimeException.class,
+ () -> {
+ cassandraDao.write(mockPreparedStatementGeneratedResponse);
+ });
+
+ assertEquals("Prepared statement execution failed", exception.getMessage());
+ Mockito.verify(mockSession).prepare(ArgumentMatchers.eq(preparedDmlStatement));
+ Mockito.verify(mockPreparedStatement).bind(ArgumentMatchers.any());
+ Mockito.verify(mockSession).execute(ArgumentMatchers.eq(mockBoundStatement));
+ }
+
+ @Test
+ public void testWriteWithExceptionHandling() throws Exception {
+ String dmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
+ Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement()).thenReturn(dmlStatement);
+ Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
+ .thenReturn(mockSession);
+ Mockito.when(mockSession.prepare(dmlStatement))
+ .thenThrow(new RuntimeException("Failed to prepare statement"));
+
+ RuntimeException exception =
+ assertThrows(
+ RuntimeException.class,
+ () -> {
+ cassandraDao.write(mockPreparedStatementGeneratedResponse);
+ });
+
+ assertEquals("Failed to prepare statement", exception.getMessage());
+ Mockito.verify(mockSession).prepare(dmlStatement);
+ Mockito.verify(mockSession, Mockito.never()).execute(ArgumentMatchers.>any());
+ }
+
+ @Test
+ public void testConnectionExceptionDuringWrite() throws Exception {
+ Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
+ .thenThrow(new ConnectionException("Connection failed"));
+ ConnectionException exception =
+ assertThrows(
+ ConnectionException.class,
+ () -> cassandraDao.write(mockPreparedStatementGeneratedResponse));
+ assertEquals("Connection failed", exception.getMessage());
+ }
+
+ @Test
+ public void testWriteWithSimpleStatementExecution() throws Exception {
+ String simpleStatement = "DELETE FROM test WHERE id = 1";
+ DMLGeneratorResponse mockDmlGeneratorResponse = Mockito.mock(DMLGeneratorResponse.class);
+
+ Mockito.when(mockDmlGeneratorResponse.getDmlStatement()).thenReturn(simpleStatement);
+ Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
+ .thenReturn(mockSession);
+
+ cassandraDao.write(mockDmlGeneratorResponse);
+
+ Mockito.verify(mockSession).execute(ArgumentMatchers.eq(simpleStatement));
+ Mockito.verify(mockSession, Mockito.never()).prepare(ArgumentMatchers.anyString());
+ }
+}