-
Notifications
You must be signed in to change notification settings - Fork 11
/
Using DBSQL Serverless Transaction Manager Example.py
136 lines (111 loc) · 4.03 KB
/
Using DBSQL Serverless Transaction Manager Example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# Databricks notebook source
# MAGIC %pip install -r helperfunctions/requirements.txt
# COMMAND ----------
from helperfunctions.dbsqltransactions import DBSQLTransactionManager
# COMMAND ----------
# DBTITLE 1,Example Inputs For Client
token = None ## optional
host_name = None ## optional
warehouse_id = "475b94ddc7cd5211"
# COMMAND ----------
# DBTITLE 1,Example Multi Statement Transaction
sqlString = """
USE CATALOG hive_metastore;
CREATE SCHEMA IF NOT EXISTS iot_dashboard;
USE SCHEMA iot_dashboard;
-- Create Tables
CREATE OR REPLACE TABLE iot_dashboard.bronze_sensors
(
Id BIGINT GENERATED BY DEFAULT AS IDENTITY,
device_id INT,
user_id INT,
calories_burnt DECIMAL(10,2),
miles_walked DECIMAL(10,2),
num_steps DECIMAL(10,2),
timestamp TIMESTAMP,
value STRING
)
USING DELTA
TBLPROPERTIES("delta.targetFileSize"="128mb");
CREATE OR REPLACE TABLE iot_dashboard.silver_sensors
(
Id BIGINT GENERATED BY DEFAULT AS IDENTITY,
device_id INT,
user_id INT,
calories_burnt DECIMAL(10,2),
miles_walked DECIMAL(10,2),
num_steps DECIMAL(10,2),
timestamp TIMESTAMP,
value STRING
)
USING DELTA
PARTITIONED BY (user_id)
TBLPROPERTIES("delta.targetFileSize"="128mb");
-- Statement 1 -- the load
COPY INTO iot_dashboard.bronze_sensors
FROM (SELECT
id::bigint AS Id,
device_id::integer AS device_id,
user_id::integer AS user_id,
calories_burnt::decimal(10,2) AS calories_burnt,
miles_walked::decimal(10,2) AS miles_walked,
num_steps::decimal(10,2) AS num_steps,
timestamp::timestamp AS timestamp,
value AS value -- This is a JSON object
FROM "/databricks-datasets/iot-stream/data-device/")
FILEFORMAT = json
COPY_OPTIONS('force'='true') -- 'false' -- process incrementally
--option to be incremental or always load all files
;
-- Statement 2
MERGE INTO iot_dashboard.silver_sensors AS target
USING (SELECT Id::integer,
device_id::integer,
user_id::integer,
calories_burnt::decimal,
miles_walked::decimal,
num_steps::decimal,
timestamp::timestamp,
value::string
FROM iot_dashboard.bronze_sensors) AS source
ON source.Id = target.Id
AND source.user_id = target.user_id
AND source.device_id = target.device_id
WHEN MATCHED THEN UPDATE SET
target.calories_burnt = source.calories_burnt,
target.miles_walked = source.miles_walked,
target.num_steps = source.num_steps,
target.timestamp = source.timestamp
WHEN NOT MATCHED THEN INSERT *;
OPTIMIZE iot_dashboard.silver_sensors ZORDER BY (timestamp);
-- This calculate table stats for all columns to ensure the optimizer can build the best plan
-- Statement 3
ANALYZE TABLE iot_dashboard.silver_sensors COMPUTE STATISTICS FOR ALL COLUMNS;
CREATE OR REPLACE TABLE hourly_summary_statistics
AS
SELECT user_id,
date_trunc('hour', timestamp) AS HourBucket,
AVG(num_steps)::float AS AvgNumStepsAcrossDevices,
AVG(calories_burnt)::float AS AvgCaloriesBurnedAcrossDevices,
AVG(miles_walked)::float AS AvgMilesWalkedAcrossDevices
FROM silver_sensors
GROUP BY user_id,date_trunc('hour', timestamp)
ORDER BY HourBucket;
-- Statement 4
-- Truncate bronze batch once successfully loaded
TRUNCATE TABLE bronze_sensors;
"""
# COMMAND ----------
serverless_client_t = DBSQLTransactionManager(warehouse_id = warehouse_id, mode="inferred_altered_tables") ## token=<optional>, host_name=<optional>verbose=True for print statements and other debugging messages
# COMMAND ----------
# DBTITLE 1,Submitting the Multi Statement Transaction to Serverless SQL Warehouse
"""
PARAMS:
warehouse_id --> Required, the SQL warehouse to submit statements
mode -> selected_tables, inferred_altered_tables
token --> optional, will try to get one for the user
host_name --> optional, will try to infer same workspace url
execute_sql_transaction params:
return_type --> "message", "last_results". "message" will return status of query chain. "last_result" will run all statements and return the last results of the final query in the chain
"""
result_df = serverless_client_t.execute_dbsql_transaction(sql_string = sqlString)