Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cassandra DAO PR #25

Merged
merged 9 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions v2/spanner-to-sourcedb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>java-driver-core</artifactId>
<version>4.17.0</version> <!-- Use the latest version -->
</dependency>
<!-- TODO - Remove when https://github.com/apache/beam/pull/29732 is released. -->
<dependency>
<groupId>com.google.cloud.teleport</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.dbutils.dao.source;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;

public class CassandraDao implements IDao<DMLGeneratorResponse> {
private final String cassandraUrl;
private final String cassandraUser;
private final IConnectionHelper connectionHelper;

public CassandraDao(
String cassandraUrl, String cassandraUser, IConnectionHelper connectionHelper) {
this.cassandraUrl = cassandraUrl;
this.cassandraUser = cassandraUser;
this.connectionHelper = connectionHelper;
}

@Override
public void write(DMLGeneratorResponse dmlGeneratorResponse) throws Exception {
try (CqlSession session =
(CqlSession)
connectionHelper.getConnection(this.cassandraUrl)) { // Ensure connection is obtained
if (session == null) {
throw new ConnectionException("Connection is null");
}
if (dmlGeneratorResponse instanceof PreparedStatementGeneratedResponse) {
PreparedStatementGeneratedResponse preparedStatementGeneratedResponse =
(PreparedStatementGeneratedResponse) dmlGeneratorResponse;
try {
String dmlStatement = preparedStatementGeneratedResponse.getDmlStatement();
PreparedStatement preparedStatement = session.prepare(dmlStatement);
BoundStatement boundStatement =
preparedStatement.bind(
preparedStatementGeneratedResponse.getValues().stream()
.map(PreparedStatementValueObject::getValue)
.toArray());
session.execute(boundStatement);
} catch (Exception e) {
throw e;
}
} else {
String simpleStatement = dmlGeneratorResponse.getDmlStatement();
session.execute(simpleStatement);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.models;

import java.util.List;

public class PreparedStatementGeneratedResponse extends DMLGeneratorResponse {
private List<PreparedStatementValueObject<?>> values;

public PreparedStatementGeneratedResponse(
String dmlStatement, List<PreparedStatementValueObject<?>> values) {
super(dmlStatement);
this.values = values;
}

public List<PreparedStatementValueObject<?>> getValues() {
return values;
}

public void setValues(List<PreparedStatementValueObject<?>> values) {
this.values = values;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.models;

public class PreparedStatementValueObject<T> {
private String dataType;
private T value;

public PreparedStatementValueObject(String dataType, T value) {
this.dataType = dataType;
this.value = value;
}

public String getDataType() {
return dataType;
}

public void setDataType(String dataType) {
this.dataType = dataType;
}

public T getValue() {
return value;
}

public void setValue(T value) {
this.value = value;
}

// Override toString() for better readability when printing objects
@Override
public String toString() {
return "PreparedStatementValueObject{" + "key='" + dataType + '\'' + ", value=" + value + '}';
}

@Override
public boolean equals(Object obj) {
if (this == obj) return true;
if (obj == null || getClass() != obj.getClass()) return false;
PreparedStatementValueObject<?> that = (PreparedStatementValueObject<?>) obj;
return dataType.equals(that.dataType) && value.equals(that.value);
}

@Override
public int hashCode() {
return 31 * dataType.hashCode() + (value != null ? value.hashCode() : 0);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.dbutils.dao.source;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;

import com.datastax.oss.driver.api.core.CqlSession;
import com.datastax.oss.driver.api.core.cql.BoundStatement;
import com.datastax.oss.driver.api.core.cql.PreparedStatement;
import com.datastax.oss.driver.api.core.cql.Statement;
import com.google.cloud.teleport.v2.templates.dbutils.connection.IConnectionHelper;
import com.google.cloud.teleport.v2.templates.exceptions.ConnectionException;
import com.google.cloud.teleport.v2.templates.models.DMLGeneratorResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementGeneratedResponse;
import com.google.cloud.teleport.v2.templates.models.PreparedStatementValueObject;
import java.util.Arrays;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.MockitoAnnotations;

public class CassandraDaoTest {

@Mock private IConnectionHelper mockConnectionHelper;
@Mock private CqlSession mockSession;
@Mock private PreparedStatement mockPreparedStatement;
@Mock private BoundStatement mockBoundStatement;
@Mock private PreparedStatementGeneratedResponse mockPreparedStatementGeneratedResponse;

private CassandraDao cassandraDao;

@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
cassandraDao = new CassandraDao("cassandraUrl", "cassandraUser", mockConnectionHelper);
}

@Test
public void testNullConnectionForWrite() throws Exception {
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString())).thenReturn(null);
ConnectionException exception =
assertThrows(
ConnectionException.class,
() -> cassandraDao.write(mockPreparedStatementGeneratedResponse));
assertEquals("Connection is null", exception.getMessage());
}

@Test
public void testPreparedStatementExecution() throws Exception {
String preparedDmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
List<PreparedStatementValueObject<?>> values =
Arrays.asList(
new PreparedStatementValueObject<>("", preparedDmlStatement),
new PreparedStatementValueObject<>("Test", preparedDmlStatement));

Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement())
.thenReturn(preparedDmlStatement);
Mockito.when(mockPreparedStatementGeneratedResponse.getValues()).thenReturn(values);
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenReturn(mockSession);
Mockito.when(mockSession.prepare(ArgumentMatchers.eq(preparedDmlStatement)))
.thenReturn(mockPreparedStatement);
Mockito.when(mockPreparedStatement.bind(ArgumentMatchers.any())).thenReturn(mockBoundStatement);

cassandraDao.write(mockPreparedStatementGeneratedResponse);

Mockito.verify(mockSession).prepare(ArgumentMatchers.eq(preparedDmlStatement));
Mockito.verify(mockPreparedStatement).bind(ArgumentMatchers.any());
Mockito.verify(mockSession).execute(ArgumentMatchers.eq(mockBoundStatement));
}

@Test
public void testWriteWithExceptionInPreparedStatement() throws Exception {
String preparedDmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
List<PreparedStatementValueObject<?>> values =
Arrays.asList(
new PreparedStatementValueObject<>("", preparedDmlStatement),
new PreparedStatementValueObject<>("Test", preparedDmlStatement));

Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement())
.thenReturn(preparedDmlStatement);
Mockito.when(mockPreparedStatementGeneratedResponse.getValues()).thenReturn(values);
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenReturn(mockSession);
Mockito.when(mockSession.prepare(ArgumentMatchers.eq(preparedDmlStatement)))
.thenReturn(mockPreparedStatement);
Mockito.when(mockPreparedStatement.bind(ArgumentMatchers.any())).thenReturn(mockBoundStatement);
Mockito.doThrow(new RuntimeException("Prepared statement execution failed"))
.when(mockSession)
.execute(ArgumentMatchers.eq(mockBoundStatement));

RuntimeException exception =
assertThrows(
RuntimeException.class,
() -> {
cassandraDao.write(mockPreparedStatementGeneratedResponse);
});

assertEquals("Prepared statement execution failed", exception.getMessage());
Mockito.verify(mockSession).prepare(ArgumentMatchers.eq(preparedDmlStatement));
Mockito.verify(mockPreparedStatement).bind(ArgumentMatchers.any());
Mockito.verify(mockSession).execute(ArgumentMatchers.eq(mockBoundStatement));
}

@Test
public void testWriteWithExceptionHandling() throws Exception {
String dmlStatement = "INSERT INTO test (id, name) VALUES (?, ?)";
Mockito.when(mockPreparedStatementGeneratedResponse.getDmlStatement()).thenReturn(dmlStatement);
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenReturn(mockSession);
Mockito.when(mockSession.prepare(dmlStatement))
.thenThrow(new RuntimeException("Failed to prepare statement"));

RuntimeException exception =
assertThrows(
RuntimeException.class,
() -> {
cassandraDao.write(mockPreparedStatementGeneratedResponse);
});

assertEquals("Failed to prepare statement", exception.getMessage());
Mockito.verify(mockSession).prepare(dmlStatement);
Mockito.verify(mockSession, Mockito.never()).execute(ArgumentMatchers.<Statement<?>>any());
}

@Test
public void testConnectionExceptionDuringWrite() throws Exception {
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenThrow(new ConnectionException("Connection failed"));
ConnectionException exception =
assertThrows(
ConnectionException.class,
() -> cassandraDao.write(mockPreparedStatementGeneratedResponse));
assertEquals("Connection failed", exception.getMessage());
}

@Test
public void testWriteWithSimpleStatementExecution() throws Exception {
String simpleStatement = "DELETE FROM test WHERE id = 1";
DMLGeneratorResponse mockDmlGeneratorResponse = Mockito.mock(DMLGeneratorResponse.class);

Mockito.when(mockDmlGeneratorResponse.getDmlStatement()).thenReturn(simpleStatement);
Mockito.when(mockConnectionHelper.getConnection(ArgumentMatchers.anyString()))
.thenReturn(mockSession);

cassandraDao.write(mockDmlGeneratorResponse);

Mockito.verify(mockSession).execute(ArgumentMatchers.eq(simpleStatement));
Mockito.verify(mockSession, Mockito.never()).prepare(ArgumentMatchers.anyString());
}
}
Loading