Skip to content

Commit

Permalink
Added a new pipeline for auto audience segmentation
Browse files Browse the repository at this point in the history
  • Loading branch information
zvizdo committed Sep 14, 2023
1 parent a1cd54a commit 20342ec
Show file tree
Hide file tree
Showing 22 changed files with 3,268 additions and 35 deletions.
51 changes: 51 additions & 0 deletions config/config.yaml.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,8 @@ vertex_ai:
CALL `{customer_lifetime_value_inference_preparation_procedure_name}`();"
query_audience_segmentation_inference_preparation: "
CALL `{audience_segmentation_inference_preparation_procedure_name}`();"
query_auto_audience_segmentation_inference_preparation: "
CALL `{auto_audience_segmentation_inference_preparation_procedure_name}`();"

query_purchase_propensity_training_preparation: "
CALL `{purchase_propensity_training_preparation_procedure_name}`();"
Expand Down Expand Up @@ -209,6 +211,7 @@ vertex_ai:
purchase_propensity_inference_preparation_procedure_name: "${project_id}.purchase_propensity.invoke_purchase_propensity_inference_preparation"
customer_lifetime_value_inference_preparation_procedure_name: "${project_id}.customer_lifetime_value.invoke_customer_lifetime_value_inference_preparation"
audience_segmentation_inference_preparation_procedure_name: "${project_id}.audience_segmentation.invoke_audience_segmentation_inference_preparation"
auto_audience_segmentation_inference_preparation_procedure_name: "${project_id}.auto_audience_segmentation.invoke_auto_audience_segmentation_inference_preparation"

purchase_propensity_training_preparation_procedure_name: "${project_id}.purchase_propensity.invoke_purchase_propensity_training_preparation"
customer_lifetime_value_training_preparation_procedure_name: "${project_id}.customer_lifetime_value.invoke_customer_lifetime_value_training_preparation"
Expand Down Expand Up @@ -360,6 +363,28 @@ vertex_ai:
pubsub_activation_type: "audience-segmentation-15" # audience-segmentation-15
pipeline_parameters_substitutions: null

auto_segmentation:
prediction:
name: "auto-segmentation-prediction-pl"
job_id_prefix: "auto-segmentation-prediction-pl-"
experiment_name: "auto-segmentation-prediction"
type: "custom"
schedule:
cron: "TZ=America/New_York 0 12 * * *"
max_concurrent_run_count: 1
start_time: null
end_time: null
state: ACTIVE # possible states ACTIVE or PAUSED
pipeline_parameters:
project_id: "${project_id}"
location: "us-central1"
model_name: "interest-cluster-model"
bigquery_source: "${project_id}.auto_audience_segmentation.v_auto_audience_segmentation_inference_15"
bigquery_destination_prefix: "${project_id}.auto_audience_segmentation.p_auto_audience_segmentation_inference_15"
pubsub_activation_topic: "activation-trigger"
pubsub_activation_type: "auto-audience-segmentation-15"
pipeline_parameters_substitutions: null

clv:
training:
name: "clv-training-pl"
Expand Down Expand Up @@ -488,6 +513,17 @@ bigquery:
max_time_travel_hours: 48
default_partition_expiration_days: 365
default_table_expiration_days: 365
auto_audience_segmentation:
project_id: "${project_id}"
name: "auto_audience_segmentation"
location: "${location}"
collation: "und:ci"
is_case_insensitive: TRUE
description: "Auto Audience Segmentation Use Case dataset for Marketing behavioural modeling"
friendly_name: "Auto Audience Segmentation Dataset"
max_time_travel_hours: 48
default_partition_expiration_days: 365
default_table_expiration_days: 365
table:
audience_segmentation_inference_preparation:
project_id: "${project_id}"
Expand Down Expand Up @@ -582,6 +618,8 @@ bigquery:
query:
audience_segmentation_query_template:
none: none
auto_audience_segmentation_query_template:
none: none
purchase_propensity_query_template:
none: none
cltv_query_template:
Expand Down Expand Up @@ -799,6 +837,10 @@ bigquery:
project_id: "${project_id}"
dataset: "audience_segmentation"
stored_procedure: "audience_segmentation_inference_preparation"
invoke_auto_audience_segmentation_inference_preparation:
project_id: "${project_id}"
dataset: "auto_audience_segmentation"
stored_procedure: "auto_audience_segmentation_inference_preparation"
procedure:
audience_segmentation_training_preparation:
project_id: "${project_id}"
Expand Down Expand Up @@ -940,5 +982,14 @@ bigquery:
insert_table: "audience_segmentation_inference_preparation"
expiration_duration_hours: 48
mds_dataset: "${mds_dataset}"
auto_audience_segmentation_inference_preparation:
project_id: "${project_id}"
dataset: "auto_audience_segmentation"
name: "auto_audience_segmentation_inference_preparation"
feature_store_project_id: "${project_id}"
feature_store_dataset: "feature_store"
insert_table: "auto_audience_segmentation_inference_preparation"
expiration_duration_hours: 12
mds_dataset: "${mds_dataset}"


