Skip to content

Commit

Permalink
Enhanced log message for source freshness errors + warns
Browse files Browse the repository at this point in the history
  • Loading branch information
austinweisgrau committed Jan 13, 2025
1 parent 82e985b commit 12833fe
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 2 deletions.
12 changes: 10 additions & 2 deletions parsons/utilities/dbt/logging.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

from parsons import Table
from parsons.databases.database_connector import DatabaseConnector
from parsons.utilities.dbt.models import EnhancedNodeResult

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -100,7 +101,7 @@ def format_command_result(
log_message += "\nError messages:\n```{}```".format(
"\n\n".join(
[
i.node.name + ": " + (i.message or "")
i.node.name + ": " + (EnhancedNodeResult.log_message(i) or "")
for i in [*manifest.errors, *manifest.fails]
]
)
Expand All @@ -109,14 +110,21 @@ def format_command_result(
# Warnings
if manifest.warnings:
log_message += "\nWarn messages:\n```{}```".format(
"\n\n".join([i.node.name + ": " + (i.message or "") for i in manifest.warnings])
"\n\n".join(
[
i.node.name + ": " + (EnhancedNodeResult.log_message(i) or "")
for i in manifest.warnings
]
)
)

# Skips
if manifest.skips:
skips = set([i.node.name for i in manifest.skips])
log_message += "\nSkipped:\n```{}```".format(", ".join(skips))

breakpoint()

return log_message

def format_result(self) -> str:
Expand Down
14 changes: 14 additions & 0 deletions parsons/utilities/dbt/models.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import collections
from dbt.contracts.graph.manifest import Manifest as dbtManifest
from dbt.contracts.results import NodeResult
from dbt.contracts.results import SourceFreshnessResult


class Manifest:
Expand Down Expand Up @@ -74,3 +75,16 @@ def total_slot_hours(self) -> float:
sum([node.adapter_response.get("slot_ms", 0) for node in self.dbt_manifest]) / 3600000
)
return result


class EnhancedNodeResult(NodeResult):
def log_message(self) -> str | None:
"""Helper method to generate message for logs."""
if isinstance(self, SourceFreshnessResult):
freshness_config = self.node.freshness
time_config = getattr(freshness_config, self.status + "_after")
result = f"No new records for {int(self.age/86400)} days, {self.status} after {time_config.count} {time_config.period}s."
else:
result = self.message

return result

0 comments on commit 12833fe

Please sign in to comment.