Skip to content

Commit

Permalink
Merge pull request #20 from GoogleCloudPlatform/main
Browse files Browse the repository at this point in the history
Sync main branch
  • Loading branch information
taherkl authored Dec 16, 2024
2 parents d314c70 + cd28d01 commit 71a6477
Show file tree
Hide file tree
Showing 252 changed files with 6,394 additions and 2,732 deletions.
51 changes: 51 additions & 0 deletions .github/workflows/spanner-staging-tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
# Copyright 2024 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name: Spanner Staging integration tests

on:
workflow_dispatch:

permissions: read-all

jobs:
spanner_java_integration_tests_templates:
name: Spanner Dataflow Templates Integration Tests
timeout-minutes: 180
# Run on any runner that matches all the specified runs-on values.
runs-on: [ self-hosted, it ]
steps:
- name: Checkout Code
uses: actions/checkout@ee0669bd1cc54295c223e0bb666b733df41de1c5 # v2.7.0
- name: Setup Environment
id: setup-env
uses: ./.github/actions/setup-env
- name: Run Integration Tests
run: |
./cicd/run-it-tests \
--modules-to-build="ALL" \
--it-region="us-central1" \
--it-project="cloud-teleport-testing" \
--it-artifact-bucket="cloud-teleport-testing-it-gitactions" \
--it-private-connectivity="datastream-private-connect-us-central1" \
--it-spanner-host="https://staging-wrenchworks.sandbox.googleapis.com/"
- name: Upload Integration Tests Report
uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2
if: always() # always run even if the previous step fails
with:
name: surefire-test-results
path: '**/surefire-reports/TEST-*.xml'
retention-days: 1
- name: Cleanup Java Environment
uses: ./.github/actions/cleanup-java-env
1 change: 1 addition & 0 deletions cicd/cmd/run-it-smoke-tests/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ func main() {
flags.ArtifactBucket(),
flags.StageBucket(),
flags.PrivateConnectivity(),
flags.SpannerHost(),
flags.FailureMode(),
flags.RetryFailures(),
flags.StaticOracleHost(),
Expand Down
1 change: 1 addition & 0 deletions cicd/cmd/run-it-tests/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ func main() {
flags.StageBucket(),
flags.HostIp(),
flags.PrivateConnectivity(),
flags.SpannerHost(),
flags.FailureMode(),
flags.RetryFailures(),
flags.StaticOracleHost(),
Expand Down
6 changes: 3 additions & 3 deletions cicd/internal/flags/it-flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@ func PrivateConnectivity() string {
}

func SpannerHost() string {
if dSpannerHost == "" {
return "-DspannerHost=" + "https://staging-wrenchworks.sandbox.googleapis.com/"
if dSpannerHost != "" {
return "-DspannerHost=" + dSpannerHost
}
return "-DspannerHost=" + dSpannerHost
return ""
}

func FailureMode() string {
Expand Down
6 changes: 3 additions & 3 deletions contributor-docs/add-integration-or-load-test.md
Original file line number Diff line number Diff line change
Expand Up @@ -408,7 +408,7 @@ vary on whether the pipeline under test is a `Batch` or `Streaming` pipeline and
the type of test.

### Structure
First extend the test class from the [LoadTestBase](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/it/google-cloud-platform/src/main/java/com/google/cloud/teleport/it/gcp/LoadTestBase.java)
First extend the test class from the [LoadTestBase](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java)
class. LoadTestBase contains helper methods which abstract irrelevant
information and make it easier to write load tests. It also defines some
clients and variables which are useful for writing tests.
Expand Down Expand Up @@ -552,8 +552,8 @@ public void testSteadyState1hr() {

### Exporting Results

After the pipeline finishes successfully, we can get the performance metrics using [getMetrics](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/it/google-cloud-platform/src/main/java/com/google/cloud/teleport/it/gcp/LoadTestBase.java#L272)
method and export the results to BigQuery by calling the [exportMetricsToBigQuery](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/it/google-cloud-platform/src/main/java/com/google/cloud/teleport/it/gcp/LoadTestBase.java#L127) method.
After the pipeline finishes successfully, we can get the performance metrics using [getMetrics](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java#L279)
method and export the results to BigQuery by calling the [exportMetricsToBigQuery](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/LoadTestBase.java#L139) method.

The BigQuery project, dataset, and table to be used to export the data can be specified in the command line using,
* `-DexportProject` - BigQuery Project to export metrics (optional, if not provided `-Dproject` is used)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.metadata;

/** Annotation that marks the test as a Spanner staging test. */
public @interface SpannerStagingTest {}
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,7 @@ protected void processDescriptions(
this.setHelpText(helpText);

if (example != null && !example.isEmpty()) {
this.setHelpText(this.getHelpText() + " (Example: " + example + ")");
this.setHelpText(this.getHelpText() + " For example, `" + example + "`");
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions plugins/core-plugin/src/main/resources/README-template.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat

### Required parameters

<#list spec.metadata.parameters as parameter><#if !parameter.optional!false>* **${parameter.name}** : ${parameter.helpText?ensure_ends_with(".")}
<#list spec.metadata.parameters as parameter><#if !parameter.optional!false>* **${parameter.name}**: ${parameter.helpText?ensure_ends_with(".")}
</#if></#list>

### Optional parameters

<#list spec.metadata.parameters as parameter><#if parameter.optional!false>* **${parameter.name}** : ${parameter.helpText?ensure_ends_with(".")}
<#list spec.metadata.parameters as parameter><#if parameter.optional!false>* **${parameter.name}**: ${parameter.helpText?ensure_ends_with(".")}
</#if></#list>


Expand Down
51 changes: 51 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@
<load.tests>com.google.cloud.teleport.metadata.TemplateLoadTest</load.tests>
<direct-runner.tests>com.google.cloud.teleport.metadata.DirectRunnerTest</direct-runner.tests>
<excluded.spanner.tests></excluded.spanner.tests>
<spanner.staging.tests>com.google.cloud.teleport.metadata.SpannerStagingTest</spanner.staging.tests>

<licenseHeaderFile>JAVA_LICENSE_HEADER</licenseHeaderFile>
</properties>
Expand Down Expand Up @@ -458,6 +459,56 @@
</plugins>
</build>
</profile>
<profile>
<id>spannerStagingIntegrationTests</id>
<activation>
<activeByDefault>false</activeByDefault>
</activation>
<properties>
<!-- Skip coverage checks, unit tests are skipped -->
<jacoco.skip>true</jacoco.skip>
<!-- Some modules may yield no integration tests -->
<failIfNoTests>false</failIfNoTests>
<!-- Parallelism settings. Default is 2, set to consider methods -->
<itParallelismType>classesAndMethods</itParallelismType>
<itParallelism>2</itParallelism>
</properties>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>${surefire.version}</version>
<configuration combine.self="override">
<systemProperties>
<property>
<name>beamPythonVersion</name>
<value>${beam-python.version}</value>
</property>
<property>
<name>beamJavaVersion</name>
<value>${beam.version}</value>
</property>
<property>
<name>beamMavenRepo</name>
<value>${beam-maven-repo}</value>
</property>
</systemProperties>
<includes>
<include>**/*.java</include>
</includes>
<groups>
${spanner.staging.tests}
</groups>
<reuseForks>true</reuseForks>
<parallel>${itParallelismType}</parallel>
<threadCount>${itParallelism}</threadCount>
<trimStackTrace>false</trimStackTrace>
</configuration>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>templatesLoadTests</id>
<activation>
Expand Down
6 changes: 3 additions & 3 deletions python/README_Yaml_Template.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat

### Optional parameters

* **yaml_pipeline** : A yaml description of the pipeline to run.
* **yaml_pipeline_file** : A file in Cloud Storage containing a yaml description of the pipeline to run.
* **jinja_variables** : A json dict of variables used when invoking the jinja preprocessor on the provided yaml pipeline.
* **yaml_pipeline**: A yaml description of the pipeline to run.
* **yaml_pipeline_file**: A file in Cloud Storage containing a yaml description of the pipeline to run.
* **jinja_variables**: A json dict of variables used when invoking the jinja preprocessor on the provided yaml pipeline.



Expand Down
16 changes: 8 additions & 8 deletions v1/README_Bulk_Compress_GCS_Files.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,14 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat

### Required parameters

* **inputFilePattern** : The Cloud Storage location of the files you'd like to process. (Example: gs://your-bucket/your-files/*.txt).
* **outputDirectory** : The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse directory path for date & time formatters. (Example: gs://your-bucket/your-path).
* **outputFailureFile** : The error log output file to use for write failures that occur during compression. The contents will be one line for each file which failed compression. Note that this parameter will allow the pipeline to continue processing in the event of a failure. (Example: gs://your-bucket/compressed/failed.csv).
* **compression** : The compression algorithm used to compress the matched files. Valid algorithms: BZIP2, DEFLATE, GZIP.
* **inputFilePattern**: The Cloud Storage location of the files you'd like to process. For example, `gs://your-bucket/your-files/*.txt`.
* **outputDirectory**: The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse directory path for date & time formatters. For example, `gs://your-bucket/your-path`.
* **outputFailureFile**: The error log output file to use for write failures that occur during compression. The contents will be one line for each file which failed compression. Note that this parameter will allow the pipeline to continue processing in the event of a failure. For example, `gs://your-bucket/compressed/failed.csv`.
* **compression**: The compression algorithm used to compress the matched files. Valid algorithms: BZIP2, DEFLATE, GZIP.

### Optional parameters

* **outputFilenameSuffix** : Output filename suffix of the files to write. Defaults to .bzip2, .deflate or .gz depending on the compression algorithm.
* **outputFilenameSuffix**: Output filename suffix of the files to write. Defaults to .bzip2, .deflate or .gz depending on the compression algorithm.



Expand Down Expand Up @@ -211,9 +211,9 @@ resource "google_dataflow_job" "bulk_compress_gcs_files" {
region = var.region
temp_gcs_location = "gs://bucket-name-here/temp"
parameters = {
inputFilePattern = "gs://your-bucket/your-files/*.txt"
outputDirectory = "gs://your-bucket/your-path"
outputFailureFile = "gs://your-bucket/compressed/failed.csv"
inputFilePattern = "<inputFilePattern>"
outputDirectory = "<outputDirectory>"
outputFailureFile = "<outputFailureFile>"
compression = "<compression>"
# outputFilenameSuffix = "<outputFilenameSuffix>"
}
Expand Down
12 changes: 6 additions & 6 deletions v1/README_Bulk_Decompress_GCS_Files.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat

### Required parameters

* **inputFilePattern** : The Cloud Storage location of the files you'd like to process. (Example: gs://your-bucket/your-files/*.gz).
* **outputDirectory** : The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse directory path for date & time formatters. (Example: gs://your-bucket/decompressed/).
* **outputFailureFile** : The output file to write failures to during the decompression process. If there are no failures, the file will still be created but will be empty. The contents will be one line for each file which failed decompression in CSV format (Filename, Error). Note that this parameter will allow the pipeline to continue processing in the event of a failure. (Example: gs://your-bucket/decompressed/failed.csv).
* **inputFilePattern**: The Cloud Storage location of the files you'd like to process. For example, `gs://your-bucket/your-files/*.gz`.
* **outputDirectory**: The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse directory path for date & time formatters. For example, `gs://your-bucket/decompressed/`.
* **outputFailureFile**: The output file to write failures to during the decompression process. If there are no failures, the file will still be created but will be empty. The contents will be one line for each file which failed decompression in CSV format (Filename, Error). Note that this parameter will allow the pipeline to continue processing in the event of a failure. For example, `gs://your-bucket/decompressed/failed.csv`.

### Optional parameters

Expand Down Expand Up @@ -202,9 +202,9 @@ resource "google_dataflow_job" "bulk_decompress_gcs_files" {
region = var.region
temp_gcs_location = "gs://bucket-name-here/temp"
parameters = {
inputFilePattern = "gs://your-bucket/your-files/*.gz"
outputDirectory = "gs://your-bucket/decompressed/"
outputFailureFile = "gs://your-bucket/decompressed/failed.csv"
inputFilePattern = "<inputFilePattern>"
outputDirectory = "<outputDirectory>"
outputFailureFile = "<outputFailureFile>"
}
}
```
24 changes: 12 additions & 12 deletions v1/README_Cassandra_To_Cloud_Bigtable.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,21 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat

### Required parameters

* **cassandraHosts** : The hosts of the Apache Cassandra nodes in a comma-separated list.
* **cassandraKeyspace** : The Apache Cassandra keyspace where the table is located.
* **cassandraTable** : The Apache Cassandra table to copy.
* **bigtableProjectId** : The Google Cloud project ID associated with the Bigtable instance.
* **bigtableInstanceId** : The ID of the Bigtable instance that the Apache Cassandra table is copied to.
* **bigtableTableId** : The name of the Bigtable table that the Apache Cassandra table is copied to.
* **cassandraHosts**: The hosts of the Apache Cassandra nodes in a comma-separated list.
* **cassandraKeyspace**: The Apache Cassandra keyspace where the table is located.
* **cassandraTable**: The Apache Cassandra table to copy.
* **bigtableProjectId**: The Google Cloud project ID associated with the Bigtable instance.
* **bigtableInstanceId**: The ID of the Bigtable instance that the Apache Cassandra table is copied to.
* **bigtableTableId**: The name of the Bigtable table that the Apache Cassandra table is copied to.

### Optional parameters

* **cassandraPort** : The TCP port to use to reach Apache Cassandra on the nodes. The default value is 9042.
* **defaultColumnFamily** : The name of the column family of the Bigtable table. The default value is default.
* **rowKeySeparator** : The separator used to build row-keys. The default value is '#'.
* **splitLargeRows** : The flag for enabling splitting of large rows into multiple MutateRows requests. Note that when a large row is split between multiple API calls, the updates to the row are not atomic. .
* **writetimeCassandraColumnSchema** : GCS path to schema to copy Cassandra writetimes to Bigtable. The command to generate this schema is ```cqlsh -e "select json * from system_schema.columns where keyspace_name='$CASSANDRA_KEYSPACE' and table_name='$CASSANDRA_TABLE'`" > column_schema.json```. Set $WRITETIME_CASSANDRA_COLUMN_SCHEMA to a GCS path, e.g. `gs://$BUCKET_NAME/column_schema.json`. Then upload the schema to GCS: `gcloud storage cp column_schema.json $WRITETIME_CASSANDRA_COLUMN_SCHEMA`. Requires Cassandra version 2.2 onwards for JSON support.
* **setZeroTimestamp** : The flag for setting Bigtable cell timestamp to 0 if Cassandra writetime is not present. The default behavior for when this flag is not set is to set the Bigtable cell timestamp as the template replication time, i.e. now.
* **cassandraPort**: The TCP port to use to reach Apache Cassandra on the nodes. The default value is `9042`.
* **defaultColumnFamily**: The name of the column family of the Bigtable table. The default value is `default`.
* **rowKeySeparator**: The separator used to build row-keys. The default value is `#`.
* **splitLargeRows**: The flag for enabling splitting of large rows into multiple MutateRows requests. Note that when a large row is split between multiple API calls, the updates to the row are not atomic. .
* **writetimeCassandraColumnSchema**: GCS path to schema to copy Cassandra writetimes to Bigtable. The command to generate this schema is ```cqlsh -e "select json * from system_schema.columns where keyspace_name='$CASSANDRA_KEYSPACE' and table_name='$CASSANDRA_TABLE'`" > column_schema.json```. Set $WRITETIME_CASSANDRA_COLUMN_SCHEMA to a GCS path, e.g. `gs://$BUCKET_NAME/column_schema.json`. Then upload the schema to GCS: `gcloud storage cp column_schema.json $WRITETIME_CASSANDRA_COLUMN_SCHEMA`. Requires Cassandra version 2.2 onwards for JSON support.
* **setZeroTimestamp**: The flag for setting Bigtable cell timestamp to 0 if Cassandra writetime is not present. The default behavior for when this flag is not set is to set the Bigtable cell timestamp as the template replication time, i.e. now.



Expand Down
Loading

0 comments on commit 71a6477

Please sign in to comment.