Skip to content

Commit

Permalink
Adding ML Pipelines Documentation (#182)
Browse files Browse the repository at this point in the history
* predicting for only the users with traffic in the past 72h - purchase propensity

* running inference only for users events in the past 72h

* including 72h users for all models predictions

* considering null values in TabWorkflow models

* deleting unused pipfile

* upgrading lib versions

* implementing reporting preprocessing as a new pipeline

* adding more code documentation

* adding important information on the main README.md and DEVELOPMENT.md

* adding schedule run name and more code documentation

* implementing a new scheduler using the vertex ai sdk & adding user_id to procedures for consistency

* adding more code documentation

* adding code doc to the python custom component

* adding more code documentation

* fixing aggregated predictions query

* removing unnecessary resources from deployment

* Writing MDS guide

* adding the MDS developer and troubleshooting documentation

* fixing deployment for activation pipelines and gemini dataset

* Update README.md

* Update README.md

* Update README.md

* Update README.md

* removing deprecated api

* fixing purchase propensity pipelines names

* adding extra condition for when there is not enough data for the window interval to be applied on backfill procedures

* adding more instructions for post deployment and fixing issues when GA4 export was configured for less than 10 days

* removing unnecessary comments

* adding the number of past days to process in the variables files

* adding comment about combining data from different ga4 export datasets to data store

* fixing small issues with feature engineering and ml pipelines

* fixing hyper parameter tuning for kmeans modeling

* fixing optuna parameters

* adding cloud shell image

* fixing the list of all possible users in the propensity training preparation tables

* additional guardrails for when there is not enough data

* adding more documentation

* adding more doc to feature store

* add feature store documentation

* adding ml pipelines docs

* adding ml pipelines docs

* adding more documentation

* adding user agent client info

---------

Co-authored-by: Carlos Timoteo <[email protected]>
  • Loading branch information
chmstimoteo and Carlos Timoteo authored Aug 23, 2024
1 parent 9faf49f commit 8b1ddc1
Show file tree
Hide file tree
Showing 11 changed files with 169 additions and 16 deletions.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file modified docs/images/pipelines_bqml_training_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/images/pipelines_gemini_architecture.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
22 changes: 22 additions & 0 deletions docs/pipelines.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ This architecture diagram illustrates the process of running the BQML model pred
* **Prepare model for prediction**: The predictions are prepared and saved in BigQuery in a format to be consumed by the Activation application.
* **Trigger activation application**: A pub/sub message is sent to trigger the Activation application by providing the adequate parameters related to the use case.

![Gemini Pipeline architecture](images/pipelines_gemini_architecture.png)

This architecture diagram illustrates the process of running the reporting data preparation and gemini insights pipelines. All these steps are implemented as Vertex AI Pipeline components that are executed in BigQuery, its main core components are:

* **Data sources**: For the aggregated predictions stored procedure, the latest predictions are joined for all users which contains all the use case predictions. This singl table is used as single pane of glass to build reports on Looker.
* **Aggregate predictions from all use cases**: Dynamically identify the latest predictions table for each use case and prepares the final joined table.
* **Aggregate daily/weekly/monthly metrics and generate Gemini insights**: Ingest users revenue and behaviour metrics daily features and aggregates them daily, weekly and monthly to prompt Gemini 1.5 via the BigQuery external connection to Vertex AI.
* **Store aggregated predictions for all users**: Stores the joined predictions in a BigQuery table for further consumption in Looker reports.
* **Store generated insights**: Stores the generated insights in BigQuery tables for further consumption in Looker reports.

## Who is this solution for?

We heard common stories from customers who were struggling with three frequent objectives:
Expand Down Expand Up @@ -150,6 +160,18 @@ In comparison to other approaches, Tabular Workflow offers customers with the fo

## ML Pipelines used by each Use Case

The ML Pipelines already implemented and used by each use case are listed below:

| Use Case | Vertex AI Pipeline | Pipeline Name | BigQuery Dataset |
| -------- | ------- | -------------- | ---- |
| Purchase Propensity | [purchase_propensity_training_pipeline](../python/pipelines/pipeline_ops.py) <br> [purchase_propensity_prediction_pipeline](../python/pipelines/tabular_pipelines.py) | [purchase-propensity-training-pl](images/pipeline_purchase_propensity_training_uploaded.png) <br> [purchase-propensity-prediction-pl](images/pipeline_purchase_propensity_prediction_uploaded.png) | [purchase_propensity](../infrastructure/terraform/modules/feature-store/bigquery-datasets.tf) |
| Churn Propensity | [purchase_propensity_training_pipeline](../python/pipelines/pipeline_ops.py) <br> [purchase_propensity_prediction_pipeline](../python/pipelines/tabular_pipelines.py) | [purchase-propensity-training-pl](images/pipeline_purchase_propensity_training_uploaded.png) <br> [purchase-propensity-prediction-pl](images/pipeline_purchase_propensity_prediction_uploaded.png) | [purchase_propensity](../infrastructure/terraform/modules/feature-store/bigquery-datasets.tf) |
| Customer Lifetime Value | [purchase_propensity_training_pipeline](../python/pipelines/pipeline_ops.py) <br> [purchase_propensity_prediction_pipeline](../python/pipelines/tabular_pipelines.py) | [purchase-propensity-training-pl](images/pipeline_purchase_propensity_training_uploaded.png) <br> [purchase-propensity-prediction-pl](images/pipeline_purchase_propensity_prediction_uploaded.png) | [purchase_propensity](../infrastructure/terraform/modules/feature-store/bigquery-datasets.tf) |
| Demographic Audience Segmentation | [purchase_propensity_training_pipeline](../python/pipelines/pipeline_ops.py) <br> [purchase_propensity_prediction_pipeline](../python/pipelines/tabular_pipelines.py) | [purchase-propensity-training-pl](images/pipeline_purchase_propensity_training_uploaded.png) <br> [purchase-propensity-prediction-pl](images/pipeline_purchase_propensity_prediction_uploaded.png) | [purchase_propensity](../infrastructure/terraform/modules/feature-store/bigquery-datasets.tf) |
| Interest based Audience Segmentation | [purchase_propensity_training_pipeline](../python/pipelines/pipeline_ops.py) <br> [purchase_propensity_prediction_pipeline](../python/pipelines/tabular_pipelines.py) | [purchase-propensity-training-pl](images/pipeline_purchase_propensity_training_uploaded.png) <br> [purchase-propensity-prediction-pl](images/pipeline_purchase_propensity_prediction_uploaded.png) | [purchase_propensity](../infrastructure/terraform/modules/feature-store/bigquery-datasets.tf) |
| Aggregated Value Based Bidding | [purchase_propensity_training_pipeline](../python/pipelines/pipeline_ops.py) <br> [purchase_propensity_prediction_pipeline](../python/pipelines/tabular_pipelines.py) | [purchase-propensity-training-pl](images/pipeline_purchase_propensity_training_uploaded.png) <br> [purchase-propensity-prediction-pl](images/pipeline_purchase_propensity_prediction_uploaded.png) | [purchase_propensity](../infrastructure/terraform/modules/feature-store/bigquery-datasets.tf) |


## ML Pipelines Desing Principles

## Deploy ML Pipelines
Expand Down
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ google-cloud-pubsub = "2.15.0"
google-analytics-admin = "0.22.7"
google-analytics-data = "^0.18.0"
pyarrow = "15.0.2"
google-auth-oauthlib = "^1.2.1"
oauth2client = "^4.1.3"
google-cloud-core = "^2.4.1"

[tool.poetry.group.component_vertex.dependencies]
google-cloud-aiplatform = "1.52.0"
Expand Down
3 changes: 3 additions & 0 deletions python/base_component_image/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ scikit-learn = "1.2.2"
#matplotlib= "3.7.2"
#seaborn = "0.12.2"
pyarrow = "15.0.2"
google-auth-oauthlib = "^1.2.1"
oauth2client = "^4.1.3"
google-cloud-core = "^2.4.1"

[build-system]
requires = ["poetry-core>=1.0.0"]
Expand Down
5 changes: 4 additions & 1 deletion python/function/trigger_activation/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@
from datetime import datetime
from google.cloud import dataflow_v1beta3

from google.api_core.gapic_v1.client_info import ClientInfo

USER_AGENT_ACTIVATION = 'cloud-solutions/marketing-analytics-jumpstart-activation-v1'

@functions_framework.cloud_event
def subscribe(cloud_event):
Expand Down Expand Up @@ -93,7 +96,7 @@ def subscribe(cloud_event):
location=region,
launch_parameter=flex_template_param
)
client = dataflow_v1beta3.FlexTemplatesServiceClient()
client = dataflow_v1beta3.FlexTemplatesServiceClient(client_info=ClientInfo(user_agent=USER_AGENT_ACTIVATION))
response = client.launch_flex_template(request=request)

