From cd28d010b9f2a453accab3069b21d0355db5ef76 Mon Sep 17 00:00:00 2001 From: Vardhan Vinay Thigle <39047439+VardhanThigle@users.noreply.github.com> Date: Sat, 14 Dec 2024 13:16:23 +0000 Subject: [PATCH] Cassandra Schema and Value Mapping (#2048) --- .../FormatDatastreamRecordToJson.java | 51 +++++ .../FormatDatastreamRecordToJsonTest.java | 128 ++++++++++++ v2/sourcedb-to-spanner/pom.xml | 10 + .../cassandra/mappings/CassandraMappings.java | 59 ++++++ .../mappings/CassandraMappingsProvider.java | 135 +++++++++++++ .../io/cassandra/mappings/package-info.java | 18 ++ .../rowmapper/CassandraFieldMapper.java | 43 ++++ .../rowmapper/CassandraRowMapper.java | 85 ++++++++ .../rowmapper/CassandraRowValueExtractor.java | 35 ++++ .../rowmapper/CassandraRowValueMapper.java | 35 ++++ .../rowmapper/CassandraSourceRowMapper.java | 73 +++++++ .../io/cassandra/rowmapper/package-info.java | 17 ++ .../schema/CassandraSchemaDiscovery.java | 6 +- .../schema/typemapping/UnifiedTypeMapper.java | 15 +- .../provider/unified/CustomSchema.java | 56 ++++++ .../unified/UnifiedMappingProvider.java | 3 + .../CassandraSourceRowMapperTest.java | 189 ++++++++++++++++++ .../schema/CassandraSchemaDiscoveryTest.java | 8 +- .../cassandra/testutils/BasicTestSchema.java | 17 +- .../test/resources/CassandraUT/basicTest.cql | 157 ++++++++++++++- .../avro/GenericRecordTypeConvertor.java | 38 ++++ .../migrations/avro/AvroTestingHelper.java | 40 ++++ .../avro/GenericRecordTypeConvertorTest.java | 95 +++++++++ 23 files changed, 1298 insertions(+), 15 deletions(-) create mode 100644 v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappings.java create mode 100644 v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappingsProvider.java create mode 100644 v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/package-info.java create mode 100644 v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraFieldMapper.java create mode 100644 v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowMapper.java create mode 100644 v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueExtractor.java create mode 100644 v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueMapper.java create mode 100644 v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapper.java create mode 100644 v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/package-info.java create mode 100644 v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapperTest.java diff --git a/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java b/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java index 6b96ba25a5..ff5ef5d5ea 100644 --- a/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java +++ b/v2/datastream-common/src/main/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJson.java @@ -25,6 +25,7 @@ import java.time.Duration; import java.time.Instant; import java.time.LocalDate; +import java.time.Period; import java.time.ZoneId; import java.time.ZoneOffset; import java.time.ZonedDateTime; @@ -43,6 +44,7 @@ import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -563,6 +565,48 @@ static void handleDatastreamRecordType( .withZoneSameInstant(ZoneId.of("UTC")) .format(DEFAULT_TIMESTAMP_WITH_TZ_FORMATTER)); break; + /* + * The `intervalNano` maps to nano second precision interval type used by Cassandra Interval. + * On spanner this will map to `string` or `Interval` type. + * This is added here for DQL retrials for sourcedb-to-spanner. + * + * TODO(b/383689307): + * There's a lot of commonality in handling avro types between {@link FormatDatastreamRecordToJson} and {@link com.google.cloud.teleport.v2.spanner.migrations.avro.GenericRecordTypeConvertor}. + * Adding inter-package dependency might not be the best route, and we might eventually want to build a common package for handling common logic between the two. + */ + case "intervalNano": + Period period = + Period.ZERO + .plusYears(getOrDefault(element, "years", 0L)) + .plusMonths(getOrDefault(element, "months", 0L)) + .plusDays(getOrDefault(element, "days", 0L)); + /* + * Convert the period to a ISO-8601 period formatted String, such as P6Y3M1D. + * A zero period will be represented as zero days, 'P0D'. + * Refer to javadoc for Period#toString. + */ + String periodIso8061 = period.toString(); + java.time.Duration duration = + java.time.Duration.ZERO + .plusHours(getOrDefault(element, "hours", 0L)) + .plusMinutes(getOrDefault(element, "minutes", 0L)) + .plusSeconds(getOrDefault(element, "seconds", 0L)) + .plusNanos(getOrDefault(element, "nanos", 0L)); + /* + * Convert the duration to a ISO-8601 period formatted String, such as PT8H6M12.345S + * refer to javadoc for Duration#toString. + */ + String durationIso8610 = duration.toString(); + // Convert to ISO-8601 period format. + String convertedIntervalNano; + if (duration.isZero()) { + convertedIntervalNano = periodIso8061; + } else { + convertedIntervalNano = + periodIso8061 + StringUtils.removeStartIgnoreCase(durationIso8610, "P"); + } + jsonObject.put(fieldName, convertedIntervalNano); + break; default: LOG.warn( "Unknown field type {} for field {} in record {}.", fieldSchema, fieldName, element); @@ -578,5 +622,12 @@ static void handleDatastreamRecordType( break; } } + + private static T getOrDefault(GenericRecord element, String name, T def) { + if (element.get(name) == null) { + return def; + } + return (T) element.get(name); + } } } diff --git a/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJsonTest.java b/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJsonTest.java index 6ce9d2e818..71e31b65bc 100644 --- a/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJsonTest.java +++ b/v2/datastream-common/src/test/java/com/google/cloud/teleport/v2/datastream/transforms/FormatDatastreamRecordToJsonTest.java @@ -18,17 +18,21 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.JsonNodeFactory; import com.fasterxml.jackson.databind.node.ObjectNode; +import com.google.cloud.teleport.v2.datastream.transforms.FormatDatastreamRecordToJson.UnifiedTypesFormatter; import java.io.File; import java.io.IOException; import java.net.URISyntaxException; import java.net.URL; import org.apache.avro.LogicalTypes; import org.apache.avro.Schema; +import org.apache.avro.SchemaBuilder; import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.DatumReader; @@ -211,4 +215,128 @@ public void testLogicalType_micros() { fieldNamePositiveNumber, fieldSchema, element, jsonObject); assertTrue(jsonObject.get(fieldNamePositiveNumber).asText().equals("1981-11-21T11:45:11Z")); } + + @Test + public void testIntervalNano() throws JsonProcessingException { + + ObjectNode objectNode = new ObjectNode(new JsonNodeFactory(true)); + + /* Basic Test. */ + UnifiedTypesFormatter.handleDatastreamRecordType( + "basic", + generateIntervalNanosSchema(), + generateIntervalNanosRecord(1000L, 1000L, 3890L, 25L, 331L, 12L, 9L), + objectNode); + + /* Test with any field set as null gets treated as 0. */ + + UnifiedTypesFormatter.handleDatastreamRecordType( + "null_minute", + generateIntervalNanosSchema(), + generateIntervalNanosRecord(1000L, 1000L, 3890L, 25L, null, 12L, 9L), + objectNode); + + /* Basic test for negative field. */ + + UnifiedTypesFormatter.handleDatastreamRecordType( + "neg_field_basic", + generateIntervalNanosSchema(), + generateIntervalNanosRecord(1000L, -1000L, 3890L, 25L, 31L, 12L, 9L), + objectNode); + + /* Test that negative nanos subtract from the fractional seconds, for example 12 Seconds -1 Nanos becomes 11.999999991s. */ + UnifiedTypesFormatter.handleDatastreamRecordType( + "neg_fractional_seconds", + generateIntervalNanosSchema(), + generateIntervalNanosRecord(1000L, 31L, 3890L, 25L, 31L, 12L, -9L), + objectNode); + + /* Test 0 interval. */ + UnifiedTypesFormatter.handleDatastreamRecordType( + "zero_interval", + generateIntervalNanosSchema(), + generateIntervalNanosRecord(0L, 0L, 0L, 0L, 0L, 0L, 0L), + objectNode); + + /* Test almost zero interval with only nanos set. */ + UnifiedTypesFormatter.handleDatastreamRecordType( + "one_nano_interval", + generateIntervalNanosSchema(), + generateIntervalNanosRecord(0L, 0L, 0L, 0L, 0L, 0L, 1L), + objectNode); + /* Test with large values. */ + UnifiedTypesFormatter.handleDatastreamRecordType( + "large_values", + generateIntervalNanosSchema(), + generateIntervalNanosRecord( + 2147483647L, 11L, 2147483647L, 2147483647L, 2147483647L, 2147483647L, 999999999L), + objectNode); + + /* Test with large negative values. */ + UnifiedTypesFormatter.handleDatastreamRecordType( + "large_negative_values", + generateIntervalNanosSchema(), + generateIntervalNanosRecord( + -2147483647L, + -11L, + -2147483647L, + -2147483647L, + -2147483647L, + -2147483647L, + -999999999L), + objectNode); + String expected = + "{\"basic\":\"P1000Y1000M3890DT30H31M12.000000009S\"," + + "\"null_minute\":\"P1000Y1000M3890DT25H12.000000009S\"," + + "\"neg_field_basic\":\"P1000Y-1000M3890DT25H31M12.000000009S\"," + + "\"neg_fractional_seconds\":\"P1000Y31M3890DT25H31M11.999999991S\"," + + "\"zero_interval\":\"P0D\"," + + "\"one_nano_interval\":\"P0DT0.000000001S\"," + + "\"large_values\":\"P2147483647Y11M2147483647DT2183871564H21M7.999999999S\"," + + "\"large_negative_values\":\"P-2147483647Y-11M-2147483647DT-2183871564H-21M-7.999999999S\"}"; + assertEquals(expected, new ObjectMapper().writeValueAsString(objectNode)); + } + + private GenericRecord generateIntervalNanosRecord( + Long years, Long months, Long days, Long hours, Long minutes, Long seconds, Long nanos) { + + GenericRecord genericRecord = new GenericData.Record(generateIntervalNanosSchema()); + genericRecord.put("years", years); + genericRecord.put("months", months); + genericRecord.put("days", days); + genericRecord.put("hours", hours); + genericRecord.put("minutes", minutes); + genericRecord.put("seconds", seconds); + genericRecord.put("nanos", nanos); + return genericRecord; + } + + private Schema generateIntervalNanosSchema() { + + return SchemaBuilder.builder() + .record("intervalNano") + .fields() + .name("years") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name("months") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name("days") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name("hours") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name("minutes") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name("seconds") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name("nanos") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .endRecord(); + } } diff --git a/v2/sourcedb-to-spanner/pom.xml b/v2/sourcedb-to-spanner/pom.xml index ce040f6115..52b002a0a2 100644 --- a/v2/sourcedb-to-spanner/pom.xml +++ b/v2/sourcedb-to-spanner/pom.xml @@ -142,5 +142,15 @@ 5.0.0 test + + org.apache.beam + beam-sdks-java-io-cassandra + + + org.apache.commons + commons-collections4 + 4.1 + compile + diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappings.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappings.java new file mode 100644 index 0000000000..7385d04c3a --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappings.java @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.mappings; + +import com.google.auto.value.AutoValue; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraFieldMapper; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueExtractor; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueMapper; +import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapping; +import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.UnifiedMappingProvider; +import com.google.common.collect.ImmutableMap; + +/** Represent Unified type mapping, value extractor and value mappings for Cassandra. */ +@AutoValue +public abstract class CassandraMappings { + public abstract ImmutableMap typeMapping(); + + public abstract ImmutableMap> fieldMapping(); + + public static Builder builder() { + return new AutoValue_CassandraMappings.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + abstract ImmutableMap.Builder typeMappingBuilder(); + + abstract ImmutableMap.Builder> fieldMappingBuilder(); + + public Builder put( + String cassandraType, + UnifiedMappingProvider.Type type, + CassandraRowValueExtractor rowValueExtractor, + CassandraRowValueMapper rowValueMapper) { + this.typeMappingBuilder() + .put(cassandraType.toUpperCase(), UnifiedMappingProvider.getMapping(type)); + this.fieldMappingBuilder() + .put( + cassandraType.toUpperCase(), + CassandraFieldMapper.create(rowValueExtractor, rowValueMapper)); + return this; + } + + public abstract CassandraMappings build(); + } +} diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappingsProvider.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappingsProvider.java new file mode 100644 index 0000000000..dd89bf16bf --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/CassandraMappingsProvider.java @@ -0,0 +1,135 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.mappings; + +import com.datastax.driver.core.Duration; +import com.datastax.driver.core.LocalDate; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.TypeCodec; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraFieldMapper; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueExtractor; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraRowValueMapper; +import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapping; +import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.IntervalNano; +import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.UnifiedMappingProvider; +import com.google.common.collect.ImmutableMap; +import java.nio.ByteBuffer; +import java.util.Date; +import org.apache.avro.LogicalTypes; +import org.apache.avro.generic.GenericRecordBuilder; +import org.apache.commons.codec.binary.Hex; + +public class CassandraMappingsProvider { + /** Pass the value as is to avro. */ + private static final CassandraRowValueMapper valuePassThrough = (value, schema) -> value; + + /** Pass the value as a string to avro. */ + private static final CassandraRowValueMapper toString = (value, schema) -> value.toString(); + + /** Pass the value as an integer to avro. */ + private static final CassandraRowValueMapper toInt = (value, schema) -> value.intValue(); + + /** Map {@link ByteBuffer} to a Hex encoded String. */ + private static final CassandraRowValueMapper ByteBufferToHexString = + (value, schema) -> new String(Hex.encodeHex(value.array())); + + /** + * Map {@link LocalDate} to {@link LogicalTypes.Date}. Cassandra Date type encodes number of days + * since epoch, without any time or time zone component. + * + *

See: types + * for additional information on date type. + */ + private static final CassandraRowValueMapper localDateToAvroLogicalDate = + (value, schema) -> value.getDaysSinceEpoch(); + + private static final CassandraRowValueExtractor getDuration = + (row, name) -> row.get(name, TypeCodec.duration()); + + private static final CassandraRowValueMapper durationToAvro = + (value, schema) -> + new GenericRecordBuilder(IntervalNano.SCHEMA) + .set(IntervalNano.MONTHS_FIELD_NAME, value.getMonths()) + .set(IntervalNano.DAYS_FIELD_NAME, value.getDays()) + .set(IntervalNano.NANOS_FIELD_NAME, value.getNanoseconds()) + .build(); + + /** + * Cassandra represents `Time` field as 64 bit singed integer representing number of nanoseconds + * since midnight. See types documentation + * for further details. + */ + private static final CassandraRowValueMapper cassandraTimeToIntervalNano = + (value, schema) -> + new GenericRecordBuilder(IntervalNano.SCHEMA) + .set(IntervalNano.NANOS_FIELD_NAME, value) + .build(); + + private static final CassandraRowValueMapper dateToAvro = + (value, schema) -> value.getTime() * 1000L; + + private static final CassandraMappings CASSANDRA_MAPPINGS = + CassandraMappings.builder() + .put("ASCII", UnifiedMappingProvider.Type.STRING, Row::getString, valuePassThrough) + .put("BIGINT", UnifiedMappingProvider.Type.LONG, Row::getLong, valuePassThrough) + .put("BLOB", UnifiedMappingProvider.Type.STRING, Row::getBytes, ByteBufferToHexString) + .put("BOOLEAN", UnifiedMappingProvider.Type.BOOLEAN, Row::getBool, valuePassThrough) + .put("COUNTER", UnifiedMappingProvider.Type.LONG, Row::getLong, valuePassThrough) + .put("DATE", UnifiedMappingProvider.Type.DATE, Row::getDate, localDateToAvroLogicalDate) + // The Cassandra decimal does not have precision and scale fixed in the + // schema which would be needed if we want to map it to Avro Decimal. + .put("DECIMAL", UnifiedMappingProvider.Type.STRING, Row::getDecimal, toString) + .put("DOUBLE", UnifiedMappingProvider.Type.DOUBLE, Row::getDouble, valuePassThrough) + .put("DURATION", UnifiedMappingProvider.Type.INTERVAL_NANO, getDuration, durationToAvro) + .put("FLOAT", UnifiedMappingProvider.Type.FLOAT, Row::getFloat, valuePassThrough) + .put("INET", UnifiedMappingProvider.Type.STRING, Row::getInet, toString) + .put("INT", UnifiedMappingProvider.Type.INTEGER, Row::getInt, valuePassThrough) + .put("SMALLINT", UnifiedMappingProvider.Type.INTEGER, Row::getShort, toInt) + .put("TEXT", UnifiedMappingProvider.Type.STRING, Row::getString, valuePassThrough) + .put( + "TIME", + UnifiedMappingProvider.Type.INTERVAL_NANO, + Row::getTime, + cassandraTimeToIntervalNano) + .put("TIMESTAMP", UnifiedMappingProvider.Type.TIMESTAMP, Row::getTimestamp, dateToAvro) + .put("TIMEUUID", UnifiedMappingProvider.Type.STRING, Row::getUUID, toString) + .put("TINYINT", UnifiedMappingProvider.Type.INTEGER, Row::getByte, toInt) + .put("UUID", UnifiedMappingProvider.Type.STRING, Row::getUUID, toString) + .put("VARCHAR", UnifiedMappingProvider.Type.STRING, Row::getString, valuePassThrough) + .put("VARINT", UnifiedMappingProvider.Type.NUMBER, Row::getVarint, toString) + .put( + "UNSUPPORTED", + UnifiedMappingProvider.Type.UNSUPPORTED, + (row, name) -> null, + (value, schema) -> null) + .build(); + + private CassandraMappingsProvider() {} + + /** Mappings for unified type interface. */ + public static ImmutableMap getMapping() { + return CASSANDRA_MAPPINGS.typeMapping(); + } + + /** + * Field Mappers for {@link + * com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper.CassandraSourceRowMapper}. + */ + public static ImmutableMap> getFieldMapping() { + return CASSANDRA_MAPPINGS.fieldMapping(); + } +} diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/package-info.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/package-info.java new file mode 100644 index 0000000000..a690b8a56d --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/mappings/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2024 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** Schema and Value mapping for Cassandra. */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.mappings; diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraFieldMapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraFieldMapper.java new file mode 100644 index 0000000000..c8fb0cd393 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraFieldMapper.java @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper; + +import com.datastax.driver.core.Row; +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.avro.Schema; + +@AutoValue +public abstract class CassandraFieldMapper implements Serializable { + + public static CassandraFieldMapper create( + CassandraRowValueExtractor rowValueExtractor, CassandraRowValueMapper rowValueMapper) { + return new AutoValue_CassandraFieldMapper(rowValueExtractor, rowValueMapper); + } + + public Object mapValue(Row row, String fieldName, Schema fieldSchema) { + T extractedValue = rowValueExtractor().extract(row, fieldName); + if (extractedValue == null) { + return null; + } + Object avroValue = rowValueMapper().map(extractedValue, fieldSchema); + return avroValue; + } + + abstract CassandraRowValueExtractor rowValueExtractor(); + + abstract CassandraRowValueMapper rowValueMapper(); +} diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowMapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowMapper.java new file mode 100644 index 0000000000..7097fa9295 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowMapper.java @@ -0,0 +1,85 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper; + +import com.datastax.driver.core.Row; +import com.google.auto.value.AutoValue; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.mappings.CassandraMappingsProvider; +import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow; +import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference; +import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableSchema; +import com.google.common.collect.ImmutableMap; +import java.io.Serializable; +import java.time.Instant; +import java.util.concurrent.TimeUnit; +import org.apache.avro.Schema; +import org.apache.commons.collections4.Transformer; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoValue +abstract class CassandraRowMapper implements Transformer, Serializable { + public static final ImmutableMap> MAPPINGS = + CassandraMappingsProvider.getFieldMapping(); + + public static CassandraRowMapper create( + SourceSchemaReference sourceSchemaReference, SourceTableSchema sourceTableSchema) { + return new AutoValue_CassandraRowMapper(sourceSchemaReference, sourceTableSchema); + } + + abstract SourceSchemaReference sourceSchemaReference(); + + abstract SourceTableSchema sourceTableSchema(); + + long getCurrentTimeMicros() { + Instant now = Instant.now(); + long nanos = TimeUnit.SECONDS.toNanos(now.getEpochSecond()) + now.getNano(); + return TimeUnit.NANOSECONDS.toMicros(nanos); + } + + public @UnknownKeyFor @NonNull @Initialized SourceRow map( + @UnknownKeyFor @NonNull @Initialized Row row) { + /* Todo Decide if any of the element time like max time or min time is needed here. */ + long time = getCurrentTimeMicros(); + + SourceRow.Builder sourceRowBuilder = + SourceRow.builder(sourceSchemaReference(), sourceTableSchema(), "", time); + + sourceTableSchema() + .sourceColumnNameToSourceColumnType() + .forEach( + (key, value) -> { + Schema schema = sourceTableSchema().getAvroPayload().getField(key).schema(); + // The Unified avro mapping produces a union of the mapped type with null type + // except for "Unsupported" case. + if (schema.isUnion()) { + schema = schema.getTypes().get(1); + } + sourceRowBuilder.setField( + key, + MAPPINGS + .getOrDefault(value.getName().toUpperCase(), MAPPINGS.get("UNSUPPORTED")) + .mapValue(row, key, schema)); + }); + return sourceRowBuilder.build(); + } + + @Override + public SourceRow transform(Row row) { + return map(row); + } +} diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueExtractor.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueExtractor.java new file mode 100644 index 0000000000..135f79c429 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueExtractor.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper; + +import com.datastax.driver.core.Row; +import com.datastax.oss.driver.api.core.cql.ResultSet; +import java.io.Serializable; +import javax.annotation.Nullable; + +public interface CassandraRowValueExtractor extends Serializable { + + /** + * Extract the requested field from the result set. + * + * @param row row derived from {@link ResultSet}. + * @param fieldName name of the field to extract. + * @return extracted value. + * @throws IllegalArgumentException - thrown from Cassandra driver for invalid names. + */ + @Nullable + T extract(Row row, String fieldName) throws IllegalArgumentException; +} diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueMapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueMapper.java new file mode 100644 index 0000000000..9114128e9f --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraRowValueMapper.java @@ -0,0 +1,35 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper; + +import java.io.Serializable; +import org.apache.avro.Schema; +import org.apache.avro.Schema.Field; +import org.checkerframework.checker.nullness.qual.NonNull; + +public interface CassandraRowValueMapper extends Serializable { + + /** + * Map the extracted value to an object accepted by {@link + * org.apache.avro.generic.GenericRecordBuilder#set(Field, Object)} as per the schema of the + * field. + * + * @param value extracted value. + * @param schema Avro Schema. + * @return mapped object. + */ + Object map(@NonNull T value, Schema schema); +} diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapper.java new file mode 100644 index 0000000000..a4e00663bd --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapper.java @@ -0,0 +1,73 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper; + +import com.datastax.driver.core.ResultSet; +import com.google.auto.value.AutoValue; +import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow; +import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference; +import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableSchema; +import java.io.Serializable; +import java.util.Iterator; +import java.util.concurrent.Future; +import org.apache.beam.sdk.io.cassandra.Mapper; +import org.apache.commons.collections4.iterators.TransformIterator; +import org.checkerframework.checker.initialization.qual.Initialized; +import org.checkerframework.checker.nullness.qual.NonNull; +import org.checkerframework.checker.nullness.qual.Nullable; +import org.checkerframework.checker.nullness.qual.UnknownKeyFor; + +@AutoValue +public abstract class CassandraSourceRowMapper implements Mapper, Serializable { + abstract SourceSchemaReference sourceSchemaReference(); + + abstract SourceTableSchema sourceTableSchema(); + + @Override + public @UnknownKeyFor @NonNull @Initialized Iterator map( + @UnknownKeyFor @NonNull @Initialized ResultSet resultSet) { + var ret = new TransformIterator(); + ret.setIterator(resultSet.iterator()); + ret.setTransformer(CassandraRowMapper.create(sourceSchemaReference(), sourceTableSchema())); + return ret; + } + + @Override + public @UnknownKeyFor @NonNull @Initialized Future<@UnknownKeyFor @Nullable @Initialized Void> + deleteAsync(SourceRow entity) { + throw new UnsupportedOperationException("Only Read from Cassandra is supported"); + } + + @Override + public @UnknownKeyFor @NonNull @Initialized Future<@UnknownKeyFor @Nullable @Initialized Void> + saveAsync(SourceRow entity) { + throw new UnsupportedOperationException("Only Read from Cassandra is supported"); + } + + public static Builder builder() { + return new AutoValue_CassandraSourceRowMapper.Builder(); + } + + @AutoValue.Builder + public abstract static class Builder { + + public abstract Builder setSourceSchemaReference(SourceSchemaReference value); + + public abstract Builder setSourceTableSchema(SourceTableSchema value); + + public abstract CassandraSourceRowMapper build(); + } +} diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/package-info.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/package-info.java new file mode 100644 index 0000000000..de36cd6fb4 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/package-info.java @@ -0,0 +1,17 @@ +/* + * Copyright (C) 2024 Google Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +/** Row mapper for mapping Cassandra Rows to Avro Generic Record. */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper; diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscovery.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscovery.java index 445beda528..5abbfd96a4 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscovery.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscovery.java @@ -113,7 +113,7 @@ public ImmutableMap> discoverTabl DataSource dataSource, SourceSchemaReference schemaReference, ImmutableList tables) throws SchemaDiscoveryException, RetriableSchemaDiscoveryException { Log.info( - "CassandraSchemaDiscovery discoverTableSchema started dataSource = {}, sourceSchemaReference = {}, talbes = {}", + "CassandraSchemaDiscovery discoverTableSchema started dataSource = {}, sourceSchemaReference = {}, tables = {}", dataSource, schemaReference, tables); @@ -123,7 +123,7 @@ public ImmutableMap> discoverTabl ImmutableMap> schema = this.discoverTableSchema(dataSource.cassandra(), schemaReference.cassandra(), tables); Log.info( - "CassandraSchemaDiscovery discoverTableSchema completed dataSource = {}, sourceSchemaReference = {}, talbes = {}, schema = {}", + "CassandraSchemaDiscovery discoverTableSchema completed dataSource = {}, sourceSchemaReference = {}, tables = {}, schema = {}", dataSource, schemaReference, tables, @@ -147,7 +147,7 @@ private ImmutableMap> discoverTab return builder.build(); } catch (DriverException e) { Log.error( - "CassandraSchemaDiscovery discoverTableSchema dataSource = {}, sourceSchemaReference = {}, talbes = {}", + "CassandraSchemaDiscovery discoverTableSchema dataSource = {}, sourceSchemaReference = {}, tables = {}", dataSource, schemaReference, tables, diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/UnifiedTypeMapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/UnifiedTypeMapper.java index 3a887ba9f0..b346ce86fa 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/UnifiedTypeMapper.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/UnifiedTypeMapper.java @@ -15,6 +15,7 @@ */ package com.google.cloud.teleport.v2.source.reader.io.schema.typemapping; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.mappings.CassandraMappingsProvider; import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.MysqlMappingProvider; import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.PostgreSQLMappingProvider; import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.Unsupported; @@ -33,13 +34,16 @@ public final class UnifiedTypeMapper { /** - * A static map of the type mappings for all source database types constructed at class load time. - * TODO(vardhanvthigle): Support other mappings beyond Mysql. + * A static map of the type MAPPINGS for all source database types constructed at class load time. */ private static final ImmutableMap> mappers = ImmutableMap.of( - MapperType.MYSQL, MysqlMappingProvider.getMapping(), - MapperType.POSTGRESQL, PostgreSQLMappingProvider.getMapping()); + MapperType.MYSQL, + MysqlMappingProvider.getMapping(), + MapperType.POSTGRESQL, + PostgreSQLMappingProvider.getMapping(), + MapperType.CASSANDRA, + CassandraMappingsProvider.getMapping()); private final MapperType mapperType; @@ -96,6 +100,7 @@ public enum MapperType { MYSQL, POSTGRESQL, ORACLE, - SQLSERVER + SQLSERVER, + CASSANDRA } } diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/CustomSchema.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/CustomSchema.java index 846e433017..a97031984e 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/CustomSchema.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/CustomSchema.java @@ -70,6 +70,62 @@ public static final class Interval { private Interval() {} } + /** Schema to represent Interval from years upto nanoseconds precision. */ + public static final class IntervalNano { + public static final String RECORD_NAME = "intervalNano"; + + /** Years in the duration. * */ + public static final String YEARS_FIELD_NAME = "years"; + + /** Months in the duration. * */ + public static final String MONTHS_FIELD_NAME = "months"; + + /** Days in the duration. * */ + public static final String DAYS_FIELD_NAME = "days"; + + /** Hours in the duration. * */ + public static final String HOURS_FIELD_NAME = "hours"; + + /** Minutes in the duration. * */ + public static final String MINUTES_FIELD_NAME = "minutes"; + + /** Seconds in the duration. * */ + public static final String SECONDS_FIELD_NAME = "seconds"; + + /** Nano Seconds in the duration. * */ + public static final String NANOS_FIELD_NAME = "nanos"; + + public static final Schema SCHEMA = + SchemaBuilder.builder() + .record(RECORD_NAME) + .fields() + .name(YEARS_FIELD_NAME) + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name(MONTHS_FIELD_NAME) + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name(DAYS_FIELD_NAME) + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name(HOURS_FIELD_NAME) + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name(MINUTES_FIELD_NAME) + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name(SECONDS_FIELD_NAME) + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name(NANOS_FIELD_NAME) + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .endRecord(); + + /** Static final class wrapping only constants. * */ + private IntervalNano() {} + } + public static final class TimeStampTz { public static final String RECORD_NAME = "timestampTz"; public static final String TIMESTAMP_FIELD_NAME = "timestamp"; diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/UnifiedMappingProvider.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/UnifiedMappingProvider.java index 8b712048dc..d86c631b3f 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/UnifiedMappingProvider.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/schema/typemapping/provider/unified/UnifiedMappingProvider.java @@ -21,6 +21,7 @@ import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomLogical.TimeIntervalMicros; import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.DateTime; import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.Interval; +import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.provider.unified.CustomSchema.IntervalNano; import com.google.common.collect.ImmutableMap; import java.util.Map; import org.apache.avro.LogicalTypes; @@ -62,6 +63,7 @@ public enum Type { TIME_WITH_TIME_ZONE, VARCHAR, UNSUPPORTED, + INTERVAL_NANO, } // Implementation Detail, ImmutableMap.of(...) supports only upto 10 arguments. @@ -93,6 +95,7 @@ public enum Type { .addToSchema(SchemaBuilder.builder().longType())) .put(Type.TIMESTAMP_WITH_TIME_ZONE, CustomSchema.TimeStampTz.SCHEMA) .put(Type.TIME_WITH_TIME_ZONE, CustomSchema.TimeTz.SCHEMA) + .put(Type.INTERVAL_NANO, IntervalNano.SCHEMA) .build() .entrySet() .stream() diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapperTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapperTest.java new file mode 100644 index 0000000000..9f457d2272 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/rowmapper/CassandraSourceRowMapperTest.java @@ -0,0 +1,189 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.source.reader.io.cassandra.rowmapper; + +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.PRIMITIVE_TYPES_TABLE; +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.PRIMITIVE_TYPES_TABLE_AVRO_ROWS; +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CONFIG; +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CQLSH; +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_KEYSPACE; +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.when; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.policies.DCAwareRoundRobinPolicy; +import com.datastax.oss.driver.api.core.cql.SimpleStatement; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper.CassandraConnector; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.iowrapper.CassandraDataSource; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.schema.CassandraSchemaDiscovery; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.schema.CassandraSchemaReference; +import com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.SharedEmbeddedCassandra; +import com.google.cloud.teleport.v2.source.reader.io.datasource.DataSource; +import com.google.cloud.teleport.v2.source.reader.io.exception.RetriableSchemaDiscoveryException; +import com.google.cloud.teleport.v2.source.reader.io.row.SourceRow; +import com.google.cloud.teleport.v2.source.reader.io.schema.SourceSchemaReference; +import com.google.cloud.teleport.v2.source.reader.io.schema.SourceTableSchema; +import com.google.cloud.teleport.v2.source.reader.io.schema.typemapping.UnifiedTypeMapper.MapperType; +import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnType; +import com.google.common.collect.ImmutableList; +import java.io.IOException; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +/** Test class for {@link CassandraSourceRowMapper}. */ +@RunWith(MockitoJUnitRunner.class) +public class CassandraSourceRowMapperTest { + + private static SharedEmbeddedCassandra sharedEmbeddedCassandra = null; + + @BeforeClass + public static void startEmbeddedCassandra() throws IOException { + if (sharedEmbeddedCassandra == null) { + sharedEmbeddedCassandra = new SharedEmbeddedCassandra(TEST_CONFIG, TEST_CQLSH); + } + } + + @AfterClass + public static void stopEmbeddedCassandra() throws Exception { + if (sharedEmbeddedCassandra != null) { + sharedEmbeddedCassandra.close(); + sharedEmbeddedCassandra = null; + } + } + + @Test + public void testCassandraSourceRowMapperBasic() throws RetriableSchemaDiscoveryException { + + SourceSchemaReference sourceSchemaReference = + SourceSchemaReference.ofCassandra( + CassandraSchemaReference.builder().setKeyspaceName(TEST_KEYSPACE).build()); + + DataSource dataSource = + DataSource.ofCassandra( + CassandraDataSource.builder() + .setClusterName(sharedEmbeddedCassandra.getInstance().getClusterName()) + .setContactPoints(sharedEmbeddedCassandra.getInstance().getContactPoints()) + .setLocalDataCenter(sharedEmbeddedCassandra.getInstance().getLocalDataCenter()) + .build()); + + SourceTableSchema.Builder sourceTableSchemaBuilder = + SourceTableSchema.builder(MapperType.CASSANDRA).setTableName(PRIMITIVE_TYPES_TABLE); + new CassandraSchemaDiscovery() + .discoverTableSchema( + dataSource, sourceSchemaReference, ImmutableList.of(PRIMITIVE_TYPES_TABLE)) + .get(PRIMITIVE_TYPES_TABLE) + .forEach(sourceTableSchemaBuilder::addSourceColumnNameToSourceColumnType); + + CassandraSourceRowMapper cassandraSourceRowMapper = + CassandraSourceRowMapper.builder() + .setSourceSchemaReference(sourceSchemaReference) + .setSourceTableSchema(sourceTableSchemaBuilder.build()) + .build(); + + ResultSet resultSet; + String query = "SELECT * FROM " + PRIMITIVE_TYPES_TABLE; + com.datastax.oss.driver.api.core.cql.SimpleStatement statement = + SimpleStatement.newInstance(query); + Cluster cluster = + Cluster.builder() + .addContactPointsWithPorts(dataSource.cassandra().contactPoints()) + .withClusterName(dataSource.cassandra().clusterName()) + .withoutJMXReporting() + .withLoadBalancingPolicy( + new DCAwareRoundRobinPolicy.Builder() + .withLocalDc(dataSource.cassandra().localDataCenter()) + .build()) + .build(); + try (CassandraConnector cassandraConnectorWithSchemaReference = + new CassandraConnector(dataSource.cassandra(), sourceSchemaReference.cassandra())) { + resultSet = cluster.connect(TEST_KEYSPACE).execute(query); + ImmutableList.Builder readRowsBuilder = ImmutableList.builder(); + cassandraSourceRowMapper.map(resultSet).forEachRemaining(row -> readRowsBuilder.add(row)); + ImmutableList readRows = readRowsBuilder.build(); + + readRows.forEach(r -> assertThat(r.tableName() == PRIMITIVE_TYPES_TABLE)); + readRows.forEach(r -> assertThat(r.sourceSchemaReference() == sourceSchemaReference)); + assertThat( + readRows.stream() + .map(r -> r.getPayload().toString()) + .sorted() + .collect(ImmutableList.toImmutableList())) + .isEqualTo( + PRIMITIVE_TYPES_TABLE_AVRO_ROWS.stream() + .sorted() + .collect(ImmutableList.toImmutableList())); + + // Since we will use CassandraIO only for reads, we don't need to support the `deleteAsync` + // and `saveAsync` functions of the CassandraIO mapper interface. + assertThrows( + UnsupportedOperationException.class, + () -> cassandraSourceRowMapper.deleteAsync(readRows.get(1))); + assertThrows( + UnsupportedOperationException.class, + () -> cassandraSourceRowMapper.saveAsync(readRows.get(1))); + } + } + + @Test + public void testCassandraSourceRowForUnsupportedType() { + ResultSet mockResultSet = Mockito.mock(ResultSet.class); + Row mockRow = Mockito.mock(Row.class); + final String testIntCol = "testIntCol"; + when(mockRow.getInt(testIntCol)).thenReturn(42); + when(mockResultSet.iterator()).thenReturn(ImmutableList.of(mockRow).stream().iterator()); + + SourceSchemaReference sourceSchemaReference = + SourceSchemaReference.ofCassandra( + CassandraSchemaReference.builder().setKeyspaceName(TEST_KEYSPACE).build()); + + SourceTableSchema sourceTableSchema = + SourceTableSchema.builder(MapperType.CASSANDRA) + .setTableName("testTable") + .addSourceColumnNameToSourceColumnType( + testIntCol, new SourceColumnType("int", null, null)) + .addSourceColumnNameToSourceColumnType( + "UnSupportedCol1", new SourceColumnType("UnseenColumnType", null, null)) + .addSourceColumnNameToSourceColumnType( + "UnSupportedCol2", new SourceColumnType("UNSUPPORTED", null, null)) + .build(); + + CassandraSourceRowMapper cassandraSourceRowMapper = + CassandraSourceRowMapper.builder() + .setSourceSchemaReference(sourceSchemaReference) + .setSourceTableSchema(sourceTableSchema) + .build(); + + ImmutableList.Builder readRowsBuilder = ImmutableList.builder(); + cassandraSourceRowMapper.map(mockResultSet).forEachRemaining(row -> readRowsBuilder.add(row)); + ImmutableList readRows = readRowsBuilder.build(); + + assertThat( + readRows.stream() + .map(r -> r.getPayload().toString()) + .sorted() + .collect(ImmutableList.toImmutableList())) + .isEqualTo( + ImmutableList.of( + "{\"testIntCol\": 42, \"UnSupportedCol1\": null, \"UnSupportedCol2\": null}")); + } +} diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscoveryTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscoveryTest.java index be7c1be510..bee3ab67cb 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscoveryTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/schema/CassandraSchemaDiscoveryTest.java @@ -15,11 +15,11 @@ */ package com.google.cloud.teleport.v2.source.reader.io.cassandra.schema; +import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.BASIC_TEST_TABLE_SCHEMA; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CONFIG; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_CQLSH; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_KEYSPACE; import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_TABLES; -import static com.google.cloud.teleport.v2.source.reader.io.cassandra.testutils.BasicTestSchema.TEST_TABLE_SCHEMA; import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; @@ -102,8 +102,10 @@ public void testDiscoverTableSchemaBasic() throws IOException, RetriableSchemaDi CassandraSchemaDiscovery cassandraSchemaDiscovery = new CassandraSchemaDiscovery(); ImmutableMap> schema = cassandraSchemaDiscovery.discoverTableSchema( - cassandraDataSource, cassandraSchemaReference, TEST_TABLES); - assertThat(schema).isEqualTo(TEST_TABLE_SCHEMA); + cassandraDataSource, + cassandraSchemaReference, + BASIC_TEST_TABLE_SCHEMA.keySet().asList()); + assertThat(schema).isEqualTo(BASIC_TEST_TABLE_SCHEMA); } @Test diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/BasicTestSchema.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/BasicTestSchema.java index b8de00a3b6..8bbb572b49 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/BasicTestSchema.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/cassandra/testutils/BasicTestSchema.java @@ -29,15 +29,26 @@ public class BasicTestSchema { public static final String TEST_KEYSPACE = "test_keyspace"; public static final String TEST_CONFIG = TEST_RESOURCE_ROOT + "basicConfig.yaml"; public static final String TEST_CQLSH = TEST_RESOURCE_ROOT + "basicTest.cql"; + public static final String BASIC_TEST_TABLE = "basic_test_table"; + public static final String PRIMITIVE_TYPES_TABLE = "primitive_types_table"; public static final ImmutableMap> - TEST_TABLE_SCHEMA = + BASIC_TEST_TABLE_SCHEMA = ImmutableMap.of( - "basic_test_table", + BASIC_TEST_TABLE, ImmutableMap.of( "id", new SourceColumnType("TEXT", new Long[] {}, new Long[] {}), "name", new SourceColumnType("TEXT", new Long[] {}, new Long[] {}))); public static final ImmutableList TEST_TABLES = - ImmutableList.copyOf(TEST_TABLE_SCHEMA.keySet()); + ImmutableList.of(BASIC_TEST_TABLE, PRIMITIVE_TYPES_TABLE); + + public static final ImmutableList PRIMITIVE_TYPES_TABLE_AVRO_ROWS = + ImmutableList.of( + "{\"primary_key\": \"dfcad8f3-3cdc-49c7-bce9-575f307c0637\", \"ascii_col\": \"ascii1\", \"bigint_col\": 1234567890, \"blob_col\": \"cafebabe\", \"boolean_col\": true, \"date_col\": 19694, \"decimal_col\": \"123.456\", \"double_col\": 123.456789, \"duration_col\": {\"years\": 0, \"months\": 0, \"days\": 0, \"hours\": 0, \"minutes\": 0, \"seconds\": 0, \"nanos\": 45296000000000}, \"float_col\": 123.45, \"inet_col\": \"/127.0.0.1\", \"int_col\": 12345, \"smallint_col\": 123, \"text_col\": \"text1\", \"time_col\": {\"years\": 0, \"months\": 0, \"days\": 0, \"hours\": 0, \"minutes\": 0, \"seconds\": 0, \"nanos\": 45296789000000}, \"timestamp_col\": 1733296987117000000, \"timeuuid_col\": \"9b9419da-b210-11ef-890e-9d9a41af9e54\", \"tinyint_col\": 123, \"uuid_col\": \"c3de3455-6b4e-4a81-a6d7-ab61610f08c6\", \"varchar_col\": \"varchar1\", \"varint_col\": \"1234567890123456789\"}", + "{\"primary_key\": \"fe3263a0-1577-4851-95f8-3af47628baa4\", \"ascii_col\": \"ascii2\", \"bigint_col\": 9876543210, \"blob_col\": \"deadbeef\", \"boolean_col\": false, \"date_col\": 19298, \"decimal_col\": \"987.654\", \"double_col\": 987.654321, \"duration_col\": {\"years\": 0, \"months\": 0, \"days\": 0, \"hours\": 0, \"minutes\": 0, \"seconds\": 0, \"nanos\": -45296000000000}, \"float_col\": 987.65, \"inet_col\": \"/0:0:0:0:0:0:0:1\", \"int_col\": 98765, \"smallint_col\": 987, \"text_col\": \"text2\", \"time_col\": {\"years\": 0, \"months\": 0, \"days\": 0, \"hours\": 0, \"minutes\": 0, \"seconds\": 0, \"nanos\": 86399999000000}, \"timestamp_col\": 1733296987122000000, \"timeuuid_col\": \"9b94dd2a-b210-11ef-890e-9d9a41af9e54\", \"tinyint_col\": -123, \"uuid_col\": \"6324e301-94fb-44fe-95ac-91d2f7236e2e\", \"varchar_col\": \"varchar2\", \"varint_col\": \"-9876543210987654321\"}", + "{\"primary_key\": \"9a0acb7d-674c-4ee1-9644-9da24b7a72f4\", \"ascii_col\": \"ascii3\", \"bigint_col\": 1010101010, \"blob_col\": \"facefeed\", \"boolean_col\": true, \"date_col\": 19723, \"decimal_col\": \"10.101\", \"double_col\": 10.10101, \"duration_col\": {\"years\": 0, \"months\": 14, \"days\": 3, \"hours\": 0, \"minutes\": 0, \"seconds\": 0, \"nanos\": 14706000000000}, \"float_col\": 10.1, \"inet_col\": \"/192.168.1.1\", \"int_col\": 10101, \"smallint_col\": 101, \"text_col\": \"text3\", \"time_col\": {\"years\": 0, \"months\": 0, \"days\": 0, \"hours\": 0, \"minutes\": 0, \"seconds\": 0, \"nanos\": 0}, \"timestamp_col\": 1733296987127000000, \"timeuuid_col\": \"9b95a07a-b210-11ef-890e-9d9a41af9e54\", \"tinyint_col\": 101, \"uuid_col\": \"f0e1d922-06b5-4f07-a7a6-ec0c9f23e172\", \"varchar_col\": \"varchar3\", \"varint_col\": \"10101010101010101010\"}", + "{\"primary_key\": \"e6bc8562-2575-420f-9344-9fedc4945f61\", \"ascii_col\": null, \"bigint_col\": 0, \"blob_col\": null, \"boolean_col\": false, \"date_col\": null, \"decimal_col\": null, \"double_col\": 0.0, \"duration_col\": null, \"float_col\": 0.0, \"inet_col\": null, \"int_col\": 0, \"smallint_col\": 0, \"text_col\": null, \"time_col\": {\"years\": 0, \"months\": 0, \"days\": 0, \"hours\": 0, \"minutes\": 0, \"seconds\": 0, \"nanos\": 0}, \"timestamp_col\": null, \"timeuuid_col\": null, \"tinyint_col\": 0, \"uuid_col\": null, \"varchar_col\": null, \"varint_col\": null}", + "{\"primary_key\": \"a389de30-f01f-4395-a0c6-c407bfbe81d0\", \"ascii_col\": \"zzzzzzzzzz\", \"bigint_col\": 9223372036854775807, \"blob_col\": \"ffffffff\", \"boolean_col\": true, \"date_col\": 2932896, \"decimal_col\": \"10000000000000000000000000000000000000\", \"double_col\": 1.7976931348623157E308, \"duration_col\": {\"years\": 0, \"months\": 0, \"days\": 0, \"hours\": 0, \"minutes\": 0, \"seconds\": 0, \"nanos\": 320949000000000}, \"float_col\": 3.4028235E38, \"inet_col\": \"/255.255.255.255\", \"int_col\": 2147483647, \"smallint_col\": 32767, \"text_col\": \"abcdef\", \"time_col\": {\"years\": 0, \"months\": 0, \"days\": 0, \"hours\": 0, \"minutes\": 0, \"seconds\": 0, \"nanos\": 86399999000000}, \"timestamp_col\": -1000, \"timeuuid_col\": null, \"tinyint_col\": 127, \"uuid_col\": \"00e4afef-52f8-4e1f-9afa-0632c8ccf790\", \"varchar_col\": \"abcdef\", \"varint_col\": \"9223372036854775807\"}", + "{\"primary_key\": \"29e38561-6376-4b45-b1a0-1709e11cfc8c\", \"ascii_col\": \"\", \"bigint_col\": -9223372036854775808, \"blob_col\": \"00\", \"boolean_col\": false, \"date_col\": -354285, \"decimal_col\": \"-10000000000000000000000000000000000000\", \"double_col\": -1.7976931348623157E308, \"duration_col\": {\"years\": 0, \"months\": 0, \"days\": 0, \"hours\": 0, \"minutes\": 0, \"seconds\": 0, \"nanos\": 320949000000000}, \"float_col\": -3.4028235E38, \"inet_col\": \"/0.0.0.0\", \"int_col\": -2147483648, \"smallint_col\": -32768, \"text_col\": \"\", \"time_col\": {\"years\": 0, \"months\": 0, \"days\": 0, \"hours\": 0, \"minutes\": 0, \"seconds\": 0, \"nanos\": 0}, \"timestamp_col\": 0, \"timeuuid_col\": null, \"tinyint_col\": -128, \"uuid_col\": \"fff6d876-560f-48bc-8088-90c69e5a0c40\", \"varchar_col\": \"\", \"varint_col\": \"-9223372036854775808\"}"); private BasicTestSchema() {} ; diff --git a/v2/sourcedb-to-spanner/src/test/resources/CassandraUT/basicTest.cql b/v2/sourcedb-to-spanner/src/test/resources/CassandraUT/basicTest.cql index 20f381fc5f..35882d73c0 100644 --- a/v2/sourcedb-to-spanner/src/test/resources/CassandraUT/basicTest.cql +++ b/v2/sourcedb-to-spanner/src/test/resources/CassandraUT/basicTest.cql @@ -10,4 +10,159 @@ CREATE TABLE basic_test_table( PRIMARY KEY(id)); INSERT INTO basic_test_table(id, name) values('1234','Albert'); -INSERT INTO basic_test_table(id, name) values('5678','Einstein'); \ No newline at end of file +INSERT INTO basic_test_table(id, name) values('5678','Einstein'); + +// Primitive types +CREATE TABLE primitive_types_table ( + primary_key UUID PRIMARY KEY, + ascii_col ASCII, + bigint_col BIGINT, + blob_col BLOB, + boolean_col BOOLEAN, + date_col DATE, + decimal_col DECIMAL, + double_col DOUBLE, + duration_col DURATION, + float_col FLOAT, + inet_col INET, + int_col INT, + smallint_col SMALLINT, + text_col TEXT, + time_col TIME, + timestamp_col TIMESTAMP, + timeuuid_col TIMEUUID, + tinyint_col TINYINT, + uuid_col UUID, + varchar_col VARCHAR, + varint_col VARINT); + +-- Inserting 3 Randomly generated rows. +INSERT INTO primitive_types_table (primary_key, ascii_col, bigint_col, blob_col, boolean_col, date_col, decimal_col, double_col, duration_col, float_col, inet_col, int_col, smallint_col, text_col, time_col, timestamp_col, timeuuid_col, tinyint_col, uuid_col, varchar_col, varint_col) +VALUES ( + dfcad8f3-3cdc-49c7-bce9-575f307c0637, + 'ascii1', + 1234567890, + 0xCAFEBABE, + true, + '2023-12-03', + 123.456, + 123.456789, + 12h34m56s, + 123.45, + '127.0.0.1', + 12345, + 123, + 'text1', + '12:34:56.789', + 1733296987117000, + 9b9419da-b210-11ef-890e-9d9a41af9e54, + 123, + c3de3455-6b4e-4a81-a6d7-ab61610f08c6, + 'varchar1', + 1234567890123456789 + ); + + +INSERT INTO primitive_types_table (primary_key, ascii_col, bigint_col, blob_col, boolean_col, date_col, decimal_col, double_col, duration_col, float_col, inet_col, int_col, smallint_col, text_col, time_col, timestamp_col, timeuuid_col, tinyint_col, uuid_col, varchar_col, varint_col) +VALUES ( + fe3263a0-1577-4851-95f8-3af47628baa4, + 'ascii2', + 9876543210, + 0xDEADBEEF, + false, + '2022-11-02', + 987.654, + 987.654321, + -12h34m56s, + 987.65, + '::1', + 98765, + 987, + 'text2', + '23:59:59.999', + 1733296987122000, + 9b94dd2a-b210-11ef-890e-9d9a41af9e54, + -123, + 6324e301-94fb-44fe-95ac-91d2f7236e2e, + 'varchar2', + -9876543210987654321 + ); +INSERT INTO primitive_types_table (primary_key, ascii_col, bigint_col, blob_col, boolean_col, date_col, decimal_col, double_col, duration_col, float_col, inet_col, int_col, smallint_col, text_col, time_col, timestamp_col, timeuuid_col, tinyint_col, uuid_col, varchar_col, varint_col) +VALUES ( + 9a0acb7d-674c-4ee1-9644-9da24b7a72f4, + 'ascii3', + 1010101010, + 0xFACEFEED, + true, + '2024-01-01', + 10.101, + 10.101010, + 1y2mo3d4h5m6s, + 10.10, + '192.168.1.1', + 10101, + 101, + 'text3', + '00:00:00.000', + 1733296987127000, + 9b95a07a-b210-11ef-890e-9d9a41af9e54, + 101, + f0e1d922-06b5-4f07-a7a6-ec0c9f23e172, + 'varchar3', + 10101010101010101010 + ); + +-- Inserting data with all columns null (except primary key) +INSERT INTO primitive_types_table (primary_key) VALUES (e6bc8562-2575-420f-9344-9fedc4945f61); + +-- Inserting data with minimum values for each column (where applicable) +INSERT INTO primitive_types_table (primary_key, ascii_col, bigint_col, blob_col, boolean_col, date_col, decimal_col, double_col, duration_col, float_col, inet_col, int_col, smallint_col, text_col, time_col, timestamp_col, timeuuid_col, tinyint_col, uuid_col, varchar_col, varint_col) +VALUES ( + 29e38561-6376-4b45-b1a0-1709e11cfc8c, + '', -- Minimum ASCII (empty string) + -9223372036854775808, -- Minimum BIGINT + 0x00, -- Minimum BLOB (empty) + false, -- Minimum BOOLEAN + '1000-01-01', -- Minimum DATE + -10000000000000000000000000000000000000, + -1.7976931348623157E+308, -- Minimum DOUBLE + P0000-00-00T89:09:09, -- TODO Min + -3.4028234663852886E+38, -- Minimum FLOAT + '0.0.0.0', -- Minimum INET + -2147483648, -- Minimum INT + -32768, -- Minimum SMALLINT + '', -- Minimum TEXT (empty string) + '00:00:00.000', -- Minimum TIME + 0, -- Minimum TIMESTAMP (epoch) + null, -- TODO time uuid + -128, -- Minimum TINYINT + fff6d876-560f-48bc-8088-90c69e5a0c40, + '', -- Minimum VARCHAR (empty string) + -9223372036854775808 -- Minimum VARINT + ); + +-- Inserting data with maximum values for each column (where applicable) +INSERT INTO primitive_types_table (primary_key, ascii_col, bigint_col, blob_col, boolean_col, date_col, decimal_col, double_col, duration_col, float_col, inet_col, int_col, smallint_col, text_col, time_col, timestamp_col, timeuuid_col, tinyint_col, uuid_col, varchar_col, varint_col) +VALUES ( + a389de30-f01f-4395-a0c6-c407bfbe81d0, + 'zzzzzzzzzz', -- + 9223372036854775807, -- Maximum BIGINT + 0xFFFFFFFF, -- + true, -- Maximum BOOLEAN + '9999-12-31', -- Maximum DATE + 10000000000000000000000000000000000000, -- + 1.7976931348623157E+308, -- Maximum DOUBLE + P0000-00-00T89:09:09, -- TODO Max + 3.4028234663852886E+38, -- Maximum FLOAT + '255.255.255.255',-- Maximum INET + 2147483647, -- Maximum INT + 32767, -- Maximum SMALLINT + 'abcdef', -- + '23:59:59.999', -- Maximum TIME + 9223372036854775807, + null, -- TODO time uuid + 127, -- Maximum TINYINT + 00e4afef-52f8-4e1f-9afa-0632c8ccf790, + 'abcdef', + 9223372036854775807 -- Maximum VARINT + ); diff --git a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java index 4c7ea2cdc6..3cfd9c8fcc 100644 --- a/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java +++ b/v2/spanner-common/src/main/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertor.java @@ -28,6 +28,7 @@ import java.time.Instant; import java.time.LocalDate; import java.time.LocalTime; +import java.time.Period; import java.time.ZoneId; import java.time.ZoneOffset; import java.time.ZonedDateTime; @@ -47,6 +48,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.beam.sdk.metrics.Distribution; import org.apache.beam.sdk.metrics.Metrics; +import org.apache.commons.lang3.StringUtils; import org.apache.kerby.util.Hex; import org.joda.time.Duration; import org.slf4j.Logger; @@ -529,6 +531,35 @@ static String handleRecordFieldType(String fieldName, GenericRecord element, Sch // Handle hours separately since that can also be negative. We convert micros to localTime // format (HH:MM:SS), then strip of HH:, which will always be "00:". return String.format("%s:%s", hours, localTime.substring(3)); + } else if (fieldSchema.getName().equals("intervalNano")) { + Period period = + Period.ZERO + .plusYears(getOrDefault(element, "years", 0L)) + .plusMonths(getOrDefault(element, "months", 0L)) + .plusDays(getOrDefault(element, "days", 0L)); + /* + * Convert the period to a ISO-8601 period formatted String, such as P6Y3M1D. + * A zero period will be represented as zero days, 'P0D'. + * Refer to javadoc for Period#toString. + */ + String periodIso8061 = period.toString(); + java.time.Duration duration = + java.time.Duration.ZERO + .plusHours(getOrDefault(element, "hours", 0L)) + .plusMinutes(getOrDefault(element, "minutes", 0L)) + .plusSeconds(getOrDefault(element, "seconds", 0L)) + .plusNanos(getOrDefault(element, "nanos", 0L)); + /* + * Convert the duration to a ISO-8601 period formatted String, such as PT8H6M12.345S + * refer to javadoc for Duration#toString. + */ + String durationIso8610 = duration.toString(); + // Convert to ISO-8601 period format. + if (duration.isZero()) { + return periodIso8061; + } else { + return periodIso8061 + StringUtils.removeStartIgnoreCase(durationIso8610, "P"); + } } else { throw new UnsupportedOperationException( String.format( @@ -536,4 +567,11 @@ static String handleRecordFieldType(String fieldName, GenericRecord element, Sch fieldSchema.getName(), element, fieldName)); } } + + private static T getOrDefault(GenericRecord element, String name, T def) { + if (element.get(name) == null) { + return def; + } + return (T) element.get(name); + } } diff --git a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/AvroTestingHelper.java b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/AvroTestingHelper.java index 08a2c6babd..bed7558b00 100644 --- a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/AvroTestingHelper.java +++ b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/AvroTestingHelper.java @@ -61,6 +61,33 @@ public class AvroTestingHelper { .noDefault() .endRecord(); + public static final Schema INTERVAL_NANOS_SCHEMA = + SchemaBuilder.builder() + .record("intervalNano") + .fields() + .name("years") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name("months") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name("days") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name("hours") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name("minutes") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name("seconds") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .name("nanos") + .type(SchemaBuilder.builder().longType()) + .withDefault(0L) + .endRecord(); + public static final Schema UNSUPPORTED_SCHEMA = SchemaBuilder.record("unsupportedName") .fields() @@ -91,4 +118,17 @@ public static GenericRecord createIntervalRecord(Integer months, Integer hours, genericRecord.put("micros", micros); return genericRecord; } + + public static GenericRecord createIntervalNanosRecord( + Long years, Long months, Long days, Long hours, Long minutes, Long seconds, Long nanos) { + GenericRecord genericRecord = new GenericData.Record(INTERVAL_NANOS_SCHEMA); + genericRecord.put("years", years); + genericRecord.put("months", months); + genericRecord.put("days", days); + genericRecord.put("hours", hours); + genericRecord.put("minutes", minutes); + genericRecord.put("seconds", seconds); + genericRecord.put("nanos", nanos); + return genericRecord; + } } diff --git a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java index 68b94f69d3..e5c9ff4a94 100644 --- a/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java +++ b/v2/spanner-common/src/test/java/com/google/cloud/teleport/v2/spanner/migrations/avro/GenericRecordTypeConvertorTest.java @@ -337,6 +337,101 @@ public void testHandleRecordFieldType() { AvroTestingHelper.UNSUPPORTED_SCHEMA)); } + /* + * Test conversion of Interval Nano to String for various cases. + */ + @Test + public void testIntervalNanos() { + String result; + + /* Basic Test. */ + result = + GenericRecordTypeConvertor.handleRecordFieldType( + "interval_nanos_column", + AvroTestingHelper.createIntervalNanosRecord(1000L, 1000L, 3890L, 25L, 331L, 12L, 9L), + AvroTestingHelper.INTERVAL_NANOS_SCHEMA); + assertEquals( + "Test #1 interval nano conversion:", "P1000Y1000M3890DT30H31M12.000000009S", result); + + /* Test with any field set as null gets treated as 0. */ + result = + GenericRecordTypeConvertor.handleRecordFieldType( + "interval_nanos_column", + AvroTestingHelper.createIntervalNanosRecord(1000L, 1000L, 3890L, 25L, null, 12L, 9L), + AvroTestingHelper.INTERVAL_NANOS_SCHEMA); + assertEquals( + "Test #2 interval nano conversion with null minutes:", + "P1000Y1000M3890DT25H12.000000009S", + result); + + /* Basic test for negative field. */ + result = + GenericRecordTypeConvertor.handleRecordFieldType( + "interval_nanos_column", + AvroTestingHelper.createIntervalNanosRecord(1000L, -1000L, 3890L, 25L, 31L, 12L, 9L), + AvroTestingHelper.INTERVAL_NANOS_SCHEMA); + assertEquals( + "Test #3 interval nano conversion with negative months:", + "P1000Y-1000M3890DT25H31M12.000000009S", + result); + + /* Test that negative nanos subtract from the fractional seconds, for example 12 Seconds -1 Nanos becomes 11.999999991s. */ + result = + GenericRecordTypeConvertor.handleRecordFieldType( + "interval_nanos_column", + AvroTestingHelper.createIntervalNanosRecord(1000L, 31L, 3890L, 25L, 31L, 12L, -9L), + AvroTestingHelper.INTERVAL_NANOS_SCHEMA); + assertEquals( + "Test #4 interval nano conversion with negative nanos:", + "P1000Y31M3890DT25H31M11.999999991S", + result); + + /* Test 0 interval. */ + result = + GenericRecordTypeConvertor.handleRecordFieldType( + "interval_nanos_column", + AvroTestingHelper.createIntervalNanosRecord(0L, 0L, 0L, 0L, 0L, 0L, 0L), + AvroTestingHelper.INTERVAL_NANOS_SCHEMA); + assertEquals("Test #5 interval nano conversion with all zeros", "P0D", result); + + /* Test almost zero interval with only nanos set. */ + result = + GenericRecordTypeConvertor.handleRecordFieldType( + "interval_nanos_column", + AvroTestingHelper.createIntervalNanosRecord(0L, 0L, 0L, 0L, 0L, 0L, 1L), + AvroTestingHelper.INTERVAL_NANOS_SCHEMA); + assertEquals("Test #6 interval nano conversion with only nanos", "P0DT0.000000001S", result); + /* Test with large values. */ + result = + GenericRecordTypeConvertor.handleRecordFieldType( + "interval_nanos_column", + AvroTestingHelper.createIntervalNanosRecord( + 2147483647L, 11L, 2147483647L, 2147483647L, 2147483647L, 2147483647L, 999999999L), + AvroTestingHelper.INTERVAL_NANOS_SCHEMA); + assertEquals( + "Test #6 interval nano conversion with INT.MAX values", + "P2147483647Y11M2147483647DT2183871564H21M7.999999999S", + result); + + /* Test with large negative values. */ + result = + GenericRecordTypeConvertor.handleRecordFieldType( + "interval_nanos_column", + AvroTestingHelper.createIntervalNanosRecord( + -2147483647L, + -11L, + -2147483647L, + -2147483647L, + -2147483647L, + -2147483647L, + -999999999L), + AvroTestingHelper.INTERVAL_NANOS_SCHEMA); + assertEquals( + "Test #6 interval nano conversion with -INT.MAX values", + "P-2147483647Y-11M-2147483647DT-2183871564H-21M-7.999999999S", + result); + } + @Test public void testHandleRecordFieldType_nullInput() { assertNull(