Skip to content

Commit

Permalink
marks all places where we remove bad data
Browse files Browse the repository at this point in the history
  • Loading branch information
rudolfix authored and sh-rp committed Jan 8, 2024
1 parent 067b5d7 commit 7e3d072
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 5 deletions.
5 changes: 5 additions & 0 deletions dlt/common/libs/pydantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,7 @@ def validate_items(
if err_idx in deleted:
# already dropped
continue
# item with an error (evidence)
err_item = items[err_idx - len(deleted)]
else:
# top level error which means misalignment of list model and items
Expand Down Expand Up @@ -314,6 +315,7 @@ def validate_items(
err_item,
) from e
elif column_mode == "discard_row":
# MARK: add contract violation hook here (IDK how to pass plugins here so it is still effective)
# pop at the right index
items.pop(err_idx - len(deleted))
# store original index so we do not pop again
Expand All @@ -335,6 +337,7 @@ def validate_items(
err_item,
) from e
elif data_mode == "discard_row":
# MARK: add contract violation hook here (IDK how to pass plugins here so it is still effective)
items.pop(err_idx - len(deleted))
deleted.add(err_idx)
else:
Expand Down Expand Up @@ -372,6 +375,7 @@ def validate_item(
item,
) from e
elif column_mode == "discard_row":
# MARK: add contract violation hook here (IDK how to pass plugins here so it is still effective)
return None
raise NotImplementedError(
f"{column_mode} column mode not implemented for Pydantic validation"
Expand All @@ -389,6 +393,7 @@ def validate_item(
item,
) from e
elif data_mode == "discard_row":
# MARK: add contract violation hook here (IDK how to pass plugins here so it is still effective)
return None
raise NotImplementedError(
f"{data_mode} data mode not implemented for Pydantic validation"
Expand Down
6 changes: 5 additions & 1 deletion dlt/extract/extractors.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,12 +137,16 @@ def _write_to_dynamic_table(self, resource: DltResource, items: TDataItems) -> N
for item in items:
table_name = self._get_dynamic_table_name(resource, item)
if table_name in self._filtered_tables:
# MARK: add contract violation hook here
continue
if table_name not in self._table_contracts or resource._table_has_other_dynamic_hints:
item = self._compute_and_update_table(resource, table_name, item)
# write to storage with inferred table name
if table_name not in self._filtered_tables:
self._write_item(table_name, resource.name, item)
else:
# MARK: add contract violation hook here
pass

def _write_to_static_table(
self, resource: DltResource, table_name: str, items: TDataItems
Expand Down Expand Up @@ -261,7 +265,7 @@ def _apply_contract_filters(
]
if removed_columns:
item = pyarrow.remove_columns(item, removed_columns)
# MARK: add contract violation hook here
# MARK: add contract violation hook here -> I'd pass the original arrow table here so before columns are modified

return item

Expand Down
9 changes: 5 additions & 4 deletions dlt/normalize/items_normalizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ def _filter_columns(
for name, mode in filtered_columns.items():
if name in row:
if mode == "discard_row":
# MARK: add contract violation hook here
return None
elif mode == "discard_value":
# MARK: add contract violation hook here, I just see a problem with sending the original row. we probably need to make a copy. which costs a lot
# WARNING: send an item only once! same item may be modified several times
row.pop(name)
return row

Expand Down Expand Up @@ -108,7 +111,7 @@ def _normalize_chunk(
# do not process empty rows
if not row:
should_descend = False
# MARK: add contract violation hook here
# NOT MARK: no contract violation here
continue

# filter columns or full rows if schema contract said so
Expand All @@ -119,7 +122,6 @@ def _normalize_chunk(
# if whole row got dropped
if not row:
should_descend = False
# MARK: add contract violation hook here
continue

# decode pua types
Expand Down Expand Up @@ -154,7 +156,7 @@ def _normalize_chunk(
if partial_table is None:
# discard migration and row
should_descend = False
# MARK: add contract violation hook here
# MARK: add contract violation hook here (full table dropped)
continue
# theres a new table or new columns in existing table
# update schema and save the change
Expand All @@ -171,7 +173,6 @@ def _normalize_chunk(
# do not continue if new filters skipped the full row
if not row:
should_descend = False
# MARK: add contract violation hook here
continue

# get current columns schema
Expand Down

0 comments on commit 7e3d072

Please sign in to comment.