Skip to content

Commit

Permalink
materializes table schemas for empty tables (#1122)
Browse files Browse the repository at this point in the history
* allows to get data tables that seen data

* always runs empty jobs through normalizer, marks them as seen data

* defines and tracks special empty lists items in extractor

* adds dlt mark and tests for materialize table items

* does not run through empty normalize when table seen data
  • Loading branch information
rudolfix authored Mar 21, 2024
1 parent 3990973 commit 92bf3a0
Show file tree
Hide file tree
Showing 10 changed files with 203 additions and 46 deletions.
10 changes: 8 additions & 2 deletions dlt/common/schema/schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -553,14 +553,20 @@ def get_table_columns(
if utils.is_complete_column(v)
}

def data_tables(self, include_incomplete: bool = False) -> List[TTableSchema]:
def data_tables(
self, seen_data_only: bool = False, include_incomplete: bool = False
) -> List[TTableSchema]:
"""Gets list of all tables, that hold the loaded data. Excludes dlt tables. Excludes incomplete tables (ie. without columns)"""
return [
t
for t in self._schema_tables.values()
if not t["name"].startswith(self._dlt_tables_prefix)
and (
include_incomplete or len(self.get_table_columns(t["name"], include_incomplete)) > 0
(
include_incomplete
or len(self.get_table_columns(t["name"], include_incomplete)) > 0
)
and (not seen_data_only or utils.has_table_seen_data(t))
)
]

Expand Down
2 changes: 2 additions & 0 deletions dlt/extract/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from dlt.extract.decorators import source, resource, transformer, defer
from dlt.extract.incremental import Incremental
from dlt.extract.wrappers import wrap_additional_type
from dlt.extract.extractors import materialize_schema_item

__all__ = [
"DltResource",
Expand All @@ -17,4 +18,5 @@
"defer",
"Incremental",
"wrap_additional_type",
"materialize_schema_item",
]
67 changes: 45 additions & 22 deletions dlt/extract/extract.py
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,48 @@ def _compute_metrics(self, load_id: str, source: DltSource) -> ExtractMetrics:
"hints": clean_hints,
}

def _write_empty_files(
self, source: DltSource, extractors: Dict[TLoaderFileFormat, Extractor]
) -> None:
schema = source.schema
json_extractor = extractors["puae-jsonl"]
resources_with_items = set().union(*[e.resources_with_items for e in extractors.values()])
# find REPLACE resources that did not yield any pipe items and create empty jobs for them
# NOTE: do not include tables that have never seen data
data_tables = {t["name"]: t for t in schema.data_tables(seen_data_only=True)}
tables_by_resources = utils.group_tables_by_resource(data_tables)
for resource in source.resources.selected.values():
if resource.write_disposition != "replace" or resource.name in resources_with_items:
continue
if resource.name not in tables_by_resources:
continue
for table in tables_by_resources[resource.name]:
# we only need to write empty files for the top tables
if not table.get("parent", None):
json_extractor.write_empty_items_file(table["name"])

# collect resources that received empty materialized lists and had no items
resources_with_empty = (
set()
.union(*[e.resources_with_empty for e in extractors.values()])
.difference(resources_with_items)
)
# get all possible tables
data_tables = {t["name"]: t for t in schema.data_tables()}
tables_by_resources = utils.group_tables_by_resource(data_tables)
for resource_name in resources_with_empty:
if resource := source.resources.selected.get(resource_name):
if tables := tables_by_resources.get("resource_name"):
# write empty tables
for table in tables:
# we only need to write empty files for the top tables
if not table.get("parent", None):
json_extractor.write_empty_items_file(table["name"])
else:
table_name = json_extractor._get_static_table_name(resource, None)
if table_name:
json_extractor.write_empty_items_file(table_name)

