-
Notifications
You must be signed in to change notification settings - Fork 189
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add pyiceberg DataFusion e2e test #825
base: main
Are you sure you want to change the base?
Conversation
64c1473
to
3a328ed
Compare
1b03c42
to
c261ddf
Compare
import pyarrow.parquet as pq | ||
|
||
# Generate a table with various types in memory and dump to a Parquet file | ||
ctx = SessionContext() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm reluctant to let Datafusion generate these files, for two reasons:
- These files are imported into an Iceberg table and are missing things like Field-IDs etc.
- I'd rather depend on purely Spark which uses the Java SDK underneath, which is more complete than PyIceberg. The Java impl is considered the reference implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing things like Field-IDs etc.
If by that you mean PARQUET:field_id
note that pyiceberg actualy decorates the fields during table creation with those, I've just pushed an addendum to the tests that demonstrate this.
Granted I'm not sure about the state of pyiceberg compliance with the protocol.
Would you be more open to accepting this test if I switched to Java SDK for generating the Parquet files and creating the table?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also see value in this, let's hear what others think!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally i like the idea of using pyiceberg
for testing, maybe also alongside spark. This can help facilitate cross platform interoperability,
but im not sure when we would use one versus the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright just an update—I've tried doing the same with pyspark but it seems it is being very conservative and either rejecting some types outright (e.g. Illegal Parquet type: INT64 (TIMESTAMP(NANOS,false))
), or coercing them on its own.
For instance if I just supply it with a Parquet file like type_test.parquet
(output of load_types_table.py
) with type hints
message arrow_schema {
optional boolean cboolean;
optional int32 cint8 (INTEGER(8,true));
optional int32 cint16 (INTEGER(16,true));
optional int32 cint32;
optional int64 cint64;
...
and do
parquet_df = spark.read.parquet("types_test.parquet")
parquet_df.writeTo("rest.default.types_test").using("iceberg").create()
the resulting data parquet files have incompatible type hints removed
message table {
optional boolean cboolean = 1;
optional int32 cint8 = 2;
optional int32 cint16 = 3;
optional int32 cint32 = 4;
optional int64 cint64 = 5;
...
TLDR: I can't seem to replicate the issue in #814 with pyspark, though it could still be used to test out the non-corner cases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but im not sure when we would use one versus the other.
I think there's definitely merit in having an integration types test like the one proposed here. The basic one can be done with pyspark, though it might be a bonus (or not) that apparently pyiceberg can hit some corner cases which pyspark can't.
(fwiw, this is something that we ran into in our dev work, so it's not a end-user-reported issue.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
apparently pyiceberg can hit some corner cases which pyspark can't.
Interesting! curious to hear about what you guys ran into.
I think Pyspark is good for integration tests through the spark API. PyIceberg allows more granular test cases since you can work with iceberg constructs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
curious to hear about what you guys ran into.
Actually the only one so far seems to be this Iceberg-from-parquet creation discussed above—if the Parquet file has e.g. Int32 field with Int16 type hints spark will automatically coerce those in the resulting data file into Int32, whereas pyiceberg will leave it as is (and so during the scan in the actual record batches we get Int16, whilst the schema says it's Int32).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
crates/integration_tests/testdata/pyiceberg/load_types_table.py
Outdated
Show resolved
Hide resolved
import pyarrow.parquet as pq | ||
|
||
# Generate a table with various types in memory and dump to a Parquet file | ||
ctx = SessionContext() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally i like the idea of using pyiceberg
for testing, maybe also alongside spark. This can help facilitate cross platform interoperability,
but im not sure when we would use one versus the other.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think E2E test should be pyiceberg writes and reading with datafusion. similar to #850
other than that, generally lgtm!
# Generate a table with various types in memory and dump to a Parquet file | ||
ctx = SessionContext() | ||
ctx.sql(""" | ||
CREATE TABLE types_test ( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
lets generate these values in pyiceberg/pyarrow. it'll help us figure out the supportability of each types which is where the E2E test is useful.
Here are the supported types we use in pyiceberg's integration test
https://github.com/apache/iceberg-python/blob/acd6f5a8a19db709e835e2686b87d4db3dca254f/tests/conftest.py#L304-L349
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds good, I just switched to pyarrow to generate the Parquet file from which the table is constructed.
To be clear, if #850 gets accepted and merged, and if folks don't feel comfortable including pyiceberg in the testing scope for now, then this PR can just remain as an example for the issue in #813 (it still exercises it as demonstrated at the very end of the integration test).
64990ba
to
5cb32a9
Compare
5cb32a9
to
ba81d13
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM! I like this E2E test from pyiceberg writes to datafusion read.
In the future, I want to expose the IcebergTableProvider as python binding so we can also use the datafusion python library to query (#865)
|
||
FROM python:3.9-bullseye | ||
|
||
RUN pip install pyiceberg[pyarrow]==0.8 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: 0.8.1 is the latest version
identifier=f"default.types_test", | ||
schema=parquet_file.schema, | ||
) | ||
iceberg_table.append(df=parquet_file) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
iceberg_table.append(table)
also works, do we need to roundtrip writing to parquet and then reading it back?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh I thought only appending via a parquet file reproduces #813, but turns out so does appending a pyarrow table directly; nice catch!
('ctimestamptz', pa.timestamp('us', tz='UTC')), | ||
('cutf8', pa.utf8()), | ||
('cbinary', pa.binary()), | ||
]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we want to include other data types? including complex types? like in https://github.com/apache/iceberg-python/blob/acd6f5a8a19db709e835e2686b87d4db3dca254f/tests/conftest.py#L304-L349
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense to me, could be valuable to include list/map/struct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM!
]; | ||
assert_batches_eq!(expected, &batches); | ||
|
||
// TODO: this isn't OK, and should be fixed with https://github.com/apache/iceberg-rust/issues/813 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we wait for this fix?
Besides serving as an e2e test, this also excessive quite a bit of type conversion logic.
In addition it demonstrates the issue described in #813.