Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pyarrow direct loading #679

Merged
merged 30 commits into from
Oct 16, 2023
Merged

Pyarrow direct loading #679

merged 30 commits into from
Oct 16, 2023

Conversation

steinitzu
Copy link
Collaborator

Continuation of #662

@netlify
Copy link

netlify bot commented Oct 9, 2023

Deploy Preview for dlt-hub-docs canceled.

Name Link
🔨 Latest commit 2b96904
🔍 Latest deploy log https://app.netlify.com/sites/dlt-hub-docs/deploys/652da78cc45f6000082a7b4f

@rudolfix
Copy link
Collaborator

@sh-rp please take a look
@steinitzu it looks good. code structure is OK. what we still need

  • a pipeline test where we load arrow tables and panda frames to all destinations that support parquet
  • a documentation: let's add it to "Verified Sources" under Arrow Table / Pandas
  • I will still do the review

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is good!

  • we need tests I mentioned in review
  • please also test what happens if we add incremental or any other filter/map function
  • I think it is trivial to modify ItemTransform to deconstruct Table or Panda into rows and make incremental working. this of course will destroy the table. maybe there's a way to "rescue" filter functions? maybe you can filter arrow table by a python lambda

dlt/common/storages/normalize_storage.py Outdated Show resolved Hide resolved
dlt/extract/extract.py Outdated Show resolved Hide resolved
dlt/extract/extract.py Outdated Show resolved Hide resolved
dlt/normalize/items_normalizers.py Outdated Show resolved Hide resolved
dlt/normalize/normalize.py Show resolved Hide resolved
dlt/normalize/normalize.py Show resolved Hide resolved
@sh-rp
Copy link
Collaborator

sh-rp commented Oct 10, 2023

Nice PR! The one thing that came to my mind is, that with the way it is implemented now we are losing the ability to create variant columns. This means if we try to load an arrow table that has a different datatype on a column, it will now fail when updating the schema in the extract phase but not create a variant. It seems we could solve this with rename_columns in arrow, but this would mean having some kind of variant detection step in the extract phase, or alternatively push the schema updating downstream into the normalizer phase (which might be nice anyway to keep it similar to the way it is now). Also should we give the users the option to extract complex fields in parquet into subtables?

PS: Ok, if we push this to the normalizer, then we lose the ability to just copy those files to the load stage..

@steinitzu
Copy link
Collaborator Author

this is good!

* we need tests I mentioned in review

* please also test what happens if we add incremental or any other filter/map function

* I think it is trivial to modify `ItemTransform` to deconstruct Table or Panda into rows and make incremental working. this of course will destroy the table. maybe there's a way to "rescue" filter functions? maybe you can filter arrow table by a python lambda

I think we could do incremental efficiently with min/max functions on arrow table directly. But custom aggregate functions look complicated in arrow.
Maybe with pandas we can make it work generally (hoping the conversion arrow -> pandas -> arrow is not destructive)

@rudolfix
Copy link
Collaborator

@sh-rp good points! but in most cases people want Arrow tables to have strict schema (and typically the data is already well defined)

I like the idea to extract the structs into separate tables. We could do that in parquet normalizer but OFC the tables would need to be rewritten - most probably using duckdb as engine. btw. that would be FAST - compare to our standard normalizer

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some comments on incremental

dlt/extract/incremental/__init__.py Outdated Show resolved Hide resolved
dlt/extract/incremental/transform.py Show resolved Hide resolved
dlt/extract/incremental/transform.py Show resolved Hide resolved

# Filter out all rows which have cursor value equal to last value
# and unique id exists in state
tbl = tbl.append_column("_dlt_index", pa.array(range(tbl.num_rows)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you can create index after you filter the rows in next line

unique_values = [(i, uq_val) for i, uq_val in unique_values if uq_val not in incremental_state['unique_hashes']]
keep_idx = pa.array(i for i, _ in unique_values)
# Filter the table
tbl = tbl.filter(pa.compute.is_in(tbl["_dlt_index"], keep_idx))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. this will keep only records with unique values and remove all the records that are "newer" than unique right? because keep_idx is only on unique values. maybe you should use is_not_in?
  2. also adding index could be deferred to the moment we need to remove something below. (and maybe primary key could be used for that if present)

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks good! I assume that is still WIP (some tests and docs mostly)
also you detect arrow tables in lists but IMO when you write to tables in extractor you assume that items are always actual objects not lists. if I'm not right you can ignore it.

also in arrow incremental IMO you delete too many records but that will come out in the tests

amazing work!

dlt/extract/incremental/__init__.py Outdated Show resolved Hide resolved
dlt/normalize/normalize.py Outdated Show resolved Hide resolved
@steinitzu
Copy link
Collaborator Author

looks good! I assume that is still WIP (some tests and docs mostly) also you detect arrow tables in lists but IMO when you write to tables in extractor you assume that items are always actual objects not lists. if I'm not right you can ignore it.

also in arrow incremental IMO you delete too many records but that will come out in the tests

amazing work!

Yep tests and docs coming, needs more thorough testing. I'm hoping I can parametrize the current incremental tests easily to run with all formats, that would be best.

also you detect arrow tables in lists but IMO when you write to tables in extractor you assume that items are always actual objects not lists

You might be right. Needs tests with lists as well.

rudolfix
rudolfix previously approved these changes Oct 15, 2023
Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! thx for incremental tests!
this needs to be fixed though

 Redshift cannot load TIME columns from parquet files. Switch to direct INSERT file format or convert `datetime.time` objects in your data to `str` or `datetime.datetime`

Also Athena has the same problem

@rudolfix rudolfix marked this pull request as ready for review October 16, 2023 15:40
@steinitzu
Copy link
Collaborator Author

LGTM! thx for incremental tests! this needs to be fixed though

 Redshift cannot load TIME columns from parquet files. Switch to direct INSERT file format or convert `datetime.time` objects in your data to `str` or `datetime.datetime`

Should work now! Had a typo in this check b9f7aaf

@rudolfix
Copy link
Collaborator

LGTM! thx for incremental tests! this needs to be fixed though

 Redshift cannot load TIME columns from parquet files. Switch to direct INSERT file format or convert `datetime.time` objects in your data to `str` or `datetime.datetime`

Should work now! Had a typo in this check b9f7aaf

@steinitzu now there's some problems with types. ie. binary values are returned as hex strings on redshift. which is the case in other tests...

Copy link
Collaborator

@rudolfix rudolfix left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so good!

@rudolfix rudolfix merged commit d3db284 into devel Oct 16, 2023
35 of 39 checks passed
@rudolfix rudolfix deleted the sthor/pyarrow-load branch October 16, 2023 21:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants