Skip to content

Commit

Permalink
Cassandra DAO PR (#25)
Browse files Browse the repository at this point in the history
Implementation of Cassandra DAO
  • Loading branch information
pawankashyapollion authored Dec 18, 2024
1 parent 5aa21dc commit 0b0faf1
Show file tree
Hide file tree
Showing 5 changed files with 338 additions and 0 deletions.
5 changes: 5 additions & 0 deletions v2/spanner-to-sourcedb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version> <!-- Use the latest version -->
</dependency>
<!-- TODO - Remove when https://github.com/apache/beam/pull/29732 is released. -->
<dependency>
<groupId>com.google.cloud.teleport</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.dbutils.dao.source;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;

public class CassandraDao implements IDao<DMLGeneratorResponse> {
private final String cassandraUrl;
private final String cassandraUser;
private final IConnectionHelper connectionHelper;

public CassandraDao(
String cassandraUrl, String cassandraUser, IConnectionHelper connectionHelper) {
this.cassandraUrl = cassandraUrl;
this.cassandraUser = cassandraUser;
this.connectionHelper = connectionHelper;
}

@Override
public void write(DMLGeneratorResponse dmlGeneratorResponse) throws Exception {
try (CqlSession session =
(CqlSession)
connectionHelper.getConnection(this.cassandraUrl)) { // Ensure connection is obtained
if (session == null) {
throw new ConnectionException("Connection is null");
}
if (dmlGeneratorResponse instanceof PreparedStatementGeneratedResponse) {
PreparedStatementGeneratedResponse preparedStatementGeneratedResponse =
(PreparedStatementGeneratedResponse) dmlGeneratorResponse;
try {
String dmlStatement = preparedStatementGeneratedResponse.getDmlStatement();
PreparedStatement preparedStatement = session.prepare(dmlStatement);
BoundStatement boundStatement =
preparedStatement.bind(
preparedStatementGeneratedResponse.getValues().stream()
.map(PreparedStatementValueObject::getValue)
.toArray());
session.execute(boundStatement);
} catch (Exception e) {
throw e;
}
} else {
String simpleStatement = dmlGeneratorResponse.getDmlStatement();
session.execute(simpleStatement);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.models;

import java.util.List;

public class PreparedStatementGeneratedResponse extends DMLGeneratorResponse {
private List<PreparedStatementValueObject<?>> values;

public PreparedStatementGeneratedResponse(
String dmlStatement, List<PreparedStatementValueObject<?>> values) {
super(dmlStatement);
this.values = values;
}

public List<PreparedStatementValueObject<?>> getValues() {
return values;
}

public void setValues(List<PreparedStatementValueObject<?>> values) {
this.values = values;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.models;

public class PreparedStatementValueObject<T> {
private String dataType;
private T value;

public PreparedStatementValueObject(String dataType, T value) {
this.dataType = dataType;
this.value = value;
}

public String getDataType() {
return dataType;
}

public void setDataType(String dataType) {
this.dataType = dataType;
}

public T getValue() {
return value;
}

public void setValue(T value) {
this.value = value;
}

// Override toString() for better readability when printing objects
@Override
public String toString() {
return "PreparedStatementValueObject{" + "key='" + dataType + '\'' + ", value=" + value + '}';
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
PreparedStatementValueObject<?> that = (PreparedStatementValueObject<?>) obj;
return dataType.equals(that.dataType) && value.equals(that.value);
}

@Override
public int hashCode() {
return 31 * dataType.hashCode() + (value != null ? value.hashCode() : 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.dbutils.dao.source;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;
import java.util.Arrays;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

public class CassandraDaoTest {

@Mock private IConnectionHelper mockConnectionHelper;
@Mock private CqlSession mockSession;
@Mock private PreparedStatement mockPreparedStatement;
@Mock private BoundStatement mockBoundStatement;
@Mock private PreparedStatementGeneratedResponse mockPreparedStatementGeneratedResponse;

private CassandraDao cassandraDao;

@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
cassandraDao = new CassandraDao("cassandraUrl", "cassandraUser", mockConnectionHelper);
}

@Test
public void testNullConnectionForWrite() throws Exception {
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString())).thenReturn(null);
ConnectionException exception =
assertThrows(
ConnectionException.class,
() -> cassandraDao.write(mockPreparedStatementGeneratedResponse));
assertEquals("Connection is null", exception.getMessage());
}

@Test
public void testPreparedStatementExecution() throws Exception {
String preparedDmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
List<PreparedStatementValueObject<?>> values =
Arrays.asList(
new PreparedStatementValueObject<>("", preparedDmlStatement),
new PreparedStatementValueObject<>("Test", preparedDmlStatement));

Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement())
.thenReturn(preparedDmlStatement);
Mockito.when(mockPreparedStatementGeneratedResponse.getValues()).thenReturn(values);
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenReturn(mockSession);
Mockito.when(mockSession.prepare(ArgumentMatchers.eq(preparedDmlStatement)))
.thenReturn(mockPreparedStatement);
Mockito.when(mockPreparedStatement.bind(ArgumentMatchers.any())).thenReturn(mockBoundStatement);

cassandraDao.write(mockPreparedStatementGeneratedResponse);

Mockito.verify(mockSession).prepare(ArgumentMatchers.eq(preparedDmlStatement));
Mockito.verify(mockPreparedStatement).bind(ArgumentMatchers.any());
Mockito.verify(mockSession).execute(ArgumentMatchers.eq(mockBoundStatement));
}

@Test
public void testWriteWithExceptionInPreparedStatement() throws Exception {
String preparedDmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
List<PreparedStatementValueObject<?>> values =
Arrays.asList(
new PreparedStatementValueObject<>("", preparedDmlStatement),
new PreparedStatementValueObject<>("Test", preparedDmlStatement));

Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement())
.thenReturn(preparedDmlStatement);
Mockito.when(mockPreparedStatementGeneratedResponse.getValues()).thenReturn(values);
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenReturn(mockSession);
Mockito.when(mockSession.prepare(ArgumentMatchers.eq(preparedDmlStatement)))
.thenReturn(mockPreparedStatement);
Mockito.when(mockPreparedStatement.bind(ArgumentMatchers.any())).thenReturn(mockBoundStatement);
Mockito.doThrow(new RuntimeException("Prepared statement execution failed"))
.when(mockSession)
.execute(ArgumentMatchers.eq(mockBoundStatement));

RuntimeException exception =
assertThrows(
RuntimeException.class,
() -> {
cassandraDao.write(mockPreparedStatementGeneratedResponse);
});

assertEquals("Prepared statement execution failed", exception.getMessage());
Mockito.verify(mockSession).prepare(ArgumentMatchers.eq(preparedDmlStatement));
Mockito.verify(mockPreparedStatement).bind(ArgumentMatchers.any());
Mockito.verify(mockSession).execute(ArgumentMatchers.eq(mockBoundStatement));
}

@Test
public void testWriteWithExceptionHandling() throws Exception {
String dmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement()).thenReturn(dmlStatement);
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenReturn(mockSession);
Mockito.when(mockSession.prepare(dmlStatement))
.thenThrow(new RuntimeException("Failed to prepare statement"));

RuntimeException exception =
assertThrows(
RuntimeException.class,
() -> {
cassandraDao.write(mockPreparedStatementGeneratedResponse);
});

assertEquals("Failed to prepare statement", exception.getMessage());
Mockito.verify(mockSession).prepare(dmlStatement);
Mockito.verify(mockSession, Mockito.never()).execute(ArgumentMatchers.<Statement<?>>any());
}

@Test
public void testConnectionExceptionDuringWrite() throws Exception {
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenThrow(new ConnectionException("Connection failed"));
ConnectionException exception =
assertThrows(
ConnectionException.class,
() -> cassandraDao.write(mockPreparedStatementGeneratedResponse));
assertEquals("Connection failed", exception.getMessage());
}

@Test
public void testWriteWithSimpleStatementExecution() throws Exception {
String simpleStatement = "DELETE FROM test WHERE id = 1";
DMLGeneratorResponse mockDmlGeneratorResponse = Mockito.mock(DMLGeneratorResponse.class);

Mockito.when(mockDmlGeneratorResponse.getDmlStatement()).thenReturn(simpleStatement);
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenReturn(mockSession);

cassandraDao.write(mockDmlGeneratorResponse);

Mockito.verify(mockSession).execute(ArgumentMatchers.eq(simpleStatement));
Mockito.verify(mockSession, Mockito.never()).prepare(ArgumentMatchers.anyString());
}
}

0 comments on commit 0b0faf1

Please sign in to comment.