-
Notifications
You must be signed in to change notification settings - Fork 30
/
Copy path08 Insert or Merge Query Results or View into Delta Tables using Databricks SQL.sql
151 lines (130 loc) · 4.46 KB
/
08 Insert or Merge Query Results or View into Delta Tables using Databricks SQL.sql
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
SHOW DATABASES;
USE lms_silver; SHOW TABLES;
USE lms_bronze; SHOW TABLES;
CREATE DATABASE IF NOT EXISTS lms_silver;
DROP TABLE IF EXISTS lms_silver.instructors;
CREATE TABLE IF NOT EXISTS lms_silver.instructors (
instructor_id INT,
instructor_name STRING,
bl_created_ts TIMESTAMP,
sl_updated_ts TIMESTAMP
);
DROP TABLE IF EXISTS lms_bronze.course_catalog;
CREATE TABLE IF NOT EXISTS lms_bronze.course_catalog (
table_name STRING,
record STRING,
bl_created_ts TIMESTAMP
);
SELECT input_file_name(), table_name, record
FROM JSON.`dbfs:/FileStore/lms_dl/course_catalog` AS course_catalog;
COPY INTO lms_bronze.course_catalog
FROM (
SELECT table_name,
record,
current_timestamp AS bl_created_ts
FROM 'dbfs:/FileStore/lms_dl/course_catalog'
) FILEFORMAT = JSON
FILES = ('part-00000.json')
COPY_OPTIONS ('force' = 'true');
SELECT * FROM lms_bronze.course_catalog;
INSERT INTO lms_silver.instructors
SELECT record.*,
bl_created_ts,
current_timestamp AS sl_updated_ts
FROM (
SELECT from_json(record, 'struct<instructor_id:INT, instructor_name:STRING>') AS record,
bl_created_ts
FROM lms_bronze.course_catalog
WHERE table_name = 'instructors'
);
SELECT * FROM lms_silver.instructors;
SELECT input_file_name(), table_name, record
FROM JSON.`dbfs:/FileStore/lms_dl/course_catalog` AS course_catalog;
COPY INTO lms_bronze.course_catalog
FROM (
SELECT table_name,
record,
current_timestamp AS bl_created_ts
FROM 'dbfs:/FileStore/lms_dl/course_catalog'
) FILEFORMAT = JSON
FILES = ('part-00001.json');
SELECT *
FROM lms_bronze.course_catalog
WHERE table_name = 'instructors'
ORDER BY bl_created_ts;
SELECT max(bl_created_ts) FROM lms_bronze.course_catalog;
SELECT record.*,
bl_created_ts
FROM (
SELECT from_json(record, 'struct<instructor_id:INT, instructor_name:STRING>') AS record,
bl_created_ts
FROM lms_bronze.course_catalog
WHERE table_name = 'instructors'
AND bl_created_ts >= (SELECT max(bl_created_ts) FROM lms_bronze.course_catalog)
);
-- MERGE INTO <table_name>
-- USING <source_data_expression>
-- ON <join_condition>
-- WHEN MATCHED THEN
-- UPDATE SET
-- tgt_col1 = src_col1,
-- tgt_col2 = src_col2
-- WHEN NOT MATCHED THEN
-- INSERT ()
-- VALUES
WITH course_catalog_cte AS (
SELECT record.*,
bl_created_ts,
current_timestamp AS sl_updated_ts
FROM (
SELECT from_json(record, 'struct<instructor_id:INT, instructor_name:STRING>') AS record,
bl_created_ts
FROM lms_bronze.course_catalog
WHERE table_name = 'instructors'
AND bl_created_ts >= (SELECT max(bl_created_ts) FROM lms_bronze.course_catalog)
)
) SELECT * FROM course_catalog_cte;
WITH course_catalog_cte AS (
SELECT record.*,
bl_created_ts,
current_timestamp AS sl_updated_ts
FROM (
SELECT from_json(record, 'struct<instructor_id:INT, instructor_name:STRING>') AS record,
bl_created_ts
FROM lms_bronze.course_catalog
WHERE table_name = 'instructors'
AND bl_created_ts >= (SELECT max(bl_created_ts) FROM lms_bronze.course_catalog)
)
) MERGE INTO lms_silver.instructors AS i
USING course_catalog_cte AS cc
ON i.instructor_id = cc.instructor_id
WHEN MATCHED THEN UPDATE SET *
WHEN NOT MATCHED THEN INSERT *;
SELECT * FROM lms_silver.instructors;
WITH course_catalog_cte AS (
SELECT record.*,
bl_created_ts,
current_timestamp AS sl_updated_ts
FROM (
SELECT from_json(record, 'struct<instructor_id:INT, instructor_name:STRING>') AS record,
bl_created_ts
FROM lms_bronze.course_catalog
WHERE table_name = 'instructors'
AND bl_created_ts >= (SELECT max(bl_created_ts) FROM lms_bronze.course_catalog)
)
) MERGE INTO lms_silver.instructors AS i
USING course_catalog_cte AS cc
ON i.instructor_id = cc.instructor_id
WHEN MATCHED THEN
UPDATE SET
i.instructor_name = cc.instructor_name,
i.bl_created_ts = cc.bl_created_ts,
i.sl_updated_ts = cc.sl_updated_ts
WHEN NOT MATCHED THEN
INSERT
(instructor_id, instructor_name, bl_created_ts, sl_updated_ts)
VALUES
(cc.instructor_id, cc.instructor_name, cc.bl_created_ts, cc.sl_updated_ts);
SELECT * FROM lms_bronze.course_catalog
WHERE table_name = 'courses'
ORDER BY bl_created_ts;