16 changes: 12 additions & 4 deletions infrastructure/terraform/modules/activation/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ locals {
trigger_function_dir = "${local.source_root_dir}/python/function"
configuration_folder = "configuration"
audience_segmentation_query_template_file = "audience_segmentation_query_template.sqlx"
auto_audience_segmentation_query_template_file = "auto_audience_segmentation_query_template.sqlx"
cltv_query_template_file = "cltv_query_template.sqlx"
purchase_propensity_query_template_file = "purchase_propensity_query_template.sqlx"
measurement_protocol_payload_template_file = "app_payload_template.jinja2"
Expand Down Expand Up @@ -245,6 +246,12 @@ resource "google_storage_bucket_object" "audience_segmentation_query_template_fi
bucket = module.pipeline_bucket.name
}

resource "google_storage_bucket_object" "auto_audience_segmentation_query_template_file" {
name = "${local.configuration_folder}/${local.auto_audience_segmentation_query_template_file}"
source = "${local.sql_dir}/${local.auto_audience_segmentation_query_template_file}"
bucket = module.pipeline_bucket.name
}

resource "google_storage_bucket_object" "cltv_query_template_file" {
name = "${local.configuration_folder}/${local.cltv_query_template_file}"
source = "${local.sql_dir}/${local.cltv_query_template_file}"
Expand All @@ -260,10 +267,11 @@ resource "google_storage_bucket_object" "purchase_propensity_query_template_file
data "template_file" "activation_type_configuration" {
template = file("${local.template_dir}/activation_type_configuration_template.tpl")
vars = {
audience_segmentation_query_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.audience_segmentation_query_template_file.output_name}"
cltv_query_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.cltv_query_template_file.output_name}"
purchase_propensity_query_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.purchase_propensity_query_template_file.output_name}"
measurement_protocol_payload_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.measurement_protocol_payload_template_file.output_name}"
audience_segmentation_query_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.audience_segmentation_query_template_file.output_name}"
auto_audience_segmentation_query_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.auto_audience_segmentation_query_template_file.output_name}"
cltv_query_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.cltv_query_template_file.output_name}"
purchase_propensity_query_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.purchase_propensity_query_template_file.output_name}"
measurement_protocol_payload_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.measurement_protocol_payload_template_file.output_name}"
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,19 @@ resource "google_bigquery_dataset" "audience_segmentation" {
location = local.config_bigquery.dataset.audience_segmentation.location
max_time_travel_hours = local.config_bigquery.dataset.audience_segmentation.max_time_travel_hours

labels = {
version = "pilot"
}
}

resource "google_bigquery_dataset" "auto_audience_segmentation" {
dataset_id = local.config_bigquery.dataset.auto_audience_segmentation.name
friendly_name = local.config_bigquery.dataset.auto_audience_segmentation.friendly_name
project = local.auto_audience_segmentation_project_id
description = local.config_bigquery.dataset.auto_audience_segmentation.description
location = local.config_bigquery.dataset.auto_audience_segmentation.location
max_time_travel_hours = local.config_bigquery.dataset.auto_audience_segmentation.max_time_travel_hours

labels = {
version = "pilot"
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,25 @@ resource "google_bigquery_routine" "audience_segmentation_inference_preparation"
}
}

data "local_file" "auto_audience_segmentation_inference_preparation_file" {
filename = "${local.sql_dir}/procedure/auto_audience_segmentation_inference_preparation.sql"
}

