Skip to content

Commit

Permalink
Merge branch 'develop' into issue1252-add_parquet_orc
Browse files Browse the repository at this point in the history
  • Loading branch information
gcf-merge-on-green[bot] authored Oct 11, 2024
2 parents 69d6555 + 7e947d5 commit 46eb3fa
Show file tree
Hide file tree
Showing 33 changed files with 1,150 additions and 583 deletions.
37 changes: 27 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,13 @@ data-validation
[--case-insensitive-match, -cim]
Performs a case insensitive match by adding an UPPER() before comparison.
```
#### Generate Table Partitions for Large Table Row Validations
#### Generate Partitions for Large Row Validations

When performing row validations, Data Validation Tool brings each row into memory and can run into MemoryError. Below is the command syntax for generating table partitions in order to perform row validations on large tables to alleviate MemoryError. Each partition contains a range of primary key(s) and the ranges of keys across partitions are distinct. The partitions have nearly equal number of rows. See *Primary Keys* section
When performing row validations, Data Validation Tool brings each row into memory and can run into MemoryError. Below is the command syntax for generating partitions in order to perform row validations on large dataset (table or custom-query) to alleviate MemoryError. Each partition contains a range of primary key(s) and the ranges of keys across partitions are distinct. The partitions have nearly equal number of rows. See *Primary Keys* section

The command generates and stores multiple YAML validations each representing a chunk of the large table using filters (`WHERE primary_key(s) >= X AND primary_key(s) < Y`) in one YAML file. The parameter parts-per-file, specifies the number of validations in one YAML file. Each yaml file will have parts-per-file validations in it - except the last one which will contain the remaining partitions (i.e. parts-per-file may not divide partition-num evenly). You can then run the validations in the directory serially (or in parallel in multiple containers, VMs) with the `data-validation configs run --config-dir PATH` command as described [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator#yaml-configuration-files).
The command generates and stores multiple YAML validations each representing a chunk of the large dataset using filters (`WHERE primary_key(s) >= X AND primary_key(s) < Y`) in YAML files. The parameter parts-per-file, specifies the number of validations in one YAML file. Each yaml file will have parts-per-file validations in it - except the last one which will contain the remaining partitions (i.e. parts-per-file may not divide partition-num evenly). You can then run the validations in the directory serially (or in parallel in multiple containers, VMs) with the `data-validation configs run --config-dir PATH` command as described [here](https://github.com/GoogleCloudPlatform/professional-services-data-validator#yaml-configuration-files).

The command takes the same parameters as required for `Row Validation` *plus* a few parameters to support partitioning. Single and multiple primary keys are supported and keys can be of any indexable type, except for date and timestamp type. A parameter used in earlier versions, ```partition-key``` is no longer supported.
The command takes the same parameters as required for `Row Validation` *plus* a few parameters to support partitioning. Single and multiple primary keys are supported and keys can be of any indexable type, except for date and timestamp type. You can specify tables that are being validated or the source and target custom query. A parameter used in earlier versions, ```partition-key``` is no longer supported.

```
data-validation
Expand All @@ -251,6 +251,17 @@ data-validation
Comma separated list of tables in the form schema.table=target_schema.target_table
Target schema name and table name are optional.
i.e 'bigquery-public-data.new_york_citibike.citibike_trips'
Either --tables-list or --source-query (or file) and --target-query (or file) must be provided
--source-query SOURCE_QUERY, -sq SOURCE_QUERY
Source sql query
Either --tables-list or --source-query (or file) and --target-query (or file) must be provided
--source-query-file SOURCE_QUERY_FILE, -sqf SOURCE_QUERY_FILE
File containing the source sql command. Supports GCS and local paths.
--target-query TARGET_QUERY, -tq TARGET_QUERY
Target sql query
Either --tables-list or --source-query (or file) and --target-query (or file) must be provided
--target-query-file TARGET_QUERY_FILE, -tqf TARGET_QUERY_FILE
File containing the target sql command. Supports GCS and local paths.
--primary-keys PRIMARY_KEYS, -pk PRIMARY_KEYS
Comma separated list of primary key columns 'col_a,col_b'. See *Primary Keys* section
--comparison-fields or -comp-fields FIELDS
Expand All @@ -262,6 +273,7 @@ data-validation
Directory Path to store YAML Config Files
GCS: Provide a full gs:// path of the target directory. Eg: `gs://<BUCKET>/partitions_dir`
Local: Provide a relative path of the target directory. Eg: `partitions_dir`
If invoked with -tbls parameter, the validations are stored in a directory named <schema>.<table>, otherwise the directory is named `custom.<random_string>`
--partition-num INT, -pn INT
Number of partitions into which the table should be split, e.g. 1000 or 10000
In case this value exceeds the row count of the source/target table, it will be decreased to max(source_row_count, target_row_count)
Expand Down Expand Up @@ -540,10 +552,10 @@ View the complete YAML file for a Grouped Column validation on the