print(response)
58 changes: 43 additions & 15 deletions python/pipelines/components/bigquery/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@
import os
import yaml

from google.api_core.gapic_v1.client_info import ClientInfo

USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'

config_file_path = os.path.join(os.path.dirname(
__file__), '../../../../config/config.yaml')

Expand Down Expand Up @@ -56,7 +68,8 @@ def bq_stored_procedure_exec(

client = bigquery.Client(
project=project,
location=location
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_FEATURES)
)

params = []
Expand Down Expand Up @@ -185,7 +198,8 @@ def bq_clustering_exec(

client = bigquery.Client(
project=project_id,
location=location
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_TRAINING)
)

logging.info(f"BQML Model Training Query: {query}")
Expand Down Expand Up @@ -300,7 +314,8 @@ def list(cls):
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_PREDICTION)
)

# TODO(developer): Set dataset_id to the ID of the dataset that contains
Expand Down Expand Up @@ -410,7 +425,8 @@ def bq_clustering_predictions(

client = bigquery.Client(
project=project_id,
location=location
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_PREDICTION)
)

query = f"""
Expand Down Expand Up @@ -463,7 +479,8 @@ def bq_flatten_tabular_binary_prediction_table(
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_PROPENSITY_PREDICTION)
)

# Inspect the metadata set on destination_table and predictions_table
Expand Down Expand Up @@ -535,7 +552,8 @@ def bq_flatten_tabular_binary_prediction_table(
# Reconstruct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=bq_table.location
location=bq_table.location,
client_info=ClientInfo(user_agent=USER_AGENT_PROPENSITY_PREDICTION)
)
query_job = client.query(
query=query,
Expand Down Expand Up @@ -577,7 +595,8 @@ def bq_flatten_tabular_regression_table(
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_REGRESSION_PREDICTION)
)

# Inspect the metadata set on destination_table and predictions_table
Expand Down Expand Up @@ -617,7 +636,8 @@ def bq_flatten_tabular_regression_table(
# Reconstruct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=bq_table.location
location=bq_table.location,
client_info=ClientInfo(user_agent=USER_AGENT_REGRESSION_PREDICTION)
)
query_job = client.query(
query=query,
Expand Down Expand Up @@ -656,7 +676,8 @@ def bq_flatten_kmeans_prediction_table(
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_PREDICTION)
)

# Make an API request.
Expand Down Expand Up @@ -747,7 +768,8 @@ def bq_dynamic_query_exec_output(
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_TRAINING)
)

# Construct query template
Expand Down Expand Up @@ -856,7 +878,8 @@ def bq_dynamic_stored_procedure_exec_output_full_dataset_preparation(
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_TRAINING)
)

def _create_auto_audience_segmentation_full_dataset_preparation_procedure(
Expand Down Expand Up @@ -1004,7 +1027,8 @@ def bq_union_predictions_tables(
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=location
location=location,
client_info=ClientInfo(user_agent=USER_AGENT_REGRESSION_PREDICTION)
)

# Inspect the metadata set on destination_table and predictions_table
Expand Down Expand Up @@ -1127,7 +1151,8 @@ def bq_union_predictions_tables(
# Reconstruct a BigQuery client object.
client = bigquery.Client(
project=project_id,
location=bq_table_regression.location
location=bq_table_regression.location,
client_info=ClientInfo(user_agent=USER_AGENT_REGRESSION_PREDICTION)
)
query_job = client.query(
query=query,
Expand Down Expand Up @@ -1166,8 +1191,11 @@ def write_tabular_model_explanation_to_bigquery(
from google.api_core import exceptions
import time

client = bigquery.Client(project=project,
location=data_location)
client = bigquery.Client(
project=project,
location=data_location,
client_info=ClientInfo(user_agent=USER_AGENT_VBB_EXPLANATION)
)

feature_names = model_explanation.metadata['feature_names']
values = model_explanation.metadata['values']
Expand Down
14 changes: 14 additions & 0 deletions python/pipelines/components/python/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,18 @@
import os
import yaml

from google.api_core.gapic_v1.client_info import ClientInfo

USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'

config_file_path = os.path.join(os.path.dirname(
__file__), '../../../../config/config.yaml')

Expand Down Expand Up @@ -102,6 +114,7 @@ def train_scikit_cluster_model(
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_TRAINING)
#location=location
)

Expand Down Expand Up @@ -314,6 +327,7 @@ def hyper_parameter_tuning_scikit_audience_model(
# Construct a BigQuery client object.
client = bigquery.Client(
project=project_id,
client_info=ClientInfo(user_agent=USER_AGENT_SEGMENTATION_TRAINING)
#location=location
)

Expand Down
80 changes: 80 additions & 0 deletions python/pipelines/components/vertex/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,17 @@
from kfp.dsl import component, Output, Artifact, Model, Input, Metrics, ClassificationMetrics, Dataset
from ma_components.vertex import VertexModel

from google.api_core.gapic_v1.client_info import ClientInfo

USER_AGENT_FEATURES = 'cloud-solutions/marketing-analytics-jumpstart-features-v1'
USER_AGENT_PROPENSITY_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-propensity-training-v1'
USER_AGENT_PROPENSITY_PREDICTION= 'cloud-solutions/marketing-analytics-jumpstart-propensity-prediction-v1'
USER_AGENT_REGRESSION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-regression-training-v1'
USER_AGENT_REGRESSION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-regression-prediction-v1'
USER_AGENT_SEGMENTATION_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-training-v1'
USER_AGENT_SEGMENTATION_PREDICTION = 'cloud-solutions/marketing-analytics-jumpstart-segmentation-prediction-v1'
USER_AGENT_VBB_TRAINING = 'cloud-solutions/marketing-analytics-jumpstart-vbb-training-v1'
USER_AGENT_VBB_EXPLANATION = 'cloud-solutions/marketing-analytics-jumpstart-vbb-explanation-v1'

pyproject_toml_file_path = os.path.join(os.path.dirname(__file__), '../../../../pyproject.toml')
config_file_path = os.path.join(os.path.dirname(__file__), '../../../../config/config.yaml')
Expand Down Expand Up @@ -90,6 +101,20 @@ def elect_best_tabular_model(
from pprint import pformat
from enum import Enum
#from google_cloud_pipeline_components.types.artifact_types import VertexModel
from google.cloud import aiplatform

api_endpoint = "us-central1-aiplatform.googleapis.com"
# The AI Platform services require regional API endpoints.
client_options = {"api_endpoint": api_endpoint}
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.gapic.JobServiceClient(
client_options=client_options,
client_info=ClientInfo(user_agent=USER_AGENT_PROPENSITY_PREDICTION)
)
del client


class MetricsEnum(Enum):
# classification
Expand Down Expand Up @@ -243,6 +268,20 @@ def get_latest_model(
from pprint import pformat
from enum import Enum
#from google_cloud_pipeline_components.types.artifact_types import VertexModel
from google.cloud import aiplatform

api_endpoint = "us-central1-aiplatform.googleapis.com"
# The AI Platform services require regional API endpoints.
client_options = {"api_endpoint": api_endpoint}
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.gapic.JobServiceClient(
client_options=client_options,
client_info=ClientInfo(user_agent=USER_AGENT_PROPENSITY_PREDICTION)
)
del client


class MetricsEnum(Enum):
LOG_LOSS = 'logLoss'
Expand Down Expand Up @@ -352,6 +391,20 @@ def batch_prediction(
import logging
from google.cloud import bigquery
from google.cloud.aiplatform import Model
from google.cloud import aiplatform

api_endpoint = "us-central1-aiplatform.googleapis.com"
# The AI Platform services require regional API endpoints.
client_options = {"api_endpoint": api_endpoint}
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.gapic.JobServiceClient(
client_options=client_options,
client_info=ClientInfo(user_agent=USER_AGENT_PROPENSITY_PREDICTION)
)
del client

model = Model(f"{model.metadata['resourceName']}@{model.metadata['version']}")
timestamp = str(int(datetime.now().timestamp()))

Expand Down Expand Up @@ -418,6 +471,20 @@ def return_unmanaged_model(
from google_cloud_pipeline_components import v1
from google_cloud_pipeline_components.types import artifact_types
from kfp import dsl
from google.cloud import aiplatform

api_endpoint = "us-central1-aiplatform.googleapis.com"
# The AI Platform services require regional API endpoints.
client_options = {"api_endpoint": api_endpoint}
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.gapic.JobServiceClient(
client_options=client_options,
client_info=ClientInfo(user_agent=USER_AGENT_PROPENSITY_PREDICTION)
)
del client


model_uri = f"gs://{bucket_name}/{model_name}"
model.metadata['containerSpec'] = {
Expand Down Expand Up @@ -454,6 +521,19 @@ def get_tabular_model_explanation(
from google.cloud import aiplatform
import logging
import re
from google.cloud import aiplatform

api_endpoint = "us-central1-aiplatform.googleapis.com"
# The AI Platform services require regional API endpoints.
client_options = {"api_endpoint": api_endpoint}
client_options = {"api_endpoint": api_endpoint}
# Initialize client that will be used to create and send requests.
# This client only needs to be created once, and can be reused for multiple requests.
client = aiplatform.gapic.JobServiceClient(
client_options=client_options,
client_info=ClientInfo(user_agent=USER_AGENT_VBB_EXPLANATION)
)
del client

#Get explanaitions from the AutoML API
aiplatform.init(project=project, location=location)
Expand Down

0 comments on commit 8b1ddc1

Please sign in to comment.