Skip to content

Commit

Permalink
Refactor message system
Browse files Browse the repository at this point in the history
  • Loading branch information
marcus-snx committed Aug 27, 2024
1 parent 9ca75dd commit 7415863
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 30 deletions.
22 changes: 11 additions & 11 deletions scheduler/dags/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def get_log_url(context):
return url


def parse_dbt_test_results(context):
def parse_dbt_output(context):
ti = context["task_instance"]
log_path = f"./logs/dag_id={ti.dag_id}/run_id={ti.run_id}/task_id={ti.task_id}/attempt={ti.try_number-1}.log"

Expand All @@ -42,23 +42,23 @@ def parse_dbt_test_results(context):
int, summary_match.groups()
)

summary_message = f"dbt test summary: {pass_count} passed, {warn_count} warnings, {error_count} errors, {skip_count} skipped, {total_count} total"
summary_message = f"dbt output summary: {pass_count} passed, {warn_count} warnings, {error_count} errors, {skip_count} skipped, {total_count} total"
send_discord_alert(summary_message)

if warn_count > 0 or error_count > 0:
error_warnings = re.findall(
r"((?:Failure|Warning) in test .*? \(.*?\))", dbt_test_output
r"((?:Failure|Warning) in .*? \(.*?\))", dbt_test_output
)
for test in error_warnings:
if "Failure" in test:
message = f":exclamation: {test}"
elif "Warning" in test:
message = f":warning: {test}"
for err in error_warnings:
if "Failure" in err:
message = f":exclamation: {err}"
elif "Warning" in err:
message = f":warning: {err}"
else:
message = f"{test}"
message = f"{err}"
message = message.replace("__", r"\_\_")
send_discord_alert(message)
else:
send_discord_alert("Unable to parse dbt test summary")
send_discord_alert("Unable to parse dbt summary")
else:
send_discord_alert("Unable to retrieve dbt test output")
send_discord_alert("Unable to retrieve dbt output")
37 changes: 18 additions & 19 deletions scheduler/dags/v3_etl.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import os
from datetime import datetime, timedelta

from utils import parse_dbt_test_results, send_discord_alert, get_log_url
from utils import parse_dbt_output, send_discord_alert, get_log_url

from airflow import DAG
from airflow.operators.latest_only import LatestOnlyOperator
Expand All @@ -28,24 +28,21 @@
}


def success_callback(context):
log_url = get_log_url(context)
message = f":green_circle: DAG run was **successful** | DAG: {context['dag'].dag_id} | Task: {context['task'].task_id}"

send_discord_alert(message)
send_discord_alert(f"Link: {log_url}")

parse_dbt_test_results(context)


def failure_callback(context):
log_url = get_log_url(context)
message = f":red_circle: DAG run has **failed** | DAG: {context['dag'].dag_id} | Task: {context['task'].task_id}"
def callback_dbt(mode="success", task_type="run"):
status = (
f"{task_type} was **successful**"
if mode == "success"
else f"{task_type} **failed**"
)

send_discord_alert(message)
send_discord_alert(f"Link: {log_url}")
def callback(context):
log_url = get_log_url(context)
message = f":green_circle: DAG {status} | DAG: {context['dag'].dag_id} | Task: {context['task'].task_id}"
send_discord_alert(message)
send_discord_alert(f"Link: {log_url}")
parse_dbt_output(context)

parse_dbt_test_results(context)
return callback


def create_docker_operator(
Expand Down Expand Up @@ -104,6 +101,8 @@ def create_dag(network, rpc_var, target="dev"):
image="data-transformer",
command=f"dbt run --target {target if network != 'optimism_mainnet' else target + '-op'} --select tag:{network} --profiles-dir profiles --profile synthetix",
network_env_var=rpc_var,
on_success_callback=callback_dbt(mode="success", task_type="run"),
on_failure_callback=callback_dbt(mode="fail", task_type="run"),
)

test_task_id = f"test_{version}"
Expand All @@ -114,8 +113,8 @@ def create_dag(network, rpc_var, target="dev"):
image="data-transformer",
command=f"dbt test --target {target if network != 'optimism_mainnet' else target + '-op'} --select tag:{network} --profiles-dir profiles --profile synthetix",
network_env_var=rpc_var,
on_success_callback=success_callback,
on_failure_callback=failure_callback,
on_success_callback=callback_dbt(mode="success", task_type="test"),
on_failure_callback=callback_dbt(mode="fail", task_type="test"),
)

if target == "prod":
Expand Down

0 comments on commit 7415863

Please sign in to comment.