-
Notifications
You must be signed in to change notification settings - Fork 29
docs: add delta lake documentation #238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool!
| " df.to_pandas(),\n", | ||
| " mode=\"overwrite\"\n", | ||
| ")" | ||
| "write_deltalake(table_path, df.to_pandas(), mode=\"overwrite\")" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that write_deltalake() supports ArrowArrayStreamExportable input (which our df is!), so you can just do:
write_deltalake(table_path, df, ...)https://delta-io.github.io/delta-rs/api/delta_writer/#write-to-delta-tables
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got this error when I changed from df.to_pandas() => df:
ValueError Traceback (most recent call last)
Cell In[7], line 6
2 df = sd.sql(
3 "select name, continent, ST_AsText(geometry) as geometry_wkt from countries"
4 )
5 table_path = "[/tmp/delta_with_wkt](http://localhost:8889/tmp/delta_with_wkt)"
----> 6 write_deltalake(table_path, df, mode="overwrite")
File [/opt/miniconda3/lib/python3.12/site-packages/deltalake/writer.py:306](http://localhost:8889/opt/miniconda3/lib/python3.12/site-packages/deltalake/writer.py#line=305), in write_deltalake(table_or_uri, data, schema, partition_by, mode, file_options, max_partitions, max_open_files, max_rows_per_file, min_rows_per_group, max_rows_per_group, name, description, configuration, schema_mode, storage_options, partition_filters, predicate, target_file_size, large_dtypes, engine, writer_properties, custom_metadata, post_commithook_properties)
300 data, schema = _convert_data_and_schema(
301 data=data,
302 schema=schema,
303 conversion_mode=ArrowSchemaConversionMode.PASSTHROUGH,
304 )
305 data = RecordBatchReader.from_batches(schema, (batch for batch in data))
--> 306 write_deltalake_rust(
307 table_uri=table_uri,
308 data=data,
309 partition_by=partition_by,
310 mode=mode,
311 table=table._table if table is not None else None,
312 schema_mode=schema_mode,
313 predicate=predicate,
314 target_file_size=target_file_size,
315 name=name,
316 description=description,
317 configuration=configuration,
318 storage_options=storage_options,
319 writer_properties=writer_properties,
320 custom_metadata=custom_metadata,
321 post_commithook_properties=post_commithook_properties,
322 )
323 if table:
324 table.update_incremental()
ValueError: C Data interface error: The datatype ""vu"" is still not supported in Rust implementation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, no string view support in the version of arrow being used by delta lake. That's a shame.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth trying this:
sd.sql("SET datafusion.execution.parquet.schema_force_view_types = false").execute()...and using the df object directly. You really want to be streaming the query result into DeltaLake so that you're not limited to the size of the memory.
| "dt = DeltaTable(table_path)\n", | ||
| "arrow_table = dt.to_pyarrow_table()\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably a user will want to select columns or filter with an expression here? (One of the cool things we could do here if we integrated a delta lake table provider would be to insert this query automatically based on the information DataFusion gives us).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, for sure. For DuckDB, the read Delta + apply filtering query pattern benefits from PyArrow Datasets vs PyArrow tables. I can prepare a little benchmark if that'd be interesting.
I added a filtering example to the notebook to show the benefits of the geometry data type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably also worth demonstrating how you can push a query down into the DeltaLake scan using its filter argument (most real world usage would do that instead of load the entire table into memory and then issue a query on it?).
One of the things you could demo is adding bbox columns on write by using ST_Xmin() and friends. When you read, you can issue a Delta Lake filter like
bbox.xmin <= -73.11 AND
bbox.ymin <= 44.03 AND
bbox.xmax >= -73.21 AND
bbox.ymax >= 43.97
...except in whatever syntax the delta lake filter argument uses. That should result in a more reasonable fetch from a large local or remote table. (Otherwise, users are better off just using Parquet because the pushdown is better).
| " df.to_pandas(),\n", | ||
| " mode=\"overwrite\"\n", | ||
| ")" | ||
| "write_deltalake(table_path, df.to_pandas(), mode=\"overwrite\")" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's worth trying this:
sd.sql("SET datafusion.execution.parquet.schema_force_view_types = false").execute()...and using the df object directly. You really want to be streaming the query result into DeltaLake so that you're not limited to the size of the memory.
| ```python | ||
| countries.to_view("countries", True) | ||
| df = sd.sql( | ||
| "select name, continent, ST_AsText(geometry) as geometry_wkt from countries" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In real life you'd probably want ST_AsBinary() (more compact for most usage)?
| "dt = DeltaTable(table_path)\n", | ||
| "arrow_table = dt.to_pyarrow_table()\n", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably also worth demonstrating how you can push a query down into the DeltaLake scan using its filter argument (most real world usage would do that instead of load the entire table into memory and then issue a query on it?).
One of the things you could demo is adding bbox columns on write by using ST_Xmin() and friends. When you read, you can issue a Delta Lake filter like
bbox.xmin <= -73.11 AND
bbox.ymin <= 44.03 AND
bbox.xmax >= -73.21 AND
bbox.ymax >= 43.97
...except in whatever syntax the delta lake filter argument uses. That should result in a more reasonable fetch from a large local or remote table. (Otherwise, users are better off just using Parquet because the pushdown is better).
This adds a documentation page on how to create Delta tables from SedonaDB DataFrame and how to read Delta tables into SedonaDB DataFrames.