This repository provides templates and reference implementations of Vertex AI Pipelines for production-grade training and batch prediction pipelines on GCP for:
Further useful documentation:
The sections below provide a general description of the ML pipelines (training and prediction) for both the TensorFlow template and XGBoost template. These two templates are similar in most ways and a complete overview of their key differences are given in their own README files:
As an example, the overall flow diagram for the TensorFlow training pipeline is shown below:
The training pipeline can be broken down into the following sequence of components at a high level:
Step No | Step Name | Description | Input(s) | Output(s) |
---|---|---|---|---|
1 | Generate Queries | Generate base preprocessing & train-test-validation split queries for Google BigQuery. This component only needs a .sql file and all parametrized values in that .sql file (source_dataset , source_table , filter_column , and so on) |
||
2 | BQ Query to Table | This component takes the generated query from the previous component & runs it on Google BigQuery to create the required table (preprocessed data/ train-test-validation data). | Output from Generate Queries | New Google BigQuery table created |
3 | Extract BQ to Dataset | Creates CSV file/s in Google Cloud Storage from the Google BigQuery tables created in the previous component | BigQuery table created from BQ Query to Table | BigQuery table converted to Google Cloud Storage objects as CSV/JSONL files and corresponding file directory and path |
4 | Generate Statistics | Generate training statistics based on ingested_dataset CSV files in Google Cloud Storage (output from previous component). This component relies on TFDV and also supports running on DataFlow - see here for more details. |
Google Cloud Storage CSV files for train_data from Extract BQ to Dataset |
Generated training statistics using TensorFlow data validation in Google Cloud Storage |
5 | Visualize Statistics | Create HTML files with visualizations for generated statistics. This component relies on TFDV. | Training statistics in Google Cloud Storage from Generate Statistics | HTML files with visualizations in Google Cloud Storage |
6 | Validate Schema | Use TensorFlow Data Validation (TFDV) to validate the schema for train_data against the the original tfdv schema stored in the assets folder (see here for more details). You can also find more information on schema validation here. |
Training statistics in Google Cloud Storage from Generate Statistics | Google Cloud Storage objects with validation details including anomalies |
7 | Show Anomalies | Use outputs from validate schema component to flag any schema anomalies (can error out if setup accordingly). If an ignore list is passed by the user to this component, all defined anomaly codes will be ignored from being flagged as an anomaly. | Google Cloud Storage objects with schema validation details including anomalies from Validate Schema | Error/warning message if any schema anomalies exist |
8 | Vertex Training | Run a Vertex Training job with train-validation data using a training component wrapped in a ContainerOp from google-cloud-pipeline-components |
|
Trained challenger model object/s or binaries stored in Google Cloud Storage |
9 | Challenger Model Predictions | Use the trained model to get challenger predictions for evaluation purposes |
|
Challenger predictions on test data stored as CSV files in Google Cloud Storage |
10 | Calculate Evaluation Metrics | Use predictions from previous component to compute user-defined evaluation metrics. This component uses TFMA. |
|
Evaluation metrics stored in Google Cloud Storage. Plots for all evaluation metrics and slices stored as HTML files in Google CLoud Storage |
11 | Lookup Model | Fetch the required model and its resource name if a previous champion model exists in Vertex AI | Base model name to check if it exists in Vertex AI | Model resource name as a string if a champion model exists else an empty string |
12 | Champion-Challenger Comparison | All model training activities until this point were for a new challenger model. Basic idea is to compare this newly trained challenger model with an existing champion model & decide deployment of this challenger model based on performance comparison.
|
Each of these components can be connected using .after({component_name})
or if the output of a preceding component is used as input for that component
Every component can be set with a display name using .set_display_name({display_name})
. Note that this applies to both the training pipeline and the prediction pipeline.
-
TFDV schema for data skew
-
A champion model
-
Training statistics generated from the training pipeline
-
A successful run of the training pipeline
As an example, the overall flow diagram for the TensorFlow prediction pipeline is shown below:
The prediction pipeline can be broken down into the following sequence of components at a high level:
Step No | Name | Description | Input(s) | Output(s) |
---|---|---|---|---|
1 | Generate Queries | Generate base preprocessing & prediction data creation queries for Google BigQuery. This component only needs a .sql file & all parametrized values in that .sql file (source_dataset, source_table, filter_column etc) |
||
2 | BQ Query to Table | This component takes the generated query from the previous component & runs it on Google BigQuery to create the required table (preprocessed data/ prediction data). | Output from Generate Queries | New Google BigQuery table created |
3 | Extract BQ to Dataset | Creates CSV/JSONL file/s in Google Cloud Storage from the Google BigQuery tables created in the previous component | BigQuery table created from BQ Query to Table. Full table name required i.e {project_id}.{dataset_id}.{table_id} |
BigQuery table converted to Google Cloud Storage objects as CSV/JSONL files, and corresponding file directory and path |
4 | Generate Statistics | Generate statistics based on prediction_data CSV files in Google Cloud Storage (output from previous component) |
Google Cloud Storage CSV files for prediction_data from Extract BQ to Dataset |
Generated statistics using TensorFlow data validation in Google Cloud Storage |
5 | Visualize Statistics | Create HTML files with visualizations for generated statistics | Prediction statistics in Google Cloud Storage from Generate Statistics | HTML files with visualizations in Google Cloud Storage |
6 | Validate Schema | Use TensorFlow data validation to validate skew/drift for prediction_data against the the original tfdv schema stored under assets/ . For more information on checking data skew and drift, see this. |
|
Google Cloud Storage objects with validation details including anomalies |
7 | Show Anomalies | Use outputs from validate schema component to flag any schema anomalies (can error out if setup accordingly). If an ignore list is passed by the user to this component, all defined anomaly codes will be ignored from being flagged as an anomaly. | Google Cloud Storage objects with schema validation details including anomalies from Validate Schema | Error/warning message if any schema anomalies exist |
8 | Lookup Model | Fetch the required model resource name for a champion model in Vertex AI. Since the prediction pipeline will always run after the training pipeline, a champion model will always exist | Base champion model name as a string | Champion model resource name as a string |
9a | Vertex Batch Predictions from Google Cloud Storage | Run a Vertex Batch Prediction job with prediction data as input in Tensorflow prediction Pipeline |
|
A batch prediction job artifact with metadata: resourceName(batch prediction job ID) and gcsOutputDirectory(output JSONL files in Google Cloud Storage) |
9b | Vertex Batch Predictions from BigQuery | Run a Vertex Batch Prediction job with prediction data as input in XGBoost prediction Pipeline |
|
A batch prediction job artifact with metadata: resourceName(batch prediction job ID) and bigqueryOutputTable(output BigQuery tables) |
10 | Load Dataset to BQ | Upload the batch predictions stored in Google Cloud Storage to a BigQuery table in Tensorflow prediction Pipeline | Batch predictions stored as JSONL files in Google Cloud Storage from Vertex Batch Predictions The specific URL could be read from metadata from batch prediction job | Google BigQuery table with the batch predictions & input instances/features |
Each of these components can be connected using .after({component_name})
or if the output of a preceding component is used as input for that component
Every component can be set with a display name using .set_display_name({display_name})
. Note that this applies to both the training pipeline and the prediction pipeline.
The final loading stage has an optional argument dataset_location
which is the location to run the loading job and must match the location of the destination table. It is defaulted to "EU" in the component definition.
Each pipeline (training or prediction) requires the provision of a tfdv schema. Ideally, this file will be created by a data scientist at experimentation time after the computation of the statistics.
The schema file can then be placed in the appropriate assets
folder and be version controlled. Remember to re-point tfdv_schema_path
in your pipeline input parameters.
As a concrete example, we illustrate how we created the tfdv schema for the taxi trips dataset in this notebook. For more information on how to generate tfdv schema files using best practices, see the official TFDV documentation.
- Trigger script can be found here
In order to orchestrate machine learning (ML) pipelines on Vertex AI, you need to configure a few things.
The first thing to do is to copy env.sh.example
to env.sh
, and fill in the values of env.sh
with the values relevant to your development/sandbox environment. These environment variables will be used when you trigger pipeline runs in Vertex AI.
The ML pipelines have input parameters. As you can see in the pipeline definition files (pipelines/<xgboost|tensorflow>/<training|prediction>/pipeline.py
), they have default values, and some of these default values are derived from environment variables (which in turn are defined in env.sh
as described above).
When triggering ad hoc runs in your dev/sandbox environment, or when running the E2E tests in CI, these default values are used. For the test and production deployments, the pipeline parameters are defined in the Terraform code for the Cloud Scheduler jobs (envs/<test|prod>/variables.auto.tfvars
).
You can specify the Python base image and packages required for KFP components in the @component
decorator using the base_image
and packages_to_install
arguments respectively.
In general there are two methods to configure compute resources in each pipeline. Firstly, by setting the machine_type
variable in XGboost training pipeline, XGboost prediction pipeline, Tensorflow training pipeline, Tensorflow prediction pipeline. The default value is n1-standard-4
with 4 core CPUs and 15GB memory.
Secondly, in order to manage the requirements of each step in your pipeline, you can set up machine type on the pipeline steps. This is because some steps might need more computational resources than others. For example, when you run calculate_eval_metrics
with a large input data, you can increase CPU and memory limits by applying .set_cpu_limit({CPU_LIMIT})
and .set_memory_limit('MEMORY_LIMIT')
for that component.
- CPU_LIMIT: The maximum CPU limit for this operator. This string value can be a number (integer value for number of CPUs), or a number followed by "m", which means 1/1000. You can specify at most 96 CPUs.
- MEMORY_LIMIT: The maximum memory limit for this operator. This string value can be a number, or a number followed by "K" (kilobyte), "M" (megabyte), or "G" (gigabyte). At most 624GB is supported.
For more information, please refer to the guide on specifying machine types for a pipeline step.
When Vertex AI Pipelines runs a pipeline, it checks to see whether or not an execution exists in Vertex ML Metadata with the interface (cache key) of each pipeline step (component).
If the component is exactly the same and the arguments are exactly the same as in some previous execution, then the task can be skipped and the outputs of the old step can be used.
Since most of the ML projects take a long time and expensive computation resources, it is cost-effective to use cache when you are sure that the output of components is correct.
In terms of how to control cache reuse behavior, in generally, you can do it for either a component or the entire pipeline (for all components).
If you want to control caching behavior for individual components, add .set_caching_options(<True|False>)
after each component when building a pipeline.
To change the caching behaviour of ALL components within a pipeline, you can specify this when you trigger the pipeline like so: make run pipeline=<training|prediction> enable_caching=<true|false>
It is suggested to start by disabling caching of components during development, until you have a good idea of how the caching behaviour works, as it can lead to unexpected results.
Note:
Disable caching if you want to use new data:
When caching is enabled for the pipeline, changing timestamp
or source of data (such as ingestion_dataset_id
) will only change the output of
corresponding step, for example Ingest data
. While for the other components, which take the same arguments, they will use caches instead of using the new data.
In the training pipelines, a Champion-Challenger evaluation is conducted via the models with same name pattern in the same project. Explore lookup_model.py
for more detailed information.
In practice, you should be aware of that and give the model a specific name related to the ML project you are working on once the new model is not comparable with the previous models.
For example, when you want to train a new model using different features, the best practice is to change you model name in the pipeline input parameters.