-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathone-table-bigquery.sql
290 lines (245 loc) · 13.1 KB
/
one-table-bigquery.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
/*
SQL Experiments for Typing and Normalizing AirbyteRecords in 1 table
Run me on BQ
Schema:
{
"id": "number",
"first_name": ["string", null],
"age": ["number", null],
"address": [null, {
"street": "string",
"zip": "string"
}],
"updated_at": timestamp
}
KNOWN LIMITATIONS
* Only one error type shown per row, the first one sequentially
* There's a full table scan used for de-duplication. This can be made more efficient...
* It would be better to show the actual error message from the DB, not custom "this column is bad" strings
*/
-- Set up the Experiment
-- Assumption: We can build the table at the start of the sync based only on the schema we get from the source/configured catalog
DROP TABLE IF EXISTS testing_evan_2052.users;
DROP TABLE IF EXISTS testing_evan_2052.users_raw;
CREATE TABLE testing_evan_2052.users (
`id` INT64 OPTIONS (description = 'PK cannot be null, but after raw insert and before typing, row will be temporarily null')
, `first_name` STRING
, `age` INT64
, `address` JSON
, `updated_at` TIMESTAMP
, `_airbyte_meta` JSON NOT NULL OPTIONS (description = 'Airbyte column, cannot be null')
, `_airbyte_raw_id` STRING NOT NULL OPTIONS (description = 'Airbyte column, cannot be null')
, `_airbyte_extracted_at` TIMESTAMP NOT NULL OPTIONS (description = 'Airbyte column, cannot be null')
)
PARTITION BY (
DATE_TRUNC(_airbyte_extracted_at, DAY)
-- TODO: Learn about partition_expiration_days https://cloud.google.com/bigquery/docs/creating-partitioned-tables
) CLUSTER BY
id, _airbyte_extracted_at
OPTIONS (
description="users table"
)
;
-------------------------------------
--------- TYPE AND DEDUPE -----------
-------------------------------------
CREATE OR REPLACE PROCEDURE testing_evan_2052._airbyte_prepare_raw_table()
BEGIN
CREATE TABLE IF NOT EXISTS testing_evan_2052.users_raw (
`_airbyte_raw_id` STRING NOT NULL OPTIONS (description = 'Airbyte column, cannot be null')
, `_airbyte_data` JSON NOT NULL OPTIONS (description = 'Airbyte column, cannot be null')
, `_airbyte_extracted_at` TIMESTAMP NOT NULL OPTIONS (description = 'Airbyte column, cannot be null')
, `_airbyte_loaded_at` TIMESTAMP
)
PARTITION BY (
DATE_TRUNC(_airbyte_extracted_at, DAY)
) CLUSTER BY
_airbyte_loaded_at
;
END
;
CREATE OR REPLACE PROCEDURE testing_evan_2052._airbyte_type_dedupe()
OPTIONS (strict_mode=FALSE)
BEGIN
DECLARE missing_pk_count INT64;
BEGIN TRANSACTION;
-- Step 1: Validate the incoming data
-- We can't really do this properly in the pure-SQL example here, but we should throw if any row doesn't have a PK
SET missing_pk_count = (
SELECT COUNT(1)
FROM testing_evan_2052.users_raw
WHERE
`_airbyte_loaded_at` IS NULL
AND SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.id') as INT64) IS NULL
);
IF missing_pk_count > 0 THEN
RAISE USING message = FORMAT("Raw table has %s rows missing a primary key", CAST(missing_pk_count AS STRING));
END IF;
-- Moving the data and deduping happens in a transaction to prevent duplicates from appearing
-- BEGIN
-- Step 2: Move the new data to the typed table
INSERT INTO testing_evan_2052.users
(
id,
first_name,
age,
updated_at,
address,
_airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
)
WITH intermediate_data AS (
SELECT
SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.id') as INT64) as id,
SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.first_name') as STRING) as first_name,
SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.age') as INT64) as age,
JSON_QUERY(`_airbyte_data`, '$.address') as address, -- NOTE: For record properties remaining as JSON, you `JSON_QUERY`, not `JSON_VALUE`
SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.updated_at') as TIMESTAMP) as updated_at,
array_concat(
CASE
WHEN (JSON_VALUE(`_airbyte_data`, '$.id') IS NOT NULL) AND (SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.id') as INT64) IS NULL) THEN ["Problem with `id`"]
ELSE []
END,
CASE
WHEN (JSON_VALUE(`_airbyte_data`, '$.first_name') IS NOT NULL) AND (SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.first_name') as STRING) IS NULL) THEN ["Problem with `first_name`"]
ELSE []
END,
CASE
WHEN (JSON_VALUE(`_airbyte_data`, '$.age') IS NOT NULL) AND (SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.age') as INT64) IS NULL) THEN ["Problem with `age`"]
ELSE []
END,
CASE
WHEN (JSON_VALUE(`_airbyte_data`, '$.updated_at') IS NOT NULL) AND (SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.updated_at') as TIMESTAMP) IS NULL) THEN ["Problem with `updated_at`"]
ELSE []
END,
-- TODO This should probably actually check if JSON_QUERY returned an object. Right now it's checking IS NOT NULL AND IS NULL, i.e always false.
CASE
WHEN (JSON_VALUE(`_airbyte_data`, '$.address') IS NOT NULL) AND (JSON_QUERY(`_airbyte_data`, '$.address') IS NULL) THEN ["Problem with `address`"]
ELSE []
END
) _airbyte_cast_errors,
_airbyte_raw_id,
_airbyte_extracted_at
FROM testing_evan_2052.users_raw
WHERE
_airbyte_loaded_at IS NULL -- inserting only new/null values, we can recover from failed previous checkpoints
OR (
-- Temporarily place back an entry for any CDC-deleted record so we can order them properly by cursor. We only need the PK and cursor value
_airbyte_loaded_at IS NOT NULL
AND JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
)
)
SELECT
`id`,
`first_name`,
`age`,
`updated_at`,
`address`,
CASE
WHEN array_length(_airbyte_cast_errors) = 0 THEN JSON'{"errors": []}'
ELSE to_json(struct(_airbyte_cast_errors AS errors))
END AS _airbyte_meta,
_airbyte_raw_id,
_airbyte_extracted_at
FROM intermediate_data
;
-- Step 3: Dedupe and clean the typed table
-- This is a full table scan, but we need to do it this way to merge the new rows with the old to:
-- * Consider the case in which there are multiple entries for the same PK in the new insert batch
-- * Consider the case in which the data in the new batch is older than the data in the typed table, and we only want to keep the newer (pre-existing) data
-- * Order by the source's provided cursor and _airbyte_extracted_at to break any ties
-- NOTE: Bigquery does not support using CTEs for delete statements: https://stackoverflow.com/questions/48130324/bigquery-delete-statement-to-remove-duplicates
DELETE FROM testing_evan_2052.users
WHERE
-- Delete any rows which are not the most recent for a given PK
`_airbyte_raw_id` IN (
SELECT `_airbyte_raw_id` FROM (
SELECT `_airbyte_raw_id`, row_number() OVER (
PARTITION BY `id` ORDER BY `updated_at` DESC, `_airbyte_extracted_at` DESC
) as row_number FROM testing_evan_2052.users
)
WHERE row_number != 1
)
;
-- Step 4: Remove old entries from Raw table
DELETE FROM
testing_evan_2052.users_raw
WHERE
`_airbyte_raw_id` NOT IN (
SELECT `_airbyte_raw_id` FROM testing_evan_2052.users
)
;
-- Step 5: Clean out CDC deletes from final table
-- Only run this step if _ab_cdc_deleted_at is a property of the stream
/*
DELETE FROM testing_evan_2052.users
WHERE _ab_cdc_deleted_at IS NOT NULL
*/
-- the following will always work, even if there is no _ab_cdc_deleted_at column, but it is slower
DELETE FROM testing_evan_2052.users
WHERE
-- Delete rows that have been CDC deleted
`id` IN (
SELECT
SAFE_CAST(JSON_VALUE(`_airbyte_data`, '$.id') as INT64) as id -- based on the PK which we know from the connector catalog
FROM testing_evan_2052.users_raw
WHERE
JSON_VALUE(`_airbyte_data`, '$._ab_cdc_deleted_at') IS NOT NULL
)
;
-- Step 6: Apply typed_at timestamp where needed
UPDATE testing_evan_2052.users_raw
SET `_airbyte_loaded_at` = CURRENT_TIMESTAMP()
WHERE `_airbyte_loaded_at` IS NULL
;
COMMIT TRANSACTION;
END;
----------------------------
--------- SYNC 1 -----------
----------------------------
CALL testing_evan_2052._airbyte_prepare_raw_table();
-- Load the raw data
INSERT INTO testing_evan_2052.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES (JSON'{ "id": 1, "updated_at": "2020-01-01T00:00:00Z", "first_name": "Evan", "age": 38, "address": { "city": "San Francisco", "zip": "94001" } }', GENERATE_UUID(), CURRENT_TIMESTAMP());
INSERT INTO testing_evan_2052.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES (JSON'{ "id": 2, "updated_at": "2020-01-01T00:00:01Z", "first_name": "Brian", "age": 39, "address": { "city": "Menlo Park", "zip": "94002" } }', GENERATE_UUID(), CURRENT_TIMESTAMP());
INSERT INTO testing_evan_2052.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES (JSON'{ "id": 3, "updated_at": "2020-01-01T00:00:02Z", "first_name": "Edward", "age": 40, "address": { "city": "Sunyvale", "zip": "94003" } }', GENERATE_UUID(), CURRENT_TIMESTAMP());
INSERT INTO testing_evan_2052.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES (JSON'{ "id": 4, "updated_at": "2020-01-01T00:00:03Z", "first_name": "Joe", "address": { "city": "Seattle", "zip": "98999" } }', GENERATE_UUID(), CURRENT_TIMESTAMP());
CALL testing_evan_2052._airbyte_type_dedupe();
----------------------------
--------- SYNC 2 -----------
----------------------------
CALL testing_evan_2052._airbyte_prepare_raw_table();
-- Load the raw data
-- Age update for testing_evan_2052 (user 1)
-- There is an update for Brian (user 2, new address.zip)
-- There is an update for Edward (user 3, age is invalid)
-- No update for Joe (user 4)
INSERT INTO testing_evan_2052.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES (JSON'{ "id": 1, "updated_at": "2020-01-02T00:00:00Z", "first_name": "Evan", "age": 39, "address": { "city": "San Francisco", "zip": "94001" } }', GENERATE_UUID(), CURRENT_TIMESTAMP());
INSERT INTO testing_evan_2052.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES (JSON'{ "id": 2, "updated_at": "2020-01-02T00:00:01Z", "first_name": "Brian", "age": 39, "address": { "city": "Menlo Park", "zip": "99999" } }', GENERATE_UUID(), CURRENT_TIMESTAMP());
INSERT INTO testing_evan_2052.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES (JSON'{ "id": 3, "updated_at": "2020-01-02T00:00:02Z", "first_name": "Edward", "age": "forty", "address": { "city": "Sunyvale", "zip": "94003" } }', GENERATE_UUID(), CURRENT_TIMESTAMP());
CALL testing_evan_2052._airbyte_type_dedupe();
----------------------------
--------- SYNC 3 -----------
----------------------------
CALL testing_evan_2052._airbyte_prepare_raw_table();
-- Load the raw data
-- Delete row 1 with CDC
-- Insert multiple records for a new user (with age incrementing each time)
INSERT INTO testing_evan_2052.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES (JSON'{ "id": 1, "updated_at": "2020-01-03T00:00:00Z", "first_name": "Brian", "age": 39, "address": { "city": "Menlo Park", "zip": "99999" }, "_ab_cdc_deleted_at": true}', GENERATE_UUID(), CURRENT_TIMESTAMP());
INSERT INTO testing_evan_2052.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES (JSON'{ "id": 5, "updated_at": "2020-01-03T00:00:01Z", "first_name": "Cynthia", "age": 40, "address": { "city": "Redwood City", "zip": "98765" }}', GENERATE_UUID(), CURRENT_TIMESTAMP());
INSERT INTO testing_evan_2052.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES (JSON'{ "id": 5, "updated_at": "2020-01-03T00:00:02Z", "first_name": "Cynthia", "age": 41, "address": { "city": "Redwood City", "zip": "98765" }}', GENERATE_UUID(), CURRENT_TIMESTAMP());
INSERT INTO testing_evan_2052.users_raw (`_airbyte_data`, `_airbyte_raw_id`, `_airbyte_extracted_at`) VALUES (JSON'{ "id": 5, "updated_at": "2020-01-03T00:00:03Z", "first_name": "Cynthia", "age": 42, "address": { "city": "Redwood City", "zip": "98765" }}', GENERATE_UUID(), CURRENT_TIMESTAMP());
CALL testing_evan_2052._airbyte_type_dedupe();
----------------------
-- FINAL VALIDATION --
----------------------
/*
You should see 5 RAW records, one for each of the 5 users
You should see 4 TYPED records, one for each user, except user #2, which was CDC deleted
You should have the latest data for each user in the typed final table:
* User #1 (Evan) has the latest data (age=39)
* User #3 (Edward) has a null age [+ error] due to that age being un-typable
* User #4 (Joe) has a null age & no errors
* User #5 (Cynthia) has one entry dispite the multiple insertes, with the latest entry (age=42)
*/
SELECT CURRENT_TIMESTAMP();