Skip to content

Commit

Permalink
support gitpythonfs fsspec implementation
Browse files Browse the repository at this point in the history
- also introduces support for passing keyword argument to fsspec.  gitpythonfs is the first use case where the kwargs are needed.
- requires future version of dlt that supports gitpython and dynamic registration of fssepc implementations.
- requires manual creation of a git repo for testing gitpythonfs
- if kwargs are used to construct filesystem instances then FileItemDict must be instantiated with an existing instance of AbstractFileSystem.  Instantiating FileItemDict with FileSystemCredentials will omit the fs kwargs and cause unexpected behaviour.
  • Loading branch information
deanja authored and Your Name committed Feb 7, 2024
1 parent eaa1f4d commit afe8aa1
Show file tree
Hide file tree
Showing 5 changed files with 180 additions and 39 deletions.
31 changes: 24 additions & 7 deletions sources/filesystem/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Iterator, List, Optional, Tuple, Union

import dlt
from dlt.common.typing import copy_sig
from dlt.common.typing import copy_sig, DictStrAny
from dlt.sources import DltResource
from dlt.sources.filesystem import FileItem, FileItemDict, fsspec_filesystem, glob_files
from dlt.sources.credentials import FileSystemCredentials
Expand All @@ -26,6 +26,7 @@ def readers(
bucket_url: str = dlt.secrets.value,
credentials: Union[FileSystemCredentials, AbstractFileSystem] = dlt.secrets.value,
file_glob: Optional[str] = "*",
kwargs: Optional[DictStrAny] = None,
) -> Tuple[DltResource, ...]:
"""This source provides a few resources that are chunked file readers. Readers can be further parametrized before use
read_csv(chunksize, **pandas_kwargs)
Expand All @@ -38,13 +39,28 @@ def readers(
file_glob (str, optional): The filter to apply to the files in glob format. by default lists all files in bucket_url non-recursively
"""
return (
filesystem(bucket_url, credentials, file_glob=file_glob)
filesystem(
bucket_url,
credentials,
file_glob=file_glob,
kwargs=kwargs,
)
| dlt.transformer(name="read_csv")(_read_csv),
filesystem(bucket_url, credentials, file_glob=file_glob)
filesystem(
bucket_url,
credentials,
file_glob=file_glob,
kwargs=kwargs,
)
| dlt.transformer(name="read_jsonl")(_read_jsonl),
filesystem(bucket_url, credentials, file_glob=file_glob)
filesystem(
bucket_url,
credentials,
file_glob=file_glob,
kwargs=kwargs,
)
| dlt.transformer(name="read_parquet")(_read_parquet),
filesystem(bucket_url, credentials, file_glob=file_glob)
filesystem(bucket_url, credentials, file_glob=file_glob, kwargs=kwargs)
| dlt.transformer(name="read_csv_duckdb")(_read_csv_duckdb),
)

