-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathUsing Transaction Manager Example.py
190 lines (148 loc) · 6.07 KB
/
Using Transaction Manager Example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
# Databricks notebook source
# MAGIC %md
# MAGIC
# MAGIC ## TO DO:
# MAGIC
# MAGIC 1. Continue to add edge cases on affected tables: RESTORE TABLE, OPTIMIZE
# MAGIC 2. Ensure shapshot versions are created for tables that do not exists. if transaction fails and snapshot is -1, then run DROP TABLE IF EXISTS statement.
# COMMAND ----------
# MAGIC %pip install -r helperfunctions/requirements.txt
# COMMAND ----------
from helperfunctions.transactions import Transaction
# COMMAND ----------
# DBTITLE 1,Example SQL Transaction Block
sqlString = """
USE CATALOG hive_metastore;
CREATE SCHEMA IF NOT EXISTS iot_dashboard;
USE SCHEMA iot_dashboard;
-- Create Tables
CREATE OR REPLACE TABLE iot_dashboard.bronze_sensors
(
Id BIGINT GENERATED BY DEFAULT AS IDENTITY,
device_id INT,
user_id INT,
calories_burnt DECIMAL(10,2),
miles_walked DECIMAL(10,2),
num_steps DECIMAL(10,2),
timestamp TIMESTAMP,
value STRING
)
USING DELTA
TBLPROPERTIES("delta.targetFileSize"="128mb");
CREATE OR REPLACE TABLE iot_dashboard.silver_sensors
(
Id BIGINT GENERATED BY DEFAULT AS IDENTITY,
device_id INT,
user_id INT,
calories_burnt DECIMAL(10,2),
miles_walked DECIMAL(10,2),
num_steps DECIMAL(10,2),
timestamp TIMESTAMP,
value STRING
)
USING DELTA
PARTITIONED BY (user_id)
TBLPROPERTIES("delta.targetFileSize"="128mb");
-- Statement 1 -- the load
COPY INTO iot_dashboard.bronze_sensors
FROM (SELECT
id::bigint AS Id,
device_id::integer AS device_id,
user_id::integer AS user_id,
calories_burnt::decimal(10,2) AS calories_burnt,
miles_walked::decimal(10,2) AS miles_walked,
num_steps::decimal(10,2) AS num_steps,
timestamp::timestamp AS timestamp,
value AS value -- This is a JSON object
FROM "/databricks-datasets/iot-stream/data-device/")
FILEFORMAT = json
COPY_OPTIONS('force'='true') -- 'false' -- process incrementally
--option to be incremental or always load all files
;
-- Statement 2
MERGE INTO iot_dashboard.silver_sensors AS target
USING (SELECT Id::integer,
device_id::integer,
user_id::integer,
calories_burnt::decimal,
miles_walked::decimal,
num_steps::decimal,
timestamp::timestamp,
value::string
FROM iot_dashboard.bronze_sensors) AS source
ON source.Id = target.Id
AND source.user_id = target.user_id
AND source.device_id = target.device_id
WHEN MATCHED THEN UPDATE SET
target.calories_burnt = source.calories_burnt,
target.miles_walked = source.miles_walked,
target.num_steps = source.num_steps,
target.timestamp = source.timestamp
WHEN NOT MATCHED THEN INSERT *;
OPTIMIZE iot_dashboard.silver_sensors ZORDER BY (timestamp);
-- This calculate table stats for all columns to ensure the optimizer can build the best plan
-- Statement 3
ANALYZE TABLE iot_dashboard.silver_sensors COMPUTE STATISTICS FOR ALL COLUMNS;
CREATE OR REPLACE TABLE hourly_summary_statistics
AS
SELECT user_id,
date_trunc('hour', timestamp) AS HourBucket,
AVG(num_steps)::float AS AvgNumStepsAcrossDevices,
AVG(calories_burnt)::float AS AvgCaloriesBurnedAcrossDevices,
AVG(miles_walked)::float AS AvgMilesWalkedAcrossDevices
FROM silver_sensors
GROUP BY user_id,date_trunc('hour', timestamp)
ORDER BY HourBucket;
-- Statement 4
-- Truncate bronze batch once successfully loaded
TRUNCATE TABLE bronze_sensors;
"""
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC ## 3 Primary Ways to Do a Transaction
# MAGIC
# MAGIC 1. <b> SQL - selected_tables: </b> This allows the user to explicitly control which exact tables get snapshotted and rolledback - good for production where lots of jobs are running.
# MAGIC
# MAGIC 2. <b> SQL - inferred_selected_tables </b> This uses SQL Glot to automatically find tables that would be altered from the SQL inside the transaction block, and will snapshot those tables. Great for simplicity but should be checked in a test before moving to production
# MAGIC
# MAGIC 3. <b> Python </b> - call .begin_transaction() and rollback_transaction() methods manually do manage a transaction state. This allows for more complex logic outside of a contiguous multi statement SQL block
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC ## Method 1: SQL - selected_tables
# COMMAND ----------
# DBTITLE 1,Initialize Transaction Class - Manually Define Selected Tables
x = Transaction(mode="selected_tables", uc_default=False)
# COMMAND ----------
# DBTITLE 1,Execute a multi statement SQL transaction from a SQL string - Manually Defining
## This method is great because to do not need to rollback manually, it is handled for you
## This statement auto-commmits on success. If you do not want that, you can write pyspark or regular SQL outside of this method and then manually rollback
x.execute_sql_transaction(sqlString, tables_to_manage=["hive_metastore.iot_dashboard.silver_sensors"])
# COMMAND ----------
# MAGIC %md
# MAGIC ## Method 2: SQL - inferred_altered_tables
# COMMAND ----------
y = Transaction(mode="inferred_altered_tables", uc_default=False) ## uc_default=True if you want to infer schema with main as default instead of hive_metastore.
# COMMAND ----------
## This statement auto-commmits on success. If you do not want that, you can write pyspark or regular SQL outside of this method and then manually rollback
y.execute_sql_transaction(sqlString)
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC ## Method 3: Python
# MAGIC
# MAGIC Call transaction begin and rollback and do any logic in between
# COMMAND ----------
# DBTITLE 1,Begin Transaction in Python
x.begin_transaction(tables_to_snapshot=["hive_metastore.iot_dashbaord.silver_sensors"])
# COMMAND ----------
##### Do a bunch of logic here, any logic at all
#####
# COMMAND ----------
# DBTITLE 1,Get Transaction Snapshot Info
x.get_transaction_snapshot()
# COMMAND ----------
# DBTITLE 1,Manually rollback a transaction from most recent explicit snapshot for tables
### If you use the SQL execute method, it auto commits!! So you cannot roll back once it succeed. It will do it automatically. You can still use all the manual methods if you want to opt out of auto handling the rollback/committ process
x.rollback_transaction()