Skip to content

Commit

Permalink
Updated
Browse files Browse the repository at this point in the history
  • Loading branch information
dat-a-man committed Oct 23, 2023
1 parent 4352753 commit 3325d87
Showing 1 changed file with 115 additions and 76 deletions.
191 changes: 115 additions & 76 deletions docs/website/docs/dlt-ecosystem/verified-sources/filesystem.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
---
title: Readers Source & Filesystem
description: dlt verified source for Readers Source & Filesystem
keywords: [readers source and filesystem, filesystem, readers source]
---
______________________________________________________________________

## title: Readers Source & Filesystem description: dlt verified source for Readers Source & Filesystem keywords: \[readers source and filesystem, filesystem, readers source\]

# Readers Source & Filesystem

Expand All @@ -12,64 +10,78 @@ keywords: [readers source and filesystem, filesystem, readers source]
or [book a call](https://calendar.app.google/kiLhuMsWKpZUpfho6) with our support engineer Adrian.
:::

This verified source easily streams files from s3, gcs, azure, or local filesystem using the reader source. Currently the following reader sources are supported:
- read_csv (with Pandas)
- read_jsonl
- read_parquet (with pyarrow)<br>
Additionally, it can read Excel files with a standalone transformer and copy files locally.
This verified source easily streams files from s3, gcs, azure, or local filesystem using the reader
source. Currently the following reader sources are supported:

- read_csv (with Pandas)
- read_jsonl
- read_parquet (with pyarrow)<br> Additionally, it can read Excel files with a standalone
transformer and copy files locally.

Sources and resources that can be loaded using this verified source are:

| Name | Description |
|------------|-------------------------------------------------------|
| readers | Provides chunked file reader resources |
| filesystem | Lists files in `bucket_url` using `file_glob` pattern |
| Name | Description |
|------------|------------------------------------------------------|
| readers | Provides chunked file reader resources |
| filesystem | Lists files in `bucket_url` using`file_glob` pattern |

We advice that you give each resource a [specific name](https://dlthub.com/docs/general-usage/resource#duplicate-and-rename-resources) before loading with pipeline.run. This will make
sure that data goes to a table with the name you want and that each pipeline uses a [separate state for
incremental loading.](https://dlthub.com/docs/general-usage/state#read-and-write-pipeline-state-in-a-resource)
We advice that you give each resource a
[specific name](https://dlthub.com/docs/general-usage/resource#duplicate-and-rename-resources)
before loading with pipeline.run. This will make sure that data goes to a table with the name you
want and that each pipeline uses a
[separate state for incremental loading.](https://dlthub.com/docs/general-usage/state#read-and-write-pipeline-state-in-a-resource)

### Standalone filesystem
Use the [standalone filesystem](https://dlthub.com/docs/general-usage/resource#declare-a-standalone-resource) resource to list files in s3, gcs, and azure buckets. This allows you to customize file
readers or manage files using fsspec. For details, see the [fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/index.html). The filesystem ensures consistent
file representation across bucket types and offers methods to access and read data. You can quickly build pipelines to:

- Extract text from PDFs
- Stream large file content directly from buckets
- Copy files locally.
Use the
[standalone filesystem](https://dlthub.com/docs/general-usage/resource#declare-a-standalone-resource)
resource to list files in s3, gcs, and azure buckets. This allows you to customize file readers or
manage files using fsspec. For details, see the
[fsspec documentation](https://filesystem-spec.readthedocs.io/en/latest/index.html). The filesystem
ensures consistent file representation across bucket types and offers methods to access and read
data. You can quickly build pipelines to:

- Extract text from PDFs
- Stream large file content directly from buckets
- Copy files locally.

## Setup Guide

### Grab credentials

This source can access various bucket types, including:

- AWS S3
- GCS Cloud Storage
- Azure Blob Storage
- AWS S3
- GCS Cloud Storage
- Azure Blob Storage

To access these, you'll need secret credentials obtained as follows:

#### AWS S3 credentials

To get AWS keys for S3 access:

1. Access IAM in AWS Console.
1. Select "Users", choose a user, and open "Security credentials".
1. Click "Create access key" for AWS ID and Secret Key.

For more info, see [AWS official documentation.](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html)
For more info, see
[AWS official documentation.](https://docs.aws.amazon.com/IAM/latest/UserGuide/id_credentials_access-keys.html)

#### GCS Cloud Storage credentials

To get GCS cloud storage access:

1. Log in to [console.cloud.google.com](http://console.cloud.google.com/).
1. Create a [service account](https://cloud.google.com/iam/docs/service-accounts-create#creating).
1. Enable "Google Analytics API"; see [Google's guide](https://support.google.com/googleapi/answer/6158841?hl=en).
1. In IAM & Admin > Service Accounts, find your account, click the three-dot menu > "Manage Keys" > "ADD KEY" > "CREATE" to get a JSON credential file.
1. Enable "Google Analytics API"; see
[Google's guide](https://support.google.com/googleapi/answer/6158841?hl=en).
1. In IAM & Admin > Service Accounts, find your account, click the three-dot menu > "Manage Keys" >
"ADD KEY" > "CREATE" to get a JSON credential file.
> Grant the service account appropriate permissions for cloud storage access.
For more info, see how to [create service account](https://support.google.com/a/answer/7378726?hl=en).
For more info, see how to
[create service account](https://support.google.com/a/answer/7378726?hl=en).

#### Azure Blob Storage credentials

Expand All @@ -80,8 +92,8 @@ To obtain Azure blob storage access:
1. Click "Settings" > "Access keys".
1. View account name and two keys (primary/secondary). Keep keys confidential.

For more info, see [Azure official documentation](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal).

For more info, see
[Azure official documentation](https://learn.microsoft.com/en-us/azure/storage/common/storage-account-keys-manage?tabs=azure-portal).

### Initialize the verified source

Expand Down Expand Up @@ -127,22 +139,26 @@ For more information, read the
private_key="Please set me up!"
project_id="Please set me up!"
```
2. Finally, enter credentials for your chosen destination as per the [docs](../destinations/).

3. You can pass the bucket URL and glob pattern or use "config.toml". For local filesystems, use file://
or skip the schema.
1. Finally, enter credentials for your chosen destination as per the [docs](../destinations/).

1. You can pass the bucket URL and glob pattern or use "config.toml". For local filesystems, use
file:// or skip the schema.

```toml
[sources.filesystem]
bucket_url="~/Documents/csv_files/"
file_glob="*"
```

For remote file systems you need to add the schema, it will be used to get the protocol being used, for example:
For remote file systems you need to add the schema, it will be used to get the protocol being
used, for example:

```toml
[sources.filesystem]
bucket_url="s3://my-bucket/csv_files/"
```

> Note: For Azure, use adlfs>=2023.9.0. Older versions mishandle globs.
## Run the pipeline
Expand All @@ -153,6 +169,7 @@ For more information, read the
```bash
pip install -r requirements.txt
```

1. Install optional modules:

- For AWS S3:
Expand All @@ -178,8 +195,8 @@ For more information, read the
dlt pipeline <pipeline_name> show
```
For example, the `pipeline_name` for the above pipeline example is `standard_filesystem`, you
may also use any custom name instead.
For example, the `pipeline_name` for the above pipeline example is `standard_filesystem`, you may
also use any custom name instead.
For more information, read the [Walkthrough: Run a pipeline](../../walkthroughs/run-a-pipeline).
Expand All @@ -190,10 +207,12 @@ For more information, read the [Walkthrough: Run a pipeline](../../walkthroughs/
### Source `readers`
This source provides resources that are chunked file readers. You can customize these readers optionally, resources provided are:
- read_csv(chunksize, **pandas_kwargs)
- read_jsonl(chunksize)
- read_parquet(chunksize)
This source provides resources that are chunked file readers. You can customize these readers
optionally, resources provided are:
- read_csv(chunksize, \*\*pandas_kwargs)
- read_jsonl(chunksize)
- read_parquet(chunksize)
```python
@dlt.source(_impl_cls=ReadersSource, spec=FilesystemConfigurationResource)
Expand All @@ -204,14 +223,14 @@ def readers(
) -> Tuple[DltResource, ...]:
```
`bucket_url`: The url to the bucket.<br>
`credentials`: The credentials to the filesystem of fsspec `AbstractFilesystem` instance.<br>
`file_glob`: Glob filter for files; defaults to non-recursive listing in the bucket.<br>
`bucket_url`: The url to the bucket.<br> `credentials`: The credentials to the filesystem of fsspec
`AbstractFilesystem` instance.<br> `file_glob`: Glob filter for files; defaults to non-recursive
listing in the bucket.<br>
### Resource `filesystem`
This resource enumerates files in bucket_url using the file_glob pattern. Files are returned as FileItem with
methods to access data. Pair with transformers for further processing.
This resource enumerates files in bucket_url using the file_glob pattern. Files are returned as
FileItem with methods to access data. Pair with transformers for further processing.
```python
@dlt.resource(
Expand All @@ -226,20 +245,23 @@ def filesystem(
) -> Iterator[List[FileItem]]:
```
`bucket_url`: URL of the bucket.<br>
`credentials`: Filesystem credentials of `AbstractFilesystem` instance.<br>
`file_glob`: File filter in glob format. Defaults to listing all non-recursive files in bucket_url.<br>
`files_per_page`: Number of files processed at once (default: 100).<br>
`extract_content`: If true, the content of the file will be read and returned in the resource. (default: False).<br>
`bucket_url`: URL of the bucket.<br> `credentials`: Filesystem credentials of `AbstractFilesystem`
instance.<br> `file_glob`: File filter in glob format. Defaults to listing all non-recursive files
in bucket_url.<br> `files_per_page`: Number of files processed at once (default: 100).<br>
`extract_content`: If true, the content of the file will be read and returned in the resource.
(default: False).<br>
## Filesystem Integration and Data Extraction Guide
### Filesystem Usage
- The filesystem tool lists files in a chosen bucket based on a glob pattern and returns file details (as FileInfo)
in adjustable page sizes.
- The resource is designed to work with transform functions and transformers for custom extract pipelines.
- The filesystem tool lists files in a chosen bucket based on a glob pattern and returns file
details (as FileInfo) in adjustable page sizes.
- The resource is designed to work with transform functions and transformers for custom extract
pipelines.
To load data into a specific table (instead of the default filesystem table), see the snippet below:
```python
@dlt.transformer(standalone=True)
def read_csv(items, chunksize: int = 15) ->:
Expand All @@ -258,29 +280,34 @@ pipeline.run(met_files.with_name("met_csv"))
### FileItem Representation
- All dlt sources/resources that yield files follow the FileItem contract.
- File content is typically not loaded; instead, full file info and methods to access content are available.
- File content is typically not loaded; instead, full file info and methods to access content are
available.
- Users can request an authenticated filespec AbstractFilesystem instance.
#### FileItem Fields:
`file_url` - Complete URL of the file; also the primary key (e.g., file://).<br>
`file_name` - Name or relative path of the file from the bucket_url.<br>
`mime_type` - File's mime type; sourced from the bucket provider or inferred from its extension.<br>
`modification_date` - File's last modification time (format: pendulum.DateTime).<br>
`size_in_bytes` - File size.<br>
`file_content` - Content, provided upon request.<br>
>📌 Note: When using a nested or recursive glob pattern, file_name will include the file's path. For instance, using the resource:
`filesystem("az://dlt-ci-test-bucket/standard_source/samples", file_glob="met_csv/A801/*.csv")`
will produce file names relative to the /standard_source/samples path, such as met_csv/A801/A881_20230920.csv.
`file_url` - Complete URL of the file; also the primary key (e.g., file://).<br> `file_name` - Name
or relative path of the file from the bucket_url.<br> `mime_type` - File's mime type; sourced from
the bucket provider or inferred from its extension.<br> `modification_date` - File's last
modification time (format: pendulum.DateTime).<br> `size_in_bytes` - File size.<br> `file_content` -
Content, provided upon request.<br>
> 📌 Note: When using a nested or recursive glob pattern, file_name will include the file's path. For
> instance, using the resource:
> `filesystem("az://dlt-ci-test-bucket/standard_source/samples", file_glob="met_csv/A801/*.csv")`
> will produce file names relative to the /standard_source/samples path, such as
> met_csv/A801/A881_20230920.csv.

### File Manipulation

FileItem, backed by a dictionary implementation, offers these helper methods:

- read_bytes(): Returns the file content as bytes.
- open(): Provides a file object when opened.
- filesystem: Gives access to an authorized AbstractFilesystem with standard fsspec methods.

## Customization

### Create your own pipeline

If you wish to create your own pipelines, you can leverage source and resource methods from this
Expand All @@ -295,8 +322,10 @@ verified source.
dataset_name="filesystem_data" # Use a custom name if desired
)
```
1. To read and load CSV files:
Replace the placeholder for BUCKET_URL with the appropriate path to your bucket or local destination.

1. To read and load CSV files: Replace the placeholder for BUCKET_URL with the appropriate path to
your bucket or local destination.

```python
BUCKET_URL = "YOUR_BUCKET_PATH_HERE"
met_files = readers(
Expand All @@ -309,12 +338,16 @@ Replace the placeholder for BUCKET_URL with the appropriate path to your bucket
print(load_info)
print(pipeline.last_trace.last_normalize_info)
```
> The `file_glob` parameter targets all CSVs in the "met_csv/A801" directory..<br>
> The `print(pipeline.last_trace.last_normalize_info)` line displays the data normalization details from the pipeline's last trace.<br>
> 📌 Note: If you have a default bucket URL set in "/.dlt/config.toml", you can omit the bucket_url parameter.

When rerun the next day, this pipeline updates both new and the previous day's records.
> The `file_glob` parameter targets all CSVs in the "met_csv/A801" directory..<br> The
> `print(pipeline.last_trace.last_normalize_info)` line displays the data normalization details
> from the pipeline's last trace.<br> 📌 Note: If you have a default bucket URL set in
> "/.dlt/config.toml", you can omit the bucket_url parameter.
When rerun the next day, this pipeline updates both new and the previous day's records.

1. To load only new CSV files:

```python
# This configuration will only consider new csv files
new_files = filesystem(bucket_url=BUCKET_URL, file_glob="met_csv/A801/*.csv")
Expand All @@ -326,6 +359,7 @@ Replace the placeholder for BUCKET_URL with the appropriate path to your bucket
```

1. To read and load Parquet and JSONL from a bucket:

```python
BUCKET_URL = "Please set me up!" #path of the bucket url or local destination
jsonl_reader = readers(BUCKET_URL, file_glob="**/*.jsonl").read_jsonl(
Expand All @@ -343,11 +377,14 @@ Replace the placeholder for BUCKET_URL with the appropriate path to your bucket
print(load_info)
print(pipeline.last_trace.last_normalize_info)
```
> `file_glob`: Specifies file pattern; reads all JSONL and Parquet files across directories.<br>
> `chunksize`: Set to 10,000; data read in chunks of 10,000 records each.<br>
> `print(pipeline.last_trace.last_normalize_info)` displays the data normalization details from the pipeline's last trace.<br>

> `file_glob`: Specifies file pattern; reads all JSONL and Parquet files across directories.<br>
> `chunksize`: Set to 10,000; data read in chunks of 10,000 records each.<br>
> `print(pipeline.last_trace.last_normalize_info)` displays the data normalization details from
> the pipeline's last trace.<br>
1. To set up a pipeline that reads from an Excel file using a standalone transformer:
```python
# Define a standalone transformer to read data from an Excel file.
@dlt.transformer(standalone=True)
Expand Down Expand Up @@ -379,9 +416,11 @@ Replace the placeholder for BUCKET_URL with the appropriate path to your bucket
# Print the loading information.
print(load_info)
```
> The code loads data from "example.xlsx" into the destination.
1. To copy files locally adding a step in the filesystem resource and then load the listing to the DB:
1. To copy files locally adding a step in the filesystem resource and then load the listing to the
DB:
```python
def copy_files_resource(local_folder: str) -> None:
Expand Down Expand Up @@ -416,4 +455,4 @@ Replace the placeholder for BUCKET_URL with the appropriate path to your bucket
# pretty print the information on data that was loaded
print(load_info)
print(pipeline.last_trace.last_normalize_info)
```
```

0 comments on commit 3325d87

Please sign in to comment.