Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chmstimoteo patch2 #47

Merged
merged 8 commits into from
Sep 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 13 additions & 11 deletions config/config.yaml.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ vertex_ai:
query_parameters:
- { name: "input_date", type: "DATE", value: None } # If value is not defined then assume current_date()
#INT64
timeout: 1800.0
timeout: 3600.0
pipeline_parameters_substitutions: # Substitutions are applied to the parameters before compilation
customer_lifetime_value_label_procedure_name: "${project_id}.feature_store.invoke_customer_lifetime_value_label"
purchase_propensity_label_procedure_name: "${project_id}.feature_store.invoke_purchase_propensity_label"
Expand Down Expand Up @@ -186,6 +186,7 @@ vertex_ai:
# data_source_bigquery_table_path: "bq://${project_id}.purchase_propensity.v_purchase_propensity_training_15_7"
# data_source_bigquery_table_path: "bq://${project_id}.purchase_propensity.v_purchase_propensity_training_15_15"
data_source_bigquery_table_path: "bq://${project_id}.purchase_propensity.v_purchase_propensity_training_30_15"
data_source_bigquery_table_schema: "sql/schema/table/purchase_propensity_training_preparation.json"
dataflow_service_account: "df-worker@${project_id}.iam.gserviceaccount.com"
timestamp_split_key: null
stratified_split_key: null
Expand Down Expand Up @@ -329,6 +330,7 @@ vertex_ai:
data_source_csv_filenames: null
optimization_objective: minimize-mae # minimize-mae | minimize-rmse | minimize-rmsle
data_source_bigquery_table_path: "bq://${project_id}.customer_lifetime_value.v_customer_lifetime_value_training_180_30"
data_source_bigquery_table_schema: "sql/schema/table/customer_lifetime_value_training_preparation.json"
dataflow_service_account: "df-worker@${project_id}.iam.gserviceaccount.com"
timestamp_split_key: null
stratified_split_key: null
Expand Down Expand Up @@ -392,7 +394,7 @@ bigquery:
is_case_insensitive: TRUE
description: "Feature Store dataset for Marketing behavioural modeling"
friendly_name: "Feature Store"
max_time_travel_hours: 48
max_time_travel_hours: 168
default_partition_expiration_days: 365
default_table_expiration_days: 365
purchase_propensity:
Expand All @@ -403,7 +405,7 @@ bigquery:
is_case_insensitive: TRUE
description: "Purchase Propensity Use Case dataset for Marketing behavioural modeling"
friendly_name: "Purchase Propensity Dataset"
max_time_travel_hours: 48
max_time_travel_hours: 168
default_partition_expiration_days: 365
default_table_expiration_days: 365
customer_lifetime_value:
Expand All @@ -414,7 +416,7 @@ bigquery:
is_case_insensitive: TRUE
description: "Customer Lifetime Value Use Case dataset for Marketing behavioural modeling"
friendly_name: "Customer Lifetime Value Dataset"
max_time_travel_hours: 48
max_time_travel_hours: 168
default_partition_expiration_days: 365
default_table_expiration_days: 365
audience_segmentation:
Expand All @@ -425,7 +427,7 @@ bigquery:
is_case_insensitive: TRUE
description: "Audience Segmentation Use Case dataset for Marketing behavioural modeling"
friendly_name: "Audience Segmentation Dataset"
max_time_travel_hours: 48
max_time_travel_hours: 168
default_partition_expiration_days: 365
default_table_expiration_days: 365
table:
Expand Down Expand Up @@ -749,7 +751,7 @@ bigquery:
feature_store_dataset: "feature_store"
mds_project_id: "${project_id}"
mds_dataset: "${mds_dataset}"
expiration_duration_hours: 48
expiration_duration_hours: 168
samples_per_split: 100000
customer_lifetime_value_label:
project_id: "${project_id}"
Expand All @@ -767,7 +769,7 @@ bigquery:
feature_store_dataset: "feature_store"
mds_project_id: "${project_id}"
mds_dataset: "${mds_dataset}"
expiration_duration_hours: 48
expiration_duration_hours: 168
purchase_propensity_label:
project_id: "${project_id}"
dataset: "feature_store"
Expand All @@ -784,7 +786,7 @@ bigquery:
feature_store_dataset: "feature_store"
mds_project_id: "${project_id}"
mds_dataset: "${mds_dataset}"
expiration_duration_hours: 48
expiration_duration_hours: 168
user_dimensions:
project_id: "${project_id}"
dataset: "feature_store"
Expand Down Expand Up @@ -862,23 +864,23 @@ bigquery:
feature_store_project_id: "${project_id}"
feature_store_dataset: "feature_store"
insert_table: "purchase_propensity_inference_preparation"
expiration_duration_hours: 48
expiration_duration_hours: 168
customer_lifetime_value_inference_preparation:
project_id: "${project_id}"
dataset: "customer_lifetime_value"
name: "customer_lifetime_value_inference_preparation"
feature_store_project_id: "${project_id}"
feature_store_dataset: "feature_store"
insert_table: "customer_lifetime_value_inference_preparation"
expiration_duration_hours: 48
expiration_duration_hours: 168
audience_segmentation_inference_preparation:
project_id: "${project_id}"
dataset: "audience_segmentation"
name: "audience_segmentation_inference_preparation"
feature_store_project_id: "${project_id}"
feature_store_dataset: "feature_store"
insert_table: "audience_segmentation_inference_preparation"
expiration_duration_hours: 48
expiration_duration_hours: 168
mds_dataset: "${mds_dataset}"


Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,15 @@ resource "google_bigquery_dataset" "feature_store" {
description = local.config_bigquery.dataset.feature_store.description
location = local.config_bigquery.dataset.feature_store.location
max_time_travel_hours = local.config_bigquery.dataset.feature_store.max_time_travel_hours
delete_contents_on_destroy = false

labels = {
version = "pilot"
}

lifecycle {
ignore_changes = all
}
}

