Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
taherkl committed Nov 27, 2024
2 parents 1bef5ff + b68fe21 commit 15b621d
Show file tree
Hide file tree
Showing 142 changed files with 6,560 additions and 739 deletions.
4 changes: 2 additions & 2 deletions .github/codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ coverage:
patch:
default:
target: 80%
informational: false
informational: true

component_management:
individual_components:
Expand All @@ -32,7 +32,7 @@ component_management:
informational: true
- type: patch
target: 80%
informational: false
informational: true
- component_id: spanner-import-export
name: spanner-import-export
paths:
Expand Down
7 changes: 5 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<java.version>11</java.version>
<!-- <os.detected.classifier>osx-x86_64</os.detected.classifier>-->
<!-- Plugins -->
<templates-maven-plugin.version>1.0-SNAPSHOT</templates-maven-plugin.version>
<maven-checkstyle-plugin.version>3.2.1</maven-checkstyle-plugin.version>
Expand All @@ -47,8 +48,8 @@
<jacoco.version>0.8.8</jacoco.version>

<!-- Beam and linked versions -->
<beam.version>2.60.0</beam.version>
<beam-python.version>2.60.0</beam-python.version>
<beam.version>2.61.0</beam.version>
<beam-python.version>2.61.0</beam-python.version>
<beam-maven-repo></beam-maven-repo>

<!-- Common dependency versions -->
Expand Down Expand Up @@ -367,6 +368,8 @@
<exclude>**/CustomTransformationImplFetcher.*</exclude>
<exclude>**/JarFileReader.*</exclude>
<exclude>**/CustomTransformationWithShardFor*IT.*</exclude>
<exclude>**/models/*</exclude>
<exclude>**/exceptions/*</exclude>
</excludes>
</configuration>
<executions>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,6 @@ public Table toTable(String tableName, Schema schema) {
if (Boolean.parseBoolean(stored)) {
column.stored();
}
String hidden = f.getProp(HIDDEN);
if (Boolean.parseBoolean(hidden)) {
column.isHidden(true);
}
} else {
boolean nullable = false;
Schema avroType = f.schema();
Expand All @@ -306,6 +302,10 @@ public Table toTable(String tableName, Schema schema) {
String defaultExpression = f.getProp(DEFAULT_EXPRESSION);
column.parseType(sqlType).notNull(!nullable).defaultExpression(defaultExpression);
}
String hidden = f.getProp(HIDDEN);
if (Boolean.parseBoolean(hidden)) {
column.isHidden(true);
}
String placementKey = f.getProp(SPANNER_PLACEMENT_KEY);
if (placementKey != null) {
column.isPlacementKey(Boolean.parseBoolean(placementKey));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,7 @@ public Collection<Schema> convert(Ddl ddl) {
for (Column cm : table.columns()) {
SchemaBuilder.FieldBuilder<Schema> fieldBuilder = fieldsAssembler.name(cm.name());
fieldBuilder.prop(SQL_TYPE, cm.typeString());
fieldBuilder.prop(HIDDEN, Boolean.toString(cm.isHidden()));
for (int i = 0; i < cm.columnOptions().size(); i++) {
fieldBuilder.prop(SPANNER_OPTION + i, cm.columnOptions().get(i));
}
Expand All @@ -162,7 +163,6 @@ public Collection<Schema> convert(Ddl ddl) {
fieldBuilder.prop(NOT_NULL, Boolean.toString(cm.notNull()));
fieldBuilder.prop(GENERATION_EXPRESSION, cm.generationExpression());
fieldBuilder.prop(STORED, Boolean.toString(cm.isStored()));
fieldBuilder.prop(HIDDEN, Boolean.toString(cm.isHidden()));
// Make the type null to allow us not export the generated column values,
// which are semantically logical entities.
fieldBuilder.type(SchemaBuilder.builder().nullType()).withDefault(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package com.google.cloud.teleport.spanner.ddl;

import static com.google.cloud.spanner.Dialect.GOOGLE_STANDARD_SQL;
import static com.google.cloud.teleport.spanner.common.NameUtils.quoteIdentifier;
import static com.google.common.base.Strings.isNullOrEmpty;

Expand All @@ -25,11 +26,12 @@
import java.io.IOException;
import java.io.Serializable;
import java.util.stream.Collectors;
import javax.annotation.Nullable;

/** Cloud Spanner foreign key definition. */
@AutoValue
public abstract class ForeignKey implements Serializable {
private static final long serialVersionUID = 519932875L;
private static final long serialVersionUID = 779301367L;

/** Referential actions supported in Foreign Keys. */
public enum ReferentialAction {
Expand Down Expand Up @@ -92,6 +94,9 @@ public static ReferentialAction getReferentialAction(String changeType, String a

abstract Optional<ReferentialAction> referentialAction();

@Nullable
abstract Boolean isEnforced();

public static Builder builder(Dialect dialect) {
return new AutoValue_ForeignKey.Builder().dialect(dialect);
}
Expand Down Expand Up @@ -133,6 +138,10 @@ private void prettyPrint(Appendable appendable) throws IOException {
"Foreign Key action not supported: " + action.get().getSqlString());
}
}
if (dialect() == GOOGLE_STANDARD_SQL && isEnforced() != null && !isEnforced()) {
// TODO: Add Postgresql support for NOT ENFORCED foreign keys
appendable.append(" NOT ENFORCED");
}
}

