Skip to content

Commit

Permalink
Merge branch 'main' into metadata_config_and_pipeline_options
Browse files Browse the repository at this point in the history
  • Loading branch information
pawankashyapollion committed Dec 23, 2024
2 parents 5c71394 + 04bac39 commit b1bac03
Show file tree
Hide file tree
Showing 19 changed files with 698 additions and 119 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/uw-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
id: variables
run: |
echo "unifiedWorkerHarnessContainerImage=${HARNESS_IMAGE}" >> $GITHUB_OUTPUT
echo "releaseTag=$(curl -s https://api.github.com/repos/GoogleCLoudPlatform/DataflowTemplates/releases/latest | jq '.tag_name' | sed 's/\"//g')" >> $GITHUB_OUTPUT
echo "releaseTag=$(curl -s https://api.github.com/repos/GoogleCloudPlatform/DataflowTemplates/releases/latest | jq '.tag_name' | sed 's/\"//g')" >> $GITHUB_OUTPUT
env:
HARNESS_IMAGE: ${{ inputs.unifiedWorkerHarnessContainerImage }}
- name: Checkout code
Expand Down
6 changes: 6 additions & 0 deletions v2/sourcedb-to-spanner/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,12 @@
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-io-cassandra</artifactId>
</dependency>
<dependency>
<groupId>com.microsoft.sqlserver</groupId>
<artifactId>mssql-jdbc</artifactId>
<version>${mssql-jdbc.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import com.google.re2j.Pattern;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand All @@ -59,9 +58,6 @@
public final class MysqlDialectAdapter implements DialectAdapter {

public static final String PAD_SPACE = "PAD SPACE";
public static final String NO_PAD = "NO PAD";
public static final String BINARY_CHARACTER_SET = "binary";
public static final String BINARY_COLLATION = "binary";
private final MySqlVersion mySqlVersion;

private static final Logger logger = LoggerFactory.getLogger(MysqlDialectAdapter.class);
Expand Down Expand Up @@ -370,16 +366,14 @@ private ImmutableMap<String, SourceColumnType> getTableCols(
// String types: Ref https://dev.mysql.com/doc/refman/8.4/en/string-type-syntax.html
.put("CHAR", IndexType.STRING)
.put("VARCHAR", IndexType.STRING)
.put("BINARY", IndexType.STRING)
.put("VARBINARY", IndexType.STRING)
.put("BLOB", IndexType.STRING)
.put("TEXT", IndexType.STRING)
.put("ENUM", IndexType.STRING)
.put("SET", IndexType.STRING)
// Mapping BINARY, VARBINARY and TINYBLOB to Java bigInteger
// Ref https://dev.mysql.com/doc/refman/8.4/en/charset-binary-collations.html
.put("BINARY", IndexType.BINARY)
.put("VARBINARY", IndexType.BINARY)
.put("TINYBLOB", IndexType.BINARY)
.put("TINYTEXT", IndexType.STRING)
.build();

private ImmutableSet<String> binaryColumnTypes = ImmutableSet.of("BINARY", "VARBINARY", "BLOB");

/**
* Get the PadSpace attribute from {@link ResultSet} for index discovery query {@link
* #getIndexDiscoveryQuery(JdbcSchemaReference)}. This method takes care of the fact that older
Expand Down Expand Up @@ -440,28 +434,17 @@ private ImmutableList<SourceColumnIndexInfo> getTableIndexes(
// Column.
String columType = normalizeColumnType(rs.getString(InformationSchemaStatsCols.TYPE_COL));
IndexType indexType = INDEX_TYPE_MAPPING.getOrDefault(columType, IndexType.OTHER);

CollationReference collationReference = null;
// Binary (and similar columns like VarBinary, Blob etc) columns have a fixed character-set
// and collation called "binary".
// Ref https://dev.mysql.com/doc/refman/8.4/en/charset-binary-collations.html
// In information_schema.columns query, these column types show null as character set.
// Ref: https://www.db-fiddle.com/f/kRVPA5jDwZYNj2rsdtif4K/3
// Also for both mySQL 5.7 and 8.0 binary columns have a NO-PAD comparison.
// Ref: https://www.db-fiddle.com/f/kRVPA5jDwZYNj2rsdtif4K/0.
if (binaryColumnTypes.contains(columType) && characterSet == null) {
characterSet = BINARY_CHARACTER_SET;
collation = BINARY_COLLATION;
padSpace = NO_PAD;
}
if (characterSet != null) {
if (indexType.equals(IndexType.STRING)) {
collationReference =
CollationReference.builder()
.setDbCharacterSet(characterSet)
.setDbCollation(collation)
.setDbCharacterSet(escapeMySql(characterSet))
.setDbCollation(escapeMySql(collation))
.setPadSpace(
(padSpace == null) ? false : padSpace.trim().toUpperCase().equals(PAD_SPACE))
.build();
} else {
stringMaxLength = null;
}

indexesBuilder.add(
Expand All @@ -487,6 +470,15 @@ private ImmutableList<SourceColumnIndexInfo> getTableIndexes(
return indexesBuilder.build();
}

@VisibleForTesting
protected static String escapeMySql(String input) {
if (input.startsWith("`")) {
return input;
} else {
return "`" + input + "`";
}
}

private SourceColumnType resultSetToSourceColumnType(ResultSet rs) throws SQLException {
String colType = normalizeColumnType(rs.getString(InformationSchemaCols.TYPE_COL));
long charMaxLength = rs.getLong(InformationSchemaCols.CHAR_MAX_LENGTH_COL);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,8 @@ private static TableConfig getTableConfig(
.forEach(tableConfigBuilder::withPartitionColum);
} else {
ImmutableSet<IndexType> supportedIndexTypes =
ImmutableSet.of(IndexType.NUMERIC, IndexType.STRING, IndexType.BIG_INT_UNSIGNED);
ImmutableSet.of(
IndexType.NUMERIC, IndexType.STRING, IndexType.BIG_INT_UNSIGNED, IndexType.BINARY);
// As of now only Primary key index with Numeric type is supported.
// TODO:
// 1. support non-primary unique indexes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
Expand All @@ -27,21 +26,27 @@
/** Factory to construct {@link BoundaryExtractor} for supported {@link class}. */
public class BoundaryExtractorFactory {

public static final Class BYTE_ARRAY_CLASS = (new byte[] {}).getClass();
private static final ImmutableMap<Class, BoundaryExtractor<?>> extractorMap =
ImmutableMap.of(
Integer.class,
(BoundaryExtractor<Integer>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromIntegers(partitionColumn, resultSet, boundaryTypeMapper),
(BoundaryExtractor<Integer>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromIntegers(partitionColumn, resultSet, boundaryTypeMapper),
Long.class,
(BoundaryExtractor<Long>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromLongs(partitionColumn, resultSet, boundaryTypeMapper),
String.class, (BoundaryExtractor<String>) BoundaryExtractorFactory::fromStrings,
BigInteger.class,
(BoundaryExtractor<BigInteger>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromBigIntegers(partitionColumn, resultSet, boundaryTypeMapper));
(BoundaryExtractor<Long>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromLongs(partitionColumn, resultSet, boundaryTypeMapper),
String.class,
(BoundaryExtractor<String>) BoundaryExtractorFactory::fromStrings,
BigDecimal.class,
(BoundaryExtractor<BigDecimal>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromBigDecimals(partitionColumn, resultSet, boundaryTypeMapper),
BYTE_ARRAY_CLASS,
(BoundaryExtractor<byte[]>)
(partitionColumn, resultSet, boundaryTypeMapper) ->
fromBinary(partitionColumn, resultSet, boundaryTypeMapper));

/**
* Create a {@link BoundaryExtractor} for the required class.
Expand Down Expand Up @@ -90,20 +95,38 @@ private static Boundary<Long> fromLongs(
.build();
}

private static Boundary<java.math.BigInteger> fromBigIntegers(
private static Boundary<BigDecimal> fromBigDecimals(
PartitionColumn partitionColumn,
ResultSet resultSet,
@Nullable BoundaryTypeMapper boundaryTypeMapper)
throws SQLException {
Preconditions.checkArgument(partitionColumn.columnClass().equals(BigInteger.class));
Preconditions.checkArgument(partitionColumn.columnClass().equals(BigDecimal.class));
resultSet.next();
BigDecimal start = resultSet.getBigDecimal(1);
BigDecimal end = resultSet.getBigDecimal(2);
return Boundary.<java.math.BigInteger>builder()
return Boundary.<BigDecimal>builder()
.setPartitionColumn(partitionColumn)
.setStart(start == null ? null : start.toBigInteger())
.setEnd(end == null ? null : end.toBigInteger())
.setBoundarySplitter(BoundarySplitterFactory.create(BigInteger.class))
.setStart(start)
.setEnd(end)
.setBoundarySplitter(BoundarySplitterFactory.create(BigDecimal.class))
.setBoundaryTypeMapper(boundaryTypeMapper)
.build();
}

private static Boundary<byte[]> fromBinary(
PartitionColumn partitionColumn,
ResultSet resultSet,
@Nullable BoundaryTypeMapper boundaryTypeMapper)
throws SQLException {
Preconditions.checkArgument(partitionColumn.columnClass().equals(BYTE_ARRAY_CLASS));
resultSet.next();
byte[] start = resultSet.getBytes(1);
byte[] end = resultSet.getBytes(2);
return Boundary.<byte[]>builder()
.setPartitionColumn(partitionColumn)
.setStart(start)
.setEnd(end)
.setBoundarySplitter(BoundarySplitterFactory.create(BYTE_ARRAY_CLASS))
.setBoundaryTypeMapper(boundaryTypeMapper)
.build();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
*/
package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range;

import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.BoundaryExtractorFactory.BYTE_ARRAY_CLASS;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.math.BigDecimal;
import java.math.BigInteger;
import org.apache.beam.sdk.transforms.DoFn;

Expand All @@ -26,18 +29,27 @@ public class BoundarySplitterFactory {
private static final ImmutableMap<Class, BoundarySplitter<?>> splittermap =
ImmutableMap.of(
Integer.class,
(BoundarySplitter<Integer>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitIntegers(start, end),
(BoundarySplitter<Integer>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitIntegers(start, end),
Long.class,
(BoundarySplitter<Long>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitLongs(start, end),
(BoundarySplitter<Long>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitLongs(start, end),
BigInteger.class,
(BoundarySplitter<BigInteger>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitBigIntegers(start, end),
String.class, (BoundarySplitter<String>) BoundarySplitterFactory::splitStrings);
(BoundarySplitter<BigInteger>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitBigIntegers(start, end),
BigDecimal.class,
(BoundarySplitter<BigDecimal>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitBigDecimal(start, end),
String.class,
(BoundarySplitter<String>) BoundarySplitterFactory::splitStrings,
BYTE_ARRAY_CLASS,
(BoundarySplitter<byte[]>)
(start, end, partitionColumn, boundaryTypeMapper, processContext) ->
splitBytes(start, end));

/**
* Creates {@link BoundarySplitter BoundarySplitter&lt;T&gt;} for pass class {@code c} such that
Expand Down Expand Up @@ -132,6 +144,26 @@ private static Long splitLongs(Long start, Long end) {
return (start & end) + ((start ^ end) >> 1);
}

private static BigDecimal splitBigDecimal(BigDecimal start, BigDecimal end) {
BigInteger startBigInt = (start == null) ? null : start.toBigInteger();
BigInteger endBigInt = (end == null) ? null : end.toBigInteger();
BigInteger split = splitBigIntegers(startBigInt, endBigInt);
if (split == null) {
return null;
}
return new BigDecimal(split);
}

private static byte[] splitBytes(byte[] start, byte[] end) {
BigInteger startBigInt = (start == null) ? null : new BigInteger(start);
BigInteger endBigInt = (end == null) ? null : new BigInteger(end);
BigInteger split = splitBigIntegers(startBigInt, endBigInt);
if (split == null) {
return null;
}
return split.toByteArray();
}

private static String splitStrings(
String start,
String end,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,11 @@
package com.google.cloud.teleport.v2.source.reader.io.schema;

import com.google.auto.value.AutoValue;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.BoundaryExtractorFactory;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.math.BigInteger;
import java.math.BigDecimal;
import javax.annotation.Nullable;

@AutoValue
Expand Down Expand Up @@ -138,6 +139,7 @@ public SourceColumnIndexInfo build() {
public enum IndexType {
NUMERIC,
BIG_INT_UNSIGNED,
BINARY,
STRING,
DATE_TIME,
OTHER
Expand All @@ -148,5 +150,6 @@ public enum IndexType {
ImmutableMap.of(
IndexType.NUMERIC, Long.class,
IndexType.STRING, String.class,
IndexType.BIG_INT_UNSIGNED, BigInteger.class);
IndexType.BIG_INT_UNSIGNED, BigDecimal.class,
IndexType.BINARY, BoundaryExtractorFactory.BYTE_ARRAY_CLASS);
}
Loading

0 comments on commit b1bac03

Please sign in to comment.