Skip to content

Commit

Permalink
rewriting the backfill queries avoiding the for loop
Browse files Browse the repository at this point in the history
  • Loading branch information
Carlos Timoteo committed Sep 12, 2023
1 parent 4122050 commit 56a78c4
Show file tree
Hide file tree
Showing 12 changed files with 1,814 additions and 219 deletions.
4 changes: 3 additions & 1 deletion sql/query/invoke_backfill_customer_lifetime_value_label.sqlx
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ CREATE TEMP TABLE dates_interval as (
ORDER BY input_date DESC
);


## All users in the platform
CREATE TEMP TABLE events_users_days as (
SELECT DISTINCT
Expand Down Expand Up @@ -85,4 +86,5 @@ INSERT INTO `{{project_id}}.{{dataset}}.{{insert_table}}`
GREATEST(0.0, COALESCE(COALESCE(B.user_ltv_revenue_1_180,0.0) - COALESCE(B.user_ltv_revenue_today,0.0), 0.0)) AS pltv_revenue_180_days,
FROM events_users_days AS A
INNER JOIN future_revenue_per_user AS B
ON A.user_pseudo_id = B.user_pseudo_id;
ON A.user_pseudo_id = B.user_pseudo_id
;
117 changes: 97 additions & 20 deletions sql/query/invoke_backfill_purchase_propensity_label.sqlx
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,103 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.

DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE rows_added INT64 DEFAULT NULL;

DECLARE max_date DATE;
DECLARE min_date DATE;
SET max_date = (SELECT DATE_SUB(MAX(event_date), INTERVAL {{interval_max_date}} DAY) FROM `{{mds_project_id}}.{{mds_dataset}}.event`);
SET min_date = (SELECT DATE_ADD(MIN(event_date), INTERVAL {{interval_min_date}} DAY) FROM `{{mds_project_id}}.{{mds_dataset}}.event`);
SELECT max_date;
SELECT min_date;
SET max_date = (SELECT DATE_SUB(MAX(event_date), INTERVAL 15 DAY) FROM `mde-aggregated-vbb.marketing_ga4_v1_prod.event`);
SET min_date = (SELECT DATE_ADD(MIN(event_date), INTERVAL 30 DAY) FROM `mde-aggregated-vbb.marketing_ga4_v1_prod.event`);

CREATE TEMP TABLE dates_interval as (
SELECT DISTINCT
event_date as input_date,
DATE_ADD(event_date, INTERVAL 30 DAY) as end_date
FROM `mde-aggregated-vbb.marketing_ga4_v1_prod.event`
WHERE event_date BETWEEN min_date AND max_date
ORDER BY input_date DESC
LIMIT 1
);

## All users in the platform
CREATE TEMP TABLE all_users_possible_purchases as (
SELECT DISTINCT
Users.user_pseudo_id,
DI.event_date as event_date,
NULL as purchase_day_0,
NULL as purchase_day_1,
NULL as purchase_day_2,
NULL as purchase_day_3,
NULL as purchase_day_4,
NULL as purchase_day_5,
NULL as purchase_day_6,
NULL as purchase_day_7,
NULL as purchase_day_8,
NULL as purchase_day_9,
NULL as purchase_day_10,
NULL as purchase_day_11,
NULL as purchase_day_12,
NULL as purchase_day_13,
NULL as purchase_day_14,
FROM `mde-aggregated-vbb.marketing_ga4_v1_prod.event` Users
CROSS JOIN (SELECT event_date FROM UNNEST(GENERATE_DATE_ARRAY(min_date, max_date, INTERVAL 1 DAY)) as event_date) as DI
WHERE Users.event_name='purchase'
AND Users.ga_session_id IS NOT NULL
AND Users.ecommerce.transaction_id IS NOT NULL
AND Users.ecommerce.transaction_id <> '(not set)'
);

## Future User metrics: 1-7-day future purchases per user, 1-15-day future purchases per user, 1-30-day future purchases per user, 1–90-day future purchases per user
CREATE TEMP TABLE future_purchases_per_user AS (
SELECT
user_pseudo_id,
input_date as feature_date,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 0 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_0,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 1 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_1,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 2 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_2,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 3 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_3,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 4 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_4,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 5 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_5,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 6 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_6,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 7 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_7,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 8 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_8,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 9 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_9,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 10 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_10,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 11 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_11,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 12 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_12,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 13 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_13,
MAX(COUNT(DISTINCT CASE DATE_DIFF(event_date, input_date, DAY) = 14 WHEN TRUE THEN ecommerce.transaction_id END)) OVER(PARTITION BY user_pseudo_id, input_date) AS purchase_day_14
FROM `mde-aggregated-vbb.marketing_ga4_v1_prod.event` as E
INNER JOIN `mde-aggregated-vbb.marketing_ga4_v1_prod.device` as D
ON E.device_type_id = D.device_type_id
CROSS JOIN dates_interval as DI
WHERE E.event_date BETWEEN DI.input_date AND DI.end_date
AND E.event_name='purchase'
AND E.ga_session_id IS NOT NULL
AND E.ecommerce.transaction_id IS NOT NULL
AND E.ecommerce.transaction_id <> '(not set)'
AND D.device_os IS NOT NULL
GROUP BY user_pseudo_id, feature_date
);

FOR record IN
(SELECT DISTINCT
event_date
FROM `{{mds_project_id}}.{{mds_dataset}}.event`
WHERE event_date BETWEEN min_date AND max_date
ORDER BY event_date ASC )
DO
SET input_date= (SELECT record.event_date);
SET end_date= (SELECT DATE_ADD(record.event_date, INTERVAL {{interval_end_date}} DAY));
CALL `{{project_id}}.{{dataset}}.{{stored_procedure}}`(input_date, end_date, rows_added);
SELECT rows_added;
END FOR;
INSERT INTO `mde-aggregated-vbb.feature_store.purchase_propensity_label`
SELECT DISTINCT
CURRENT_TIMESTAMP() AS processed_timestamp,
A.event_date as feature_date,
A.user_pseudo_id,
LEAST(COALESCE(B.purchase_day_0, 0), 1) AS purchase_day_0,
LEAST(COALESCE(B.purchase_day_1, 0), 1) AS purchase_day_1,
LEAST(COALESCE(B.purchase_day_2, 0), 1) AS purchase_day_2,
LEAST(COALESCE(B.purchase_day_3, 0), 1) AS purchase_day_3,
LEAST(COALESCE(B.purchase_day_4, 0), 1) AS purchase_day_4,
LEAST(COALESCE(B.purchase_day_5, 0), 1) AS purchase_day_5,
LEAST(COALESCE(B.purchase_day_6, 0), 1) AS purchase_day_6,
LEAST(COALESCE(B.purchase_day_7, 0), 1) AS purchase_day_7,
LEAST(COALESCE(B.purchase_day_8, 0), 1) AS purchase_day_8,
LEAST(COALESCE(B.purchase_day_9, 0), 1) AS purchase_day_9,
LEAST(COALESCE(B.purchase_day_10, 0), 1) AS purchase_day_10,
LEAST(COALESCE(B.purchase_day_11, 0), 1) AS purchase_day_11,
LEAST(COALESCE(B.purchase_day_12, 0), 1) AS purchase_day_12,
LEAST(COALESCE(B.purchase_day_13, 0), 1) AS purchase_day_13,
LEAST(COALESCE(B.purchase_day_14, 0), 1) AS purchase_day_14,
FROM all_users_possible_purchases AS A
LEFT JOIN future_purchases_per_user AS B
ON B.user_pseudo_id = A.user_pseudo_id
;
162 changes: 142 additions & 20 deletions sql/query/invoke_backfill_user_dimensions.sqlx
Original file line number Diff line number Diff line change
Expand Up @@ -12,26 +12,148 @@
-- See the License for the specific language governing permissions and
-- limitations under the License.

DECLARE input_date DATE;
DECLARE end_date DATE;
DECLARE users_added INT64 DEFAULT NULL;

DECLARE max_date DATE;
DECLARE min_date DATE;
SET max_date = (SELECT DATE_SUB(MAX(event_date), INTERVAL {{interval_max_date}} DAY) FROM `{{mds_project_id}}.{{mds_dataset}}.event`);
SET min_date = (SELECT DATE_ADD(MIN(event_date), INTERVAL {{interval_min_date}} DAY) FROM `{{mds_project_id}}.{{mds_dataset}}.event`);
SELECT max_date;
SELECT min_date;
SET max_date = (SELECT DATE_SUB(MAX(event_date), INTERVAL 15 DAY) FROM `mde-aggregated-vbb.marketing_ga4_v1_prod.event`);
SET min_date = (SELECT DATE_ADD(MIN(event_date), INTERVAL 30 DAY) FROM `mde-aggregated-vbb.marketing_ga4_v1_prod.event`);

CREATE TEMP TABLE dates_interval as (
SELECT DISTINCT
event_date as input_date,
DATE_ADD(event_date, INTERVAL 30 DAY) as end_date
FROM `mde-aggregated-vbb.marketing_ga4_v1_prod.event`
WHERE event_date BETWEEN min_date AND max_date
ORDER BY input_date DESC
);

CREATE TEMP TABLE user_dimensions_event_scoped as (
SELECT DISTINCT
event_date as feature_date,
user_pseudo_id,
FIRST_VALUE(format_date('%m',event_date)) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) AS month_of_the_year,
FIRST_VALUE(format_date('%U',event_date)) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) AS week_of_the_year,
FIRST_VALUE(format_date('%d',event_date)) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) AS day_of_the_month,
FIRST_VALUE(format_date('%w',event_date)) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) AS day_of_week,
FIRST_VALUE(format("%02d",extract(hour from event_timestamp))) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) as hour_of_day,
FIRST_VALUE(user_ltv_revenue) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) AS user_ltv_revenue,
FIRST_VALUE(T.traffic_source_medium) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) AS last_traffic_source_medium,
FIRST_VALUE(T.traffic_source_name) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) AS last_traffic_source_name,
FIRST_VALUE(T.traffic_source) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) AS last_traffic_source_source,
FIRST_VALUE(CASE WHEN (TIMESTAMP_DIFF(event_timestamp, user_first_touch_timestamp, DAY) < 7) OR (user_first_touch_timestamp IS NULL) THEN 'new' ELSE 'existing' END) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) AS new_or_established_user,
FIRST_VALUE(L.subcontinent) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) as geo_sub_continent,
FIRST_VALUE(L.country) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) as geo_country,
FIRST_VALUE(L.region) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) as geo_region,
FIRST_VALUE(L.city) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) as geo_city,
FIRST_VALUE(L.metro) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp DESC) as geo_metro,
FIRST_VALUE(T.traffic_source_medium) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp ASC) AS first_traffic_source_medium,
FIRST_VALUE(T.traffic_source_name) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp ASC) AS first_traffic_source_name,
FIRST_VALUE(T.traffic_source) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date ORDER BY event_timestamp ASC) AS first_traffic_source_source
FROM `mde-aggregated-vbb.marketing_ga4_v1_prod.event` as E
INNER JOIN `mde-aggregated-vbb.marketing_ga4_v1_prod.traffic_source` as T
ON E.traffic_source_id = T.traffic_source_id
INNER JOIN `mde-aggregated-vbb.marketing_ga4_v1_prod.device` as D
ON E.device_type_id = D.device_type_id
INNER JOIN `mde-aggregated-vbb.marketing_ga4_v1_prod.location` as L
ON E.location_id = L.location_id
CROSS JOIN dates_interval as DI
WHERE E.event_date BETWEEN DI.input_date AND DI.end_date
AND ga_session_id IS NOT NULL
AND D.device_os IS NOT NULL
);