### Scaling DVT

You can scale DVT for large table validations by running the tool in a distributed manner. To optimize the validation speed for large tables, you can use GKE Jobs ([Google Kubernetes Jobs](https://cloud.google.com/kubernetes-engine/docs/how-to/deploying-workloads-overview#batch_jobs)) or [Cloud Run Jobs](https://cloud.google.com/run/docs/create-jobs). If you are not familiar with Kubernetes or Cloud Run Jobs, see [Scaling DVT with Distributed Jobs](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/docs/internal/distributed_jobs.md) for a detailed overview.
You can scale DVT for large validations by running the tool in a distributed manner. To optimize the validation speed for large tables, you can use GKE Jobs ([Google Kubernetes Jobs](https://cloud.google.com/kubernetes-engine/docs/how-to/deploying-workloads-overview#batch_jobs)) or [Cloud Run Jobs](https://cloud.google.com/run/docs/create-jobs). If you are not familiar with Kubernetes or Cloud Run Jobs, see [Scaling DVT with Distributed Jobs](https://github.com/GoogleCloudPlatform/professional-services-data-validator/blob/develop/docs/internal/distributed_jobs.md) for a detailed overview.


We recommend first generating table partitions with the `generate-table-partitions` command for your large tables. Then, use Cloud Run or GKE to distribute validating each chunk in parallel. See the [Cloud Run Jobs Quickstart sample](https://github.com/GoogleCloudPlatform/professional-services-data-validator/tree/develop/samples/cloud_run_jobs) to get started.
We recommend first generating partitions with the `generate-table-partitions` command for your large datasets (tables or queries). Then, use Cloud Run or GKE to distribute the validation of each chunk in parallel. See the [Cloud Run Jobs Quickstart sample](https://github.com/GoogleCloudPlatform/professional-services-data-validator/tree/develop/samples/cloud_run_jobs) to get started.

When running DVT in a distributed fashion, both the `--kube-completions` and `--config-dir` flags are required. The `--kube-completions` flag specifies that the validation is being run in indexed completion mode in Kubernetes or as multiple independent tasks in Cloud Run. If the `-kc` option is used and you are not running in indexed mode, you will receive a warning and the container will process all the validations sequentially. If the `-kc` option is used and a config directory is not provided (i.e. a `--config-file` is provided instead), a warning is issued.

Expand Down Expand Up @@ -678,10 +690,15 @@ resultset that breaks down the count of rows per calendar date.
Row level validations can involve either a hash/checksum, concat, or comparison fields.
A hash validation (`--hash '*'`) will first sanitize the data with the following
operations on all or selected columns: CAST to string, IFNULL replace with a default
replacement string, RSTRIP, and UPPER. Then, it will CONCAT() the results
and run a SHA256() hash and compare the source and target results. Since each row will
be returned in the result set, it is recommended to utilize the `--use-random-row` feature
to validate a subset of the table.
replacement string and RSTRIP. Then, it will CONCAT() the results
and run a SHA256() hash and compare the source and target results.

When there are data type mismatches for columns, for example dates compared to timestamps and
booleans compared with numeric columns, you may see other expressions in SQL statements which
ensure that consistent values are used to build comparison values.

Since each row will be returned in the result set if is recommended recommended to validate a
subset of the table. The `--filters` and `--use-random-row` options can be used for this purpose.

Please note that SHA256 is not a supported function on Teradata systems. If you wish to perform
this comparison on Teradata you will need to [deploy a UDF to perform the conversion](https://github.com/akuroda/teradata-udf-sha2/blob/master/src/sha256.c).
Expand Down
77 changes: 31 additions & 46 deletions data_validation/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,13 @@ def get_calculated_config(args, config_manager: ConfigManager) -> List[dict]:


def build_config_from_args(args: Namespace, config_manager: ConfigManager):
"""Append build configs to ConfigManager object.
"""This function is used to append build configs to the config manager for all validation commands and generate-table-partitions.
Instead of having two separate commands, e.g. validate row and validate custom-query row, generate-table-partitions
uses implicit choice of table or custom-query. A user can specify either tables or source/target query/file,
but not both. In the case of generate-table-partitions with custom query, the user will not provide
args.custom_query_type. However, the code will inject args.custom_query_type as 'row' before invoking
build_config_from_args.
Args:
args (Namespace): User specified Arguments
Expand All @@ -227,14 +233,14 @@ def build_config_from_args(args: Namespace, config_manager: ConfigManager):

# Append SCHEMA_VALIDATION configs
if config_manager.validation_type == consts.SCHEMA_VALIDATION:
if args.exclusion_columns is not None:
if args.exclusion_columns:
exclusion_columns = cli_tools.get_arg_list(args.exclusion_columns)
config_manager.append_exclusion_columns(
[col.casefold() for col in exclusion_columns]
)
config_manager.append_allow_list(args.allow_list, args.allow_list_file)

# Append CUSTOM_QUERY configs
# Append configs specific to CUSTOM_QUERY (i.e. query strings or strings from files)
if config_manager.validation_type == consts.CUSTOM_QUERY:
config_manager.append_custom_query_type(args.custom_query_type)

Expand All @@ -252,60 +258,35 @@ def build_config_from_args(args: Namespace, config_manager: ConfigManager):
)
)

# For custom-query column command
if args.custom_query_type == consts.COLUMN_VALIDATION.lower():
config_manager.append_aggregates(get_aggregate_config(args, config_manager))

# For custom-query row command
if args.custom_query_type == consts.ROW_VALIDATION.lower():
# Append Comparison fields
if args.comparison_fields is not None:
comparison_fields = cli_tools.get_arg_list(
args.comparison_fields, default_value=[]
)

# As per #1190, add rstrip for Teradata string comparison fields
if (
config_manager.source_client.name == "teradata"
or config_manager.target_client.name == "teradata"
):
comparison_fields = config_manager.add_rstrip_to_comp_fields(
comparison_fields
)

config_manager.append_comparison_fields(
config_manager.build_config_comparison_fields(comparison_fields)
)

# Append calculated fields: --hash/--concat
config_manager.append_calculated_fields(
get_calculated_config(args, config_manager)
)

# Append primary_keys
primary_keys = cli_tools.get_arg_list(args.primary_keys)
config_manager.append_primary_keys(
config_manager.build_column_configs(primary_keys)
)

# Append COLUMN_VALIDATION configs
if config_manager.validation_type == consts.COLUMN_VALIDATION:
# Append COLUMN_VALIDATION configs, including custom-query column validation
if (
config_manager.validation_type == consts.COLUMN_VALIDATION
or config_manager.validation_type == consts.CUSTOM_QUERY
and args.custom_query_type == consts.COLUMN_VALIDATION.lower()
):
config_manager.append_aggregates(get_aggregate_config(args, config_manager))
if args.grouped_columns is not None:
if (
config_manager.validation_type == consts.COLUMN_VALIDATION
and args.grouped_columns # grouped_columns not supported in custom queries - at least now.
):
grouped_columns = cli_tools.get_arg_list(args.grouped_columns)
config_manager.append_query_groups(
config_manager.build_column_configs(grouped_columns)
)

# Append ROW_VALIDATION configs
if config_manager.validation_type == consts.ROW_VALIDATION:
# Append ROW_VALIDATION configs, including custom-query row validation
if (
config_manager.validation_type == consts.ROW_VALIDATION
or config_manager.validation_type == consts.CUSTOM_QUERY
and args.custom_query_type == consts.ROW_VALIDATION.lower()
):
# Append calculated fields: --hash/--concat
config_manager.append_calculated_fields(
get_calculated_config(args, config_manager)
)

# Append Comparison fields
if args.comparison_fields is not None:
if args.comparison_fields:
comparison_fields = cli_tools.get_arg_list(
args.comparison_fields, default_value=[]
)
Expand Down Expand Up @@ -599,7 +580,11 @@ def partition_and_store_config_files(args: Namespace) -> None:
None
"""
# Default Validate Type
config_managers = build_config_managers_from_args(args, consts.ROW_VALIDATION)
if args.tables_list:
config_managers = build_config_managers_from_args(args, consts.ROW_VALIDATION)
else:
setattr(args, "custom_query_type", "row")
config_managers = build_config_managers_from_args(args, consts.CUSTOM_QUERY)
partition_builder = PartitionBuilder(config_managers, args)
partition_builder.partition_configs()

Expand Down
Loading

0 comments on commit 46eb3fa

Please sign in to comment.