From 29efe2a3685c0cfb492a5f301bcaf5cbb522d018 Mon Sep 17 00:00:00 2001 From: Carlos Timoteo Date: Wed, 10 Jan 2024 17:09:26 -0500 Subject: [PATCH] Bug-fix: Measurement Protocol Message (#68) * implemeting user parameter and event parameters for MP api * validating new MP message payload * removing bad strings from client_id - MP message payload --------- Co-authored-by: Carlos Timoteo --- infrastructure/terraform/main.tf | 2 +- .../terraform/modules/activation/main.tf | 19 +++++ pyproject.toml | 5 +- python/activation/main.py | 69 +++++++++++++++---- python/ga4_setup/setup.py | 2 +- .../audience_segmentation_query_template.sqlx | 29 +++----- ..._audience_segmentation_query_template.sqlx | 12 ++-- sql/query/cltv_query_template.sqlx | 15 ++-- .../purchase_propensity_query_template.sqlx | 16 +++-- templates/app_payload_template.jinja2 | 6 +- 10 files changed, 114 insertions(+), 61 deletions(-) diff --git a/infrastructure/terraform/main.tf b/infrastructure/terraform/main.tf index c2c372a2..e80e0cd2 100644 --- a/infrastructure/terraform/main.tf +++ b/infrastructure/terraform/main.tf @@ -89,7 +89,7 @@ resource "local_file" "feature_store_configuration" { resource "null_resource" "poetry_install" { triggers = { - create_command = "${var.poetry_cmd} install" + create_command = "${var.poetry_cmd} lock && ${var.poetry_cmd} install" source_contents_hash = local.project_toml_content_hash } diff --git a/infrastructure/terraform/modules/activation/main.tf b/infrastructure/terraform/modules/activation/main.tf index 0c361e14..3b09f29d 100644 --- a/infrastructure/terraform/modules/activation/main.tf +++ b/infrastructure/terraform/modules/activation/main.tf @@ -40,6 +40,19 @@ locals { activation_type_configuration_file = "${local.source_root_dir}/templates/activation_type_configuration_template.tpl" activation_type_configuration_file_content_hash = filesha512(local.activation_type_configuration_file) + app_payload_template_file = "${local.source_root_dir}/templates/app_payload_template.jinja2" + app_payload_template_file_content_hash = filesha512(local.activation_type_configuration_file) + + activation_application_dir = "${local.source_root_dir}/python/activation" + activation_application_fileset = [ + "${local.activation_application_dir}/main.py", + "${local.activation_application_dir}/Dockerfile", + "${local.activation_application_dir}/metadata.json", + "${local.activation_application_dir}/requirements.txt", + "${local.activation_application_dir}/pipeline_test.py", + ] + activation_application_content_hash = sha512(join("", [for f in local.activation_application_fileset : fileexists(f) ? filesha512(f) : sha512("file-not-found")])) + audience_segmentation_activation_query_file = "${local.source_root_dir}/sql/query/audience_segmentation_query_template.sqlx" audience_segmentation_activation_query_file_content_hash = filesha512(local.audience_segmentation_activation_query_file) } @@ -112,6 +125,8 @@ resource "null_resource" "create_custom_events" { triggers = { services_enabled_project = module.project_services.project_id source_contents_hash = local.activation_type_configuration_file_content_hash + #source_activation_type_configuration_hash = local.activation_type_configuration_file_content_hash + #source_activation_application_python_hash = local.activation_application_content_hash } provisioner "local-exec" { command = <<-EOT @@ -125,6 +140,8 @@ resource "null_resource" "create_custom_dimensions" { triggers = { services_enabled_project = module.project_services.project_id source_contents_hash = local.audience_segmentation_activation_query_file_content_hash + #source_activation_type_configuration_hash = local.activation_type_configuration_file_content_hash + #source_activation_application_python_hash = local.activation_application_content_hash } provisioner "local-exec" { command = <<-EOT @@ -266,6 +283,7 @@ resource "google_storage_bucket_object" "purchase_propensity_query_template_file data "template_file" "activation_type_configuration" { template = file("${local.template_dir}/activation_type_configuration_template.tpl") + vars = { audience_segmentation_query_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.audience_segmentation_query_template_file.output_name}" auto_audience_segmentation_query_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.auto_audience_segmentation_query_template_file.output_name}" @@ -279,6 +297,7 @@ resource "google_storage_bucket_object" "activation_type_configuration_file" { name = "${local.configuration_folder}/activation_type_configuration.json" content = data.template_file.activation_type_configuration.rendered bucket = module.pipeline_bucket.name + detect_md5hash = base64encode("${local.activation_type_configuration_file_content_hash}${local.activation_application_content_hash}") } module "activation_pipeline_container" { diff --git a/pyproject.toml b/pyproject.toml index e83bf4d2..7b4962a7 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,7 +13,8 @@ google-cloud-aiplatform = "1.22.0" google-cloud = "^0.34.0" jinja2 = ">=3.0.1,<4.0.0" pip = "22.3.1" -invoke = "1.7.3" +invoke = "2.2.0" +## pyinvoke = "1.0.4" pre-commit = ">=2.14.1,<3.0.0" pandas = "1.3.5" google-cloud-bigquery = "2.30.0" @@ -51,7 +52,7 @@ pytest-xdist = "^3.0.2" [tool.poetry.group.dev.dependencies] pip = "22.3.1" -invoke = "1.7.3" +invoke = "2.2.0" pre-commit = ">=2.14.1,<3.0.0" black = "22.10.0" flake8 = "5.0.4" diff --git a/python/activation/main.py b/python/activation/main.py index 66fa50cc..99e5edee 100644 --- a/python/activation/main.py +++ b/python/activation/main.py @@ -13,6 +13,7 @@ # limitations under the License. import logging import re +import traceback from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.options.pipeline_options import GoogleCloudOptions @@ -119,13 +120,26 @@ def process(self, element): else: state_msg = 'SEND_FAIL' - yield { - 'id': str(uuid.uuid4()), - 'activation_id': element[0]['events'][0]['name'], - 'payload': json.dumps(element[0]), - 'latest_state': f"{state_msg} {element[1]}", - 'updated_at': str(time_cast) - } + result = {} + try: + result = { + 'id': str(uuid.uuid4()), + 'activation_id': element[0]['events'][0]['name'], + 'payload': json.dumps(element[0]), + 'latest_state': f"{state_msg} {element[1]}", + 'updated_at': str(time_cast) + } + except KeyError as e: + logging.error(element) + result = { + 'id': str(uuid.uuid4()), + 'activation_id': "", + 'payload': json.dumps(element[0]), + 'latest_state': f"{state_msg} {element[1]}", + 'updated_at': str(time_cast) + } + logging.error(traceback.format_exc()) + yield result class DecimalEncoder(json.JSONEncoder): def default(self, obj): @@ -143,13 +157,29 @@ def setup(self): self.payload_template = Environment(loader=BaseLoader).from_string(self.template_str) def process(self, element): + # Removing bad shaping strings in client_id + _client_id = element['client_id'].replace(r'', '') + _client_id = element['client_id'].replace(r'q=">', '') + payload_str = self.payload_template.render( - client_id=element['client_id'], + client_id=_client_id, event_timestamp=self.date_to_micro(element["inference_date"]), event_name=self.event_name, user_properties=self.generate_user_properties(element), + event_parameters=self.generate_event_parameters(element), ) - yield json.loads(payload_str) + result = {} + try: + result = json.loads(r'{}'.format(payload_str)) + except json.decoder.JSONDecodeError as e: + logging.error(payload_str) + logging.error(traceback.format_exc()) + yield result + def date_to_micro(self, date_str): try: # try if date_str is in ISO timestamp format @@ -174,6 +204,16 @@ def generate_user_properties(self, element): if v: user_properties_obj[k] = {'value': v} return json.dumps(user_properties_obj, cls=DecimalEncoder) + + def generate_event_parameters(self, element): + element_copy = element.copy() + del element_copy['client_id'] + del element_copy['inference_date'] + event_parameters_obj = {} + for k, v in element_copy.items(): + if v: + event_parameters_obj[k] = v + return json.dumps(event_parameters_obj, cls=DecimalEncoder) def send_success(element): return element[1] == requests.status_codes.codes.NO_CONTENT @@ -199,6 +239,7 @@ def run(argv=None): activation_type_configuration = load_activation_type_configuration(activation_options) load_from_source_query = build_query(activation_options, activation_type_configuration) + logging.info(load_from_source_query) table_suffix =f"{datetime.datetime.today().strftime('%Y_%m_%d')}_{str(uuid.uuid4())[:8]}" log_table_names = [f'activation_log_{table_suffix}', f'activation_retry_{table_suffix}'] @@ -232,7 +273,7 @@ def run(argv=None): query=load_from_source_query, use_json_exports=True, use_standard_sql=True) - | "transform to Measurement Protocol API payload" >> beam.ParDo(TransformToPayload(activation_type_configuration['measurement_protocol_payload_template'], activation_type_configuration['activation_event_name'])) + | 'Prepare Measurement Protocol API payload' >> beam.ParDo(TransformToPayload(activation_type_configuration['measurement_protocol_payload_template'], activation_type_configuration['activation_event_name'])) | 'POST event to Measurement Protocol API' >> beam.ParDo(CallMeasurementProtocolAPI(activation_options.ga4_measurement_id, activation_options.ga4_api_secret, debug=activation_options.use_api_validation)) ) @@ -245,8 +286,8 @@ def run(argv=None): ) _ = ( success_responses - | 'transform log format' >> beam.ParDo(ToLogFormat()) - | 'store to log table' >> beam.io.WriteToBigQuery( + | 'Transform log format' >> beam.ParDo(ToLogFormat()) + | 'Store to log BQ table' >> beam.io.WriteToBigQuery( success_log_table_spec, schema=table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, @@ -254,8 +295,8 @@ def run(argv=None): ) _ = ( failed_responses - | 'transform failure log format' >> beam.ParDo(ToLogFormat()) - | 'store to failure log table' >> beam.io.WriteToBigQuery( + | 'Transform failure log format' >> beam.ParDo(ToLogFormat()) + | 'Store to failure log BQ table' >> beam.io.WriteToBigQuery( failure_log_table_spec, schema=table_schema, write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND, diff --git a/python/ga4_setup/setup.py b/python/ga4_setup/setup.py index 6c9b5464..a3bb7121 100644 --- a/python/ga4_setup/setup.py +++ b/python/ga4_setup/setup.py @@ -80,7 +80,7 @@ def create_custom_events(configuration: map): def load_custom_dimensions(query_file: str): - reserved_words = ['select', 'extract', 'from', 'where', 'and', 'order', 'limit'] + reserved_words = ['select', 'coalesce', 'extract', 'from', 'where', 'and', 'order', 'limit'] ret_fields = [] with open(query_file, "r") as f: lines = f.readlines() diff --git a/sql/query/audience_segmentation_query_template.sqlx b/sql/query/audience_segmentation_query_template.sqlx index cfa54650..c6e970b2 100644 --- a/sql/query/audience_segmentation_query_template.sqlx +++ b/sql/query/audience_segmentation_query_template.sqlx @@ -1,21 +1,8 @@ -SELECT user_pseudo_id AS client_id, - EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date, - prediction, - active_users_past_1_7_day AS active_users_past_1_7, - active_users_past_8_14_day AS active_users_past_8_14, - purchases_past_1_7_day AS purchases_past_1_7, - purchases_past_8_14_day AS purchases_past_8_14, - visits_past_1_7_day AS visits_past_1_7, - visits_past_8_14_day AS visits_past_8_14, - view_items_past_1_7_day AS view_items_past_1_7, - view_items_past_8_14_day AS view_items_past_8_14, - add_to_carts_past_1_7_day AS add_to_carts_past_1_7, - add_to_carts_past_8_14_day AS add_to_carts_past_8_14, - checkouts_past_1_7_day AS checkouts_past_1_7, - checkouts_past_8_14_day AS checkouts_past_8_14, - ltv_revenue_past_1_7_day AS ltv_revenue_past_1_7, - ltv_revenue_past_7_15_day AS ltv_revenue_past_7_15, - geo_region - FROM `{{source_table}}` - WHERE prediction IS NOT NULL - LIMIT 10000 \ No newline at end of file +SELECT + prediction, + COALESCE(user_id, user_pseudo_id) AS client_id, + EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date, + FROM `{{source_table}}` + WHERE prediction IS NOT NULL + LIMIT 10000 + \ No newline at end of file diff --git a/sql/query/auto_audience_segmentation_query_template.sqlx b/sql/query/auto_audience_segmentation_query_template.sqlx index 0c95c688..af4c2e9c 100644 --- a/sql/query/auto_audience_segmentation_query_template.sqlx +++ b/sql/query/auto_audience_segmentation_query_template.sqlx @@ -1,7 +1,7 @@ SELECT - user_id AS client_id, - EXTRACT(DATE FROM feature_timestamp AT TIME ZONE 'UTC') AS inference_date, - prediction as prediction - FROM `{{source_table}}` -WHERE prediction IS NOT NULL -LIMIT 10000 \ No newline at end of file + prediction, + user_id AS client_id, + EXTRACT(DATE FROM feature_timestamp AT TIME ZONE 'UTC') AS inference_date, + FROM `{{source_table}}` + WHERE prediction IS NOT NULL + LIMIT 10000 \ No newline at end of file diff --git a/sql/query/cltv_query_template.sqlx b/sql/query/cltv_query_template.sqlx index c2b5f778..f52c3554 100644 --- a/sql/query/cltv_query_template.sqlx +++ b/sql/query/cltv_query_template.sqlx @@ -1,7 +1,8 @@ -SELECT * EXCEPT(user_pseudo_id, processed_timestamp), - user_pseudo_id AS client_id, - EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date - FROM `{{source_table}}` - WHERE prediction > 0 - ORDER BY prediction DESC - LIMIT 10000 +SELECT + prediction, + user_pseudo_id AS client_id, + EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date, + FROM `{{source_table}}` + WHERE prediction > 0 + ORDER BY prediction DESC + LIMIT 10000 diff --git a/sql/query/purchase_propensity_query_template.sqlx b/sql/query/purchase_propensity_query_template.sqlx index ccaf8328..c06de109 100644 --- a/sql/query/purchase_propensity_query_template.sqlx +++ b/sql/query/purchase_propensity_query_template.sqlx @@ -1,7 +1,9 @@ -SELECT * EXCEPT(user_pseudo_id, processed_timestamp, user_id), - user_pseudo_id AS client_id, - EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date - FROM `{{source_table}}` - WHERE prediction = 'true' - ORDER BY prediction_prob DESC - LIMIT 10000 \ No newline at end of file +SELECT + prediction, + prediction_prob, + COALESCE(user_id, user_pseudo_id) AS client_id, + EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date, + FROM `{{source_table}}` + WHERE prediction = 'true' + ORDER BY prediction_prob DESC + LIMIT 10000 \ No newline at end of file diff --git a/templates/app_payload_template.jinja2 b/templates/app_payload_template.jinja2 index 219d04e1..f5de1908 100644 --- a/templates/app_payload_template.jinja2 +++ b/templates/app_payload_template.jinja2 @@ -2,10 +2,12 @@ "client_id": "{{client_id}}", "timestamp_micros": {{event_timestamp}}, "nonPersonalizedAds": false, - "user_properties": {{user_properties}}, + "user_properties": + {{user_properties}}, "events": [ { - "name": "{{event_name}}" + "name": "{{event_name}}", + "params": {{event_parameters}} } ] } \ No newline at end of file