def _extract_single_source(
self,
load_id: str,
Expand All @@ -255,14 +297,11 @@ def _extract_single_source(
) -> None:
schema = source.schema
collector = self.collector
resources_with_items: Set[str] = set()
extractors: Dict[TLoaderFileFormat, Extractor] = {
"puae-jsonl": JsonLExtractor(
load_id, self.extract_storage, schema, resources_with_items, collector=collector
),
"arrow": ArrowExtractor(
load_id, self.extract_storage, schema, resources_with_items, collector=collector
load_id, self.extract_storage, schema, collector=collector
),
"arrow": ArrowExtractor(load_id, self.extract_storage, schema, collector=collector),
}
last_item_format: Optional[TLoaderFileFormat] = None

Expand Down Expand Up @@ -294,23 +333,7 @@ def _extract_single_source(
extractors[item_format].write_items(resource, pipe_item.item, pipe_item.meta)
last_item_format = item_format

# find defined resources that did not yield any pipeitems and create empty jobs for them
# NOTE: do not include incomplete tables. those tables have never seen data so we do not need to reset them
data_tables = {t["name"]: t for t in schema.data_tables(include_incomplete=False)}
tables_by_resources = utils.group_tables_by_resource(data_tables)
for resource in source.resources.selected.values():
if (
resource.write_disposition != "replace"
or resource.name in resources_with_items
):
continue
if resource.name not in tables_by_resources:
continue
for table in tables_by_resources[resource.name]:
# we only need to write empty files for the top tables
if not table.get("parent", None):
extractors["puae-jsonl"].write_empty_items_file(table["name"])

self._write_empty_files(source, extractors)
if left_gens > 0:
# go to 100%
collector.update("Resources", left_gens)
Expand Down
23 changes: 19 additions & 4 deletions dlt/extract/extractors.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from copy import copy
from typing import Set, Dict, Any, Optional, Set
from typing import Set, Dict, Any, Optional, List

from dlt.common import logger
from dlt.common.configuration.inject import with_config
Expand Down Expand Up @@ -37,6 +37,17 @@
pandas = None


class MaterializedEmptyList(List[Any]):
"""A list variant that will materialize tables even if empty list was yielded"""

pass


def materialize_schema_item() -> MaterializedEmptyList:
"""Yield this to materialize schema in the destination, even if there's no data."""
return MaterializedEmptyList()


class Extractor:
file_format: TLoaderFileFormat

Expand All @@ -50,15 +61,17 @@ def __init__(
load_id: str,
storage: ExtractStorage,
schema: Schema,
resources_with_items: Set[str],
collector: Collector = NULL_COLLECTOR,
*,
_caps: DestinationCapabilitiesContext = None,
) -> None:
self.schema = schema
self.naming = schema.naming
self.collector = collector
self.resources_with_items = resources_with_items
self.resources_with_items: Set[str] = set()
"""Tracks resources that received items"""
self.resources_with_empty: Set[str] = set()
"""Track resources that received empty materialized list"""
self.load_id = load_id
self._table_contracts: Dict[str, TSchemaContractDict] = {}
self._filtered_tables: Set[str] = set()
Expand Down Expand Up @@ -131,6 +144,9 @@ def _write_item(
self.collector.update(table_name, inc=new_rows_count)
if new_rows_count > 0:
self.resources_with_items.add(resource_name)
else:
if isinstance(items, MaterializedEmptyList):
self.resources_with_empty.add(resource_name)

def _write_to_dynamic_table(self, resource: DltResource, items: TDataItems) -> None:
if not isinstance(items, list):
Expand Down Expand Up @@ -296,7 +312,6 @@ def _compute_table(self, resource: DltResource, items: TDataItems) -> TPartialTa
# issue warnings when overriding computed with arrow
for col_name, column in arrow_table["columns"].items():
if src_column := computed_table["columns"].get(col_name):
print(src_column)
for hint_name, hint in column.items():
if (src_hint := src_column.get(hint_name)) is not None:
if src_hint != hint:
Expand Down
41 changes: 26 additions & 15 deletions dlt/normalize/items_normalizers.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ def _filter_columns(
return row

def _normalize_chunk(
self, root_table_name: str, items: List[TDataItem], may_have_pua: bool
self, root_table_name: str, items: List[TDataItem], may_have_pua: bool, skip_write: bool
) -> TSchemaUpdate:
column_schemas = self._column_schemas
schema_update: TSchemaUpdate = {}
Expand Down Expand Up @@ -172,9 +172,11 @@ def _normalize_chunk(
# store row
# TODO: store all rows for particular items all together after item is fully completed
# will be useful if we implement bad data sending to a table
self.load_storage.write_data_item(
self.load_id, schema_name, table_name, row, columns
)
# we skip write when discovering schema for empty file
if not skip_write:
self.load_storage.write_data_item(
self.load_id, schema_name, table_name, row, columns
)
except StopIteration:
pass
signals.raise_if_signalled()
Expand All @@ -193,22 +195,31 @@ def __call__(
line: bytes = None
for line_no, line in enumerate(f):
items: List[TDataItem] = json.loadb(line)
partial_update = self._normalize_chunk(root_table_name, items, may_have_pua(line))
partial_update = self._normalize_chunk(
root_table_name, items, may_have_pua(line), skip_write=False
)
schema_updates.append(partial_update)
logger.debug(f"Processed {line_no} lines from file {extracted_items_file}")
if line is None and root_table_name in self.schema.tables:
# write only if table seen data before
# TODO: we should push the truncate jobs via package state
# not as empty jobs. empty jobs should be reserved for
# materializing schemas and other edge cases ie. empty parquet files
root_table = self.schema.tables[root_table_name]
if has_table_seen_data(root_table):
self.load_storage.write_empty_items_file(
self.load_id,
self.schema.name,
root_table_name,
self.schema.get_table_columns(root_table_name),
)
logger.debug(
f"No lines in file {extracted_items_file}, written empty load job file"
if not has_table_seen_data(root_table):
# if this is a new table, add normalizer columns
partial_update = self._normalize_chunk(
root_table_name, [{}], False, skip_write=True
)
schema_updates.append(partial_update)
self.load_storage.write_empty_items_file(
self.load_id,
self.schema.name,
root_table_name,
self.schema.get_table_columns(root_table_name),
)
logger.debug(
f"No lines in file {extracted_items_file}, written empty load job file"
)

return schema_updates

Expand Down
2 changes: 1 addition & 1 deletion dlt/normalize/normalize.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def spool_files(
# drop evolve once for all tables that seen data
x_normalizer.pop("evolve-columns-once", None)
# mark that table have seen data only if there was data
if table_metrics[table_name].items_count > 0 and "seen-data" not in x_normalizer:
if "seen-data" not in x_normalizer:
logger.info(
f"Table {table_name} has seen data for a first time with load id {load_id}"
)
Expand Down
7 changes: 6 additions & 1 deletion dlt/pipeline/mark.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
"""Module with mark functions that make data to be specially processed"""
from dlt.extract import with_table_name, with_hints, make_hints
from dlt.extract import (
with_table_name,
with_hints,
make_hints,
materialize_schema_item as materialize_table_schema,
)
2 changes: 1 addition & 1 deletion tests/load/pipeline/test_pipelines.py
Original file line number Diff line number Diff line change
Expand Up @@ -932,7 +932,7 @@ def table_3(make_data=False):

# load with one empty job, table 3 not created
load_info = pipeline.run(source.table_3, loader_file_format=destination_config.file_format)
assert_load_info(load_info)
assert_load_info(load_info, expected_load_packages=0)
with pytest.raises(DatabaseUndefinedRelation):
load_table_counts(pipeline, "table_3")
# print(pipeline.default_schema.to_pretty_yaml())
Expand Down
54 changes: 54 additions & 0 deletions tests/pipeline/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
from dlt.extract.exceptions import InvalidResourceDataTypeBasic, PipeGenInvalid, SourceExhausted
from dlt.extract.extract import ExtractStorage
from dlt.extract import DltResource, DltSource
from dlt.extract.extractors import MaterializedEmptyList
from dlt.load.exceptions import LoadClientJobFailed
from dlt.pipeline.exceptions import InvalidPipelineName, PipelineNotActive, PipelineStepFailed
from dlt.pipeline.helpers import retry_load
Expand All @@ -46,6 +47,7 @@
from tests.utils import TEST_STORAGE_ROOT
from tests.extract.utils import expect_extracted_file
from tests.pipeline.utils import (
assert_data_table_counts,
assert_load_info,
airtable_emojis,
load_data_table_counts,
Expand Down Expand Up @@ -1857,3 +1859,55 @@ def demand_map():
schema_hashset.add(pipeline.schemas["nice_load_info_schema"].version_hash)

assert len(schema_hashset) == 1


def test_yielding_empty_list_creates_table() -> None:
pipeline = dlt.pipeline(
pipeline_name="empty_start",
destination="duckdb",
dataset_name="mydata",
)

# empty list should create empty table in the destination but with the required schema
extract_info = pipeline.extract(
[MaterializedEmptyList()],
table_name="empty",
columns=[{"name": "id", "data_type": "bigint", "nullable": True}],
)
print(extract_info)
normalize_info = pipeline.normalize()
print(normalize_info)
assert normalize_info.row_counts["empty"] == 0
load_info = pipeline.load()
# print(load_info.asstr(verbosity=3))
assert_load_info(load_info)
assert_data_table_counts(pipeline, {"empty": 0})
# make sure we have expected columns
assert set(pipeline.default_schema.tables["empty"]["columns"].keys()) == {
"id",
"_dlt_load_id",
"_dlt_id",
}

# load some data
pipeline.run([{"id": 1}], table_name="empty")
assert_data_table_counts(pipeline, {"empty": 1})

# update schema on existing table
pipeline.run(
[MaterializedEmptyList()],
table_name="empty",
columns=[{"name": "user_name", "data_type": "text", "nullable": True}],
)
assert_data_table_counts(pipeline, {"empty": 1})
assert set(pipeline.default_schema.tables["empty"]["columns"].keys()) == {
"id",
"_dlt_load_id",
"_dlt_id",
"user_name",
}
with pipeline.sql_client() as client:
with client.execute_query("SELECT id, user_name FROM empty") as cur:
rows = list(cur.fetchall())
assert len(rows) == 1
assert rows[0] == (1, None)
Loading

0 comments on commit 92bf3a0

Please sign in to comment.