-
Notifications
You must be signed in to change notification settings - Fork 84
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Upgrade datafusion #867
Upgrade datafusion #867
Conversation
I took a stab at some of those changes to Hopefully that's helpful and not stepping on your toes. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We'll likely still need the trait using FromPyArrow::from_pyarrow_bound
d58ae94
to
fc182d1
Compare
row_number was converted to a UDF in datafusion v42 apache/datafusion#12030 This specific functionality needs to be added back in.
Implicit defaults for trailing optional arguments have been deprecated in pyo3 v0.22.0 PyO3/pyo3#4078
fc182d1
to
4945661
Compare
Co-authored-by: Michael J Ward <[email protected]>
b6e4645
to
afcc9f1
Compare
The using
|
Alright - I've narrowed down the bug. I'm fairly certain this is upstream but haven't reproduced it in Rust yet. I have a commit here that re-implements upstream The thing to notice in this log is that I think the simplified version should be any df
.with_column("foo", <normal expr>)
.with_column("bar", <window expr>) adding column: "total_value" with expr: PyExpr { expr: WindowFunction(WindowFunction { fun: AggregateUDF(AggregateUDF { inner: Sum { signature: Signature { type_signature: UserDefined, volatility: Immutable } } }), args: [Column(Column { relation: None, name: "value" })], partition_by: [], order_by: [], window_frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }, null_treatment: None }) }
window_func_exprs: [WindowFunction(WindowFunction { fun: AggregateUDF(AggregateUDF { inner: Sum { signature: Signature { type_signature: UserDefined, volatility: Immutable } } }), args: [Column(Column { relation: None, name: "value" })], partition_by: [], order_by: [], window_frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }, null_treatment: None })]
col_exists: true, window_func: true
plan: WindowAggr: windowExpr=[[sum(value) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]]
Aggregate: groupBy=[[?table?.ps_partkey]], aggr=[[sum(value) AS value]]
Projection: ?table?.n_nationkey, ?table?.n_name, ?table?.s_suppkey, ?table?.s_nationkey, ?table?.ps_supplycost, ?table?.ps_availqty, ?table?.ps_suppkey, ?table?.ps_partkey, ?table?.ps_supplycost * ?table?.ps_availqty AS value
Inner Join: ?table?.s_suppkey = ?table?.ps_suppkey
Inner Join: ?table?.n_nationkey = ?table?.s_nationkey
Filter: ?table?.n_name = Utf8("GERMANY")
Projection: ?table?.n_nationkey, ?table?.n_name
TableScan: ?table?
Projection: ?table?.s_suppkey, ?table?.s_nationkey
TableScan: ?table?
Projection: ?table?.ps_supplycost, ?table?.ps_availqty, ?table?.ps_suppkey, ?table?.ps_partkey
TableScan: ?table?
qualifier: Some(Bare { table: "?table?" }), field: Field { name: "ps_partkey", data_type: Int64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }
adding column
qualifier: None, field: Field { name: "value", data_type: Decimal128(36, 2), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }
adding window function with alias
qualifier: None, field: Field { name: "sum(value) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Decimal128(38, 2), nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }
adding window function with alias
col exists - not pushing Alias(Alias { expr: WindowFunction(WindowFunction { fun: AggregateUDF(AggregateUDF { inner: Sum { signature: Signature { type_signature: UserDefined, volatility: Immutable } } }), args: [Column(Column { relation: None, name: "value" })], partition_by: [], order_by: [], window_frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)), is_causal: false }, null_treatment: None }), relation: None, name: "total_value" })
Traceback (most recent call last):
File "/home/mike/workspace/datafusion-python/dev/examples/tpch/q11_important_stock_identification.py", line 70, in <module>
df = df.with_column(
^^^^^^^^^^^^^^^
File "/home/mike/workspace/datafusion-python/dev/python/datafusion/dataframe.py", line 164, in with_column
return DataFrame(self.df.with_column(name, expr.expr))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Exception: Error during planning: Projections require unique expression names but the expression "value AS total_value" at position 1 and "sum(value) ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS total_value" at position 2 have the same name. Consider aliasing ("AS") one of them. |
Verified and reported the upstream bug: |
Great! How do we want to proceed w/r to this PR, wait for a fix to land upstream or can we work around it? |
I think we can keep working on this PR. I don't think we want to merge it into |
@emgeee, Tim's PR was just merged. Updating the commit reference for the |
DF42.0 should be available now so I think we can update the cargo.toml and get this reviewed/merged |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a couple of small things, but otherwise look good. Actually I think it was my commit that introduced them so let me know if you want me to touch them up
Which issue does this PR close?
Closes #870.
Rationale for this change
What changes are included in this PR?
This PR contains changes required to update to Datafusion v42, Arrow v53, and pyo3 v22
Are there any user-facing changes?