Skip to content

Commit

Permalink
Cassandra DAO Implementation (#2072)
Browse files Browse the repository at this point in the history
* Cassandra DAO PR (#25)

Implementation of Cassandra DAO

* Added AutoValue

* Address Method reference

* Added test case Fixes

* Removed Support of Raw Statement from Cassandra DAO

---------

Co-authored-by: taherkl <[email protected]>
Co-authored-by: pawankashyapollion <[email protected]>
  • Loading branch information
3 people authored Dec 20, 2024
1 parent 5d6e63c commit edee6a7
Show file tree
Hide file tree
Showing 5 changed files with 282 additions and 0 deletions.
5 changes: 5 additions & 0 deletions v2/spanner-to-sourcedb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version> <!-- Use the latest version -->
</dependency>
<!-- TODO - Remove when https://github.com/apache/beam/pull/29732 is released. -->
<dependency>
<groupId>com.google.cloud.teleport</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.dbutils.dao.source;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;

public class CassandraDao implements IDao<DMLGeneratorResponse> {
private final String cassandraUrl;
private final String cassandraUser;
private final IConnectionHelper connectionHelper;

public CassandraDao(
String cassandraUrl, String cassandraUser, IConnectionHelper connectionHelper) {
this.cassandraUrl = cassandraUrl;
this.cassandraUser = cassandraUser;
this.connectionHelper = connectionHelper;
}

@Override
public void write(DMLGeneratorResponse dmlGeneratorResponse) throws Exception {
try (CqlSession session =
(CqlSession)
connectionHelper.getConnection(this.cassandraUrl)) { // Ensure connection is obtained
if (session == null) {
throw new ConnectionException("Connection is null");
}
PreparedStatementGeneratedResponse preparedStatementGeneratedResponse =
(PreparedStatementGeneratedResponse) dmlGeneratorResponse;
String dmlStatement = preparedStatementGeneratedResponse.getDmlStatement();
PreparedStatement preparedStatement = session.prepare(dmlStatement);
BoundStatement boundStatement =
preparedStatement.bind(
preparedStatementGeneratedResponse.getValues().stream()
.map(PreparedStatementValueObject::value)
.toArray());
session.execute(boundStatement);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.models;

import java.util.List;

public class PreparedStatementGeneratedResponse extends DMLGeneratorResponse {
private List<PreparedStatementValueObject<?>> values;

public PreparedStatementGeneratedResponse(
String dmlStatement, List<PreparedStatementValueObject<?>> values) {
super(dmlStatement);
this.values = values;
}

public List<PreparedStatementValueObject<?>> getValues() {
return values;
}

public void setValues(List<PreparedStatementValueObject<?>> values) {
this.values = values;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.models;

import com.google.auto.value.AutoValue;

@AutoValue
public abstract class PreparedStatementValueObject<T> {

public abstract String dataType();

public abstract T value();

public static <T> PreparedStatementValueObject<T> create(String dataType, T value) {
return new AutoValue_PreparedStatementValueObject<>(dataType, value);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.dbutils.dao.source;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;
import java.util.Arrays;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

public class CassandraDaoTest {

@Mock private IConnectionHelper mockConnectionHelper;
@Mock private CqlSession mockSession;
@Mock private PreparedStatement mockPreparedStatement;
@Mock private BoundStatement mockBoundStatement;
@Mock private PreparedStatementGeneratedResponse mockPreparedStatementGeneratedResponse;

private CassandraDao cassandraDao;

@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
cassandraDao = new CassandraDao("cassandraUrl", "cassandraUser", mockConnectionHelper);
}

@Test
public void testNullConnectionForWrite() throws Exception {
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString())).thenReturn(null);
ConnectionException exception =
assertThrows(
ConnectionException.class,
() -> cassandraDao.write(mockPreparedStatementGeneratedResponse));
assertEquals("Connection is null", exception.getMessage());
}

@Test
public void testPreparedStatementExecution() throws Exception {
String preparedDmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
List<PreparedStatementValueObject<?>> values =
Arrays.asList(
PreparedStatementValueObject.create("", preparedDmlStatement),
PreparedStatementValueObject.create("Test", preparedDmlStatement));

Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement())
.thenReturn(preparedDmlStatement);
Mockito.when(mockPreparedStatementGeneratedResponse.getValues()).thenReturn(values);
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenReturn(mockSession);
Mockito.when(mockSession.prepare(ArgumentMatchers.eq(preparedDmlStatement)))
.thenReturn(mockPreparedStatement);
Mockito.when(mockPreparedStatement.bind(ArgumentMatchers.any())).thenReturn(mockBoundStatement);

cassandraDao.write(mockPreparedStatementGeneratedResponse);

Mockito.verify(mockSession).prepare(ArgumentMatchers.eq(preparedDmlStatement));
Mockito.verify(mockPreparedStatement).bind(ArgumentMatchers.any());
Mockito.verify(mockSession).execute(ArgumentMatchers.eq(mockBoundStatement));
}

@Test
public void testWriteWithExceptionInPreparedStatement() throws Exception {
String preparedDmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
List<PreparedStatementValueObject<?>> values =
Arrays.asList(
PreparedStatementValueObject.create("", preparedDmlStatement),
PreparedStatementValueObject.create("Test", preparedDmlStatement));

Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement())
.thenReturn(preparedDmlStatement);
Mockito.when(mockPreparedStatementGeneratedResponse.getValues()).thenReturn(values);
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenReturn(mockSession);
Mockito.when(mockSession.prepare(ArgumentMatchers.eq(preparedDmlStatement)))
.thenReturn(mockPreparedStatement);
Mockito.when(mockPreparedStatement.bind(ArgumentMatchers.any())).thenReturn(mockBoundStatement);
Mockito.doThrow(new RuntimeException("Prepared statement execution failed"))
.when(mockSession)
.execute(ArgumentMatchers.eq(mockBoundStatement));

RuntimeException exception =
assertThrows(
RuntimeException.class,
() -> {
cassandraDao.write(mockPreparedStatementGeneratedResponse);
});

assertEquals("Prepared statement execution failed", exception.getMessage());
Mockito.verify(mockSession).prepare(ArgumentMatchers.eq(preparedDmlStatement));
Mockito.verify(mockPreparedStatement).bind(ArgumentMatchers.any());
Mockito.verify(mockSession).execute(ArgumentMatchers.eq(mockBoundStatement));
}

@Test
public void testWriteWithExceptionHandling() throws Exception {
String dmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement()).thenReturn(dmlStatement);
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenReturn(mockSession);
Mockito.when(mockSession.prepare(dmlStatement))
.thenThrow(new RuntimeException("Failed to prepare statement"));

RuntimeException exception =
assertThrows(
RuntimeException.class,
() -> {
cassandraDao.write(mockPreparedStatementGeneratedResponse);
});

assertEquals("Failed to prepare statement", exception.getMessage());
Mockito.verify(mockSession).prepare(dmlStatement);
Mockito.verify(mockSession, Mockito.never()).execute(ArgumentMatchers.<Statement<?>>any());
}

@Test
public void testConnectionExceptionDuringWrite() throws Exception {
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenThrow(new ConnectionException("Connection failed"));
ConnectionException exception =
assertThrows(
ConnectionException.class,
() -> cassandraDao.write(mockPreparedStatementGeneratedResponse));
assertEquals("Connection failed", exception.getMessage());
}
}

0 comments on commit edee6a7

Please sign in to comment.