Skip to content

Commit

Permalink
Littlepay Table Deduplication Adjustments (#2993)
Browse files Browse the repository at this point in the history
  • Loading branch information
SorenSpicknall authored Oct 12, 2023
1 parent 232cdc0 commit e6c2f53
Show file tree
Hide file tree
Showing 7 changed files with 371 additions and 68 deletions.
77 changes: 51 additions & 26 deletions warehouse/models/staging/payments/littlepay/_littlepay.yml
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,6 @@ models:
Should ideally be handled by uniqueness of _payments_key but surfaced for troubleshooting.
- name: stg_littlepay__customer_funding_source
tests:
- &littlepay_uniqueness
dbt_utils.unique_combination_of_columns:
combination_of_columns:
- instance
- extract_filename
- ts
- _line_number
columns:
- name: littlepay_export_ts
description: Export timestamp parsed from filename.
Expand Down Expand Up @@ -243,10 +235,20 @@ models:
description: The number of records on which the `customer_id` shows up in the table
- name: calitp_customer_id_rank
description: A ranking of the records using the `customer_id`, ordered newest to oldest
- *lp_export_date
- *lp_export_ts
- *lp_line_number
- *payments_input_row_key
- &payments_key_full_uniqueness
name: _payments_key
description: |
Synthentic key composed of the elements that define a natural key within the source data (primary key according to Littlepay schema.)
tests:
- not_null
- unique
- *_content_hash

- name: stg_littlepay__device_transaction_purchases
tests:
- *littlepay_uniqueness
columns:
- name: littlepay_transaction_id
description: Unique tap ID generated by Littlepay upon receipt of a tap to the Device API.
Expand All @@ -269,10 +271,14 @@ models:
description: The value of the purchase. The value may be adjusted by capping rules or tap-off.
- name: transaction_time
description: The time transaction was created.
- *lp_export_date
- *lp_export_ts
- *lp_line_number
- *payments_input_row_key
- *payments_key_full_uniqueness
- *_content_hash

- name: stg_littlepay__device_transactions
tests:
- *littlepay_uniqueness
description: In the case of duplicated littlepay_transaction_id values, the one with the latest transaction_date_time_utc in the latest export file takes precedence.
columns:
- name: customer_id
Expand Down Expand Up @@ -319,7 +325,7 @@ models:
description: The route identifier provided by the device.
- name: transaction_date_time_utc
description: The date and time that the customer tapped on the device.
- name: transaction_deny_reason
- name: transction_deny_reason
description: |
The reason a transaction was denied.
Expand Down Expand Up @@ -349,10 +355,15 @@ models:
description: The zone identifier provided by the device.
- name: transaction_date_time_pacific
description: The date and time that the customer tapped on the device (in Pacific time instead of UTC).
- *lp_export_date
- *lp_export_ts
- *lp_line_number
- *payments_input_row_key
- *payments_key_full_uniqueness
- *_content_hash

- name: stg_littlepay__micropayment_adjustments
tests:
- *littlepay_uniqueness
- dbt_utils.unique_combination_of_columns:
combination_of_columns:
- micropayment_id
Expand Down Expand Up @@ -404,10 +415,14 @@ models:
The ID of the product with the incentive that was applied to micropayments.
Same ID as `product_id`.
- *lp_export_date
- *lp_export_ts
- *lp_line_number
- *payments_input_row_key
- *payments_key_full_uniqueness
- *_content_hash

- name: stg_littlepay__micropayment_device_transactions
tests:
- *littlepay_uniqueness
columns:
- name: littlepay_transaction_id
description: |
Expand All @@ -416,10 +431,22 @@ models:
For 'tap on, tap off' integrations, merchants will see two transactions for each micropayment.
- name: micropayment_id
description: The micropayment that the transaction is associated with.
- *lp_export_date
- *lp_export_ts
- *lp_line_number
- *payments_input_row_key
- *payments_key_full_uniqueness
- *_content_hash

- name: stg_littlepay__micropayments
tests:
- *littlepay_uniqueness
- &littlepay_uniqueness
dbt_utils.unique_combination_of_columns:
combination_of_columns:
- instance
- extract_filename
- ts
- _line_number
description: This model makes the assumption that, in the case of duplicated micropayment_id values, the one with the latest transaction_time in the latest export file takes precedence.
columns:
- name: micropayment_id
Expand Down Expand Up @@ -472,17 +499,9 @@ models:
- *lp_line_number
- *payments_input_row_key
- *_content_hash
- &payments_key_full_uniqueness
name: _payments_key
description: |
Synthentic key composed of the elements that define a natural key within the source data (primary key according to Littlepay schema.)
tests:
- not_null
- unique
- *payments_key_full_uniqueness

- name: stg_littlepay__product_data
tests:
- *littlepay_uniqueness
columns:
- name: participant_id
description: Identifies the participant that the micropayment belongs to.
Expand Down Expand Up @@ -617,6 +636,12 @@ models:
description: |
Configuration type for the incentive.
Possible values are `DAILY_TABLE` or `ZONE_TRIANGLE`.
- *lp_export_date
- *lp_export_ts
- *lp_line_number
- *payments_input_row_key
- *payments_key_full_uniqueness
- *_content_hash

- name: stg_littlepay__refunds
tests:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
WITH source AS (
SELECT * FROM {{ littlepay_source('external_littlepay', 'customer_funding_source') }}
SELECT * FROM {{ source('external_littlepay', 'customer_funding_source') }}
),

stg_littlepay__customer_funding_source AS (
clean_columns AS (
SELECT
{{ trim_make_empty_string_null('funding_source_id') }} AS funding_source_id,
{{ trim_make_empty_string_null('funding_source_vault_id') }} AS funding_source_vault_id,
Expand All @@ -14,11 +14,23 @@ stg_littlepay__customer_funding_source AS (
{{ trim_make_empty_string_null('issuer_country') }} AS issuer_country,
{{ trim_make_empty_string_null('form_factor') }} AS form_factor,
{{ trim_make_empty_string_null('principal_customer_id') }} AS principal_customer_id,
_line_number,
CAST(_line_number AS INTEGER) AS _line_number,
`instance`,
extract_filename,
ts,
littlepay_export_ts,
{{ extract_littlepay_filename_ts() }} AS littlepay_export_ts,
{{ extract_littlepay_filename_date() }} AS littlepay_export_date,
-- hash all content not generated by us to enable deduping full dup rows
-- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream
{{ dbt_utils.generate_surrogate_key(['funding_source_id', 'funding_source_vault_id',
'customer_id', 'bin', 'masked_pan', 'card_scheme', 'issuer', 'issuer_country',
'form_factor']) }} AS _content_hash,
FROM source
),

add_keys_drop_full_dupes AS (
SELECT
*,
-- flag in reverse order, since we usually want the latest
DENSE_RANK() OVER (
PARTITION BY funding_source_id
Expand All @@ -29,7 +41,47 @@ stg_littlepay__customer_funding_source AS (
DENSE_RANK() OVER (
PARTITION BY customer_id
ORDER BY littlepay_export_ts DESC) AS calitp_customer_id_rank,
FROM source
-- generate keys now that input columns have been trimmed & cast and files deduped
{{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key,
{{ dbt_utils.generate_surrogate_key(['funding_source_id', 'customer_id']) }} AS _payments_key,
FROM clean_columns
{{ qualify_dedupe_full_duplicate_lp_rows() }}
),

stg_littlepay__customer_funding_source AS (
SELECT
funding_source_id,
funding_source_vault_id,
customer_id,
bin,
masked_pan,
card_scheme,
issuer,
issuer_country,
form_factor,
principal_customer_id,
_line_number,
`instance`,
extract_filename,
ts,
littlepay_export_ts,
littlepay_export_date,
calitp_funding_source_id_rank,
calitp_funding_source_vault_id_rank,
calitp_customer_id_rank,
_key,
_payments_key,
_content_hash,
FROM add_keys_drop_full_dupes
-- Some funding sources have incomplete information when first present in data, like missing
-- values for form_factor or issuer_country that are filled in during later exports.
-- Additionally, sometimes a filled column value is updated in newer exports for a given entry.
QUALIFY ROW_NUMBER() OVER (
PARTITION BY
funding_source_id,
customer_id
ORDER BY littlepay_export_ts DESC
) = 1
)

SELECT * FROM stg_littlepay__customer_funding_source
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ WITH source AS (
SELECT * FROM {{ source('external_littlepay', 'device_transaction_purchases') }}
),

stg_littlepay__device_transaction_purchases AS (
clean_columns AS (
SELECT
{{ trim_make_empty_string_null('littlepay_transaction_id') }} AS littlepay_transaction_id,
{{ trim_make_empty_string_null('purchase_id') }} AS purchase_id,
Expand All @@ -11,11 +11,57 @@ stg_littlepay__device_transaction_purchases AS (
{{ trim_make_empty_string_null('description') }} AS description,
{{ safe_cast('indicative_amount', type_numeric()) }} AS indicative_amount,
{{ safe_cast('transaction_time', type_timestamp()) }} AS transaction_time,
_line_number,
CAST(_line_number AS INTEGER) AS _line_number,
`instance`,
extract_filename,
{{ extract_littlepay_filename_ts() }} AS littlepay_export_ts,
{{ extract_littlepay_filename_date() }} AS littlepay_export_date,
ts,
-- hash all content not generated by us to enable deduping full dup rows
-- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream
{{ dbt_utils.generate_surrogate_key(['littlepay_transaction_id', 'purchase_id',
'correlated_purchase_id', 'product_id', 'description', 'indicative_amount',
'transaction_time']) }} AS _content_hash,
FROM source
),

add_keys_drop_full_dupes AS (
SELECT
*,
-- generate keys now that input columns have been trimmed & cast and files deduped
{{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key,
{{ dbt_utils.generate_surrogate_key(['littlepay_transaction_id', 'purchase_id']) }} AS _payments_key,
FROM clean_columns
{{ qualify_dedupe_full_duplicate_lp_rows() }}
),

stg_littlepay__device_transaction_purchases AS (
SELECT
littlepay_transaction_id,
purchase_id,
correlated_purchase_id,
product_id,
description,
indicative_amount,
transaction_time,
_line_number,
`instance`,
extract_filename,
littlepay_export_ts,
littlepay_export_date,
ts,
_key,
_payments_key,
_content_hash,
FROM add_keys_drop_full_dupes
-- Some purchases initially are given a value of 'autoscan' for product_id, and then that
-- value is later updated. No other partial duplicate conditions exist at implementation time.
QUALIFY ROW_NUMBER() OVER (
PARTITION BY
littlepay_transaction_id,
purchase_id
ORDER BY littlepay_export_ts DESC
) = 1
)

SELECT * FROM stg_littlepay__device_transaction_purchases
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
WITH source AS (
SELECT * FROM {{ littlepay_source('external_littlepay', 'device_transactions') }}
SELECT * FROM {{ source('external_littlepay', 'device_transactions') }}
),

stg_littlepay__device_transactions AS (
clean_columns AS (
SELECT
{{ trim_make_empty_string_null('participant_id') }} AS participant_id,
{{ trim_make_empty_string_null('customer_id') }} AS customer_id,
Expand Down Expand Up @@ -32,13 +32,73 @@ stg_littlepay__device_transactions AS (
{{ trim_make_empty_string_null('vehicle_id') }} AS vehicle_id,
{{ trim_make_empty_string_null('granted_zone_ids') }} AS granted_zone_ids,
{{ trim_make_empty_string_null('onward_zone_ids') }} AS onward_zone_ids,
_line_number,
CAST(_line_number AS INTEGER) AS _line_number,
`instance`,
extract_filename,
{{ extract_littlepay_filename_ts() }} AS littlepay_export_ts,
{{ extract_littlepay_filename_date() }} AS littlepay_export_date,
ts,
littlepay_export_ts,
-- hash all content not generated by us to enable deduping full dup rows
-- hashing at this step will preserve distinction between nulls and empty strings in case that is meaningful upstream
{{ dbt_utils.generate_surrogate_key(['participant_id', 'customer_id',
'device_transaction_id', 'littlepay_transaction_id', 'device_id', 'device_id_issuer',
'type', 'transaction_outcome', 'transction_deny_reason', 'transaction_date_time_utc',
'location_id', 'location_scheme', 'location_name', 'zone_id', 'route_id', 'mode',
'direction', 'latitude', 'longitude', 'vehicle_id', 'granted_zone_ids',
'onward_zone_ids']) }} AS _content_hash,
FROM source
QUALIFY ROW_NUMBER() OVER (PARTITION BY littlepay_transaction_id ORDER BY littlepay_export_ts DESC, transaction_date_time_utc DESC) = 1
),

add_keys_drop_full_dupes AS (
SELECT
*,
-- generate keys now that input columns have been trimmed & cast and files deduped
{{ dbt_utils.generate_surrogate_key(['littlepay_export_ts', '_line_number', 'instance']) }} AS _key,
littlepay_transaction_id AS _payments_key,
FROM clean_columns
{{ qualify_dedupe_full_duplicate_lp_rows() }}
),

stg_littlepay__device_transactions AS (
SELECT
participant_id, customer_id,
device_transaction_id,
littlepay_transaction_id,
device_id,
device_id_issuer,
type,
transaction_outcome,
transction_deny_reason,
transaction_date_time_utc,
transaction_date_time_pacific,
location_id,
location_scheme,
location_name,
zone_id,
route_id,
mode,
direction,
latitude,
longitude,
vehicle_id,
granted_zone_ids,
onward_zone_ids,
_line_number,
`instance`,
extract_filename,
littlepay_export_ts,
littlepay_export_date,
ts,
_key,
_payments_key,
_content_hash,
FROM add_keys_drop_full_dupes
-- Some transactions have placeholder information for routes, stops, and directions in their first export,
-- then a later export contains the canonical version of the transaction with that information corrected
QUALIFY ROW_NUMBER() OVER (
PARTITION BY littlepay_transaction_id
ORDER BY littlepay_export_ts DESC
) = 1
)

SELECT * FROM stg_littlepay__device_transactions
Loading

0 comments on commit e6c2f53

Please sign in to comment.