-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathUsing Delta Logger Example.py
248 lines (198 loc) · 11.3 KB
/
Using Delta Logger Example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
# Databricks notebook source
# MAGIC %md
# MAGIC
# MAGIC ## Delta Logger - How to use
# MAGIC
# MAGIC Purpose: This notebook utilizes the delta logger library to automatically and easiy log general pipeline information all in one place for any data pipeline.
# MAGIC
# MAGIC All logger tables have a standard default schema DDL:
# MAGIC
# MAGIC CREATE TABLE IF NOT EXISTS {full_table_name} (
# MAGIC run_id BIGINT GENERATED BY DEFAULT AS IDENTITY,
# MAGIC batch_id STRING,
# MAGIC session_process_name STRING NOT NULL,
# MAGIC process_name STRING NOT NULL,
# MAGIC status STRING NOT NULL, -- RUNNING, FAIL, SUCCESS, STALE
# MAGIC start_timestamp TIMESTAMP NOT NULL,
# MAGIC end_timestamp TIMESTAMP,
# MAGIC duration_seconds DECIMAL,
# MAGIC duration_ms DECIMAL,
# MAGIC run_metadata STRING, -- String formatted like JSON
# MAGIC update_timestamp TIMESTAMP,
# MAGIC update_date DATE GENERATED ALWAYS AS (update_timestamp::date),
# MAGIC start_date DATE GENERATED ALWAYS AS (start_timestamp::date),
# MAGIC end_date DATE GENERATED ALWAYS AS (end_timestamp::date)
# MAGIC )
# MAGIC USING DELTA
# MAGIC
# MAGIC ## Overivew
# MAGIC The Delta logger is organized into Sessions, Processes, and Runs.
# MAGIC A session is just like a Spark Session. It is an attempt at running a particular job/task. It is scoped like an active session each time the delta_logger is initialized.
# MAGIC
# MAGIC A session can then have one or many proesses running inside it. This is to allow for nested tracking of specific actions/processes within a Databricks job/notebook. By default, if a custom process name is not provided when starting a run, the session_process_name = active_process_name.
# MAGIC
# MAGIC Then, each process (can be 1 or many for each session/batch) can perform a run. A run is the smallest atomic unit. It is an ever-incrementing attempt at running a process. Runs then have start times, status, end times, metadata, etc.
# MAGIC
# MAGIC
# MAGIC ## Initialize
# MAGIC delta_logger = DeltaLogger(logger_table="main.iot_dashboard.pipeline_logs",
# MAGIC session_process_name="iot_pipeline",
# MAGIC batch_id = None ## Optional - allows user to pass in custom session batch id, by default a uuid is created for measuring a session id.
# MAGIC logger_location=None, ## Optional location of the underlying table. S3/ADLS/GCS/dbfs path.
# MAGIC partition_col:[str] = None ## Optional list of custom partition columns for the table. Allows user to customerize their logger to their query and management needs.
# MAGIC )
# MAGIC
# MAGIC - <b>logger_table</b> is the logging table you want to store and reference. You can create and manage as many logger tables as you would like. If you initilize a DeltaLogger and that table does not exist, it will create it for you.
# MAGIC - <b> session_process_name</b> OPTIONAL - Users can log events/runs and pass the process_name into each event, or they can simply define it at the session level this way. This will default to using the session_process_name passed in here for the whole session. It can be overridden anytime. You can also use this to log child processes within a session by starting/completing runs with additional process names while using the session process name as the parent. And example is provided in this notebook.
# MAGIC - <b> logger_location </b> OPTIONAL - default = None. This is an override for specifying a specific object storage location for where the user wants the table to live. If not provided, it will be a managed table by default (recommended).
# MAGIC
# MAGIC ## Methods:
# MAGIC
# MAGIC For most methods: -- if process_name not provided, will use session. If cannot find process_name, will error.
# MAGIC
# MAGIC - <b> create_logger() </b> -- creates a logger table if not exists. This also optimizes the table since it is used in initlialization.
# MAGIC - <b> drop_logger() </b> -- drops the logger table attached to the session
# MAGIC - <b> truncate_logger() </b> -- clears an existing logger table
# MAGIC - <b> start_run(process_name: Optional, msg: Optional) </b>
# MAGIC - <b> fail_run(process_name: Optional, msg: Optional) </b>
# MAGIC - <b> complete_run(process_name: Optional, msg: Optional) </b>
# MAGIC - <b> log_run_metric(process_name: Optional, run_metric_dic:dict[str])
# MAGIC - <b> log_run_info(log_level = 'INFO', msg = None) </b>
# MAGIC - <b> get_last_successful_run_id(proces_name: Optional)</b> -- If no previous successful run, return -1
# MAGIC - <b> get_last_successful_run_timestamp(process_name: Optional)</b> -- If no previous successful run for the process, defaults to "1900-01-01 00:00:00"
# MAGIC - <b> get_last_run_id(process_name: Optional)</b> -- Get last run id regardless of status, if none return -1
# MAGIC - <b> get_last_run_timestamp(process_name: Optional)</b> -- Get last run timestamp , If no previous run for the process, defaults to "1900-01-01 00:00:00"
# MAGIC - <b> get_last_failed_run_id(process_name: Optional) </b>
# MAGIC - <b> get_last_failed_run_timestamp(prcoess_name: Optional) </b>
# MAGIC - <b> clean_stale_runs(process_name: Optional) </b> -- Will mark any runs without and end timestamp in the running state to "STALE" and give them an end timestamp. This ONLY happens when a new run is created and the runs are < the max existing RUNNING run id
# MAGIC - <b> optimize_log(process_name:Optional, zorderCols=["end_timestamp", "start_timestamp", "run_id"]) </b> -- Optimizes the underlying log table for a particular process name a ZORDERs by input col list
# MAGIC ### Limitations / Considerations
# MAGIC 1. Currently supports 1 concurrent run per process_name for a given delta table. If you want to run concurrent pipelines, you need to create separate process names for them. This is meant to be a simple run and logging tracking solution for EDW pipelines.
# MAGIC
# MAGIC 2. User can pass in the fully qualified table name, use the spark session defaults, or pass in catalog and database overrides to the parameters. Pick one.
# MAGIC
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC ## Design Patterns In this Example
# MAGIC
# MAGIC 1. Use for Basic error handling, tracking of runs of various processes
# MAGIC 2. Use for watermarking loading patterns. i.e. Creating a new run automatically pulls the most recent previous successful run and provide a "watermark" variable you can utilize for incremental loading. Use delta_logger.get_last_succes
# MAGIC 3. Use with DBSQL Client and Transaction Manager Together for end to end
# COMMAND ----------
# DBTITLE 1,Import Logger
from helperfunctions.deltalogger import DeltaLogger
# COMMAND ----------
# DBTITLE 1,Initialize a Delta Logger (creates logger table referenced in not exists)
## Session_process_name - Name for the session of a notebook. By default it is the notebook path.
## Session_batch_id - Id for the session. By default it is a generated uuid for each delta_logger initialization (session). Can customize to any string.
## partition_cols - Customer partition columns for a Delta table. By default the partition columns are: ['start_date', 'session_process_name', 'process_name']
## All date colummns are auto-generated columns that are based on the timestamp columns in the table.
delta_logger = DeltaLogger(logger_table_name="main.iot_dashboard.logger")
# COMMAND ----------
# DBTITLE 1,Start A Run
## process_name - Optional additionl / sub process name within session. By default process_name is the same as the session process name
## batch_id - Optional Batch Id
delta_logger.start_run()
# COMMAND ----------
# DBTITLE 1,Get Active Run Info
print(delta_logger.session_process_name)
print(delta_logger.active_process_name)
print(delta_logger.active_run_id)
print(delta_logger.active_run_end_ts)
print(delta_logger.active_run_start_ts)
print(delta_logger.active_run_status)
print(delta_logger.active_run_metadata)
# COMMAND ----------
# DBTITLE 1,Log a Custom Named Metrics to Reference in Queries
## Seems to cancel out metrics when a manual process id is defined
delta_logger.log_run_metric(run_metrics_dict={"Rows_Affected": 10000, "Percent_Success": 1})
# COMMAND ----------
# DBTITLE 1,Watermarking Example Baked into Logger with Process Run Start Times
watermark_ts = delta_logger.get_most_recent_success_run_start_time()
print(watermark_ts)
# COMMAND ----------
# DBTITLE 1,Log Run Info/Messages
delta_logger.log_run_info(log_level='INFO', msg = "This step did some weird stuff")
# COMMAND ----------
delta_logger.log_run_info(log_level='WARN', msg = "This step did some weird stuff")
# COMMAND ----------
# DBTITLE 1,Complete a Run
delta_logger.complete_run()
## delta_logger.fail_run()
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC ### Run a custom child/sub process name run within a session
# COMMAND ----------
## Start a customer process name run within a session
## This starts a run with this sub-process and registers the process_name as the active process
delta_logger.start_run(process_name='MERGE STEP')
# COMMAND ----------
delta_logger.log_run_metric(run_metrics_dict={"Rows_Affected": 40124, "Percent_Success": 0.5})
# COMMAND ----------
delta_logger.complete_run(process_name='MERGE STEP')
# COMMAND ----------
# MAGIC %md
# MAGIC
# MAGIC ## Analyze and Use the Logs!
# COMMAND ----------
# DBTITLE 1,Select From Logger in order of events DESC
# MAGIC %sql
# MAGIC
# MAGIC SELECT *
# MAGIC FROM main.iot_dashboard.logger
# MAGIC ORDER BY run_id DESC
# COMMAND ----------
# DBTITLE 1,Analyze Custom Logged Metrics
# MAGIC %sql
# MAGIC
# MAGIC SELECT
# MAGIC session_process_name,
# MAGIC process_name,
# MAGIC date_trunc('HOUR', start_timestamp) AS EventHour,
# MAGIC AVG(run_metadata:Rows_Affected) AS AvgRowsProcessed -- We can use our custom metrics in SQL Queries and Dashboards
# MAGIC FROM main.iot_dashboard.logger
# MAGIC GROUP BY
# MAGIC session_process_name,
# MAGIC process_name,
# MAGIC date_trunc('HOUR', start_timestamp)
# MAGIC ORDER BY EventHour
# COMMAND ----------
# DBTITLE 1,Check the partition columns of this logger
delta_logger.logger_partition_cols
# COMMAND ----------
# DBTITLE 1,Check the ZORDER columns of this logger
delta_logger.logger_zorder_cols
# COMMAND ----------
# DBTITLE 1,Use the Delta Partitions/ZORDER cols To Easily Query Large Logger Table
# MAGIC %sql
# MAGIC
# MAGIC -- Using Partition Pruning
# MAGIC SELECT
# MAGIC *
# MAGIC FROM main.iot_dashboard.logger
# MAGIC WHERE start_date = '2023-11-02'::DATE
# MAGIC AND session_process_name = '/Repos/[email protected]/edw-best-practices/Using Delta Logger Example'
# MAGIC
# COMMAND ----------
# DBTITLE 1,Using the ZORDER cols to do analysis over time
# MAGIC %sql
# MAGIC
# MAGIC
# MAGIC -- Using Partition Pruning
# MAGIC SELECT
# MAGIC *
# MAGIC FROM main.iot_dashboard.logger
# MAGIC WHERE start_date = '2023-11-02'::DATE
# MAGIC AND session_process_name = '/Repos/[email protected]/edw-best-practices/Using Delta Logger Example'
# MAGIC AND start_timestamp BETWEEN (now() - INTERVAL 2 DAYS) AND now();
# MAGIC
# MAGIC
# MAGIC -- Using Partition Pruning
# MAGIC SELECT
# MAGIC *
# MAGIC FROM main.iot_dashboard.logger
# MAGIC WHERE start_date = '2023-11-02'::DATE
# MAGIC AND session_process_name = '/Repos/[email protected]/edw-best-practices/Using Delta Logger Example'
# MAGIC AND run_id BETWEEN 1 AND 5