Skip to content

Commit

Permalink
Bug-fix: Measurement Protocol Message (#68)
Browse files Browse the repository at this point in the history
* implemeting user parameter and event parameters for MP api

* validating new MP message payload

* removing bad strings from client_id - MP message payload

---------

Co-authored-by: Carlos Timoteo <[email protected]>
  • Loading branch information
chmstimoteo and Carlos Timoteo authored Jan 10, 2024
1 parent 963573e commit 29efe2a
Show file tree
Hide file tree
Showing 10 changed files with 114 additions and 61 deletions.
2 changes: 1 addition & 1 deletion infrastructure/terraform/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ resource "local_file" "feature_store_configuration" {

resource "null_resource" "poetry_install" {
triggers = {
create_command = "${var.poetry_cmd} install"
create_command = "${var.poetry_cmd} lock && ${var.poetry_cmd} install"
source_contents_hash = local.project_toml_content_hash
}

Expand Down
19 changes: 19 additions & 0 deletions infrastructure/terraform/modules/activation/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ locals {
activation_type_configuration_file = "${local.source_root_dir}/templates/activation_type_configuration_template.tpl"
activation_type_configuration_file_content_hash = filesha512(local.activation_type_configuration_file)

app_payload_template_file = "${local.source_root_dir}/templates/app_payload_template.jinja2"
app_payload_template_file_content_hash = filesha512(local.activation_type_configuration_file)

activation_application_dir = "${local.source_root_dir}/python/activation"
activation_application_fileset = [
"${local.activation_application_dir}/main.py",
"${local.activation_application_dir}/Dockerfile",
"${local.activation_application_dir}/metadata.json",
"${local.activation_application_dir}/requirements.txt",
"${local.activation_application_dir}/pipeline_test.py",
]
activation_application_content_hash = sha512(join("", [for f in local.activation_application_fileset : fileexists(f) ? filesha512(f) : sha512("file-not-found")]))

audience_segmentation_activation_query_file = "${local.source_root_dir}/sql/query/audience_segmentation_query_template.sqlx"
audience_segmentation_activation_query_file_content_hash = filesha512(local.audience_segmentation_activation_query_file)
}
Expand Down Expand Up @@ -112,6 +125,8 @@ resource "null_resource" "create_custom_events" {
triggers = {
services_enabled_project = module.project_services.project_id
source_contents_hash = local.activation_type_configuration_file_content_hash
#source_activation_type_configuration_hash = local.activation_type_configuration_file_content_hash
#source_activation_application_python_hash = local.activation_application_content_hash
}
provisioner "local-exec" {
command = <<-EOT
Expand All @@ -125,6 +140,8 @@ resource "null_resource" "create_custom_dimensions" {
triggers = {
services_enabled_project = module.project_services.project_id
source_contents_hash = local.audience_segmentation_activation_query_file_content_hash
#source_activation_type_configuration_hash = local.activation_type_configuration_file_content_hash
#source_activation_application_python_hash = local.activation_application_content_hash
}
provisioner "local-exec" {
command = <<-EOT
Expand Down Expand Up @@ -266,6 +283,7 @@ resource "google_storage_bucket_object" "purchase_propensity_query_template_file

data "template_file" "activation_type_configuration" {
template = file("${local.template_dir}/activation_type_configuration_template.tpl")

vars = {
audience_segmentation_query_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.audience_segmentation_query_template_file.output_name}"
auto_audience_segmentation_query_template_gcs_path = "gs://${module.pipeline_bucket.name}/${google_storage_bucket_object.auto_audience_segmentation_query_template_file.output_name}"
Expand All @@ -279,6 +297,7 @@ resource "google_storage_bucket_object" "activation_type_configuration_file" {
name = "${local.configuration_folder}/activation_type_configuration.json"
content = data.template_file.activation_type_configuration.rendered
bucket = module.pipeline_bucket.name
detect_md5hash = base64encode("${local.activation_type_configuration_file_content_hash}${local.activation_application_content_hash}")
}

module "activation_pipeline_container" {
Expand Down
5 changes: 3 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ google-cloud-aiplatform = "1.22.0"
google-cloud = "^0.34.0"
jinja2 = ">=3.0.1,<4.0.0"
pip = "22.3.1"
invoke = "1.7.3"
invoke = "2.2.0"
## pyinvoke = "1.0.4"
pre-commit = ">=2.14.1,<3.0.0"
pandas = "1.3.5"
google-cloud-bigquery = "2.30.0"
Expand Down Expand Up @@ -51,7 +52,7 @@ pytest-xdist = "^3.0.2"

[tool.poetry.group.dev.dependencies]
pip = "22.3.1"
invoke = "1.7.3"
invoke = "2.2.0"
pre-commit = ">=2.14.1,<3.0.0"
black = "22.10.0"
flake8 = "5.0.4"
Expand Down
69 changes: 55 additions & 14 deletions python/activation/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
import logging
import re
import traceback

from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import GoogleCloudOptions
Expand Down Expand Up @@ -119,13 +120,26 @@ def process(self, element):
else:
state_msg = 'SEND_FAIL'

yield {
'id': str(uuid.uuid4()),
'activation_id': element[0]['events'][0]['name'],
'payload': json.dumps(element[0]),
'latest_state': f"{state_msg} {element[1]}",
'updated_at': str(time_cast)
}
result = {}
try:
result = {
'id': str(uuid.uuid4()),
'activation_id': element[0]['events'][0]['name'],
'payload': json.dumps(element[0]),
'latest_state': f"{state_msg} {element[1]}",
'updated_at': str(time_cast)
}
except KeyError as e:
logging.error(element)
result = {
'id': str(uuid.uuid4()),
'activation_id': "",
'payload': json.dumps(element[0]),
'latest_state': f"{state_msg} {element[1]}",
'updated_at': str(time_cast)
}
logging.error(traceback.format_exc())
yield result

class DecimalEncoder(json.JSONEncoder):
def default(self, obj):
Expand All @@ -143,13 +157,29 @@ def setup(self):
self.payload_template = Environment(loader=BaseLoader).from_string(self.template_str)

def process(self, element):
# Removing bad shaping strings in client_id
_client_id = element['client_id'].replace(r'<img onerror="_exploit_dom_xss(20007)', '')
_client_id = element['client_id'].replace(r'<img onerror="_exploit_dom_xss(20023)', '')
_client_id = element['client_id'].replace(r'<img onerror="_exploit_dom_xss(20013)', '')
_client_id = element['client_id'].replace(r'<img onerror="_exploit_dom_xss(20010)', '')
_client_id = element['client_id'].replace(r'q="><script>_exploit_dom_xss(40007)</script>', '')
_client_id = element['client_id'].replace(r'q="><script>_exploit_dom_xss(40013)</script>', '')

payload_str = self.payload_template.render(
client_id=element['client_id'],
client_id=_client_id,
event_timestamp=self.date_to_micro(element["inference_date"]),
event_name=self.event_name,
user_properties=self.generate_user_properties(element),
event_parameters=self.generate_event_parameters(element),
)
yield json.loads(payload_str)
result = {}
try:
result = json.loads(r'{}'.format(payload_str))
except json.decoder.JSONDecodeError as e:
logging.error(payload_str)
logging.error(traceback.format_exc())
yield result


def date_to_micro(self, date_str):
try: # try if date_str is in ISO timestamp format
Expand All @@ -174,6 +204,16 @@ def generate_user_properties(self, element):
if v:
user_properties_obj[k] = {'value': v}
return json.dumps(user_properties_obj, cls=DecimalEncoder)

def generate_event_parameters(self, element):
element_copy = element.copy()
del element_copy['client_id']
del element_copy['inference_date']
event_parameters_obj = {}
for k, v in element_copy.items():
if v:
event_parameters_obj[k] = v
return json.dumps(event_parameters_obj, cls=DecimalEncoder)

def send_success(element):
return element[1] == requests.status_codes.codes.NO_CONTENT
Expand All @@ -199,6 +239,7 @@ def run(argv=None):
activation_type_configuration = load_activation_type_configuration(activation_options)

load_from_source_query = build_query(activation_options, activation_type_configuration)
logging.info(load_from_source_query)

table_suffix =f"{datetime.datetime.today().strftime('%Y_%m_%d')}_{str(uuid.uuid4())[:8]}"
log_table_names = [f'activation_log_{table_suffix}', f'activation_retry_{table_suffix}']
Expand Down Expand Up @@ -232,7 +273,7 @@ def run(argv=None):
query=load_from_source_query,
use_json_exports=True,
use_standard_sql=True)
| "transform to Measurement Protocol API payload" >> beam.ParDo(TransformToPayload(activation_type_configuration['measurement_protocol_payload_template'], activation_type_configuration['activation_event_name']))
| 'Prepare Measurement Protocol API payload' >> beam.ParDo(TransformToPayload(activation_type_configuration['measurement_protocol_payload_template'], activation_type_configuration['activation_event_name']))
| 'POST event to Measurement Protocol API' >> beam.ParDo(CallMeasurementProtocolAPI(activation_options.ga4_measurement_id, activation_options.ga4_api_secret, debug=activation_options.use_api_validation))
)

Expand All @@ -245,17 +286,17 @@ def run(argv=None):
)

_ = ( success_responses
| 'transform log format' >> beam.ParDo(ToLogFormat())
| 'store to log table' >> beam.io.WriteToBigQuery(
| 'Transform log format' >> beam.ParDo(ToLogFormat())
| 'Store to log BQ table' >> beam.io.WriteToBigQuery(
success_log_table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)
)

_ = ( failed_responses
| 'transform failure log format' >> beam.ParDo(ToLogFormat())
| 'store to failure log table' >> beam.io.WriteToBigQuery(
| 'Transform failure log format' >> beam.ParDo(ToLogFormat())
| 'Store to failure log BQ table' >> beam.io.WriteToBigQuery(
failure_log_table_spec,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
Expand Down
2 changes: 1 addition & 1 deletion python/ga4_setup/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ def create_custom_events(configuration: map):


def load_custom_dimensions(query_file: str):
reserved_words = ['select', 'extract', 'from', 'where', 'and', 'order', 'limit']
reserved_words = ['select', 'coalesce', 'extract', 'from', 'where', 'and', 'order', 'limit']
ret_fields = []
with open(query_file, "r") as f:
lines = f.readlines()
Expand Down
29 changes: 8 additions & 21 deletions sql/query/audience_segmentation_query_template.sqlx
Original file line number Diff line number Diff line change
@@ -1,21 +1,8 @@
SELECT user_pseudo_id AS client_id,
EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date,
prediction,
active_users_past_1_7_day AS active_users_past_1_7,
active_users_past_8_14_day AS active_users_past_8_14,
purchases_past_1_7_day AS purchases_past_1_7,
purchases_past_8_14_day AS purchases_past_8_14,
visits_past_1_7_day AS visits_past_1_7,
visits_past_8_14_day AS visits_past_8_14,
view_items_past_1_7_day AS view_items_past_1_7,
view_items_past_8_14_day AS view_items_past_8_14,
add_to_carts_past_1_7_day AS add_to_carts_past_1_7,
add_to_carts_past_8_14_day AS add_to_carts_past_8_14,
checkouts_past_1_7_day AS checkouts_past_1_7,
checkouts_past_8_14_day AS checkouts_past_8_14,
ltv_revenue_past_1_7_day AS ltv_revenue_past_1_7,
ltv_revenue_past_7_15_day AS ltv_revenue_past_7_15,
geo_region
FROM `{{source_table}}`
WHERE prediction IS NOT NULL
LIMIT 10000
SELECT
prediction,
COALESCE(user_id, user_pseudo_id) AS client_id,
EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date,
FROM `{{source_table}}`
WHERE prediction IS NOT NULL
LIMIT 10000

12 changes: 6 additions & 6 deletions sql/query/auto_audience_segmentation_query_template.sqlx
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
SELECT
user_id AS client_id,
EXTRACT(DATE FROM feature_timestamp AT TIME ZONE 'UTC') AS inference_date,
prediction as prediction
FROM `{{source_table}}`
WHERE prediction IS NOT NULL
LIMIT 10000
prediction,
user_id AS client_id,
EXTRACT(DATE FROM feature_timestamp AT TIME ZONE 'UTC') AS inference_date,
FROM `{{source_table}}`
WHERE prediction IS NOT NULL
LIMIT 10000
15 changes: 8 additions & 7 deletions sql/query/cltv_query_template.sqlx
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
SELECT * EXCEPT(user_pseudo_id, processed_timestamp),
user_pseudo_id AS client_id,
EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date
FROM `{{source_table}}`
WHERE prediction > 0
ORDER BY prediction DESC
LIMIT 10000
SELECT
prediction,
user_pseudo_id AS client_id,
EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date,
FROM `{{source_table}}`
WHERE prediction > 0
ORDER BY prediction DESC
LIMIT 10000
16 changes: 9 additions & 7 deletions sql/query/purchase_propensity_query_template.sqlx
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
SELECT * EXCEPT(user_pseudo_id, processed_timestamp, user_id),
user_pseudo_id AS client_id,
EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date
FROM `{{source_table}}`
WHERE prediction = 'true'
ORDER BY prediction_prob DESC
LIMIT 10000
SELECT
prediction,
prediction_prob,
COALESCE(user_id, user_pseudo_id) AS client_id,
EXTRACT(DATE FROM processed_timestamp AT TIME ZONE 'UTC') AS inference_date,
FROM `{{source_table}}`
WHERE prediction = 'true'
ORDER BY prediction_prob DESC
LIMIT 10000
6 changes: 4 additions & 2 deletions templates/app_payload_template.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@
"client_id": "{{client_id}}",
"timestamp_micros": {{event_timestamp}},
"nonPersonalizedAds": false,
"user_properties": {{user_properties}},
"user_properties":
{{user_properties}},
"events": [
{
"name": "{{event_name}}"
"name": "{{event_name}}",
"params": {{event_parameters}}
}
]
}

0 comments on commit 29efe2a

Please sign in to comment.