diff --git a/v2/sourcedb-to-spanner/pom.xml b/v2/sourcedb-to-spanner/pom.xml
index 52b002a0a2..93ff91864a 100644
--- a/v2/sourcedb-to-spanner/pom.xml
+++ b/v2/sourcedb-to-spanner/pom.xml
@@ -146,6 +146,12 @@
org.apache.beam
beam-sdks-java-io-cassandra
+
+ com.microsoft.sqlserver
+ mssql-jdbc
+ ${mssql-jdbc.version}
+ test
+
org.apache.commons
commons-collections4
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapter.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapter.java
index fb0a15508c..d1238682b2 100644
--- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapter.java
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapter.java
@@ -35,7 +35,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
import com.google.re2j.Pattern;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
@@ -59,9 +58,6 @@
public final class MysqlDialectAdapter implements DialectAdapter {
public static final String PAD_SPACE = "PAD SPACE";
- public static final String NO_PAD = "NO PAD";
- public static final String BINARY_CHARACTER_SET = "binary";
- public static final String BINARY_COLLATION = "binary";
private final MySqlVersion mySqlVersion;
private static final Logger logger = LoggerFactory.getLogger(MysqlDialectAdapter.class);
@@ -370,16 +366,14 @@ private ImmutableMap getTableCols(
// String types: Ref https://dev.mysql.com/doc/refman/8.4/en/string-type-syntax.html
.put("CHAR", IndexType.STRING)
.put("VARCHAR", IndexType.STRING)
- .put("BINARY", IndexType.STRING)
- .put("VARBINARY", IndexType.STRING)
- .put("BLOB", IndexType.STRING)
- .put("TEXT", IndexType.STRING)
- .put("ENUM", IndexType.STRING)
- .put("SET", IndexType.STRING)
+ // Mapping BINARY, VARBINARY and TINYBLOB to Java bigInteger
+ // Ref https://dev.mysql.com/doc/refman/8.4/en/charset-binary-collations.html
+ .put("BINARY", IndexType.BINARY)
+ .put("VARBINARY", IndexType.BINARY)
+ .put("TINYBLOB", IndexType.BINARY)
+ .put("TINYTEXT", IndexType.STRING)
.build();
- private ImmutableSet binaryColumnTypes = ImmutableSet.of("BINARY", "VARBINARY", "BLOB");
-
/**
* Get the PadSpace attribute from {@link ResultSet} for index discovery query {@link
* #getIndexDiscoveryQuery(JdbcSchemaReference)}. This method takes care of the fact that older
@@ -440,28 +434,17 @@ private ImmutableList getTableIndexes(
// Column.
String columType = normalizeColumnType(rs.getString(InformationSchemaStatsCols.TYPE_COL));
IndexType indexType = INDEX_TYPE_MAPPING.getOrDefault(columType, IndexType.OTHER);
-
CollationReference collationReference = null;
- // Binary (and similar columns like VarBinary, Blob etc) columns have a fixed character-set
- // and collation called "binary".
- // Ref https://dev.mysql.com/doc/refman/8.4/en/charset-binary-collations.html
- // In information_schema.columns query, these column types show null as character set.
- // Ref: https://www.db-fiddle.com/f/kRVPA5jDwZYNj2rsdtif4K/3
- // Also for both mySQL 5.7 and 8.0 binary columns have a NO-PAD comparison.
- // Ref: https://www.db-fiddle.com/f/kRVPA5jDwZYNj2rsdtif4K/0.
- if (binaryColumnTypes.contains(columType) && characterSet == null) {
- characterSet = BINARY_CHARACTER_SET;
- collation = BINARY_COLLATION;
- padSpace = NO_PAD;
- }
- if (characterSet != null) {
+ if (indexType.equals(IndexType.STRING)) {
collationReference =
CollationReference.builder()
- .setDbCharacterSet(characterSet)
- .setDbCollation(collation)
+ .setDbCharacterSet(escapeMySql(characterSet))
+ .setDbCollation(escapeMySql(collation))
.setPadSpace(
(padSpace == null) ? false : padSpace.trim().toUpperCase().equals(PAD_SPACE))
.build();
+ } else {
+ stringMaxLength = null;
}
indexesBuilder.add(
@@ -487,6 +470,15 @@ private ImmutableList getTableIndexes(
return indexesBuilder.build();
}
+ @VisibleForTesting
+ protected static String escapeMySql(String input) {
+ if (input.startsWith("`")) {
+ return input;
+ } else {
+ return "`" + input + "`";
+ }
+ }
+
private SourceColumnType resultSetToSourceColumnType(ResultSet rs) throws SQLException {
String colType = normalizeColumnType(rs.getString(InformationSchemaCols.TYPE_COL));
long charMaxLength = rs.getLong(InformationSchemaCols.CHAR_MAX_LENGTH_COL);
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java
index d001f8864f..a69d909520 100644
--- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapper.java
@@ -306,7 +306,8 @@ private static TableConfig getTableConfig(
.forEach(tableConfigBuilder::withPartitionColum);
} else {
ImmutableSet supportedIndexTypes =
- ImmutableSet.of(IndexType.NUMERIC, IndexType.STRING, IndexType.BIG_INT_UNSIGNED);
+ ImmutableSet.of(
+ IndexType.NUMERIC, IndexType.STRING, IndexType.BIG_INT_UNSIGNED, IndexType.BINARY);
// As of now only Primary key index with Numeric type is supported.
// TODO:
// 1. support non-primary unique indexes.
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundaryExtractorFactory.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundaryExtractorFactory.java
index 85a89add45..b53ce702cb 100644
--- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundaryExtractorFactory.java
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundaryExtractorFactory.java
@@ -18,7 +18,6 @@
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
import java.math.BigDecimal;
-import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
@@ -27,21 +26,27 @@
/** Factory to construct {@link BoundaryExtractor} for supported {@link class}. */
public class BoundaryExtractorFactory {
+ public static final Class BYTE_ARRAY_CLASS = (new byte[] {}).getClass();
private static final ImmutableMap> extractorMap =
ImmutableMap.of(
Integer.class,
- (BoundaryExtractor)
- (partitionColumn, resultSet, boundaryTypeMapper) ->
- fromIntegers(partitionColumn, resultSet, boundaryTypeMapper),
+ (BoundaryExtractor)
+ (partitionColumn, resultSet, boundaryTypeMapper) ->
+ fromIntegers(partitionColumn, resultSet, boundaryTypeMapper),
Long.class,
- (BoundaryExtractor)
- (partitionColumn, resultSet, boundaryTypeMapper) ->
- fromLongs(partitionColumn, resultSet, boundaryTypeMapper),
- String.class, (BoundaryExtractor) BoundaryExtractorFactory::fromStrings,
- BigInteger.class,
- (BoundaryExtractor)
- (partitionColumn, resultSet, boundaryTypeMapper) ->
- fromBigIntegers(partitionColumn, resultSet, boundaryTypeMapper));
+ (BoundaryExtractor)
+ (partitionColumn, resultSet, boundaryTypeMapper) ->
+ fromLongs(partitionColumn, resultSet, boundaryTypeMapper),
+ String.class,
+ (BoundaryExtractor) BoundaryExtractorFactory::fromStrings,
+ BigDecimal.class,
+ (BoundaryExtractor)
+ (partitionColumn, resultSet, boundaryTypeMapper) ->
+ fromBigDecimals(partitionColumn, resultSet, boundaryTypeMapper),
+ BYTE_ARRAY_CLASS,
+ (BoundaryExtractor)
+ (partitionColumn, resultSet, boundaryTypeMapper) ->
+ fromBinary(partitionColumn, resultSet, boundaryTypeMapper));
/**
* Create a {@link BoundaryExtractor} for the required class.
@@ -90,20 +95,38 @@ private static Boundary fromLongs(
.build();
}
- private static Boundary fromBigIntegers(
+ private static Boundary fromBigDecimals(
PartitionColumn partitionColumn,
ResultSet resultSet,
@Nullable BoundaryTypeMapper boundaryTypeMapper)
throws SQLException {
- Preconditions.checkArgument(partitionColumn.columnClass().equals(BigInteger.class));
+ Preconditions.checkArgument(partitionColumn.columnClass().equals(BigDecimal.class));
resultSet.next();
BigDecimal start = resultSet.getBigDecimal(1);
BigDecimal end = resultSet.getBigDecimal(2);
- return Boundary.builder()
+ return Boundary.builder()
.setPartitionColumn(partitionColumn)
- .setStart(start == null ? null : start.toBigInteger())
- .setEnd(end == null ? null : end.toBigInteger())
- .setBoundarySplitter(BoundarySplitterFactory.create(BigInteger.class))
+ .setStart(start)
+ .setEnd(end)
+ .setBoundarySplitter(BoundarySplitterFactory.create(BigDecimal.class))
+ .setBoundaryTypeMapper(boundaryTypeMapper)
+ .build();
+ }
+
+ private static Boundary fromBinary(
+ PartitionColumn partitionColumn,
+ ResultSet resultSet,
+ @Nullable BoundaryTypeMapper boundaryTypeMapper)
+ throws SQLException {
+ Preconditions.checkArgument(partitionColumn.columnClass().equals(BYTE_ARRAY_CLASS));
+ resultSet.next();
+ byte[] start = resultSet.getBytes(1);
+ byte[] end = resultSet.getBytes(2);
+ return Boundary.builder()
+ .setPartitionColumn(partitionColumn)
+ .setStart(start)
+ .setEnd(end)
+ .setBoundarySplitter(BoundarySplitterFactory.create(BYTE_ARRAY_CLASS))
.setBoundaryTypeMapper(boundaryTypeMapper)
.build();
}
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundarySplitterFactory.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundarySplitterFactory.java
index c855db183d..8666969055 100644
--- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundarySplitterFactory.java
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundarySplitterFactory.java
@@ -15,9 +15,12 @@
*/
package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range;
+import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.BoundaryExtractorFactory.BYTE_ARRAY_CLASS;
+
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import java.io.Serializable;
+import java.math.BigDecimal;
import java.math.BigInteger;
import org.apache.beam.sdk.transforms.DoFn;
@@ -26,18 +29,27 @@ public class BoundarySplitterFactory {
private static final ImmutableMap> splittermap =
ImmutableMap.of(
Integer.class,
- (BoundarySplitter)
- (start, end, partitionColumn, boundaryTypeMapper, processContext) ->
- splitIntegers(start, end),
+ (BoundarySplitter)
+ (start, end, partitionColumn, boundaryTypeMapper, processContext) ->
+ splitIntegers(start, end),
Long.class,
- (BoundarySplitter)
- (start, end, partitionColumn, boundaryTypeMapper, processContext) ->
- splitLongs(start, end),
+ (BoundarySplitter)
+ (start, end, partitionColumn, boundaryTypeMapper, processContext) ->
+ splitLongs(start, end),
BigInteger.class,
- (BoundarySplitter)
- (start, end, partitionColumn, boundaryTypeMapper, processContext) ->
- splitBigIntegers(start, end),
- String.class, (BoundarySplitter) BoundarySplitterFactory::splitStrings);
+ (BoundarySplitter)
+ (start, end, partitionColumn, boundaryTypeMapper, processContext) ->
+ splitBigIntegers(start, end),
+ BigDecimal.class,
+ (BoundarySplitter)
+ (start, end, partitionColumn, boundaryTypeMapper, processContext) ->
+ splitBigDecimal(start, end),
+ String.class,
+ (BoundarySplitter) BoundarySplitterFactory::splitStrings,
+ BYTE_ARRAY_CLASS,
+ (BoundarySplitter)
+ (start, end, partitionColumn, boundaryTypeMapper, processContext) ->
+ splitBytes(start, end));
/**
* Creates {@link BoundarySplitter BoundarySplitter<T>} for pass class {@code c} such that
@@ -132,6 +144,26 @@ private static Long splitLongs(Long start, Long end) {
return (start & end) + ((start ^ end) >> 1);
}
+ private static BigDecimal splitBigDecimal(BigDecimal start, BigDecimal end) {
+ BigInteger startBigInt = (start == null) ? null : start.toBigInteger();
+ BigInteger endBigInt = (end == null) ? null : end.toBigInteger();
+ BigInteger split = splitBigIntegers(startBigInt, endBigInt);
+ if (split == null) {
+ return null;
+ }
+ return new BigDecimal(split);
+ }
+
+ private static byte[] splitBytes(byte[] start, byte[] end) {
+ BigInteger startBigInt = (start == null) ? null : new BigInteger(start);
+ BigInteger endBigInt = (end == null) ? null : new BigInteger(end);
+ BigInteger split = splitBigIntegers(startBigInt, endBigInt);
+ if (split == null) {
+ return null;
+ }
+ return split.toByteArray();
+ }
+
private static String splitStrings(
String start,
String end,
diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/SourceColumnIndexInfo.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/SourceColumnIndexInfo.java
index a73b830935..a11858d3eb 100644
--- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/SourceColumnIndexInfo.java
+++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/SourceColumnIndexInfo.java
@@ -16,10 +16,11 @@
package com.google.cloud.teleport.v2.source.reader.io.schema;
import com.google.auto.value.AutoValue;
+import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.BoundaryExtractorFactory;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
-import java.math.BigInteger;
+import java.math.BigDecimal;
import javax.annotation.Nullable;
@AutoValue
@@ -138,6 +139,7 @@ public SourceColumnIndexInfo build() {
public enum IndexType {
NUMERIC,
BIG_INT_UNSIGNED,
+ BINARY,
STRING,
DATE_TIME,
OTHER
@@ -148,5 +150,6 @@ public enum IndexType {
ImmutableMap.of(
IndexType.NUMERIC, Long.class,
IndexType.STRING, String.class,
- IndexType.BIG_INT_UNSIGNED, BigInteger.class);
+ IndexType.BIG_INT_UNSIGNED, BigDecimal.class,
+ IndexType.BINARY, BoundaryExtractorFactory.BYTE_ARRAY_CLASS);
}
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapterTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapterTest.java
index 74bd9beea3..3553edc6e5 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapterTest.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/mysql/MysqlDialectAdapterTest.java
@@ -311,8 +311,8 @@ public void testDiscoverIndexesBasic() throws SQLException, RetriableSchemaDisco
.setOrdinalPosition(2)
.setCollationReference(
CollationReference.builder()
- .setDbCharacterSet("utf8mb4")
- .setDbCollation("utf8mb4_0900_ai_ci")
+ .setDbCharacterSet("`utf8mb4`")
+ .setDbCollation("`utf8mb4_0900_ai_ci`")
.setPadSpace(false)
.build())
.setStringMaxLength(42)
@@ -323,15 +323,8 @@ public void testDiscoverIndexesBasic() throws SQLException, RetriableSchemaDisco
.setIsUnique(true)
.setIsPrimary(true)
.setCardinality(42L)
- .setIndexType(IndexType.STRING)
+ .setIndexType(IndexType.BINARY)
.setOrdinalPosition(3)
- .setCollationReference(
- CollationReference.builder()
- .setDbCharacterSet("binary")
- .setDbCollation("binary")
- .setPadSpace(false)
- .build())
- .setStringMaxLength(100)
.build(),
SourceColumnIndexInfo.builder()
.setColumnName("testColBinary")
@@ -339,15 +332,8 @@ public void testDiscoverIndexesBasic() throws SQLException, RetriableSchemaDisco
.setIsUnique(true)
.setIsPrimary(true)
.setCardinality(42L)
- .setIndexType(IndexType.STRING)
+ .setIndexType(IndexType.BINARY)
.setOrdinalPosition(4)
- .setCollationReference(
- CollationReference.builder()
- .setDbCharacterSet("binary")
- .setDbCollation("binary")
- .setPadSpace(false)
- .build())
- .setStringMaxLength(255)
.build());
final JdbcSchemaReference sourceSchemaReference =
@@ -406,8 +392,8 @@ public void testDiscoverIndexes5_7() throws SQLException, RetriableSchemaDiscove
.setOrdinalPosition(2)
.setCollationReference(
CollationReference.builder()
- .setDbCharacterSet("big5")
- .setDbCollation("big5_chinese_ci")
+ .setDbCharacterSet("`big5`")
+ .setDbCollation("`big5_chinese_ci`")
.setPadSpace(true)
.build())
.setStringMaxLength(42)
@@ -418,15 +404,8 @@ public void testDiscoverIndexes5_7() throws SQLException, RetriableSchemaDiscove
.setIsUnique(true)
.setIsPrimary(true)
.setCardinality(42L)
- .setIndexType(IndexType.STRING)
+ .setIndexType(IndexType.BINARY)
.setOrdinalPosition(3)
- .setCollationReference(
- CollationReference.builder()
- .setDbCharacterSet("binary")
- .setDbCollation("binary")
- .setPadSpace(false)
- .build())
- .setStringMaxLength(100)
.build());
final JdbcSchemaReference sourceSchemaReference =
@@ -512,11 +491,6 @@ private static void wireMockResultSet(
for (SourceColumnIndexInfo info : expectedSourceColumnIndexInfos) {
String ret =
(info.collationReference() == null) ? null : info.collationReference().dbCharacterSet();
- if (info.columnName() == "testColVarBinary") {
- // For columns like varBinary, the charset is null in information schema, but the db uses
- // "binary" charset and collation.
- ret = null;
- }
stubCharSetCol = stubCharSetCol.thenReturn(ret);
}
OngoingStubbing stubCollationCol =
@@ -524,11 +498,6 @@ private static void wireMockResultSet(
for (SourceColumnIndexInfo info : expectedSourceColumnIndexInfos) {
String ret =
(info.collationReference() == null) ? null : info.collationReference().dbCollation();
- if (info.columnName() == "testColVarBinary") {
- // For columns like varBinary, the charset is null in information schema, but the db uses
- // "binary" charset and collation.
- ret = null;
- }
stubCollationCol = stubCollationCol.thenReturn(ret);
}
OngoingStubbing stubPadSpaceCol =
@@ -538,11 +507,6 @@ private static void wireMockResultSet(
(info.collationReference() == null)
? null
: (info.collationReference().padSpace() ? "PAD SPACE" : "NO PAD");
- if (info.columnName() == "testColVarBinary") {
- // For columns like varBinary, the charset is null in information schema, but the db uses
- // "binary" charset and collation.
- ret = null;
- }
stubPadSpaceCol = stubPadSpaceCol.thenReturn(ret);
}
OngoingStubbing stubNext = when(mockResultSet.next());
@@ -762,6 +726,12 @@ private static ResultSet getMockInfoSchemaRs() throws SQLException {
.put("tiny_int_unsigned_col", new SourceColumnType("TINYINT", new Long[] {}, null))
.build());
}
+
+ @Test
+ public void testEscapeMySql() {
+ assertThat(MysqlDialectAdapter.escapeMySql("binary")).isEqualTo("`binary`");
+ assertThat(MysqlDialectAdapter.escapeMySql("`binary`")).isEqualTo("`binary`");
+ }
}
class MockRSBuilder {
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java
index da129dc20f..0e81e3ffe0 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/iowrapper/JdbcIoWrapperTest.java
@@ -43,7 +43,7 @@
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
-import java.math.BigInteger;
+import java.math.BigDecimal;
import java.sql.SQLException;
import org.apache.beam.sdk.io.jdbc.JdbcIO;
import org.apache.beam.sdk.transforms.PTransform;
@@ -467,7 +467,7 @@ public void testIndexTypeToColumnClass() {
.setCardinality(42L)
.setIsUnique(true)
.build()))
- .isEqualTo(BigInteger.class);
+ .isEqualTo(BigDecimal.class);
assertThrows(
SuitableIndexNotFoundException.class,
() ->
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundaryExtractorFactoryTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundaryExtractorFactoryTest.java
index c09ef07e2b..dc63b3713a 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundaryExtractorFactoryTest.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundaryExtractorFactoryTest.java
@@ -15,8 +15,10 @@
*/
package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range;
+import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.BoundaryExtractorFactory.BYTE_ARRAY_CLASS;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.when;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationMapper;
@@ -90,11 +92,11 @@ public void testFromIntegers() throws SQLException {
}
@Test
- public void testFromBigIntegers() throws SQLException {
+ public void testFromBigDecimals() throws SQLException {
final BigInteger unsignedBigIntMax = new BigInteger("18446744073709551615");
PartitionColumn partitionColumn =
- PartitionColumn.builder().setColumnName("col1").setColumnClass(BigInteger.class).build();
- BoundaryExtractor extractor = BoundaryExtractorFactory.create(BigInteger.class);
+ PartitionColumn.builder().setColumnName("col1").setColumnClass(BigDecimal.class).build();
+ BoundaryExtractor extractor = BoundaryExtractorFactory.create(BigDecimal.class);
when(mockResultSet.next()).thenReturn(true);
when(mockResultSet.getBigDecimal(1))
.thenReturn(new BigDecimal(BigInteger.ZERO))
@@ -103,14 +105,14 @@ public void testFromBigIntegers() throws SQLException {
when(mockResultSet.getBigDecimal(2))
.thenReturn(new BigDecimal(unsignedBigIntMax))
.thenReturn(null);
- Boundary boundaryMinMax =
+ Boundary boundaryMinMax =
extractor.getBoundary(partitionColumn, mockResultSet, null);
- Boundary boundaryNull = extractor.getBoundary(partitionColumn, mockResultSet, null);
+ Boundary boundaryNull = extractor.getBoundary(partitionColumn, mockResultSet, null);
- assertThat(boundaryMinMax.start()).isEqualTo(BigInteger.ZERO);
- assertThat(boundaryMinMax.end()).isEqualTo(unsignedBigIntMax);
+ assertThat(boundaryMinMax.start()).isEqualTo(new BigDecimal(BigInteger.ZERO));
+ assertThat(boundaryMinMax.end()).isEqualTo(new BigDecimal(unsignedBigIntMax));
assertThat(boundaryMinMax.split(null).getLeft().end())
- .isEqualTo((unsignedBigIntMax.divide(BigInteger.TWO)));
+ .isEqualTo(new BigDecimal(unsignedBigIntMax.divide(BigInteger.TWO)));
assertThat(boundaryNull.start()).isNull();
assertThat(boundaryNull.end()).isNull();
assertThat(boundaryNull.isSplittable(null)).isFalse();
@@ -128,13 +130,13 @@ public void testFromBigIntegers() throws SQLException {
public void testFromBigIntegersEmptyTable() throws SQLException {
final BigInteger unsignedBigIntMax = new BigInteger("18446744073709551615");
PartitionColumn partitionColumn =
- PartitionColumn.builder().setColumnName("col1").setColumnClass(BigInteger.class).build();
- BoundaryExtractor extractor = BoundaryExtractorFactory.create(BigInteger.class);
+ PartitionColumn.builder().setColumnName("col1").setColumnClass(BigDecimal.class).build();
+ BoundaryExtractor extractor = BoundaryExtractorFactory.create(BigDecimal.class);
when(mockResultSet.next()).thenReturn(true);
when(mockResultSet.getBigDecimal(1)).thenReturn(null);
// BigInt Unsigned Max in MySQL
when(mockResultSet.getBigDecimal(2)).thenReturn(null);
- Boundary boundary = extractor.getBoundary(partitionColumn, mockResultSet, null);
+ Boundary boundary = extractor.getBoundary(partitionColumn, mockResultSet, null);
assertThat(boundary.start()).isNull();
assertThat(boundary.end()).isNull();
@@ -207,6 +209,35 @@ public String unMapStringFromBigInteger(
null));
}
+ @Test
+ public void testFromBinary() throws SQLException {
+ final BigInteger unsignedBigIntMax = new BigInteger("18446744073709551615");
+ PartitionColumn partitionColumn =
+ PartitionColumn.builder().setColumnName("col1").setColumnClass(BYTE_ARRAY_CLASS).build();
+ BoundaryExtractor extractor = BoundaryExtractorFactory.create(BYTE_ARRAY_CLASS);
+ when(mockResultSet.next()).thenReturn(true);
+ doReturn(BigInteger.ZERO.toByteArray()).doReturn(null).when(mockResultSet).getBytes(1);
+ doReturn(unsignedBigIntMax.toByteArray()).doReturn(null).when(mockResultSet).getBytes(2);
+ Boundary boundaryMinMax = extractor.getBoundary(partitionColumn, mockResultSet, null);
+ Boundary boundaryNull = extractor.getBoundary(partitionColumn, mockResultSet, null);
+
+ assertThat(boundaryMinMax.start()).isEqualTo(BigInteger.ZERO.toByteArray());
+ assertThat(boundaryMinMax.end()).isEqualTo(unsignedBigIntMax.toByteArray());
+ assertThat(boundaryMinMax.split(null).getLeft().end())
+ .isEqualTo((unsignedBigIntMax.divide(BigInteger.TWO).toByteArray()));
+ assertThat(boundaryNull.start()).isNull();
+ assertThat(boundaryNull.end()).isNull();
+ assertThat(boundaryNull.isSplittable(null)).isFalse();
+ // Mismatched Type
+ assertThrows(
+ IllegalArgumentException.class,
+ () ->
+ extractor.getBoundary(
+ PartitionColumn.builder().setColumnName("col1").setColumnClass(long.class).build(),
+ mockResultSet,
+ null));
+ }
+
@Test
public void testFromUnsupported() {
assertThrows(
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundarySplitterFactoryTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundarySplitterFactoryTest.java
index b6716f3814..2e28ff427e 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundarySplitterFactoryTest.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/range/BoundarySplitterFactoryTest.java
@@ -15,11 +15,13 @@
*/
package com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range;
+import static com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.BoundaryExtractorFactory.BYTE_ARRAY_CLASS;
import static com.google.common.truth.Truth.assertThat;
import static org.junit.Assert.assertThrows;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationMapper;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
+import java.math.BigDecimal;
import java.math.BigInteger;
import java.util.Map;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
@@ -94,6 +96,56 @@ public void testBigIntegerBoundarySplitter() {
.isEqualTo(BigInteger.valueOf(21L));
}
+ @Test
+ public void testBigDecimalBoundarySplitter() {
+ BoundarySplitter splitter = BoundarySplitterFactory.create(BigDecimal.class);
+ BigDecimal start =
+ new BigDecimal(BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.valueOf(10L)));
+ BigDecimal startByTwo =
+ new BigDecimal(BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.valueOf(5L)));
+ BigDecimal end =
+ new BigDecimal(BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.valueOf(20L)));
+ BigDecimal mid =
+ new BigDecimal(BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.valueOf(15L)));
+ BigDecimal zero = new BigDecimal(BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.ZERO));
+ BigDecimal negOne = new BigDecimal(BigInteger.valueOf(-1L));
+ BigDecimal longMax = new BigDecimal(BigInteger.valueOf(Long.MAX_VALUE));
+ BigDecimal longMin = new BigDecimal(BigInteger.valueOf(Long.MIN_VALUE));
+ BigDecimal fortyTwo = new BigDecimal(BigInteger.valueOf(42L));
+ BigDecimal twentyOne = new BigDecimal(BigInteger.valueOf(21L));
+
+ assertThat(splitter.getSplitPoint(start, end, null, null, null)).isEqualTo(mid);
+ assertThat(splitter.getSplitPoint(start, zero, null, null, null)).isEqualTo(startByTwo);
+ assertThat(splitter.getSplitPoint(longMin, longMax, null, null, null)).isEqualTo(negOne);
+ assertThat(splitter.getSplitPoint(null, null, null, null, null)).isNull();
+ assertThat(splitter.getSplitPoint(fortyTwo, null, null, null, null)).isEqualTo(twentyOne);
+ assertThat(splitter.getSplitPoint(null, fortyTwo, null, null, null)).isEqualTo(twentyOne);
+ }
+
+ @Test
+ public void testBytesIntegerBoundarySplitter() {
+ BoundarySplitter splitter = BoundarySplitterFactory.create(BYTE_ARRAY_CLASS);
+ byte[] start =
+ BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.valueOf(10L)).toByteArray();
+ byte[] startByTwo =
+ BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.valueOf(5L)).toByteArray();
+ byte[] end = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.valueOf(20L)).toByteArray();
+ byte[] mid = BigInteger.valueOf(Long.MAX_VALUE).multiply(BigInteger.valueOf(15L)).toByteArray();
+ byte[] zero = BigInteger.ZERO.toByteArray();
+ byte[] negOne = BigInteger.valueOf(-1L).toByteArray();
+ byte[] longMax = BigInteger.valueOf(Long.MAX_VALUE).toByteArray();
+ byte[] longMin = BigInteger.valueOf(Long.MIN_VALUE).toByteArray();
+ byte[] fortyTwo = BigInteger.valueOf(42L).toByteArray();
+ byte[] twentyOne = BigInteger.valueOf(21L).toByteArray();
+
+ assertThat(splitter.getSplitPoint(start, end, null, null, null)).isEqualTo(mid);
+ assertThat(splitter.getSplitPoint(start, zero, null, null, null)).isEqualTo(startByTwo);
+ assertThat(splitter.getSplitPoint(longMax, longMin, null, null, null)).isEqualTo(negOne);
+ assertThat(splitter.getSplitPoint(null, null, null, null, null)).isNull();
+ assertThat(splitter.getSplitPoint(fortyTwo, null, null, null, null)).isEqualTo(twentyOne);
+ assertThat(splitter.getSplitPoint(null, fortyTwo, null, null, null)).isEqualTo(twentyOne);
+ }
+
@Test
public void testStringBoundarySplitter() {
diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLDataTypesIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLDataTypesIT.java
index 864f211096..64841ecb28 100644
--- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLDataTypesIT.java
+++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLDataTypesIT.java
@@ -228,11 +228,29 @@ private Map>> getExpectedData() {
expectedData.put("set", createRows("set", "v1,v2", "NULL"));
expectedData.put(
"integer_unsigned", createRows("integer_unsigned", "0", "42", "4294967295", "NULL"));
- // TODO(Vardhanvthigle): Debug thisfurthur
- /*
+ expectedData.put(
+ "bigint_pk", createRows("bigint_pk", "-9223372036854775808", "0", "9223372036854775807"));
expectedData.put(
"bigint_unsigned_pk", createRows("bigint_unsigned_pk", "0", "42", "18446744073709551615"));
- */
+ expectedData.put("int_pk", createRows("int_pk", "-2147483648", "0", "2147483647"));
+ expectedData.put("int_unsigned_pk", createRows("int_unsigned_pk", "0", "42", "4294967295"));
+ expectedData.put("medium_int_pk", createRows("medium_int_pk", "-8388608", "0", "8388607"));
+ expectedData.put(
+ "medium_int_unsigned_pk", createRows("medium_int_unsigned_pk", "0", "42", "16777215"));
+ expectedData.put("small_int_pk", createRows("small_int_pk", "-32768", "0", "32767"));
+ expectedData.put(
+ "small_int_unsigned_pk", createRows("small_int_unsigned_pk", "0", "42", "65535"));
+ expectedData.put("tiny_int_pk", createRows("tiny_int_pk", "-128", "0", "127"));
+ expectedData.put("tiny_int_unsigned_pk", createRows("tiny_int_unsigned_pk", "0", "42", "255"));
+ // The binary column is padded with 0s
+ expectedData.put(
+ "binary_pk",
+ createRows("binary_pk", "AAAAAAAAAAAAAAAAAAAAAAAAAAA=", "gAAAAAAAAAAAAAAAAAAAAAAAAAA="));
+ expectedData.put("varbinary_pk", createRows("varbinary_pk", "AA==", "gAAAAAAAAAA="));
+ expectedData.put("tiny_blob_pk", createRows("tiny_blob_pk", "AA==", "gAAAAAAAAAA="));
+ expectedData.put("char_pk", createRows("char_pk", "AA==", "gAAAAAAAAAA="));
+ expectedData.put("varchar_pk", createRows("varchar_pk", "AA==", "gAAAAAAAAAA="));
+ expectedData.put("tiny_text_pk", createRows("tiny_text_pk", "AA==", "gAAAAAAAAAA="));
return expectedData;
}
diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql-data-types.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql-data-types.sql
index 75c125d6e2..fc1b077739 100644
--- a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql-data-types.sql
+++ b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql-data-types.sql
@@ -178,11 +178,88 @@ CREATE TABLE set_table (
);
+CREATE TABLE bigint_pk_table (
+ id BIGINT PRIMARY KEY,
+ bigint_pk_col BIGINT NOT NULL
+);
+
CREATE TABLE bigint_unsigned_pk_table (
id BIGINT UNSIGNED PRIMARY KEY,
bigint_unsigned_pk_col BIGINT UNSIGNED NOT NULL
);
+CREATE TABLE int_pk_table (
+ id INT PRIMARY KEY,
+ int_pk_col INT NOT NULL
+);
+
+CREATE TABLE int_unsigned_pk_table (
+ id INT UNSIGNED PRIMARY KEY,
+ int_unsigned_pk_col INT UNSIGNED NOT NULL
+);
+
+CREATE TABLE medium_int_pk_table (
+ id MEDIUMINT PRIMARY KEY,
+ medium_int_pk_col MEDIUMINT NOT NULL
+);
+
+CREATE TABLE medium_int_unsigned_pk_table (
+ id MEDIUMINT UNSIGNED PRIMARY KEY,
+ medium_int_unsigned_pk_col MEDIUMINT UNSIGNED NOT NULL
+);
+
+CREATE TABLE small_int_pk_table (
+ id SMALLINT PRIMARY KEY,
+ small_int_pk_col SMALLINT NOT NULL
+);
+
+CREATE TABLE small_int_unsigned_pk_table (
+ id SMALLINT UNSIGNED PRIMARY KEY,
+ small_int_unsigned_pk_col SMALLINT UNSIGNED NOT NULL
+);
+
+CREATE TABLE tiny_int_pk_table (
+ id TINYINT PRIMARY KEY,
+ tiny_int_pk_col TINYINT NOT NULL
+);
+
+CREATE TABLE tiny_int_unsigned_pk_table (
+ id TINYINT UNSIGNED PRIMARY KEY,
+ tiny_int_unsigned_pk_col TINYINT UNSIGNED NOT NULL
+);
+
+CREATE TABLE binary_pk_table (
+ id BINARY(20) PRIMARY KEY,
+ binary_pk_col BINARY(20) NOT NULL
+);
+
+CREATE TABLE varbinary_pk_table (
+ id VARBINARY(20) PRIMARY KEY,
+ varbinary_pk_col VARBINARY(20) NOT NULL
+);
+
+CREATE TABLE tiny_blob_pk_table (
+ id TINYBLOB,
+ tiny_blob_pk_col TINYBLOB NOT NULL,
+ CONSTRAINT PRIMARY KEY (id(20))
+);
+
+CREATE TABLE char_pk_table (
+ id CHAR(20) PRIMARY KEY,
+ char_pk_col CHAR(20) NOT NULL
+) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci;
+
+CREATE TABLE varchar_pk_table (
+ id VARCHAR(20) PRIMARY KEY,
+ varchar_pk_col VARCHAR(20) NOT NULL
+) CHARACTER SET utf8mb3 COLLATE utf8mb3_general_ci;
+
+CREATE TABLE tiny_text_pk_table (
+ id TINYTEXT,
+ tiny_text_pk_col TINYTEXT NOT NULL,
+ CONSTRAINT PRIMARY KEY (id(20))
+);
+
ALTER TABLE `bigint_table` MODIFY `id` INT AUTO_INCREMENT;
ALTER TABLE `bigint_unsigned_table` MODIFY `id` INT AUTO_INCREMENT;
ALTER TABLE `binary_table` MODIFY `id` INT AUTO_INCREMENT;
@@ -303,7 +380,22 @@ INSERT INTO `year_table` (`year_col`) VALUES (2022);
INSERT INTO `year_table` (`year_col`) VALUES (1901);
INSERT INTO `year_table` (`year_col`) VALUES (2155);
INSERT INTO `set_table` (`set_col`) VALUES ('v1,v2');
+INSERT INTO `bigint_pk_table` (`id`, `bigint_pk_col`) VALUES ('-9223372036854775808', '-9223372036854775808'), ('0', '0'), ('9223372036854775807', '9223372036854775807');
INSERT INTO `bigint_unsigned_pk_table` (`id`, `bigint_unsigned_pk_col`) VALUES ('0', '0'), ('42', '42'), ('18446744073709551615', '18446744073709551615');
+INSERT INTO `int_pk_table` (`id`, `int_pk_col`) VALUES ('-2147483648', '-2147483648'), ('0', '0'), ('2147483647', '2147483647');
+INSERT INTO `int_unsigned_pk_table` (`id`, `int_unsigned_pk_col`) VALUES ('0', '0'), ('42', '42'), ('4294967295', '4294967295');
+INSERT INTO `medium_int_pk_table` (`id`, `medium_int_pk_col`) VALUES ('-8388608', '-8388608'), ('0', '0'), ('8388607', '8388607');
+INSERT INTO `medium_int_unsigned_pk_table` (`id`, `medium_int_unsigned_pk_col`) VALUES ('0', '0'), ('42', '42'), ('16777215', '16777215');
+INSERT INTO `small_int_pk_table` (`id`, `small_int_pk_col`) VALUES ('-32768', '-32768'), ('0', '0'), ('32767', '32767');
+INSERT INTO `small_int_unsigned_pk_table` (`id`, `small_int_unsigned_pk_col`) VALUES ('0', '0'), ('42', '42'), ('65535', '65535');
+INSERT INTO `tiny_int_pk_table` (`id`, `tiny_int_pk_col`) VALUES ('-128', '-128'), ('0', '0'), ('127', '127');
+INSERT INTO `tiny_int_unsigned_pk_table` (`id`, `tiny_int_unsigned_pk_col`) VALUES ('0', '0'), ('42', '42'), ('255', '255');
+INSERT INTO `binary_pk_table` (`id`, `binary_pk_col`) VALUES (FROM_BASE64('AA=='), FROM_BASE64('AA==')), (FROM_BASE64('gAAAAAAAAAA='), FROM_BASE64('gAAAAAAAAAA='));
+INSERT INTO `varbinary_pk_table` (`id`, `varbinary_pk_col`) VALUES (FROM_BASE64('AA=='), FROM_BASE64('AA==')), (FROM_BASE64('gAAAAAAAAAA='), FROM_BASE64('gAAAAAAAAAA='));
+INSERT INTO `tiny_blob_pk_table` (`id`, `tiny_blob_pk_col`) VALUES (FROM_BASE64('AA=='), FROM_BASE64('AA==')), (FROM_BASE64('gAAAAAAAAAA='), FROM_BASE64('gAAAAAAAAAA='));
+INSERT INTO `char_pk_table` (`id`, `char_pk_col`) VALUES ('AA==', 'AA=='), ('gAAAAAAAAAA=', 'gAAAAAAAAAA=');
+INSERT INTO `varchar_pk_table` (`id`, `varchar_pk_col`) VALUES ('AA==', 'AA=='), ('gAAAAAAAAAA=', 'gAAAAAAAAAA=');
+INSERT INTO `tiny_text_pk_table` (`id`, `tiny_text_pk_col`) VALUES ('AA==', 'AA=='), ('gAAAAAAAAAA=', 'gAAAAAAAAAA=');
INSERT INTO `bigint_table` (`bigint_col`) VALUES (NULL);
INSERT INTO `bigint_unsigned_table` (`bigint_unsigned_col`) VALUES (NULL);
diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql-spanner-schema.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql-spanner-schema.sql
index 01a9ba83cb..9dad10412f 100644
--- a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql-spanner-schema.sql
+++ b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql-spanner-schema.sql
@@ -209,7 +209,84 @@ CREATE TABLE spatial_polygon (
id INT64 NOT NULL,
area STRING(MAX),
) PRIMARY KEY(id);
+
+CREATE TABLE bigint_pk_table (
+ id INT64 NOT NULL,
+ bigint_pk_col INT64 NOT NULL,
+) PRIMARY KEY(id);
+
CREATE TABLE bigint_unsigned_pk_table (
id NUMERIC NOT NULL,
bigint_unsigned_pk_col NUMERIC NOT NULL,
) PRIMARY KEY(id);
+
+CREATE TABLE int_pk_table (
+ id INT64 NOT NULL,
+ int_pk_col INT64 NOT NULL,
+) PRIMARY KEY(id);
+
+CREATE TABLE int_unsigned_pk_table (
+ id INT64 NOT NULL,
+ int_unsigned_pk_col INT64 NOT NULL,
+) PRIMARY KEY(id);
+
+CREATE TABLE medium_int_pk_table (
+ id INT64 NOT NULL,
+ medium_int_pk_col INT64 NOT NULL,
+) PRIMARY KEY(id);
+
+CREATE TABLE medium_int_unsigned_pk_table (
+ id INT64 NOT NULL,
+ medium_int_unsigned_pk_col INT64 NOT NULL,
+) PRIMARY KEY(id);
+
+CREATE TABLE small_int_pk_table (
+ id INT64 NOT NULL,
+ small_int_pk_col INT64 NOT NULL,
+) PRIMARY KEY(id);
+
+CREATE TABLE small_int_unsigned_pk_table (
+ id INT64 NOT NULL,
+ small_int_unsigned_pk_col INT64 NOT NULL,
+) PRIMARY KEY(id);
+
+CREATE TABLE tiny_int_pk_table (
+ id INT64 NOT NULL,
+ tiny_int_pk_col INT64 NOT NULL,
+) PRIMARY KEY(id);
+
+CREATE TABLE tiny_int_unsigned_pk_table (
+ id INT64 NOT NULL,
+ tiny_int_unsigned_pk_col INT64 NOT NULL,
+) PRIMARY KEY(id);
+
+CREATE TABLE binary_pk_table (
+ id BYTES(20) NOT NULL,
+ binary_pk_col BYTES(20) NOT NULL,
+) PRIMARY KEY(id);
+
+
+CREATE TABLE varbinary_pk_table (
+ id BYTES(20) NOT NULL,
+ varbinary_pk_col BYTES(20) NOT NULL,
+) PRIMARY KEY(id);
+
+CREATE TABLE tiny_blob_pk_table (
+ id BYTES(20) NOT NULL,
+ tiny_blob_pk_col BYTES(20) NOT NULL,
+) PRIMARY KEY(id);
+
+CREATE TABLE char_pk_table (
+ id STRING(20) NOT NULL,
+ char_pk_col STRING(20) NOT NULL,
+) PRIMARY KEY(id);
+
+CREATE TABLE varchar_pk_table (
+ id STRING(20) NOT NULL,
+ varchar_pk_col STRING(20) NOT NULL,
+) PRIMARY KEY(id);
+
+CREATE TABLE tiny_text_pk_table (
+ id STRING(20) NOT NULL,
+ tiny_text_pk_col STRING(20) NOT NULL,
+) PRIMARY KEY(id);