CREATE TEMP TABLE user_dimensions_session_scoped as (
SELECT DISTINCT
event_date as feature_date,
user_pseudo_id,
MAX(CASE WHEN user_id IS NOT NULL THEN TRUE ELSE FALSE END) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date, session_id ORDER BY event_date ASC) AS has_signed_in_with_user_id,
FIRST_VALUE(category) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date, session_id ORDER BY event_date DESC) as device_category,
FIRST_VALUE(mobile_brand_name) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date, session_id ORDER BY event_date DESC) as device_mobile_brand_name,
FIRST_VALUE(mobile_model_name) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date, session_id ORDER BY event_date DESC) as device_mobile_model_name,
FIRST_VALUE(operating_system) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date, session_id ORDER BY event_date DESC) as device_os,
--FIRST_VALUE(operating_system_version) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date, session_id ORDER BY event_date DESC) as device_os_version,
FIRST_VALUE(SPLIT(operating_system_version, '.')[OFFSET(0)]) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date, session_id ORDER BY event_date DESC) as device_os_version,
FIRST_VALUE(language) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date, session_id ORDER BY event_date DESC) AS device_language,
FIRST_VALUE(browser) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date, session_id ORDER BY event_date DESC) as device_web_browser,
--FIRST_VALUE(browser_version) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date, session_id ORDER BY event_date DESC) as device_web_browser_version,
FIRST_VALUE(SPLIT(browser_version, '.')[OFFSET(0)]) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date, session_id ORDER BY event_date DESC) as device_web_browser_version,
FIRST_VALUE(advertising_id) OVER(PARTITION BY user_pseudo_id, event_date, DI.input_date, session_id ORDER BY event_date DESC) as device_advertising_id
FROM
(SELECT
user_pseudo_id,
event_date,
MAX(user_id) as user_id,
MAX(device_advertising_id) AS advertising_id,
MAX(device_category) AS category,
MAX(device_mobile_brand_name) AS mobile_brand_name,
MAX(device_mobile_model_name) AS mobile_model_name,
MAX(device_os) AS operating_system,
MAX(device_os_version) AS operating_system_version,
MAX(language) AS language,
MAX(device_web_browser) AS browser,
MAX(device_web_browser_version) AS browser_version,
ga_session_id as session_id,
FROM `mde-aggregated-vbb.marketing_ga4_v1_prod.event` as E
INNER JOIN `mde-aggregated-vbb.marketing_ga4_v1_prod.device` as D
ON E.device_type_id = D.device_type_id
WHERE ga_session_id IS NOT NULL
AND D.device_os IS NOT NULL
AND event_date BETWEEN min_date AND max_date
GROUP BY user_pseudo_id, event_date, ga_session_id)
CROSS JOIN dates_interval as DI
WHERE event_date BETWEEN DI.input_date AND DI.end_date
);