public String prettyPrint() {
Expand Down Expand Up @@ -168,6 +177,8 @@ public abstract static class Builder {

public abstract Builder referentialAction(Optional<ReferentialAction> action);

public abstract Builder isEnforced(Boolean enforced);

public abstract ForeignKey build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -697,8 +697,13 @@ private void listForeignKeys(Map<String, NavigableMap<String, ForeignKey.Builder
+ " kcu2.table_schema,"
+ " kcu2.table_name,"
+ " kcu2.column_name,"
+ " rc.delete_rule"
+ " rc.delete_rule,"
+ " tc.enforced"
+ " FROM information_schema.referential_constraints as rc"
+ " INNER JOIN information_schema.table_constraints as tc"
+ " ON tc.constraint_catalog = rc.constraint_catalog"
+ " AND tc.constraint_schema = rc.constraint_schema"
+ " AND tc.constraint_name = rc.constraint_name"
+ " INNER JOIN information_schema.key_column_usage as kcu1"
+ " ON kcu1.constraint_catalog = rc.constraint_catalog"
+ " AND kcu1.constraint_schema = rc.constraint_schema"
Expand All @@ -724,8 +729,13 @@ private void listForeignKeys(Map<String, NavigableMap<String, ForeignKey.Builder
+ " kcu2.table_schema,"
+ " kcu2.table_name,"
+ " kcu2.column_name,"
+ " rc.delete_rule"
+ " rc.delete_rule,"
+ " tc.enforced"
+ " FROM information_schema.referential_constraints as rc"
+ " INNER JOIN information_schema.table_constraints as tc"
+ " ON tc.constraint_catalog = rc.constraint_catalog"
+ " AND tc.constraint_schema = rc.constraint_schema"
+ " AND tc.constraint_name = rc.constraint_name"
+ " INNER JOIN information_schema.key_column_usage as kcu1"
+ " ON kcu1.constraint_catalog = rc.constraint_catalog"
+ " AND kcu1.constraint_schema = rc.constraint_schema"
Expand Down Expand Up @@ -753,6 +763,7 @@ private void listForeignKeys(Map<String, NavigableMap<String, ForeignKey.Builder
String referencedTable = getQualifiedName(resultSet.getString(4), resultSet.getString(5));
String referencedColumn = resultSet.getString(6);
String deleteRule = resultSet.getString(7);
String enforced = dialect == Dialect.GOOGLE_STANDARD_SQL ? resultSet.getString(8) : null;
Map<String, ForeignKey.Builder> tableForeignKeys =
foreignKeys.computeIfAbsent(table, k -> Maps.newTreeMap());
ForeignKey.Builder foreignKey =
Expand All @@ -767,6 +778,18 @@ private void listForeignKeys(Map<String, NavigableMap<String, ForeignKey.Builder
foreignKey.referentialAction(
Optional.of(ReferentialAction.getReferentialAction("DELETE", deleteRule)));
}
if (!isNullOrEmpty(enforced)) {
switch (enforced.trim().toUpperCase()) {
case "YES":
foreignKey.isEnforced(true);
break;
case "NO":
foreignKey.isEnforced(false);
break;
default:
throw new IllegalArgumentException("Illegal enforcement: " + enforced);
}
}
foreignKey.columnsBuilder().add(column);
foreignKey.referencedColumnsBuilder().add(referencedColumn);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.NameGenerator.generatePartitionMetadataTableName;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -60,6 +59,7 @@
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.concurrent.TimeUnit;
import org.apache.beam.runners.core.metrics.GcpResourceIdentifiers;
Expand All @@ -70,6 +70,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamMetrics;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.InitializeDoFn;
Expand Down Expand Up @@ -1551,9 +1552,13 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
getSpannerConfig().getProjectId().get(),
partitionMetadataInstanceId,
partitionMetadataDatabaseId);
final String partitionMetadataTableName =
MoreObjects.firstNonNull(
getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId));
PartitionMetadataTableNames partitionMetadataTableNames =
Optional.ofNullable(getMetadataTable())
.map(
table ->
PartitionMetadataTableNames.fromExistingTable(
partitionMetadataDatabaseId, table))
.orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId));

SpannerConfig changeStreamSpannerConfig = getSpannerConfig();
// Set default retryable errors for ReadChangeStream
Expand Down Expand Up @@ -1609,7 +1614,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
changeStreamSpannerConfig,
changeStreamName,
partitionMetadataSpannerConfig,
partitionMetadataTableName,
partitionMetadataTableNames,
rpcPriority,
input.getPipeline().getOptions().getJobName(),
changeStreamDatabaseDialect,
Expand All @@ -1625,12 +1630,13 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);

LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName);
LOG.info(
"Partition metadata table that will be used is " + partitionMetadataTableNames.getTableName());
input
.getPipeline()
.getOptions()
.as(SpannerChangeStreamOptions.class)
.setMetadataTable(partitionMetadataTableName);
.setMetadataTable(partitionMetadataTableNames.getTableName());

PCollection<byte[]> impulseOut = input.apply(Impulse.create());
PCollection<DataChangeRecord> results =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,11 @@ public void simple() {
+ " \"name\" : \"timestamp\","
+ " \"type\" : [ \"null\", {\"type\":\"long\",\"logicalType\":\"timestamp-micros\"}]"
+ " }, {"
+ " \"name\" : \"HiddenColumn\","
+ " \"type\" : [ \"null\", \"long\" ],"
+ " \"sqlType\":\"INT64\","
+ " \"hidden\" : \"true\""
+ " }, {"
+ " \"name\" : \"MyTokens\","
+ " \"type\" : \"null\","
+ " \"default\" : null,"
Expand Down Expand Up @@ -201,6 +206,12 @@ public void simple() {
+ " \"spannerForeignKey_1\" : "
+ " \"ALTER TABLE `Users` ADD CONSTRAINT `fk_odc` FOREIGN KEY (`last_name`) "
+ " REFERENCES `AllowedNames` (`last_name`) ON DELETE CASCADE\","
+ " \"spannerForeignKey_2\" : "
+ " \"ALTER TABLE `Users` ADD CONSTRAINT `fk_not_enforced_no_action` FOREIGN KEY (`last_name`) "
+ " REFERENCES `AllowedNames` (`last_name`) ON DELETE NO ACTION NOT ENFORCED\","
+ " \"spannerForeignKey_3\" : "
+ " \"ALTER TABLE `Users` ADD CONSTRAINT `fk_enforced` FOREIGN KEY (`last_name`) "
+ " REFERENCES `AllowedNames` (`last_name`) ENFORCED\","
+ " \"spannerCheckConstraint_0\" : "
+ " \"CONSTRAINT `ck` CHECK(`first_name` != 'last_name')\""
+ "}";
Expand Down Expand Up @@ -243,6 +254,7 @@ public void simple() {
+ " `float32` FLOAT32,"
+ " `float64` FLOAT64,"
+ " `timestamp` TIMESTAMP,"
+ " `HiddenColumn` INT64 HIDDEN,"
+ " `MyTokens` TOKENLIST AS ((TOKENIZE_FULLTEXT(MyData))) HIDDEN,"
+ " `Embeddings` ARRAY<FLOAT32>(vector_length=>128),"
+ " CONSTRAINT `ck` CHECK(`first_name` != 'last_name'),"
Expand All @@ -254,7 +266,13 @@ public void simple() {
+ " FOREIGN KEY (`first_name`) REFERENCES `AllowedNames` (`first_name`)"
+ " ALTER TABLE `Users` ADD CONSTRAINT `fk_odc`"
+ " FOREIGN KEY (`last_name`) REFERENCES "
+ "`AllowedNames` (`last_name`) ON DELETE CASCADE"));
+ "`AllowedNames` (`last_name`) ON DELETE CASCADE"
+ " ALTER TABLE `Users` ADD CONSTRAINT `fk_not_enforced_no_action`"
+ " FOREIGN KEY (`last_name`) REFERENCES "
+ "`AllowedNames` (`last_name`) ON DELETE NO ACTION NOT ENFORCED"
+ " ALTER TABLE `Users` ADD CONSTRAINT `fk_enforced`"
+ " FOREIGN KEY (`last_name`) REFERENCES "
+ "`AllowedNames` (`last_name`) ENFORCED"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -674,8 +674,9 @@ public void foreignKeys() throws Exception {
"ALTER TABLE `Child` ADD CONSTRAINT `fk1` FOREIGN KEY (`id1`) REFERENCES `Ref` (`id1`)",
"ALTER TABLE `Child` ADD CONSTRAINT `fk2` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`)",
"ALTER TABLE `Child` ADD CONSTRAINT `fk3` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`)",
"ALTER TABLE `Child` ADD CONSTRAINT `fk4` FOREIGN KEY (`id2`, `id1`) "
+ "REFERENCES `Ref` (`id2`, `id1`)"))
"ALTER TABLE `Child` ADD CONSTRAINT `fk4` FOREIGN KEY (`id2`, `id1`) REFERENCES `Ref` (`id2`, `id1`)",
"ALTER TABLE `Child` ADD CONSTRAINT `fk5` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`) NOT ENFORCED",
"ALTER TABLE `Child` ADD CONSTRAINT `fk6` FOREIGN KEY (`id2`) REFERENCES `Ref` (`id2`) ENFORCED"))
.endTable()
.build();
// spotless:on
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ public void simple() {
.type(Type.array(Type.float32()))
.arrayLength(Integer.valueOf(128))
.endColumn()
.column("HiddenColumn")
.type(Type.string())
.max()
.isHidden(true)
.endColumn()
.primaryKey()
.asc("id")
.asc("gen_id")
Expand All @@ -143,7 +148,13 @@ public void simple() {
"ALTER TABLE `Users` ADD CONSTRAINT `fk` FOREIGN KEY (`first_name`)"
+ " REFERENCES `AllowedNames` (`first_name`)",
"ALTER TABLE `Users` ADD CONSTRAINT `fk_odc` FOREIGN KEY (`last_name`)"
+ " REFERENCES `AllowedNames` (`last_name`) ON DELETE CASCADE"))
+ " REFERENCES `AllowedNames` (`last_name`) ON DELETE CASCADE",
"ALTER TABLE `Users` ADD CONSTRAINT `fk_not_enforced_no_action`"
+ " FOREIGN KEY (`last_name`) REFERENCES "
+ "`AllowedNames` (`last_name`) ON DELETE NO ACTION NOT ENFORCED",
"ALTER TABLE `Users` ADD CONSTRAINT `fk_enforced`"
+ " FOREIGN KEY (`last_name`) REFERENCES "
+ "`AllowedNames` (`last_name`) ENFORCED"))
.checkConstraints(ImmutableList.of("CONSTRAINT ck CHECK (`first_name` != `last_name`)"))
.endTable()
.build();
Expand All @@ -160,7 +171,7 @@ public void simple() {

List<Schema.Field> fields = avroSchema.getFields();

assertThat(fields, hasSize(7));
assertThat(fields, hasSize(8));

assertThat(fields.get(0).name(), equalTo("id"));
// Not null
Expand Down Expand Up @@ -221,6 +232,14 @@ public void simple() {
assertThat(fields.get(6).getProp(NOT_NULL), equalTo(null));
assertThat(fields.get(6).getProp(STORED), equalTo(null));

assertThat(fields.get(7).name(), equalTo("HiddenColumn"));
assertThat(fields.get(7).schema(), equalTo(nullableUnion(Schema.Type.STRING)));
assertThat(fields.get(7).getProp(SQL_TYPE), equalTo("STRING(MAX)"));
assertThat(fields.get(7).getProp(NOT_NULL), equalTo(null));
assertThat(fields.get(7).getProp(GENERATION_EXPRESSION), equalTo(null));
assertThat(fields.get(7).getProp(STORED), equalTo(null));
assertThat(fields.get(7).getProp(HIDDEN), equalTo("true"));

// spanner pk
assertThat(avroSchema.getProp(SPANNER_PRIMARY_KEY + "_0"), equalTo("`id` ASC"));
assertThat(avroSchema.getProp(SPANNER_PRIMARY_KEY + "_1"), equalTo("`gen_id` ASC"));
Expand All @@ -246,6 +265,16 @@ public void simple() {
equalTo(
"ALTER TABLE `Users` ADD CONSTRAINT `fk_odc` FOREIGN KEY (`last_name`)"
+ " REFERENCES `AllowedNames` (`last_name`) ON DELETE CASCADE"));
assertThat(
avroSchema.getProp(SPANNER_FOREIGN_KEY + "2"),
equalTo(
"ALTER TABLE `Users` ADD CONSTRAINT `fk_not_enforced_no_action` FOREIGN KEY (`last_name`)"
+ " REFERENCES `AllowedNames` (`last_name`) ON DELETE NO ACTION NOT ENFORCED"));
assertThat(
avroSchema.getProp(SPANNER_FOREIGN_KEY + "3"),
equalTo(
"ALTER TABLE `Users` ADD CONSTRAINT `fk_enforced` FOREIGN KEY (`last_name`)"
+ " REFERENCES `AllowedNames` (`last_name`) ENFORCED"));
assertThat(
avroSchema.getProp(SPANNER_CHECK_CONSTRAINT + "0"),
equalTo("CONSTRAINT ck CHECK (`first_name` != `last_name`)"));
Expand Down
Loading

0 comments on commit 15b621d

Please sign in to comment.