Skip to content

Commit

Permalink
[Bug-fix] Tabular Workflow schema update (#67)
Browse files Browse the repository at this point in the history
* updating table schema and schedules

* fixing classifier prediction flattenning logic

* fixing union prediction tables for cltv inference

* changes in default parameters

---------

Co-authored-by: Carlos Timoteo <[email protected]>
  • Loading branch information
chmstimoteo and Carlos Timoteo authored Jan 4, 2024
1 parent 007f8d5 commit 963573e
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 67 deletions.
18 changes: 9 additions & 9 deletions config/config.yaml.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ vertex_ai:
experiment_name: "propensity-training"
type: "tabular-workflows"
schedule:
cron: "TZ=America/New_York 0 0 * * SAT"
cron: "TZ=America/New_York 0 8 * * SAT"
max_concurrent_run_count: 1
start_time: null
end_time: null
Expand Down Expand Up @@ -293,7 +293,7 @@ vertex_ai:
experiment_name: "propensity-prediction"
type: "custom"
schedule:
cron: "TZ=America/New_York 0 0 * * SUN"
cron: "TZ=America/New_York 0 5 * * *"
max_concurrent_run_count: 1
start_time: null
end_time: null
Expand Down Expand Up @@ -328,7 +328,7 @@ vertex_ai:
experiment_name: "segmentation-training"
type: "custom"
schedule:
cron: "TZ=America/New_York 0 0 * * SAT"
cron: "TZ=America/New_York 0 12 * * SAT"
max_concurrent_run_count: 1
start_time: null
end_time: null
Expand Down Expand Up @@ -361,7 +361,7 @@ vertex_ai:
experiment_name: "segmentation-prediction"
type: "custom"
schedule:
cron: "TZ=America/New_York 0 0 * * SUN"
cron: "TZ=America/New_York 0 7 * * *"
max_concurrent_run_count: 1
start_time: null
end_time: null
Expand All @@ -387,7 +387,7 @@ vertex_ai:
experiment_name: "auto-segmentation-prediction"
type: "custom"
schedule:
cron: "TZ=America/New_York 0 12 * * *"
cron: "TZ=America/New_York 0 2 * * *"
max_concurrent_run_count: 1
start_time: null
end_time: null
Expand All @@ -411,7 +411,7 @@ vertex_ai:
experiment_name: "propensity-clv-training"
type: "tabular-workflows"
schedule:
cron: "TZ=America/New_York 0 0 * * SAT"
cron: "TZ=America/New_York 0 16 * * SAT"
max_concurrent_run_count: 1
start_time: null
end_time: null
Expand Down Expand Up @@ -461,7 +461,7 @@ vertex_ai:
experiment_name: "clv-training"
type: "tabular-workflows"
schedule:
cron: "TZ=America/New_York 0 0 * * SAT"
cron: "TZ=America/New_York 0 20 * * SAT"
max_concurrent_run_count: 1
start_time: null
end_time: null
Expand Down Expand Up @@ -509,7 +509,7 @@ vertex_ai:
experiment_name: "clv-prediction"
type: "custom"
schedule:
cron: "TZ=America/New_York 0 0 * * SUN"
cron: "TZ=America/New_York 0 6 * * *"
max_concurrent_run_count: 1
start_time: null
end_time: null
Expand All @@ -528,7 +528,7 @@ vertex_ai:
positive_label: "1"
clv_model_display_name: "clv-training-pl-model" # must match the model name defined in the training pipeline. for now it is {NAME_OF_PIPELINE}-model
clv_model_metric_name: "meanAbsoluteError" #'rootMeanSquaredError', 'meanAbsoluteError', 'meanAbsolutePercentageError', 'rSquared', 'rootMeanSquaredLogError'
clv_model_metric_threshold: 100
clv_model_metric_threshold: 400
number_of_clv_models_considered: 1
purchase_bigquery_source: "${project_id}.purchase_propensity.v_purchase_propensity_inference_30_30"
purchase_bigquery_destination_prefix: "${project_id}.customer_lifetime_value"
Expand Down
4 changes: 2 additions & 2 deletions infrastructure/terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,11 @@ locals {
project_toml_content_hash = filesha512(local.project_toml_file_path)

generated_sql_queries_directory_path = "${local.source_root_dir}/sql/query"
generated_sql_queries_fileset = [for f in fileset(local.generated_sql_queries_directory_path, "*.sql") : "${local.generated_sql_queries_directory_path}/${f}"]
generated_sql_queries_fileset = [for f in fileset(local.generated_sql_queries_directory_path, "*.sqlx") : "${local.generated_sql_queries_directory_path}/${f}"]
generated_sql_queries_content_hash = sha512(join("", [for f in local.generated_sql_queries_fileset : fileexists(f) ? filesha512(f) : sha512("file-not-found")]))

generated_sql_procedures_directory_path = "${local.source_root_dir}/sql/procedure"
generated_sql_procedures_fileset = [for f in fileset(local.generated_sql_procedures_directory_path, "*.sql") : "${local.generated_sql_procedures_directory_path}/${f}"]
generated_sql_procedures_fileset = [for f in fileset(local.generated_sql_procedures_directory_path, "*.sqlx") : "${local.generated_sql_procedures_directory_path}/${f}"]
generated_sql_procedures_content_hash = sha512(join("", [for f in local.generated_sql_procedures_fileset : fileexists(f) ? filesha512(f) : sha512("file-not-found")]))
}

Expand Down
103 changes: 70 additions & 33 deletions python/pipelines/components/bigquery/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -393,19 +393,40 @@ def bq_flatten_tabular_binary_prediction_table(
f"no prediction field found in given table {predictions_table.metadata['table_id']}")

query = f"""
CREATE OR REPLACE TABLE `{destination_table.metadata["table_id"]}` AS (SELECT
CASE
WHEN {predictions_column}.classes[OFFSET(0)]='{positive_label}' AND {predictions_column}.scores[OFFSET(0)]> {threashold} THEN 'true'
WHEN {predictions_column}.classes[OFFSET(1)]='{positive_label}' AND {predictions_column}.scores[OFFSET(1)]> {threashold} THEN 'true'
ELSE 'false'
END AS {destination_table.metadata["predictions_column"]},
CASE
WHEN {predictions_column}.classes[OFFSET(0)]='{positive_label}' THEN {predictions_column}.scores[OFFSET(0)]
ELSE {predictions_column}.scores[OFFSET(1)]
END AS prediction_prob, b.*
FROM `{predictions_table.metadata['table_id']}` as a
INNER JOIN `{source_table}` as b on a.{bq_unique_key}=b.{bq_unique_key}
)
CREATE OR REPLACE TEMP TABLE prediction_indexes AS (
SELECT
(SELECT offset from UNNEST({predictions_column}.classes) c with offset where c = "0") AS index_z,
(SELECT offset from UNNEST({predictions_column}.classes) c with offset where c = "1") AS index_one,
{predictions_column} as {predictions_column},
* EXCEPT({predictions_column})
FROM `{predictions_table.metadata['table_id']}`
);
CREATE OR REPLACE TEMP TABLE prediction_greatest_scores AS (
SELECT
{predictions_column}.scores[SAFE_OFFSET(index_z)] AS score_zero,
{predictions_column}.scores[SAFE_OFFSET(index_one)] AS score_one,
GREATEST({predictions_column}.scores[SAFE_OFFSET(index_z)], {predictions_column}.scores[SAFE_OFFSET(index_one)]) AS greatest_score,
*
FROM prediction_indexes
);
CREATE OR REPLACE TABLE `{destination_table.metadata["table_id"]}` AS (
SELECT
CASE
WHEN a.score_zero > {threashold} THEN 'false'
WHEN a.score_one > {threashold} THEN 'true'
ELSE 'false'
END AS {destination_table.metadata["predictions_column"]},
CASE
WHEN a.score_zero > {threashold} THEN a.score_zero
WHEN a.score_one > {threashold} THEN a.score_one
ELSE a.score_zero
END as prediction_prob,
b.*
FROM prediction_greatest_scores as a
INNER JOIN `{source_table}` as b on a.{bq_unique_key}=b.{bq_unique_key}
);
"""


Expand Down Expand Up @@ -631,28 +652,44 @@ def bq_union_predictions_tables(
destination_table.metadata["table_id"] = f"{predictions_table_regression.metadata['table_id']}_final"
destination_table.metadata["predictions_column"] = 'prediction'
query = f"""
CREATE TEMP TABLE flattened_prediction AS (
SELECT
CASE
WHEN {predictions_column_propensity}.classes[OFFSET(1)]='0' AND {predictions_column_propensity}.scores[OFFSET(1)]> {threashold} THEN 'false'
WHEN {predictions_column_propensity}.classes[OFFSET(0)]='1' AND {predictions_column_propensity}.scores[OFFSET(0)]> {threashold} THEN 'true'
ELSE 'false'
END AS {predictions_column_regression},
CASE
WHEN {predictions_column_propensity}.classes[OFFSET(1)]='0' AND {predictions_column_propensity}.scores[OFFSET(1)]> {threashold} THEN
{predictions_column_propensity}.scores[OFFSET(1)]
WHEN {predictions_column_propensity}.classes[OFFSET(0)]='1' AND {predictions_column_propensity}.scores[OFFSET(0)]> {threashold} THEN
{predictions_column_propensity}.scores[OFFSET(0)]
ELSE {predictions_column_propensity}.scores[OFFSET(0)]
END AS prediction_prob,
a.* EXCEPT({predictions_column_propensity})
FROM `{predictions_table_propensity.metadata['table_id']}` as a
CREATE OR REPLACE TEMP TABLE prediction_indexes AS (
SELECT
(SELECT offset from UNNEST({predictions_column_propensity}.classes) c with offset where c = "0") AS index_zero,
(SELECT offset from UNNEST({predictions_column_propensity}.classes) c with offset where c = "1") AS index_one,
{predictions_column_propensity},
* EXCEPT({predictions_column_propensity})
FROM `{predictions_table_propensity.metadata['table_id']}`
);
CREATE OR REPLACE TEMP TABLE prediction_greatest_scores AS (
SELECT
{predictions_column_propensity}.scores[SAFE_OFFSET(index_zero)] AS score_zero,
{predictions_column_propensity}.scores[SAFE_OFFSET(index_one)] AS score_one,
GREATEST({predictions_column_propensity}.scores[SAFE_OFFSET(index_zero)], {predictions_column_propensity}.scores[SAFE_OFFSET(index_one)]) AS greatest_score,
* EXCEPT({predictions_column_propensity})
FROM prediction_indexes
);
CREATE OR REPLACE TEMP TABLE flattened_prediction AS (
SELECT
CASE
WHEN a.score_zero > {threashold} THEN 'false'
WHEN a.score_one > {threashold} THEN 'true'
ELSE 'false'
END AS {predictions_column_regression},
CASE
WHEN a.score_zero > {threashold} THEN a.score_zero
WHEN a.score_one > {threashold} THEN a.score_one
ELSE a.score_zero
END AS prediction_prob,
a.*
FROM prediction_greatest_scores AS a
);
CREATE TEMP TABLE non_purchasers_prediction AS (
CREATE OR REPLACE TEMP TABLE non_purchasers_prediction AS (
SELECT
B.{table_regression_bq_unique_key},
0.0 AS clv_prediction,
GREATEST(0.0, COALESCE(A.max_daily_revenue / A.average_daily_purchasers, 0.0) * A.avg_user_conversion_rate) AS clv_prediction,
B.* EXCEPT({table_regression_bq_unique_key}, {predictions_column_regression})
FROM
flattened_prediction A
Expand All @@ -663,10 +700,10 @@ def bq_union_predictions_tables(
AND A.{table_propensity_bq_unique_key} = B.{table_regression_bq_unique_key}
);
CREATE TEMP TABLE purchasers_prediction AS (
CREATE OR REPLACE TEMP TABLE purchasers_prediction AS (
SELECT
B.{table_regression_bq_unique_key},
GREATEST(0.0, B.{predictions_column_regression}) AS clv_prediction,
GREATEST(COALESCE(A.max_daily_revenue / A.average_daily_purchasers, 0.0) * A.avg_user_conversion_rate, B.{predictions_column_regression}) AS clv_prediction,
B.* EXCEPT({table_regression_bq_unique_key}, {predictions_column_regression})
FROM
flattened_prediction A
Expand Down
7 changes: 2 additions & 5 deletions sql/query/audience_segmentation_query_template.sqlx
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,5 @@ SELECT user_pseudo_id AS client_id,
ltv_revenue_past_7_15_day AS ltv_revenue_past_7_15,
geo_region
FROM `{{source_table}}`
WHERE prediction IN (3,4,14)
AND geo_region IN ('California', 'New York', 'Washington')
AND visits_past_1_7_day = 1
ORDER BY ltv_revenue_past_7_15_day DESC
LIMIT 1000
WHERE prediction IS NOT NULL
LIMIT 10000
4 changes: 3 additions & 1 deletion sql/query/auto_audience_segmentation_query_template.sqlx
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@ SELECT
user_id AS client_id,
EXTRACT(DATE FROM feature_timestamp AT TIME ZONE 'UTC') AS inference_date,
prediction as prediction
FROM `{{source_table}}`
FROM `{{source_table}}`
WHERE prediction IS NOT NULL
LIMIT 10000
2 changes: 1 addition & 1 deletion sql/query/cltv_query_template.sqlx
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@ SELECT * EXCEPT(user_pseudo_id, processed_timestamp),
FROM `{{source_table}}`
WHERE prediction > 0
ORDER BY prediction DESC
LIMIT 1000
LIMIT 10000
11 changes: 6 additions & 5 deletions sql/query/invoke_audience_segmentation_training_preparation.sqlx
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,13 @@ DECLARE validation_split_end_number INT64 DEFAULT NULL;
DECLARE max_date DATE;
DECLARE min_date DATE;
SET max_date = (SELECT DATE_SUB(MAX(event_date), INTERVAL {{interval_max_date}} DAY) FROM `{{mds_project_id}}.{{mds_dataset}}.event`);
SET min_date = (SELECT DATE_ADD(MIN(event_date), INTERVAL {{interval_min_date}} DAY) FROM `{{mds_project_id}}.{{mds_dataset}}.event`);
SELECT max_date;
SELECT min_date;
SET min_date = (SELECT DATE_ADD(MIN(event_date), INTERVAL {{interval_min_date}} DAY) FROM `{{mds_project_id}}.{{mds_dataset}}.event`);

#SET train_start_date = DATE '{{train_start_date}}';
SET train_start_date = min_date;
#SET train_end_date = DATE '{{train_end_date}}';
SET train_end_date = max_date;

SET train_start_date = DATE '{{train_start_date}}';
SET train_end_date = DATE '{{train_end_date}}';
SET train_split_end_number = {{train_split_end_number}}; -- If you want 60% for training use number 5. If you want 80% use number 7.
SET validation_split_end_number = {{validation_split_end_number}};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ DECLARE purchasers INT64 DEFAULT NULL;
DECLARE max_date DATE;
DECLARE min_date DATE;
SET max_date = (SELECT DATE_SUB(MAX(event_date), INTERVAL {{interval_max_date}} DAY) FROM `{{mds_project_id}}.{{mds_dataset}}.event`);
SET min_date = (SELECT DATE_ADD(MIN(event_date), INTERVAL {{interval_min_date}} DAY) FROM `{{mds_project_id}}.{{mds_dataset}}.event`);
SELECT max_date;
SELECT min_date;
SET min_date = (SELECT DATE_ADD(MIN(event_date), INTERVAL {{interval_min_date}} DAY) FROM `{{mds_project_id}}.{{mds_dataset}}.event`);

#SET train_start_date = DATE '{{train_start_date}}';
SET train_start_date = min_date;
#SET train_end_date = DATE '{{train_end_date}}';
SET train_end_date = max_date;

SET train_start_date = DATE '{{train_start_date}}';
SET train_end_date = DATE '{{train_end_date}}';
SET train_split_end_number = {{train_split_end_number}}; -- If you want 60% for training use number 5. If you want 80% use number 7.
SET validation_split_end_number = {{validation_split_end_number}};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,12 @@ DECLARE max_date DATE;
DECLARE min_date DATE;
SET max_date = (SELECT DATE_SUB(MAX(event_date), INTERVAL {{interval_max_date}} DAY) FROM `{{mds_project_id}}.{{mds_dataset}}.event`);
SET min_date = (SELECT DATE_ADD(MIN(event_date), INTERVAL {{interval_min_date}} DAY) FROM `{{mds_project_id}}.{{mds_dataset}}.event`);
SELECT max_date;
SELECT min_date;

SET train_start_date = DATE '{{train_start_date}}';
SET train_end_date = DATE '{{train_end_date}}';
#SET train_start_date = DATE '{{train_start_date}}';
SET train_start_date = min_date;
#SET train_end_date = DATE '{{train_end_date}}';
SET train_end_date = max_date;

SET train_split_end_number = {{train_split_end_number}}; -- If you want 60% for training use number 5. If you want 80% use number 7.
SET validation_split_end_number = {{validation_split_end_number}};

Expand Down
3 changes: 1 addition & 2 deletions sql/query/purchase_propensity_query_template.sqlx
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,5 @@ SELECT * EXCEPT(user_pseudo_id, processed_timestamp, user_id),
EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date
FROM `{{source_table}}`
WHERE prediction = 'true'
AND prediction_prob > 0.5
ORDER BY prediction_prob DESC
LIMIT 1000
LIMIT 10000

0 comments on commit 963573e

Please sign in to comment.