-- All users in the platform
CREATE TEMP TABLE events_users as (
SELECT DISTINCT
Users.user_pseudo_id,
Users.user_id,
DI.event_date as feature_date
FROM `mde-aggregated-vbb.marketing_ga4_v1_prod.event` Users
CROSS JOIN (SELECT event_date FROM UNNEST(GENERATE_DATE_ARRAY(min_date, max_date, INTERVAL 1 DAY)) as event_date) as DI
WHERE ga_session_id IS NOT NULL
);


FOR record IN
(SELECT DISTINCT
event_date as event_date
FROM `{{mds_project_id}}.{{mds_dataset}}.event`
WHERE event_date BETWEEN min_date AND max_date
ORDER BY event_date ASC )
DO
SET input_date= (SELECT record.event_date);
SET end_date= (SELECT DATE_SUB(record.event_date, INTERVAL {{interval_end_date}} DAY));
CALL `{{project_id}}.{{dataset}}.{{stored_procedure}}`(input_date, end_date, users_added);
SELECT users_added;
END FOR;
INSERT INTO `mde-aggregated-vbb.feature_store.user_dimensions`
SELECT DISTINCT
CURRENT_TIMESTAMP() AS processed_timestamp,
EU.feature_date,
EU.user_pseudo_id,
EU.user_id,
UDES.month_of_the_year,
UDES.week_of_the_year,
UDES.day_of_the_month,
UDES.day_of_week,
UDES.hour_of_day,
NULL as nth_day,
NULL as nth_hour,
NULL as nth_week,
NULL as nth_month,
UDES.user_ltv_revenue,
UDSS.device_category,
UDSS.device_mobile_brand_name,
UDSS.device_mobile_model_name,
UDSS.device_os,
UDSS.device_os_version,
UDSS.device_language,
UDSS.device_web_browser,
UDSS.device_web_browser_version,
UDES.geo_sub_continent,
UDES.geo_country,
UDES.geo_region,
UDES.geo_city,
UDES.geo_metro,
UDES.last_traffic_source_medium,
UDES.last_traffic_source_name,
UDES.last_traffic_source_source,
UDES.first_traffic_source_medium,
UDES.first_traffic_source_name,
UDES.first_traffic_source_source,
UDSS.has_signed_in_with_user_id
FROM events_users EU
INNER JOIN user_dimensions_event_scoped UDES
ON EU.user_pseudo_id = UDES.user_pseudo_id AND EU.feature_date = UDES.feature_date --AND EU.user_id = UDES.user_id
INNER JOIN user_dimensions_session_scoped UDSS
ON EU.user_pseudo_id = UDSS.user_pseudo_id AND EU.feature_date = UDSS.feature_date --AND EU.user_id = UDSS.user_id
;
Loading

0 comments on commit 56a78c4

Please sign in to comment.