Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Docs: Extend layout placeholder params for filesystem destinations #1220

Merged
merged 22 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions docs/tools/check_embedded_snippets.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,19 @@
"""
Walks through all markdown files, finds all code snippets, and checks wether they are parseable.
"""
from typing import List, Dict, Optional
import os
import ast
import json
import subprocess
import argparse

import os, ast, json, yaml, tomlkit, subprocess, argparse # noqa: I251
from dataclasses import dataclass
from concurrent.futures import ThreadPoolExecutor
from textwrap import dedent
from typing import List, Dict, Optional

import tomlkit
import yaml

import dlt.cli.echo as fmt

Expand Down Expand Up @@ -293,10 +301,13 @@ def typecheck_snippets(snippets: List[Snippet], verbose: bool) -> None:

# these stages are python only
python_snippets = [s for s in filtered_snippets if s.language == "py"]
if args.command in ["lint", "full"]:
lint_snippets(python_snippets, args.verbose)
if ENABLE_MYPY and args.command in ["typecheck", "full"]:
typecheck_snippets(python_snippets, args.verbose)
with ThreadPoolExecutor() as runner:
futures = []
if args.command in ["lint", "full"]:
futures.append(runner.submit(lint_snippets, python_snippets, args.verbose))

if ENABLE_MYPY and args.command in ["typecheck", "full"]:
futures.append(runner.submit(typecheck_snippets(python_snippets, args.verbose)))

# unlink lint_me file
if os.path.exists(LINT_FILE):
Expand Down
112 changes: 110 additions & 2 deletions docs/website/docs/dlt-ecosystem/destinations/filesystem.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ To pass any additional arguments to `fsspec`, you may supply `kwargs` and `clien

```toml
[destination.filesystem]
kwargs = '{"use_ssl": true}'
kwargs = '{"use_ssl": true, "auto_mkdir": true}'
client_kwargs = '{"verify": "public.crt"}'
```

Expand Down Expand Up @@ -220,11 +220,63 @@ Current default layout: **{table_name}/{load_id}.{file_id}.{ext}`**
> 💡 Note that the default layout format has changed from `{schema_name}.{table_name}.{load_id}.{file_id}.{ext}` to `{table_name}/{load_id}.{file_id}.{ext}` in dlt 0.3.12. You can revert to the old layout by setting the old value in your toml file.


### Available layout placeholders

#### Standard placeholders

* `schema_name` - the name of schema
* `table_name` - table name
* `load_id` - the id of the load package form which the file comes from
* `file_id` - the id of the file
* `ext` - a format of the file i.e. jsonl or parquet

#### Date and time placeholders
> 💡 Keep in mind all values are lowercased.

* Years
* `YYYY` - 2024, 2025
* `Y` - 2024, 2025
* Months
* `MMMM` - January, February, March
* `MMM` - Jan, Feb, Mar
* `MM` - 01, 02, 03
* `M` - 1, 2, 3
* Days of month
* `DD` - 01, 02
* `D` - 1, 2
* Hours 24h format
* `HH` - 00, 01, 02...23
* `H` - 0, 1, 2...23
* Minutes
* `mm` - 00, 01, 02...59
VioletM marked this conversation as resolved.
Show resolved Hide resolved
* `m` - 0, 1, 2...59
* Days of week
* `dddd` - Monday, Tuesday, Wednesday
* `ddd` - Mon, Tue, Wed
* `dd` - Mo, Tu, We
* `d` - 0-6
* `Q` - quarters 1, 2, 3, 4,
* `timestamp` - timestamp from pendulum.DateTime
* `load_package_timestamp` - timestamp from [load package](../../general-usage/destination-tables.md#load-packages-and-load-ids)

You can change the file name format by providing the layout setting for the filesystem destination like so:
```toml
[destination.filesystem]
layout="{table_name}/{load_id}.{file_id}.{ext}" # current preconfigured naming scheme
# layout="{schema_name}.{table_name}.{load_id}.{file_id}.{ext}" # naming scheme in dlt 0.3.11 and earlier

# More examples
# With timestamp
# layout = "{table_name}/{timestamp}/{load_id}.{file_id}.{ext}"
VioletM marked this conversation as resolved.
Show resolved Hide resolved

# With timestamp of the load package
# layout = "{table_name}/{load_package_timestamp}/{load_id}.{file_id}.{ext}"
VioletM marked this conversation as resolved.
Show resolved Hide resolved

# Parquet like layout (note: it is not compatible with internal datetime of parquet file)
# layout = "{table_name}/year={year}/month={month}/day={day}/{load_id}.{file_id}.{ext}"

# Custom placeholders
# extra_placeholders = { "owner" = "admin", "department" = "finance" }
# layout = "{table_name}/{owner}/{department}/{load_id}.{file_id}.{ext}"
```

A few things to know when specifying your filename layout:
Expand All @@ -241,6 +293,62 @@ Please note:
- `dlt` will not dump the current schema content to the bucket
- `dlt` will mark complete loads by creating an empty file that corresponds to `_dlt_loads` table. For example, if `chess._dlt_loads.1685299832` file is present in dataset folders, you can be sure that all files for the load package `1685299832` are completely loaded


### Advanced layout configuration

Filesystem destination allows you to configure advanced layout and additional placeholders via `config.toml` or when initializing via the factory

> 💡 For deeply nested layouts you might want to enable automatic directory creation for local filesystem destination by specifying `kwargs = '{"auto_mkdir": true}'`.

#### Via the configuration
```toml
layout = "{table_name}/{test_placeholer}/{YYYY}-{MM}-{DD}/{ddd}/{mm}/{load_id}.{file_id}.{ext}"
extra_placeholders = { "test_placeholder" = "test" }
current_datetime="2024-04-14T00:00:00"
```

#### Via the factory
You can override configuration options if you initialize and pass filesystem destination directly to pipeline.
It is also possible to provide callbacks for extra placeholder options and each callback should accept the parameters below and return a string

1. `schema_name`
2. `table_name`
3. `load_id`
4. `file_id`
5. `ext`

`current_datetime` can be a callback as well, it should return an instance of `pendulum.DateTime`.

```py
def placeholder_callback(
schema_name: str,
table_name: str,
load_id: str,
file_id: str,
ext: str,
) -> str:
return "value"


def get_current_datetime() -> pendulum.DateTime:
return pendulum.now()


pipeline = dlt.pipeline(
pipeline_name="data_things",
destination=filesystem(
layout = "{table_name}/{placeholer_x}/{timestamp}/{load_id}.{file_id}.{ext}",
current_datetime=datetime.now(),
# current_datetime=get_current_datetime,
extra_placehodlers={
"test_placeholder": "test",
"placeholder_x": placeholder_callback
}
)
)
```


## Supported file formats
You can choose the following file formats:
* [jsonl](../file-formats/jsonl.md) is used by default
Expand Down
Loading