Skip to content

Commit

Permalink
feat(transform): implement columns pivot map function
Browse files Browse the repository at this point in the history
  • Loading branch information
IlyaFaer committed Mar 27, 2024
1 parent ab31329 commit 2ed1233
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 0 deletions.
30 changes: 30 additions & 0 deletions dlt/sources/helpers/transform.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from typing import List, Union

from dlt.common.typing import TDataItem
from dlt.extract.items import ItemTransformFunctionNoMeta

Expand All @@ -24,3 +26,31 @@ def _filter(_: TDataItem) -> bool:
return count > max_items

return _filter


def pivot(columns: Union[str, List[str]], prefix: str) -> ItemTransformFunctionNoMeta[TDataItem]:
"""Pivot a list of columns into a dictionary.
Args:
columns (Union[str, List[str]]): list of column names
prefix (str): prefix to add to the column names
Returns:
ItemTransformFunctionNoMeta[TDataItem]:
A function to pivot columns into a dict.
"""
if isinstance(columns, str):
columns = [columns]

def _transformer(item: TDataItem) -> TDataItem:
"""Pivot columns into a dictionary.
Args:
item (TDataItem): a data item.
Returns:
TDataItem: a data item with pivoted columns.
"""
return {prefix + col: item[ind] for ind, col in enumerate(columns)}

return _transformer
22 changes: 22 additions & 0 deletions tests/pipeline/test_pipeline_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from dlt.common import pipeline as state_module
from dlt.common.utils import uniq_id
from dlt.common.destination.reference import Destination
from dlt.sources.helpers.transform import pivot

from dlt.pipeline.exceptions import PipelineStateEngineNoUpgradePathException, PipelineStepFailed
from dlt.pipeline.pipeline import Pipeline
Expand Down Expand Up @@ -486,6 +487,27 @@ def transform(item):
)


def test_transform_function_pivot() -> None:
@dlt.resource
def test_resource():
for row in (
[[1, 2, 3], [4, 5, 6]],
[[7, 8, 9], [10, 11, 12]],
):
yield row

res = test_resource()
res.add_map(pivot(["a", "b", "c"], "prefix_"))

result = list(res)
assert result == [
{"prefix_a": 1, "prefix_b": 2, "prefix_c": 3},
{"prefix_a": 4, "prefix_b": 5, "prefix_c": 6},
{"prefix_a": 7, "prefix_b": 8, "prefix_c": 9},
{"prefix_a": 10, "prefix_b": 11, "prefix_c": 12},
]


def test_migrate_pipeline_state(test_storage: FileStorage) -> None:
# test generation of version hash on migration to v3
state_v1 = load_json_case("state/state.v1")
Expand Down

0 comments on commit 2ed1233

Please sign in to comment.