diff --git a/v2/spanner-to-sourcedb/pom.xml b/v2/spanner-to-sourcedb/pom.xml
index fd7896b923..63bd047ed6 100644
--- a/v2/spanner-to-sourcedb/pom.xml
+++ b/v2/spanner-to-sourcedb/pom.xml
@@ -88,6 +88,11 @@
${project.version}
test
+
+ com.datastax.oss
+ java-driver-core
+ 4.17.0
+
com.google.cloud.teleport
diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDao.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDao.java
new file mode 100644
index 0000000000..5960a413c2
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDao.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.dbutils.dao.source;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
+import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
+import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
+import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
+import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;
+
+public class CassandraDao implements IDao {
+ private final String cassandraUrl;
+ private final String cassandraUser;
+ private final IConnectionHelper connectionHelper;
+
+ public CassandraDao(
+ String cassandraUrl, String cassandraUser, IConnectionHelper connectionHelper) {
+ this.cassandraUrl = cassandraUrl;
+ this.cassandraUser = cassandraUser;
+ this.connectionHelper = connectionHelper;
+ }
+
+ @Override
+ public void write(DMLGeneratorResponse dmlGeneratorResponse) throws Exception {
+ try (CqlSession session =
+ (CqlSession)
+ connectionHelper.getConnection(this.cassandraUrl)) { // Ensure connection is obtained
+ if (session == null) {
+ throw new ConnectionException("Connection is null");
+ }
+ PreparedStatementGeneratedResponse preparedStatementGeneratedResponse =
+ (PreparedStatementGeneratedResponse) dmlGeneratorResponse;
+ String dmlStatement = preparedStatementGeneratedResponse.getDmlStatement();
+ PreparedStatement preparedStatement = session.prepare(dmlStatement);
+ BoundStatement boundStatement =
+ preparedStatement.bind(
+ preparedStatementGeneratedResponse.getValues().stream()
+ .map(PreparedStatementValueObject::value)
+ .toArray());
+ session.execute(boundStatement);
+ }
+ }
+}
diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/models/PreparedStatementGeneratedResponse.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/models/PreparedStatementGeneratedResponse.java
new file mode 100644
index 0000000000..0a7feea80a
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/models/PreparedStatementGeneratedResponse.java
@@ -0,0 +1,36 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.models;
+
+import java.util.List;
+
+public class PreparedStatementGeneratedResponse extends DMLGeneratorResponse {
+ private List> values;
+
+ public PreparedStatementGeneratedResponse(
+ String dmlStatement, List> values) {
+ super(dmlStatement);
+ this.values = values;
+ }
+
+ public List> getValues() {
+ return values;
+ }
+
+ public void setValues(List> values) {
+ this.values = values;
+ }
+}
diff --git a/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/models/PreparedStatementValueObject.java b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/models/PreparedStatementValueObject.java
new file mode 100644
index 0000000000..8cf297f04f
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/main/java/com/google/cloud/teleport/v2/templates/models/PreparedStatementValueObject.java
@@ -0,0 +1,30 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.models;
+
+import com.google.auto.value.AutoValue;
+
+@AutoValue
+public abstract class PreparedStatementValueObject {
+
+ public abstract String dataType();
+
+ public abstract T value();
+
+ public static PreparedStatementValueObject create(String dataType, T value) {
+ return new AutoValue_PreparedStatementValueObject<>(dataType, value);
+ }
+}
diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDaoTest.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDaoTest.java
new file mode 100644
index 0000000000..83f34e7840
--- /dev/null
+++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/dbutils/dao/source/CassandraDaoTest.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates.dbutils.dao.source;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
+import com.datastax.oss.driver.api.core.CqlSession;
+import com.datastax.oss.driver.api.core.cql.BoundStatement;
+import com.datastax.oss.driver.api.core.cql.PreparedStatement;
+import com.datastax.oss.driver.api.core.cql.Statement;
+import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
+import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
+import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
+import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;
+import java.util.Arrays;
+import java.util.List;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.ArgumentMatchers;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.MockitoAnnotations;
+
+public class CassandraDaoTest {
+
+ @Mock private IConnectionHelper mockConnectionHelper;
+ @Mock private CqlSession mockSession;
+ @Mock private PreparedStatement mockPreparedStatement;
+ @Mock private BoundStatement mockBoundStatement;
+ @Mock private PreparedStatementGeneratedResponse mockPreparedStatementGeneratedResponse;
+
+ private CassandraDao cassandraDao;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.openMocks(this);
+ cassandraDao = new CassandraDao("cassandraUrl", "cassandraUser", mockConnectionHelper);
+ }
+
+ @Test
+ public void testNullConnectionForWrite() throws Exception {
+ Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString())).thenReturn(null);
+ ConnectionException exception =
+ assertThrows(
+ ConnectionException.class,
+ () -> cassandraDao.write(mockPreparedStatementGeneratedResponse));
+ assertEquals("Connection is null", exception.getMessage());
+ }
+
+ @Test
+ public void testPreparedStatementExecution() throws Exception {
+ String preparedDmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
+ List> values =
+ Arrays.asList(
+ PreparedStatementValueObject.create("", preparedDmlStatement),
+ PreparedStatementValueObject.create("Test", preparedDmlStatement));
+
+ Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement())
+ .thenReturn(preparedDmlStatement);
+ Mockito.when(mockPreparedStatementGeneratedResponse.getValues()).thenReturn(values);
+ Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
+ .thenReturn(mockSession);
+ Mockito.when(mockSession.prepare(ArgumentMatchers.eq(preparedDmlStatement)))
+ .thenReturn(mockPreparedStatement);
+ Mockito.when(mockPreparedStatement.bind(ArgumentMatchers.any())).thenReturn(mockBoundStatement);
+
+ cassandraDao.write(mockPreparedStatementGeneratedResponse);
+
+ Mockito.verify(mockSession).prepare(ArgumentMatchers.eq(preparedDmlStatement));
+ Mockito.verify(mockPreparedStatement).bind(ArgumentMatchers.any());
+ Mockito.verify(mockSession).execute(ArgumentMatchers.eq(mockBoundStatement));
+ }
+
+ @Test
+ public void testWriteWithExceptionInPreparedStatement() throws Exception {
+ String preparedDmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
+ List> values =
+ Arrays.asList(
+ PreparedStatementValueObject.create("", preparedDmlStatement),
+ PreparedStatementValueObject.create("Test", preparedDmlStatement));
+
+ Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement())
+ .thenReturn(preparedDmlStatement);
+ Mockito.when(mockPreparedStatementGeneratedResponse.getValues()).thenReturn(values);
+ Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
+ .thenReturn(mockSession);
+ Mockito.when(mockSession.prepare(ArgumentMatchers.eq(preparedDmlStatement)))
+ .thenReturn(mockPreparedStatement);
+ Mockito.when(mockPreparedStatement.bind(ArgumentMatchers.any())).thenReturn(mockBoundStatement);
+ Mockito.doThrow(new RuntimeException("Prepared statement execution failed"))
+ .when(mockSession)
+ .execute(ArgumentMatchers.eq(mockBoundStatement));
+
+ RuntimeException exception =
+ assertThrows(
+ RuntimeException.class,
+ () -> {
+ cassandraDao.write(mockPreparedStatementGeneratedResponse);
+ });
+
+ assertEquals("Prepared statement execution failed", exception.getMessage());
+ Mockito.verify(mockSession).prepare(ArgumentMatchers.eq(preparedDmlStatement));
+ Mockito.verify(mockPreparedStatement).bind(ArgumentMatchers.any());
+ Mockito.verify(mockSession).execute(ArgumentMatchers.eq(mockBoundStatement));
+ }
+
+ @Test
+ public void testWriteWithExceptionHandling() throws Exception {
+ String dmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
+ Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement()).thenReturn(dmlStatement);
+ Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
+ .thenReturn(mockSession);
+ Mockito.when(mockSession.prepare(dmlStatement))
+ .thenThrow(new RuntimeException("Failed to prepare statement"));
+
+ RuntimeException exception =
+ assertThrows(
+ RuntimeException.class,
+ () -> {
+ cassandraDao.write(mockPreparedStatementGeneratedResponse);
+ });
+
+ assertEquals("Failed to prepare statement", exception.getMessage());
+ Mockito.verify(mockSession).prepare(dmlStatement);
+ Mockito.verify(mockSession, Mockito.never()).execute(ArgumentMatchers.>any());
+ }
+
+ @Test
+ public void testConnectionExceptionDuringWrite() throws Exception {
+ Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
+ .thenThrow(new ConnectionException("Connection failed"));
+ ConnectionException exception =
+ assertThrows(
+ ConnectionException.class,
+ () -> cassandraDao.write(mockPreparedStatementGeneratedResponse));
+ assertEquals("Connection failed", exception.getMessage());
+ }
+}