Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/devel' into 1322-lancedb-usage-e…
Browse files Browse the repository at this point in the history
…xample-docs
  • Loading branch information
Pipboyguy committed May 16, 2024
2 parents 4803a12 + 5b0afa4 commit a86cd8d
Show file tree
Hide file tree
Showing 16 changed files with 306 additions and 68 deletions.
3 changes: 3 additions & 0 deletions dlt/common/data_writers/buffered.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ def __init__(
self.closed_files: List[DataWriterMetrics] = [] # all fully processed files
# buffered items must be less than max items in file
self.buffer_max_items = min(buffer_max_items, file_max_items or buffer_max_items)
# Explicitly configured max size supersedes destination limit
self.file_max_bytes = file_max_bytes
if self.file_max_bytes is None and _caps:
self.file_max_bytes = _caps.recommended_file_size
self.file_max_items = file_max_items
# the open function is either gzip.open or open
self.open = (
Expand Down
2 changes: 2 additions & 0 deletions dlt/common/destination/capabilities.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ class DestinationCapabilitiesContext(ContainerInjectableContext):

preferred_loader_file_format: TLoaderFileFormat = None
supported_loader_file_formats: Sequence[TLoaderFileFormat] = None
recommended_file_size: Optional[int] = None
"""Recommended file size in bytes when writing extract/load files"""
preferred_staging_file_format: Optional[TLoaderFileFormat] = None
supported_staging_file_formats: Sequence[TLoaderFileFormat] = None
escape_identifier: Callable[[str], str] = None
Expand Down
2 changes: 2 additions & 0 deletions dlt/destinations/impl/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ def capabilities() -> DestinationCapabilitiesContext:
caps.supported_loader_file_formats = ["jsonl", "parquet"]
caps.preferred_staging_file_format = "parquet"
caps.supported_staging_file_formats = ["parquet", "jsonl"]
# BQ limit is 4GB but leave a large headroom since buffered writer does not preemptively check size
caps.recommended_file_size = int(1024 * 1024 * 1024)
caps.escape_identifier = escape_bigquery_identifier
caps.escape_literal = None
caps.format_datetime_literal = format_bigquery_datetime_literal
Expand Down
3 changes: 2 additions & 1 deletion dlt/destinations/impl/mssql/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ def capabilities() -> DestinationCapabilitiesContext:
# https://learn.microsoft.com/en-us/sql/sql-server/maximum-capacity-specifications-for-sql-server?view=sql-server-ver16&redirectedfrom=MSDN
caps.max_identifier_length = 128
caps.max_column_identifier_length = 128
caps.max_query_length = 4 * 1024 * 64 * 1024
# A SQL Query can be a varchar(max) but is shown as limited to 65,536 * Network Packet
caps.max_query_length = 65536 * 10
caps.is_max_query_length_in_bytes = True
caps.max_text_data_type_length = 2**30 - 1
caps.is_max_text_data_type_length_in_bytes = False
Expand Down
3 changes: 3 additions & 0 deletions dlt/load/configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ class LoaderConfiguration(PoolRunnerConfiguration):
raise_on_max_retries: int = 5
"""When gt 0 will raise when job reaches raise_on_max_retries"""
_load_storage_config: LoadStorageConfiguration = None
# if set to `True`, the staging dataset will be
# truncated after loading the data
truncate_staging_dataset: bool = False

def on_resolved(self) -> None:
self.pool_type = "none" if self.workers == 1 else "thread"
35 changes: 34 additions & 1 deletion dlt/load/load.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
LoadClientUnsupportedWriteDisposition,
LoadClientUnsupportedFileFormats,
)
from dlt.load.utils import get_completed_table_chain, init_client
from dlt.load.utils import _extend_tables_with_table_chain, get_completed_table_chain, init_client


class Load(Runnable[Executor], WithStepInfo[LoadMetrics, LoadInfo]):
Expand Down Expand Up @@ -348,6 +348,8 @@ def complete_package(self, load_id: str, schema: Schema, aborted: bool = False)
)
):
job_client.complete_load(load_id)
self._maybe_trancate_staging_dataset(schema, job_client)

self.load_storage.complete_load_package(load_id, aborted)
# collect package info
self._loaded_packages.append(self.load_storage.get_load_package_info(load_id))
Expand Down Expand Up @@ -490,6 +492,37 @@ def run(self, pool: Optional[Executor]) -> TRunMetrics:

return TRunMetrics(False, len(self.load_storage.list_normalized_packages()))

def _maybe_trancate_staging_dataset(self, schema: Schema, job_client: JobClientBase) -> None:
"""
Truncate the staging dataset if one used,
and configuration requests truncation.
Args:
schema (Schema): Schema to use for the staging dataset.
job_client (JobClientBase):
Job client to use for the staging dataset.
"""
if not (
isinstance(job_client, WithStagingDataset) and self.config.truncate_staging_dataset
):
return

