Skip to content

Commit

Permalink
Refactor MySQLIncrementalDumperTest and WALEventConverterTest (#28936)
Browse files Browse the repository at this point in the history
  • Loading branch information
terrymanu authored Nov 4, 2023
1 parent e89fb50 commit 0730fa3
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@

package org.apache.shardingsphere.data.pipeline.mysql.ingest;

import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.DataRecord;
import org.apache.shardingsphere.data.pipeline.api.ingest.record.Record;
import org.apache.shardingsphere.data.pipeline.api.metadata.ActualTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.ColumnName;
import org.apache.shardingsphere.data.pipeline.api.metadata.LogicTableName;
import org.apache.shardingsphere.data.pipeline.api.metadata.loader.PipelineTableMetaDataLoader;
import org.apache.shardingsphere.data.pipeline.api.metadata.model.PipelineColumnMetaData;
Expand Down Expand Up @@ -55,7 +53,6 @@
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Types;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
Expand All @@ -80,8 +77,8 @@ class MySQLIncrementalDumperTest {
private PipelineTableMetaData pipelineTableMetaData;

@BeforeEach
void setUp() {
IncrementalDumperContext dumperContext = mockDumperContext();
void setUp() throws SQLException {
IncrementalDumperContext dumperContext = createDumperContext();
initTableData(dumperContext);
dumperContext.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:mock://127.0.0.1:3306/test", "root", "root"));
PipelineTableMetaDataLoader metaDataLoader = mock(PipelineTableMetaDataLoader.class);
Expand All @@ -91,16 +88,15 @@ void setUp() {
when(metaDataLoader.getTableMetaData(any(), any())).thenReturn(pipelineTableMetaData);
}

private IncrementalDumperContext mockDumperContext() {
private IncrementalDumperContext createDumperContext() {
IncrementalDumperContext result = new IncrementalDumperContext();
result.setDataSourceConfig(new StandardPipelineDataSourceConfiguration("jdbc:h2:mem:test;DB_CLOSE_DELAY=-1;DATABASE_TO_UPPER=false;MODE=MySQL", "root", "root"));
result.setTableNameMap(Collections.singletonMap(new ActualTableName("t_order"), new LogicTableName("t_order")));
result.setTableNameSchemaNameMapping(new TableNameSchemaNameMapping(Collections.emptyMap()));
return result;
}

@SneakyThrows(SQLException.class)
private void initTableData(final IncrementalDumperContext dumperContext) {
private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException {
try (
PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
Expand All @@ -125,62 +121,70 @@ private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {
}

@Test
void assertWriteRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
assertWriteRowsEvent0(3);
}

private void assertWriteRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException {
WriteRowsEvent rowsEvent = new WriteRowsEvent();
rowsEvent.setDatabaseName("");
rowsEvent.setTableName("t_order");
rowsEvent.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", WriteRowsEvent.class, PipelineTableMetaData.class);
List<Record> actual = (List<Record>) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
void assertWriteRowsEvent() throws ReflectiveOperationException {
List<Record> actual = getRecordsByWriteRowsEvent(createWriteRowsEvent());
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(), is(IngestDataChangeType.INSERT));
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(expectedColumnCount));
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}

private Map<LogicTableName, Collection<ColumnName>> mockTargetTableColumnsMap() {
return Collections.singletonMap(new LogicTableName("t_order"), Collections.singleton(new ColumnName("order_id")));
private WriteRowsEvent createWriteRowsEvent() {
WriteRowsEvent result = new WriteRowsEvent();
result.setDatabaseName("");
result.setTableName("t_order");
result.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
return result;
}

@Test
void assertUpdateRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
assertUpdateRowsEvent0(3);
private List<Record> getRecordsByWriteRowsEvent(final WriteRowsEvent rowsEvent) throws ReflectiveOperationException {
Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleWriteRowsEvent", WriteRowsEvent.class, PipelineTableMetaData.class);
return (List<Record>) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
}

private void assertUpdateRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException {
UpdateRowsEvent rowsEvent = new UpdateRowsEvent();
rowsEvent.setDatabaseName("test");
rowsEvent.setTableName("t_order");
rowsEvent.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
rowsEvent.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK2"}));
Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", UpdateRowsEvent.class, PipelineTableMetaData.class);
List<Record> actual = (List<Record>) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
@Test
void assertUpdateRowsEvent() throws ReflectiveOperationException {
List<Record> actual = getRecordsByUpdateRowsEvent(createUpdateRowsEvent());
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(), is(IngestDataChangeType.UPDATE));
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(expectedColumnCount));
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}

@Test
void assertDeleteRowsEventWithoutCustomColumns() throws ReflectiveOperationException {
assertDeleteRowsEvent0(3);
private UpdateRowsEvent createUpdateRowsEvent() {
UpdateRowsEvent result = new UpdateRowsEvent();
result.setDatabaseName("test");
result.setTableName("t_order");
result.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
result.setAfterRows(Collections.singletonList(new Serializable[]{101, 1, "OK2"}));
return result;
}

private void assertDeleteRowsEvent0(final int expectedColumnCount) throws ReflectiveOperationException {
DeleteRowsEvent rowsEvent = new DeleteRowsEvent();
rowsEvent.setDatabaseName("");
rowsEvent.setTableName("t_order");
rowsEvent.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", DeleteRowsEvent.class, PipelineTableMetaData.class);
List<Record> actual = (List<Record>) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
private List<Record> getRecordsByUpdateRowsEvent(final UpdateRowsEvent rowsEvent) throws ReflectiveOperationException {
Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleUpdateRowsEvent", UpdateRowsEvent.class, PipelineTableMetaData.class);
return (List<Record>) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
}

@Test
void assertDeleteRowsEvent() throws ReflectiveOperationException {
List<Record> actual = getRecordsByDeleteRowsEvent(createDeleteRowsEvent());
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
assertThat(((DataRecord) actual.get(0)).getType(), is(IngestDataChangeType.DELETE));
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(expectedColumnCount));
assertThat(((DataRecord) actual.get(0)).getColumnCount(), is(3));
}

private DeleteRowsEvent createDeleteRowsEvent() {
DeleteRowsEvent result = new DeleteRowsEvent();
result.setDatabaseName("");
result.setTableName("t_order");
result.setBeforeRows(Collections.singletonList(new Serializable[]{101, 1, "OK"}));
return result;
}

private List<Record> getRecordsByDeleteRowsEvent(final DeleteRowsEvent rowsEvent) throws ReflectiveOperationException {
Method method = MySQLIncrementalDumper.class.getDeclaredMethod("handleDeleteRowsEvent", DeleteRowsEvent.class, PipelineTableMetaData.class);
return (List<Record>) Plugins.getMemberAccessor().invoke(method, incrementalDumper, rowsEvent, pipelineTableMetaData);
}

@Test
Expand All @@ -192,13 +196,17 @@ void assertPlaceholderEvent() throws ReflectiveOperationException {

@Test
void assertRowsEventFiltered() throws ReflectiveOperationException {
WriteRowsEvent rowsEvent = new WriteRowsEvent();
rowsEvent.setDatabaseName("test");
rowsEvent.setTableName("t_order");
rowsEvent.setAfterRows(Collections.singletonList(new Serializable[]{1}));
List<Record> actual = (List<Record>) Plugins.getMemberAccessor().invoke(MySQLIncrementalDumper.class.getDeclaredMethod("handleEvent", AbstractBinlogEvent.class),
incrementalDumper, rowsEvent);
incrementalDumper, getFilteredWriteRowsEvent());
assertThat(actual.size(), is(1));
assertThat(actual.get(0), instanceOf(DataRecord.class));
}

private WriteRowsEvent getFilteredWriteRowsEvent() {
WriteRowsEvent result = new WriteRowsEvent();
result.setDatabaseName("test");
result.setTableName("t_order");
result.setAfterRows(Collections.singletonList(new Serializable[]{1}));
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.shardingsphere.data.pipeline.postgresql.ingest.wal;

import lombok.SneakyThrows;
import org.apache.shardingsphere.data.pipeline.api.context.TableNameSchemaNameMapping;
import org.apache.shardingsphere.data.pipeline.api.context.ingest.IncrementalDumperContext;
import org.apache.shardingsphere.data.pipeline.api.datasource.config.impl.StandardPipelineDataSourceConfiguration;
Expand Down Expand Up @@ -76,7 +75,7 @@ class WALEventConverterTest {
private PipelineTableMetaData pipelineTableMetaData;

@BeforeEach
void setUp() {
void setUp() throws SQLException {
IncrementalDumperContext dumperContext = mockDumperContext();
PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
walEventConverter = new WALEventConverter(dumperContext, new StandardPipelineTableMetaDataLoader(dataSourceManager.getDataSource(dumperContext.getDataSourceConfig())));
Expand All @@ -92,8 +91,7 @@ private IncrementalDumperContext mockDumperContext() {
return result;
}

@SneakyThrows(SQLException.class)
private void initTableData(final IncrementalDumperContext dumperContext) {
private void initTableData(final IncrementalDumperContext dumperContext) throws SQLException {
try (
PipelineDataSourceManager dataSourceManager = new DefaultPipelineDataSourceManager();
PipelineDataSourceWrapper dataSource = dataSourceManager.getDataSource(dumperContext.getDataSourceConfig());
Expand All @@ -118,19 +116,23 @@ private List<PipelineColumnMetaData> mockOrderColumnsMetaDataList() {
}

@Test
void assertWriteRowEventWithoutCustomColumns() throws ReflectiveOperationException {
assertWriteRowEvent0(3);
void assertWriteRowEvent() throws ReflectiveOperationException {
DataRecord actual = getDataRecord(createWriteRowEvent());
assertThat(actual.getType(), is(IngestDataChangeType.INSERT));
assertThat(actual.getColumnCount(), is(3));
}

private WriteRowEvent createWriteRowEvent() {
WriteRowEvent result = new WriteRowEvent();
result.setSchemaName("");
result.setTableName("t_order");
result.setAfterRow(Arrays.asList(101, 1, "OK"));
return result;
}

private void assertWriteRowEvent0(final int expectedColumnCount) throws ReflectiveOperationException {
WriteRowEvent rowsEvent = new WriteRowEvent();
rowsEvent.setSchemaName("");
rowsEvent.setTableName("t_order");
rowsEvent.setAfterRow(Arrays.asList(101, 1, "OK"));
private DataRecord getDataRecord(final WriteRowEvent rowsEvent) throws ReflectiveOperationException {
Method method = WALEventConverter.class.getDeclaredMethod("handleWriteRowEvent", WriteRowEvent.class, PipelineTableMetaData.class);
DataRecord actual = (DataRecord) Plugins.getMemberAccessor().invoke(method, walEventConverter, rowsEvent, pipelineTableMetaData);
assertThat(actual.getType(), is(IngestDataChangeType.INSERT));
assertThat(actual.getColumnCount(), is(expectedColumnCount));
return (DataRecord) Plugins.getMemberAccessor().invoke(method, walEventConverter, rowsEvent, pipelineTableMetaData);
}

@Test
Expand Down

0 comments on commit 0730fa3

Please sign in to comment.