resource "google_bigquery_routine" "auto_audience_segmentation_inference_preparation" {
project = var.project_id
dataset_id = google_bigquery_dataset.auto_audience_segmentation.dataset_id
routine_id = "auto_audience_segmentation_inference_preparation"
routine_type = "PROCEDURE"
language = "SQL"
definition_body = data.local_file.auto_audience_segmentation_inference_preparation_file.content
description = "Procedure that prepares features for Auto Audience Segmentation model inference. User-per-day granularity level features. Run this procedure every time before Auto Audience Segmentation model predict."
arguments {
name = "inference_date"
mode = "INOUT"
data_type = jsonencode({ "typeKind" : "DATE" })
}
}


data "local_file" "audience_segmentation_training_preparation_file" {
filename = "${local.sql_dir}/procedure/audience_segmentation_training_preparation.sql"
Expand Down
1 change: 1 addition & 0 deletions infrastructure/terraform/modules/feature-store/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ locals {
feature_store_project_id = local.config_vars.bigquery.dataset.feature_store.project_id
purchase_propensity_project_id = local.config_vars.bigquery.dataset.purchase_propensity.project_id
audience_segmentation_project_id = local.config_vars.bigquery.dataset.audience_segmentation.project_id
auto_audience_segmentation_project_id = local.config_vars.bigquery.dataset.auto_audience_segmentation.project_id
customer_lifetime_value_project_id = local.config_vars.bigquery.dataset.customer_lifetime_value.project_id
project_id = local.feature_store_project_id
sql_dir = var.sql_dir_input
Expand Down
20 changes: 20 additions & 0 deletions infrastructure/terraform/modules/pipelines/pipelines.tf
Original file line number Diff line number Diff line change
Expand Up @@ -276,4 +276,24 @@ resource "null_resource" "compile_segmentation_prediction_pipelines" {
depends_on = [
null_resource.compile_segmentation_training_pipelines
]
}

resource "null_resource" "compile_auto_segmentation_prediction_pipelines" {
triggers = {
working_dir = "${local.source_root_dir}/python"
tag = local.compile_pipelines_tag
}

provisioner "local-exec" {
command = <<-EOT
${var.poetry_run_alias} python -m pipelines.compiler -c ${local.config_file_path_relative_python_run_dir} -p vertex_ai.pipelines.auto_segmentation.prediction -o auto_segmentation_prediction.yaml
${var.poetry_run_alias} python -m pipelines.uploader -c ${local.config_file_path_relative_python_run_dir} -f auto_segmentation_prediction.yaml -t ${self.triggers.tag} -t latest
${var.poetry_run_alias} python -m pipelines.scheduler -c ${local.config_file_path_relative_python_run_dir} -p vertex_ai.pipelines.auto_segmentation.prediction
EOT
working_dir = self.triggers.working_dir
}

depends_on = [
null_resource.compile_feature_engineering_pipelines
]
}
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ toml = "0.10.2"
docker = "^6.0.1"
ma-components = {path = "python/base_component_image/", develop = true}
google-cloud-pubsub = "2.15.0"
google-analytics-admin = "0.17.0"
google-analytics-admin = "0.20.0"

[tool.poetry.group.component_vertex.dependencies]
google-cloud-aiplatform = "1.22.0"
Expand Down
8 changes: 6 additions & 2 deletions python/activation/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,11 @@ def process(self, element):
yield json.loads(payload_str)

def date_to_micro(self, date_str):
return int(datetime.datetime.strptime(date_str, self.date_format).timestamp()*1E6)
try: # try if date_str is in ISO timestamp format
return int(datetime.datetime.fromisoformat(date_str).timestamp() * 1E6)

except Exception as e:
return int(datetime.datetime.strptime(date_str, self.date_format).timestamp() * 1E6)

def generate_param_fields(self, element):
element_copy = element.copy()
Expand Down Expand Up @@ -229,7 +233,7 @@ def run(argv=None):
use_json_exports=True,
use_standard_sql=True)
| "transform to Measurement Protocol API payload" >> beam.ParDo(TransformToPayload(activation_type_configuration['measurement_protocol_payload_template'], activation_type_configuration['activation_event_name']))
| 'POST event to Measurement Protocol API' >> beam.ParDo(CallMeasurementProtocolAPI(activation_options.ga4_measurement_id, activation_options.ga4_api_secret, debug=activation_options.use_api_validation))
| 'POST event to Measurement Protocol API' >> beam.ParDo(CallMeasurementProtocolAPI(activation_options.ga4_measurement_id, activation_options.ga4_api_secret, debug=activation_options.use_api_validation))
)

success_responses = ( measurement_api_responses
Expand Down
Loading

0 comments on commit 20342ec

Please sign in to comment.