Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filesystem] verified source #216

Closed
2 of 9 tasks
willi-mueller opened this issue Jul 17, 2023 · 2 comments
Closed
2 of 9 tasks

[filesystem] verified source #216

willi-mueller opened this issue Jul 17, 2023 · 2 comments
Labels
verified source dlt source with tests and demos

Comments

@willi-mueller
Copy link
Collaborator

willi-mueller commented Jul 17, 2023

Quick source info

Current Status

  • I plan to write it

What source does/will do

This connector loads a series of files from an S3 or GCS bucket. Which files will be loaded depends on a configuration giving a file name pattern as well as the write_disposition.

Why this is relevant

Many data providers deliver raw data or reports to bucket storage in S3 or GCS.

This connector can be the basis for additional readily configured source connectors that import from buckets. For example, Adjust offers hourly free CSV export that can be configured by the Adjust customer. I plan to open a follow-up issue for an Adjust connector.

Test account / test data

  • I'd like dltHub to create a test account before we start
  • I have a test account and can share my test data or a test account with dltHub after we merge the source

Additional context

This source, like other standard readers, will be used as a first step in data loading pipeline. In essence

  1. it will read a specified folder and filter file names with specified regex or glob
  2. it will yield FileItem(s) (file item data contract) - "built-in sources" - a repository of standard readers #256
  3. it will not read the files, it will not copy them locally!
  4. it will provided a helper method that, when passed a file url, will return authenticated fsspec instance that behaves like file object - so next step in the pipeline can read the file - often piece by piece

All other common properties of standard source apply

  • incremental load on mtime of a file (optional)
  • primary key on file url

dlt core code reuse

dlt-hub/dlt#626 - introduces fsspec into common of dlt library. take things from there. also look at tests with usage examples

    • we reuse filesystem configuration
    • we reuse fsspec client factory
    • we reuse mtime dispatch

Requirements

    • implement as dlt.resource which takes following arguments:
  • bucket_url and credentials arguments as per ... spec
  • filename_filter which can be compiled regex/glob expression or callback function
  • a flag to enable/disable incremental load (disabled by default) + start/end date - like in slack/zendesk
    • how it work internally
  • it walks/globs the bucket_url and filters out files with regex/glob/callback
  • yields file items in configurable pages
    • implements following helper functions:
  • when passed file url, returns AbstractFileSystem via client_from_config
  • a transform that can be added to the resource that will load all files using the above

Background / Use Cases

File Identifiers

Often, data deliveries come in periodically added CSV or parquet files. I've seen files, such as
report_from_acme_2023-07-14.csv, report_from_acme_2023-07-15.csv etc.

The file identifier has usually a fixed part, such as report_from_acme as well as a variable identifier, such as a timestamp or a date. Thus, I think the config for this source should include:

  1. a regexp to filter files and identify the fixed part of the file identifier to allow users building one pipeline for report_from_acme and another for report_from_foobar.
  2. a regexp to filter files that should be loaded per pipeline invocation. E.g. if it's scheduled daily then it should load the filename that matches the current_date(). Alternatively, we can somehow configure the incremental load strategy to just load what hasn't been loaded yet – without parsing the dates or timestamps in the filename.

Loading method

@adrianbr suggested figuring out incremental processing of the CSV files so that we can load a 2GB file on a 256MB RAM machine, such as on GH-actions. I love the idea.

Loading procedure:

  1. load the source files completely to disk, not memory
  2. read the source file in chunks into memory and yield line by line for a low RAM footprint

Alternative:
Maybe there is a way to guess the bytes required per row so that we can download files in chunks from S3 to disk and then load incrementally to memory to parse the CSV into records.

Pandas' has a read_csv() method which supports reading from S3/GCS and also supports reading in chunks. However, I would not use it because:

  1. it would add the heavy heavy Pandas as a dependency
  2. it might interfere with the typing done by dlt?
@rudolfix
Copy link
Contributor

thanks for bringing this! we are researching this topic for some time, the aim is for something both easy to use and very customizable. you can also take a look at dlt-hub/dlt#338

I'd break this into two dlt resources

  1. an fsspec based resource that can scan folders and yield eigible files - both from buckets and locally. it should probably allow for incremental loading (ie. by checking last modified timestamp, deleting processed files or using daily pattern as you propose).
  • here you started to give us a kind of spec with filters etc. we should dive deeper
  1. we need a resource that takes a file url (or a list) and applies a proper reader to it (can be specified or use file extension to spawn the correct reader). again it could use fsspec to open it and "stream" the content. to my knowledge, fsspec is able to "stream" bucket content, I've also tested it with opening https urls and it also reads in chunks.

in dlt you can pipe data from one source to another so we can join (1) and (2). they can be also used separately. we plan to use (1) to feed data from files into our langchain source

You can also use (2) standalone and just pass a list of files into it

btw. I totally agree that the data from files should be read and yielded in batches. IMO standard csv reader would do (https://docs.python.org/3/library/csv.html) - it should accept file pointer from fsspec. same for parquet (pyarrow reads in chunks and uses fsspec internally). there's also really cool ijson streaming json reader (I have a proof of concept somewhere)
we keept the dependencies clean so I'd avoid using pandas.

next steps:

  • let's agree on general architecure (creating one resource to process folders and one that yields file content from a list)
  • then we spec out the folder reader. I think there's a lot of interesting use cases and we can get a lot of help
  • we can start with csv - 100% agree
  • could you explain why you'd like to have parquet as a source?

btw. I think this kind of source is so fundamental that at some point we'll move it to the core library

@rudolfix
Copy link
Contributor

we have fsspec based destination implemented. I have a feeling that a lot of the code can be reused. we can refactor the core lib to make it easier. I think we could save the most time on handling credentials. it is all done just needs to be exposed

@rudolfix rudolfix added the verified source dlt source with tests and demos label Jul 31, 2023
@rudolfix rudolfix moved this to Planned in Verified Sources Jul 31, 2023
@rudolfix rudolfix changed the title [Object storage] verified source [filesystem] verified source Sep 12, 2023
@rudolfix rudolfix mentioned this issue Sep 17, 2023
4 tasks
@rudolfix rudolfix moved this from Planned to Implementation & Review in Verified Sources Oct 4, 2023
@rudolfix rudolfix moved this from Implementation & Review to Demos and docs in Verified Sources Oct 10, 2023
@rudolfix rudolfix moved this from Demos and docs to Ready for Deployment in Verified Sources Nov 3, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
verified source dlt source with tests and demos
Projects
Status: Ready for Deployment
Development

No branches or pull requests

2 participants