Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add implementation of SQL Except operation #135

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ def __init__(self):
RelConverter.add_plugin_class(logical.LogicalSortPlugin, replace=False)
RelConverter.add_plugin_class(logical.LogicalTableScanPlugin, replace=False)
RelConverter.add_plugin_class(logical.LogicalUnionPlugin, replace=False)
RelConverter.add_plugin_class(logical.LogicalMinusPlugin, replace=False)
RelConverter.add_plugin_class(logical.LogicalValuesPlugin, replace=False)
RelConverter.add_plugin_class(logical.SamplePlugin, replace=False)
RelConverter.add_plugin_class(custom.AnalyzeTablePlugin, replace=False)
Expand Down Expand Up @@ -515,6 +516,10 @@ def _get_ral(self, sql):
nonOptimizedRelNode = generator.getRelationalAlgebra(validatedSqlNode)
rel = generator.getOptimizedRelationalAlgebra(nonOptimizedRelNode)
rel_string = str(generator.getRelationalAlgebraString(rel))
logger.debug(
f"Non optimised query plan: \n "
f"{str(generator.getRelationalAlgebraString(nonOptimizedRelNode))}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, makes sense to do that. Good idea.

)
except (ValidationException, SqlParseException) as e:
logger.debug(f"Original exception raised by Java:\n {e}")
# We do not want to re-raise an exception here
Expand Down
2 changes: 2 additions & 0 deletions dask_sql/physical/rel/logical/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from .sort import LogicalSortPlugin
from .table_scan import LogicalTableScanPlugin
from .union import LogicalUnionPlugin
from .minus import LogicalMinusPlugin
from .values import LogicalValuesPlugin

__all__ = [
Expand All @@ -16,6 +17,7 @@
LogicalSortPlugin,
LogicalTableScanPlugin,
LogicalUnionPlugin,
LogicalMinusPlugin,
LogicalValuesPlugin,
SamplePlugin,
]
69 changes: 69 additions & 0 deletions dask_sql/physical/rel/logical/minus.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
import dask.dataframe as dd

from dask_sql.physical.rex import RexConverter
from dask_sql.physical.rel.base import BaseRelPlugin
from dask_sql.datacontainer import DataContainer, ColumnContainer


class LogicalMinusPlugin(BaseRelPlugin):
"""
LogicalUnion is used on EXCEPT clauses.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess you still would like to update that docstring :-)

It just concatonates the two data frames.
"""

class_name = "org.apache.calcite.rel.logical.LogicalMinus"

def convert(
self, rel: "org.apache.calcite.rel.RelNode", context: "dask_sql.Context"
) -> DataContainer:
first_dc, second_dc = self.assert_inputs(rel, 2, context)

first_df = first_dc.df
first_cc = first_dc.column_container

second_df = second_dc.df
second_cc = second_dc.column_container

# For concatenating, they should have exactly the same fields
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# For concatenating, they should have exactly the same fields
# For subtracting, they should have exactly the same fields

output_field_names = [str(x) for x in rel.getRowType().getFieldNames()]
assert len(first_cc.columns) == len(output_field_names)
first_cc = first_cc.rename(
columns={
col: output_col
for col, output_col in zip(first_cc.columns, output_field_names)
}
)
first_dc = DataContainer(first_df, first_cc)

assert len(second_cc.columns) == len(output_field_names)
second_cc = second_cc.rename(
columns={
col: output_col
for col, output_col in zip(second_cc.columns, output_field_names)
}
)
second_dc = DataContainer(second_df, second_cc)

# To concat the to dataframes, we need to make sure the
# columns actually have the specified names in the
# column containers
# Otherwise the concat won't work
first_df = first_dc.assign()
second_df = second_dc.assign()

self.check_columns_from_row_type(first_df, rel.getExpectedInputRowType(0))
self.check_columns_from_row_type(second_df, rel.getExpectedInputRowType(1))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is now a lot of code duplication in this and the LogicalUnion plugin. I think it would make sense to extract the basic functionalities (the column name cleaning) into a function in utils.py and then reuse it here - or what do you think @demianw?


df = first_df.merge(
second_df,
how="left",
indicator=True,
)

df = df[df.iloc[:, -1] == "left_only"].iloc[:, :-1]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is super cool!


cc = ColumnContainer(df.columns)
cc = self.fix_column_to_row_type(cc, rel.getRowType())
dc = DataContainer(df, cc)
dc = self.fix_dtype_to_row_type(dc, rel.getRowType())
return dc
29 changes: 29 additions & 0 deletions tests/integration/test_except.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
def test_except_empty(c, df):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you do not need the df parameter here

result_df = c.sql(
"""
SELECT * FROM df
EXCEPT
SELECT * FROM df
"""
)
result_df = result_df.compute()
assert len(result_df) == 0


def test_except_non_empty(c, df):
result_df = c.sql(
"""
(
SELECT 1 as "a"
UNION
SELECT 2 as "a"
UNION
SELECT 3 as "a"
)
EXCEPT
SELECT 2 as "a"
"""
)
result_df = result_df.compute()
assert result_df.columns == "a"
assert set(result_df["a"]) == set([1, 3])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding a test with NaNs? You can also use one of the prepared tables (e.g. user_table_nan) if it makes sense.
It might also make sense to test this functionality against sqlite - I am just scared that especially on NULL (NaN) pandas/dask and SQL have different opinions (as I have seen so often, unfortunately...)