resource "google_bigquery_dataset" "purchase_propensity" {
Expand All @@ -32,10 +37,15 @@ resource "google_bigquery_dataset" "purchase_propensity" {
description = local.config_bigquery.dataset.purchase_propensity.description
location = local.config_bigquery.dataset.purchase_propensity.location
max_time_travel_hours = local.config_bigquery.dataset.purchase_propensity.max_time_travel_hours
delete_contents_on_destroy = false

labels = {
version = "pilot"
}

lifecycle {
ignore_changes = all
}
}

resource "google_bigquery_dataset" "customer_lifetime_value" {
Expand All @@ -45,10 +55,15 @@ resource "google_bigquery_dataset" "customer_lifetime_value" {
description = local.config_bigquery.dataset.customer_lifetime_value.description
location = local.config_bigquery.dataset.customer_lifetime_value.location
max_time_travel_hours = local.config_bigquery.dataset.customer_lifetime_value.max_time_travel_hours
delete_contents_on_destroy = false

labels = {
version = "pilot"
}

lifecycle {
ignore_changes = all
}
}

resource "google_bigquery_dataset" "audience_segmentation" {
Expand All @@ -58,8 +73,13 @@ resource "google_bigquery_dataset" "audience_segmentation" {
description = local.config_bigquery.dataset.audience_segmentation.description
location = local.config_bigquery.dataset.audience_segmentation.location
max_time_travel_hours = local.config_bigquery.dataset.audience_segmentation.max_time_travel_hours
delete_contents_on_destroy = false

labels = {
version = "pilot"
}

lifecycle {
ignore_changes = all
}
}
30 changes: 15 additions & 15 deletions infrastructure/terraform/modules/feature-store/bigquery-tables.tf
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ resource "google_bigquery_table" "audience_segmentation_inference_preparation" {
dataset_id = google_bigquery_dataset.audience_segmentation.dataset_id
table_id = local.config_bigquery.table.audience_segmentation_inference_preparation.table_name
description = local.config_bigquery.table.audience_segmentation_inference_preparation.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -30,7 +30,7 @@ resource "google_bigquery_table" "customer_lifetime_value_inference_preparation"
dataset_id = google_bigquery_dataset.customer_lifetime_value.dataset_id
table_id = local.config_bigquery.table.customer_lifetime_value_inference_preparation.table_name
description = local.config_bigquery.table.customer_lifetime_value_inference_preparation.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -42,7 +42,7 @@ resource "google_bigquery_table" "customer_lifetime_value_label" {
dataset_id = google_bigquery_dataset.feature_store.dataset_id
table_id = local.config_bigquery.table.customer_lifetime_value_label.table_name
description = local.config_bigquery.table.customer_lifetime_value_label.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -54,7 +54,7 @@ resource "google_bigquery_table" "purchase_propensity_inference_preparation" {
dataset_id = google_bigquery_dataset.purchase_propensity.dataset_id
table_id = local.config_bigquery.table.purchase_propensity_inference_preparation.table_name
description = local.config_bigquery.table.purchase_propensity_inference_preparation.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -66,7 +66,7 @@ resource "google_bigquery_table" "purchase_propensity_label" {
dataset_id = google_bigquery_dataset.feature_store.dataset_id
table_id = local.config_bigquery.table.purchase_propensity_label.table_name
description = local.config_bigquery.table.purchase_propensity_label.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -78,7 +78,7 @@ resource "google_bigquery_table" "user_dimensions" {
dataset_id = google_bigquery_dataset.feature_store.dataset_id
table_id = local.config_bigquery.table.user_dimensions.table_name
description = local.config_bigquery.table.user_dimensions.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -90,7 +90,7 @@ resource "google_bigquery_table" "user_lifetime_dimensions" {
dataset_id = google_bigquery_dataset.feature_store.dataset_id
table_id = local.config_bigquery.table.user_lifetime_dimensions.table_name
description = local.config_bigquery.table.user_lifetime_dimensions.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -103,7 +103,7 @@ resource "google_bigquery_table" "user_lookback_metrics" {
dataset_id = google_bigquery_dataset.feature_store.dataset_id
table_id = local.config_bigquery.table.user_lookback_metrics.table_name
description = local.config_bigquery.table.user_lookback_metrics.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -116,7 +116,7 @@ resource "google_bigquery_table" "user_rolling_window_lifetime_metrics" {
dataset_id = google_bigquery_dataset.feature_store.dataset_id
table_id = local.config_bigquery.table.user_rolling_window_lifetime_metrics.table_name
description = local.config_bigquery.table.user_rolling_window_lifetime_metrics.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -128,7 +128,7 @@ resource "google_bigquery_table" "user_rolling_window_metrics" {
dataset_id = google_bigquery_dataset.feature_store.dataset_id
table_id = local.config_bigquery.table.user_rolling_window_metrics.table_name
description = local.config_bigquery.table.user_rolling_window_metrics.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -140,7 +140,7 @@ resource "google_bigquery_table" "user_scoped_lifetime_metrics" {
dataset_id = google_bigquery_dataset.feature_store.dataset_id
table_id = local.config_bigquery.table.user_scoped_lifetime_metrics.table_name
description = local.config_bigquery.table.user_scoped_lifetime_metrics.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -152,7 +152,7 @@ resource "google_bigquery_table" "user_scoped_metrics" {
dataset_id = google_bigquery_dataset.feature_store.dataset_id
table_id = local.config_bigquery.table.user_scoped_metrics.table_name
description = local.config_bigquery.table.user_scoped_metrics.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -164,7 +164,7 @@ resource "google_bigquery_table" "user_scoped_segmentation_metrics" {
dataset_id = google_bigquery_dataset.feature_store.dataset_id
table_id = local.config_bigquery.table.user_scoped_segmentation_metrics.table_name
description = local.config_bigquery.table.user_scoped_segmentation_metrics.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -176,7 +176,7 @@ resource "google_bigquery_table" "user_segmentation_dimensions" {
dataset_id = google_bigquery_dataset.feature_store.dataset_id
table_id = local.config_bigquery.table.user_segmentation_dimensions.table_name
description = local.config_bigquery.table.user_segmentation_dimensions.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand All @@ -188,7 +188,7 @@ resource "google_bigquery_table" "user_session_event_aggregated_metrics" {
dataset_id = google_bigquery_dataset.feature_store.dataset_id
table_id = local.config_bigquery.table.user_session_event_aggregated_metrics.table_name
description = local.config_bigquery.table.user_session_event_aggregated_metrics.table_description
deletion_protection = false
deletion_protection = true
labels = {
version = "pilot"
}
Expand Down
43 changes: 27 additions & 16 deletions python/pipelines/pipeline_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,27 @@ def run_pipeline_from_func(
return pl


def _extract_schema_from_bigquery(
table_name: str,
table_schema: str,
) -> list:
from google.cloud import bigquery
from google.api_core import exceptions
try:
client = bigquery.Client()
table = client.get_table(table_name)
schema = [schema.name for schema in table.schema]
except exceptions.NotFound as e:
logging.warn(f'Pipeline compiled without columns transformation. \
Make sure the `data_source_bigquery_table_path` table or view exists! \
Loading default values from schema file {schema_name}.')
import json
with open(schema_name) as f:
d = json.load(f)
schema = [feature['name'] for feature in d]
return schema


def compile_automl_tabular_pipeline(
template_path: str,
parameters_path: str,
Expand Down Expand Up @@ -227,18 +248,10 @@ def compile_automl_tabular_pipeline(
pipeline_parameters['transformations'] = pipeline_parameters['transformations'].format(
timestamp=datetime.now().strftime("%Y%m%d%H%M%S"))

from google.cloud import bigquery
from google.api_core import exceptions

try:
client = bigquery.Client()
table = client.get_table(
pipeline_parameters['data_source_bigquery_table_path'].split('/')[-1])
schema = [schema.name for schema in table.schema]
except exceptions.NotFound as e:
logging.warn(f'Pipeline compiled without columns transformation. \
Make sure the `data_source_bigquery_table_path` table or view exists in your config.yaml!')
schema = []
schema = _extract_schema_from_bigquery(
table_name=pipeline_parameters['data_source_bigquery_table_path'].split('/')[-1],
table_schema=pipeline_parameters['data_source_bigquery_table_schema']
)

for column_to_remove in exclude_features + [
pipeline_parameters['target_column'],
Expand All @@ -249,10 +262,7 @@ def compile_automl_tabular_pipeline(
if column_to_remove in schema:
schema.remove(column_to_remove)

logging.info(f'features:{schema}' )
# need to remove later
# if "default" in schema:
# schema.remove("default")
logging.info(f'features:{schema}')

write_auto_transformations(pipeline_parameters['transformations'], schema)
if pipeline_parameters['predefined_split_key']:
Expand All @@ -262,6 +272,7 @@ def compile_automl_tabular_pipeline(

# write_to_gcs(pipeline_parameters['transform_config_path'], json.dumps(transformations))

pipeline_parameters.pop('data_source_bigquery_table_schema', None)
(
tp,
parameter_values,
Expand Down
Loading