data_tables = schema.data_table_names()
tables = _extend_tables_with_table_chain(
schema, data_tables, data_tables, job_client.should_load_data_to_staging_dataset
)

try:
with self.get_destination_client(schema) as client:
with client.with_staging_dataset(): # type: ignore
client.initialize_storage(truncate_tables=tables)

except Exception as exc:
logger.warn(
f"Staging dataset truncate failed due to the following error: {exc}"
" However, it didn't affect the data integrity."
)

def get_step_info(
self,
pipeline: SupportsPipeline,
Expand Down
1 change: 1 addition & 0 deletions dlt/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,7 @@ def load(
with signals.delayed_signals():
runner.run_pool(load_step.config, load_step)
info: LoadInfo = self._get_step_info(load_step)

self.first_run = False
return info
except Exception as l_ex:
Expand Down
2 changes: 1 addition & 1 deletion docs/website/docs/dlt-ecosystem/verified-sources/slack.md
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ To get started with your data pipeline, follow these steps:

[This command](../../reference/command-line-interface) will initialize
[the pipeline example](https://github.com/dlt-hub/verified-sources/blob/master/sources/slack_pipeline.py)
with Google Sheets as the [source](../../general-usage/source) and
with Slack as the [source](../../general-usage/source) and
[duckdb](../destinations/duckdb.md) as the [destination](../destinations).

1. If you'd like to use a different destination, simply replace `duckdb` with the name of your
Expand Down
70 changes: 68 additions & 2 deletions docs/website/docs/general-usage/http/rest-client.md
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ class PostBodyPaginator(BasePaginator):

# Add the cursor to the request body
request.json["cursor"] = self.cursor

client = RESTClient(
base_url="https://api.example.com",
paginator=PostBodyPaginator()
Expand Down Expand Up @@ -527,4 +527,70 @@ from dlt.sources.helpers.rest_client import paginate

for page in paginate("https://api.example.com/posts"):
print(page)
```
```

## Troubleshooting

### `RESTClient.get()` and `RESTClient.post()` methods

These methods work similarly to the [get()](https://docs.python-requests.org/en/latest/api/#requests.get) and [post()](https://docs.python-requests.org/en/latest/api/#requests.post) functions
from the Requests library. They return a [Response](https://docs.python-requests.org/en/latest/api/#requests.Response) object that contains the response data.
You can inspect the `Response` object to get the `response.status_code`, `response.headers`, and `response.content`. For example:

```py
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth

client = RESTClient(base_url="https://api.example.com")
response = client.get("/posts", auth=BearerTokenAuth(token="your_access_token"))

print(response.status_code)
print(response.headers)
print(response.content)
```

### `RESTClient.paginate()`

Debugging `paginate()` is trickier because it's a generator function that yields [`PageData`](#pagedata) objects. Here's several ways to debug the `paginate()` method:

1. Enable [logging](../../running-in-production/running.md#set-the-log-level-and-format) to see detailed information about the HTTP requests:

```bash
RUNTIME__LOG_LEVEL=INFO python my_script.py
```

2. Use the [`PageData`](#pagedata) instance to inspect the [request](https://docs.python-requests.org/en/latest/api/#requests.Request)
and [response](https://docs.python-requests.org/en/latest/api/#requests.Response) objects:

```py
from dlt.sources.helpers.rest_client import RESTClient
from dlt.sources.helpers.rest_client.paginators import JSONResponsePaginator

client = RESTClient(
base_url="https://api.example.com",
paginator=JSONResponsePaginator(next_url_path="pagination.next")
)

for page in client.paginate("/posts"):
print(page.request)
print(page.response)
```

3. Use the `hooks` parameter to add custom response handlers to the `paginate()` method:

```py
from dlt.sources.helpers.rest_client.auth import BearerTokenAuth

def response_hook(response, **kwargs):
print(response.status_code)
print(f"Content: {response.content}")
print(f"Request: {response.request.body}")
# Or import pdb; pdb.set_trace() to debug

for page in client.paginate(
"/posts",
auth=BearerTokenAuth(token="your_access_token")
hooks={"response": [response_hook]}
):
print(page)
```
6 changes: 6 additions & 0 deletions docs/website/docs/running-in-production/running.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,12 @@ behind. In `config.toml`:
load.delete_completed_jobs=true
```

Also, by default, `dlt` leaves data in staging dataset, used during merge and replace load for deduplication. In order to clear it, put the following line in `config.toml`:

```toml
load.truncate_staging_dataset=true
```

## Using slack to send messages

`dlt` provides basic support for sending slack messages. You can configure Slack incoming hook via
Expand Down
Loading

0 comments on commit a86cd8d

Please sign in to comment.