Expand All @@ -58,6 +74,7 @@ def filesystem(
file_glob: Optional[str] = "*",
files_per_page: int = DEFAULT_CHUNK_SIZE,
extract_content: bool = False,
kwargs: Optional[DictStrAny] = None,
) -> Iterator[List[FileItem]]:
"""This resource lists files in `bucket_url` using `file_glob` pattern. The files are yielded as FileItem which also
provide methods to open and read file data. It should be combined with transformers that further process (ie. load files)
Expand All @@ -76,11 +93,11 @@ def filesystem(
if isinstance(credentials, AbstractFileSystem):
fs_client = credentials
else:
fs_client = fsspec_filesystem(bucket_url, credentials)[0]
fs_client = fsspec_filesystem(bucket_url, credentials, kwargs=kwargs)[0]

files_chunk: List[FileItem] = []
for file_model in glob_files(fs_client, bucket_url, file_glob):
file_dict = FileItemDict(file_model, credentials)
file_dict = FileItemDict(file_model, fs_client)
if extract_content:
file_dict["file_content"] = file_dict.read_bytes()
files_chunk.append(file_dict) # type: ignore
Expand Down
59 changes: 59 additions & 0 deletions tests/filesystem/cases/GIT-SETUP.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# Git repo for testing

The `./git`folder contains a bare repo used for running tests for the `filesystem` dlt Source.

# Usage

For example, use it to test a pipeline that reads files using the `gitpythonfs` fsspec implementation.

The repo is not needed for regular use of dlt.

For the tests to pass, use the tag (aka `ref`) called `unmodified-samples`. Using HEAD (the default) is intended to fail tests due to modifications such as a file not having the expected file name. It allows testing of the `ref` functionality of git-based fsspec implementations.

Some features of the repo are intentionally different to the containing repo (eg verified-sources repo) to help prevent mistakenly testing against (or modifying!) the wrong repo:

- The default branch is `cases-master`
- the sample files root folder is `samples`, not `tests`.

# Configuration

When to configure (build?):
- When setting up an environment - CI, local dev etc. (Unless it's now committed in verified-sources repo)
- After modifying any content in `../samples folder`

Ideally the repo will be created idempotently by a pytest fixture, `make` script or similar. Until then, these are the manual steps to idempotently create/recreate:

1. Set working directory to `tests/filesystem/cases/git`
2. Check the current folder contains only `.git`. eg `ls -a`. It's also ok if
the current folder is empty.
3. Delete `.git` and all subfolders. ie, delete the repo. `rm -rf`
4. Make a fresh repo using:

```
git init
git checkout -b cases-master
```

5. Copy in the folder `../../samples`. ie samples folder and all its contents. eg `cp -r ../../samples .`
6. Put some object in the repo:

```
git add --all
git commit -m "add standard sample files for tests"
git tag -a unmodified-samples -m "The sample test files with no modifications"
git mv samples/sample.txt samples/sample_renamed.txt
git commit -m "rename samples.txt to make tests fail"
```

5. Delete all working files, except `.git`. eg with `rm -rf samples`. (ToDo: that's not officially not a bare repo. Use `git clone --bare path/to/repo` instead. Maybe we create the repo in a temp folder and then bare clone it into `cases/git` folder, discard the temp folder.)

# Developing

Note that at least one IDE - VSCode - does not recognise this repo in the Explorer and Source Control tabs. Likely because it is a repo inside another repo.

If you are considering committing the repo to its containing repo - eg the verified-sources repo - consider the effects on the size of the containing repo:

`du -sh ./.git/*`

That's about the same as the samples folder itself. BUT consider that the largest file, `./.git/objects` might change often if the repo is regenerated. So it might be better to ensure the repo is gitignored (so meta!) and "build" it in each environment where needed.

17 changes: 12 additions & 5 deletions tests/filesystem/settings.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,17 @@
import os

TESTS_BUCKET_URLS = [
os.path.abspath("tests/filesystem/samples"),
"s3://dlt-ci-test-bucket/standard_source/samples",
"gs://ci-test-bucket/standard_source/samples",
"az://dlt-ci-test-bucket/standard_source/samples",
FACTORY_ARGS = [
{"bucket_url": os.path.abspath("tests/filesystem/samples")},
{"bucket_url": "s3://dlt-ci-test-bucket/standard_source/samples"},
{"bucket_url": "gs://ci-test-bucket/standard_source/samples"},
{"bucket_url": "az://dlt-ci-test-bucket/standard_source/samples"},
{
"bucket_url": "gitpythonfs://samples",
"kwargs": {
"repo_path": "tests/filesystem/cases/git",
"ref": "unmodified-samples",
},
}
]

GLOB_RESULTS = [
Expand Down
106 changes: 79 additions & 27 deletions tests/filesystem/test_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,32 @@
assert_query_data,
TEST_STORAGE_ROOT,
)
from tests.filesystem.utils import unpack_factory_args

from .settings import GLOB_RESULTS, TESTS_BUCKET_URLS
from .settings import GLOB_RESULTS, FACTORY_ARGS


@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS)
@pytest.mark.parametrize("factory_args", FACTORY_ARGS)
@pytest.mark.parametrize("glob_params", GLOB_RESULTS)
def test_file_list(bucket_url: str, glob_params: Dict[str, Any]) -> None:
def test_file_list(factory_args: Dict[str, Any], glob_params: Dict[str, Any]) -> None:
bucket_url, kwargs = unpack_factory_args(factory_args)

@dlt.transformer
def bypass(items) -> str:
return items

# we just pass the glob parameter to the resource if it is not None
# we only pass the glob parameter to the resource if it is not None
if file_glob := glob_params["glob"]:
filesystem_res = filesystem(bucket_url=bucket_url, file_glob=file_glob) | bypass
filesystem_res = (
filesystem(
bucket_url=bucket_url,
file_glob=file_glob,
kwargs=kwargs,
)
| bypass
)
else:
filesystem_res = filesystem(bucket_url=bucket_url) | bypass
filesystem_res = filesystem(bucket_url=bucket_url, kwargs=kwargs) | bypass

all_files = list(filesystem_res)
file_count = len(all_files)
Expand All @@ -43,8 +53,12 @@ def bypass(items) -> str:


@pytest.mark.parametrize("extract_content", [True, False])
@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS)
def test_load_content_resources(bucket_url: str, extract_content: bool) -> None:
@pytest.mark.parametrize("factory_args", FACTORY_ARGS)
def test_load_content_resources(
factory_args: Dict[str, Any], extract_content: bool
) -> None:
bucket_url, kwargs = unpack_factory_args(factory_args)

@dlt.transformer
def assert_sample_content(items: List[FileItem]):
# expect just one file
Expand All @@ -65,6 +79,7 @@ def assert_sample_content(items: List[FileItem]):
bucket_url=bucket_url,
file_glob="sample.txt",
extract_content=extract_content,
kwargs=kwargs,
)
| assert_sample_content
)
Expand All @@ -83,7 +98,11 @@ def assert_csv_file(item: FileItem):
# print(item)
return item

nested_file = filesystem(bucket_url, file_glob="met_csv/A801/A881_20230920.csv")
nested_file = filesystem(
bucket_url,
file_glob="met_csv/A801/A881_20230920.csv",
kwargs=kwargs,
)

assert len(list(nested_file | assert_csv_file)) == 1

Expand All @@ -101,10 +120,12 @@ def test_fsspec_as_credentials():
print(list(gs_resource))


@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS)
def test_csv_transformers(bucket_url: str) -> None:
@pytest.mark.parametrize("factory_args", FACTORY_ARGS)
def test_csv_transformers(factory_args: Dict[str, Any]) -> None:
from sources.filesystem_pipeline import read_csv

bucket_url, kwargs = unpack_factory_args(factory_args)

pipeline = dlt.pipeline(
pipeline_name="file_data",
destination="duckdb",
Expand All @@ -114,7 +135,12 @@ def test_csv_transformers(bucket_url: str) -> None:

# load all csvs merging data on a date column
met_files = (
filesystem(bucket_url=bucket_url, file_glob="met_csv/A801/*.csv") | read_csv()
filesystem(
bucket_url=bucket_url,
file_glob="met_csv/A801/*.csv",
kwargs=kwargs,
)
| read_csv()
)
met_files.apply_hints(write_disposition="merge", merge_key="date")
load_info = pipeline.run(met_files.with_name("met_csv"))
Expand All @@ -126,7 +152,12 @@ def test_csv_transformers(bucket_url: str) -> None:
# load the other folder that contains data for the same day + one other day
# the previous data will be replaced
met_files = (
filesystem(bucket_url=bucket_url, file_glob="met_csv/A803/*.csv") | read_csv()
filesystem(
bucket_url=bucket_url,
file_glob="met_csv/A803/*.csv",
kwargs=kwargs,
)
| read_csv()
)
met_files.apply_hints(write_disposition="merge", merge_key="date")
load_info = pipeline.run(met_files.with_name("met_csv"))
Expand All @@ -139,16 +170,23 @@ def test_csv_transformers(bucket_url: str) -> None:
assert load_table_counts(pipeline, "met_csv") == {"met_csv": 48}


@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS)
def test_standard_readers(bucket_url: str) -> None:
@pytest.mark.parametrize("factory_args", FACTORY_ARGS)
def test_standard_readers(factory_args: Dict[str, Any]) -> None:
bucket_url, kwargs = unpack_factory_args(factory_args)

# extract pipes with standard readers
jsonl_reader = readers(bucket_url, file_glob="**/*.jsonl").read_jsonl()
parquet_reader = readers(bucket_url, file_glob="**/*.parquet").read_parquet()
# also read zipped csvs
csv_reader = readers(bucket_url, file_glob="**/*.csv*").read_csv(
jsonl_reader = readers(
bucket_url, file_glob="**/*.jsonl", kwargs=kwargs
).read_jsonl()
parquet_reader = readers(
bucket_url, file_glob="**/*.parquet", kwargs=kwargs
).read_parquet()
csv_reader = readers(bucket_url, file_glob="**/*.csv*", kwargs=kwargs).read_csv(
float_precision="high"
)
csv_duckdb_reader = readers(bucket_url, file_glob="**/*.csv*").read_csv_duckdb()
csv_duckdb_reader = readers(
bucket_url, file_glob="**/*.csv*", kwargs=kwargs
).read_csv_duckdb()

# a step that copies files into test storage
def _copy(item: FileItemDict):
Expand All @@ -161,7 +199,7 @@ def _copy(item: FileItemDict):
# return file item unchanged
return item

downloader = filesystem(bucket_url, file_glob="**").add_map(_copy)
downloader = filesystem(bucket_url, file_glob="**", kwargs=kwargs).add_map(_copy)

# load in single pipeline
pipeline = dlt.pipeline(
Expand Down Expand Up @@ -200,12 +238,14 @@ def _copy(item: FileItemDict):
# print(pipeline.default_schema.to_pretty_yaml())


@pytest.mark.parametrize("bucket_url", TESTS_BUCKET_URLS)
def test_incremental_load(bucket_url: str) -> None:
@pytest.mark.parametrize("factory_args", FACTORY_ARGS)
def test_incremental_load(factory_args: Dict[str, Any]) -> None:
@dlt.transformer
def bypass(items) -> str:
return items

bucket_url, kwargs = unpack_factory_args(factory_args)

pipeline = dlt.pipeline(
pipeline_name="file_data",
destination="duckdb",
Expand All @@ -214,7 +254,11 @@ def bypass(items) -> str:
)

# Load all files
all_files = filesystem(bucket_url=bucket_url, file_glob="csv/*")
all_files = filesystem(
bucket_url=bucket_url,
file_glob="csv/*",
kwargs=kwargs,
)
# add incremental on modification time
all_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))
load_info = pipeline.run((all_files | bypass).with_name("csv_files"))
Expand All @@ -225,7 +269,11 @@ def bypass(items) -> str:
assert table_counts["csv_files"] == 4

# load again
all_files = filesystem(bucket_url=bucket_url, file_glob="csv/*")
all_files = filesystem(
bucket_url=bucket_url,
file_glob="csv/*",
kwargs=kwargs,
)
all_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))
load_info = pipeline.run((all_files | bypass).with_name("csv_files"))
# nothing into csv_files
Expand All @@ -234,7 +282,11 @@ def bypass(items) -> str:
assert table_counts["csv_files"] == 4

# load again into different table
all_files = filesystem(bucket_url=bucket_url, file_glob="csv/*")
all_files = filesystem(
bucket_url=bucket_url,
file_glob="csv/*",
kwargs=kwargs,
)
all_files.apply_hints(incremental=dlt.sources.incremental("modification_date"))
load_info = pipeline.run((all_files | bypass).with_name("csv_files_2"))
assert_load_info(load_info)
Expand All @@ -243,7 +295,7 @@ def bypass(items) -> str:

def test_file_chunking() -> None:
resource = filesystem(
bucket_url=TESTS_BUCKET_URLS[0],
bucket_url=FACTORY_ARGS[0]["bucket_url"],
file_glob="*/*.csv",
files_per_page=2,
)
Expand Down
6 changes: 6 additions & 0 deletions tests/filesystem/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import Any, Dict, List


def unpack_factory_args(factory_args: Dict[str, Any]) -> List[Any]:
"""Unpacks filesystem factory arguments from pytest parameters."""
return [factory_args.get(k) for k in ("bucket_url", "kwargs")]

0 comments on commit afe8aa1

Please sign in to comment.