Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve Left/Right/Inner Join #223

Open
wants to merge 8 commits into
base: main
Choose a base branch
from
Open

Conversation

flcong
Copy link

@flcong flcong commented Aug 23, 2021

@nils-braun
Copy link
Collaborator

Thanks @flcong - looks like a lot of work! I might need some time checking the content. Make sure to ping me if I take too long... In the meantime, maybe @rajagurunath can have a first look (although this is also new code for you, sorry!)

@codecov-commenter
Copy link

codecov-commenter commented Aug 23, 2021

Codecov Report

Merging #223 (39980fd) into main (4dab949) will not change coverage.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff            @@
##              main      #223   +/-   ##
=========================================
  Coverage   100.00%   100.00%           
=========================================
  Files           64        64           
  Lines         2589      2594    +5     
  Branches       362       363    +1     
=========================================
+ Hits          2589      2594    +5     
Impacted Files Coverage Δ
dask_sql/physical/rel/logical/join.py 100.00% <100.00%> (ø)
dask_sql/context.py 100.00% <0.00%> (ø)
dask_sql/input_utils/hive.py 100.00% <0.00%> (ø)
dask_sql/input_utils/intake.py 100.00% <0.00%> (ø)
dask_sql/input_utils/convert.py 100.00% <0.00%> (ø)
dask_sql/input_utils/location.py 100.00% <0.00%> (ø)
dask_sql/input_utils/pandaslike.py 100.00% <0.00%> (ø)
dask_sql/physical/rel/custom/create_table.py 100.00% <0.00%> (ø)

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 4dab949...39980fd. Read the comment docs.

@flcong
Copy link
Author

flcong commented Aug 23, 2021

Thanks @flcong - looks like a lot of work! I might need some time checking the content. Make sure to ping me if I take too long... In the meantime, maybe @rajagurunath can have a first look (although this is also new code for you, sorry!)

No problem! First time contributing to a python package.

@rajagurunath
Copy link
Collaborator

Hi @flcong,

Thanks for fixing this issue and welcome to dask-sql and open source :)


# Left Join
querypnl = """
select a.*, b.startdate, b.enddate, b.lk_nullint, b.lk_int, b.lk_str,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tested this Query using PostgresSQL, it somewhat matches with the correct dataset specified here except for some dtype mismatch, I have one suggestion can Please you add this testing on test_postgres.py as well, where you can make use of assert_query_gives_same_result to compare your result directly with Postgres (similar to the one you specified in issue) result. let me know if you need any help.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All right. Let me check. To do the Postgre tests, I need to install docker, right?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes :), if not, a quick hack for testing purpose is you can hardcore any available postgresSQL ip address in the engine fixture of test_postgres.py.whichever is easier.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I've added the tests in test_postgres.py. Also I don't know why but I cannot connect to the postgres container using the original test_postgres.py. So I exposed port 5432 in client.containers.run and change address to "localhost". Then, the tests work fine.

how="left",
)
# Assign pd.NA
for v in df.columns[df.columns.str.startswith(other_varpre)]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just tried running the tests and the same code in jupyter notebook , and got the following error TypeError: float() argument must be a string or a number, not 'NAType' , if you are not getting this error then it is probably due to environment mismatch error I guess, Or any other guess regarding this Error?
maybe due to setting pd.NA ? what do you think @flcong?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which test triggers the error? It seems to be the conversion float(pd.NA)?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me, first query (left join) was failing in test_join_lricomplex
I will try to provide more context by tomorrow :)

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. BTW, I use Python 3.8.10 and Pandas 1.3.2 to run the tests.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good catch @flcong, The Above Error was raised on pandas version 1.2.4 and working fine in pandas==1.3.2

flcong added 2 commits August 25, 2021 08:39
* Add new data
* Allow for type conversion
* Allow for specification of whether or not to check dtypes.
* Add new tests in `test_join.py` in `test_postgres.py` as well.
* Expose port 5432 for postgres container.
* Change address to "localhost"
Copy link
Collaborator

@rajagurunath rajagurunath left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe Once these changes are done, I request @nils-braun for further review since I am also no expert in the dask-sql/physical/rel/logical part.

@@ -32,6 +33,7 @@ def engine():
# get the address and create the connection
postgres.reload()
address = postgres.attrs["NetworkSettings"]["Networks"]["dask-sql"]["IPAddress"]
address = "localhost"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @flcong, Apologies for not letting you know about this earlier, Once you have tested with a custom PostgreSQL address, please replace that address with the original docker container host address, if not Github Workflow will fail.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah. I see.

""",
["dates", "startdate", "enddate"],
force_dtype="dask",
check_dtype=True,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just curious here, Specifying check_dtype = False was not working here? Any other reason for introducing this new argument (force_dtype)?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure how assert_frame_equal works to determine if, for example, 2.0 and 2 are identical when check_dtype=False. So here I try to make it more explicit that a type cast to the dask dataframe makes the two dataframes identical even if check_dtype=True. I guess it conveys more information for developers? (Maybe just my OCD.)

Copy link
Collaborator

@nils-braun nils-braun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ui @flcong, that was probably quite difficult to find out. Nice job. It took me quite some time to dig through the code, so I think it would be very cool if you can add a bit more documentation. Maybe you can use a small example Dataframe in the comments to show, what is actually wrong and how each step helps in solving it (e.g. the three cases you have and then the different merges later which rows they actually add). If this is too much to ask for, I can also take care of it.

I had some smaller comments in the code, but there is also a more general issue I would like to discuss here (and maybe it is just me being too naive): let me recap the problem. When we do a LEFT or RIGHT join with a filter condition, it happens that we get rid of all NAN-rows in the other Dataframe (and therefore the JOIN turns into an inner join) - I did understand this part. In my naive understanding (please correct me) the solution should be rather easy: if in the resulting Dataframe in the end a certain line from the left (or right) Dataframe is missing and we are doing a LEFT (or RIGHT) join, we still need to keep it. Shouldn't here a very simple join with the initial left (or right) Dataframe before the merge and on one of your newly created index columns be enough here? Your solution might also work (I did not think through all corner cases, but looks good so far) but it is quite complex and involves multiple merges (which are shuffles in the Dask world). Am I too simple?

# could be duplicates. (Yeah. It may be better to inform users that
# index will break. After all, it is expected to be broken since the
# number of rows changes.
df = df.assign(uniqid=1)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I did not understand why this is needed, as you are grouping and joining later anyways). Probably I am just too stupid to see it, but maybe it is wise adding more documentation on why you do it.
(also, I am not 100% sure if this does not trigger a calculation, as Dask needs to know about the divisions. But I did not check)

@@ -92,12 +95,32 @@ def convert(
# 4. dask can only merge on the same column names.
# We therefore create new columns on purpose, which have a distinct name.
assert len(lhs_on) == len(rhs_on)
# Add two columns (1,2,...) to keep track of observations in left and
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes as well as the index-reset further down are only used when there is a filter condition. I do not expect this to be the default. Therefore I think it would be nice if we do not touch the "normal" use case and do not introduce another performance drawback for the "normal" user. I think a simple if filter_condition and some comment should be enough, or what do you think?

@@ -177,8 +218,91 @@ def merge_single_partitions(lhs_partition, rhs_partition):
for rex in filter_condition
],
)
logger.debug(f"Additionally applying filter {filter_condition}")
df = filter_or_scalar(df, filter_condition)
# Three cases to deal with inequality conditions (left join as an example):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add more documentation here? From the PR context I know that we are dealing with complex join conditions, which consist of an equality and an in-equality join (but there could be more than just inequalities), but someone just reading the code will not know that. Just describe the setting you are dealing with and the actual problem with the naive implementation before going into the details.

@nils-braun
Copy link
Collaborator

Am I too simple?
Just to make my point clear: if you replace your changes in join.py inside the filter_condition check just with

logger.debug(f"Additionally applying filter {filter_condition}")
df = filter_or_scalar(df, filter_condition)

# make sure we recover any lost rows in case of left, right or outer joins
if join_type in ["left", "outer"]:
    df = df.merge(df_lhs_renamed, on=list(df_lhs_renamed.columns), how="right")
elif join_type in ["right", "outer"]:
    df = df.merge(df_rhs_renamed, on=list(df_rhs_renamed.columns), how="right")

df = df.drop(columns=["left_idx", "right_idx"])

your tests still succeed

@@ -86,6 +86,88 @@ def datetime_table():
)


@pytest.fixture
def user_table_lk():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I love those tests, they are super cool because they seem like coming from a real use-case, which is absolute brilliant.

However, can we also have a very simple one with just like 3-4 lines and two columns (e.g. the one I used in my comments)? This makes debugging much easier than skimming though multiple lines which (because the columns are so wide) even span a lot of space in the editor. I can also take care of this if you want!

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. I realize that. I simplified the new tests.

flcong added 2 commits August 30, 2021 10:09
* Roll back to previous `join.py` but add lines to merge unmatched columns.
* Fix a bug in `merge_single_partitions` where the returned dataframe has an extra column `"common"` that triggers metadata mismatch in the added lines of merging.
* Since int columns will be converted to float if there are unmatched rows, I use `check_dtype=False` but still use nullable int in the assumed correct table.
@flcong
Copy link
Author

flcong commented Aug 30, 2021

Am I too simple?
Just to make my point clear: if you replace your changes in join.py inside the filter_condition check just with

logger.debug(f"Additionally applying filter {filter_condition}")
df = filter_or_scalar(df, filter_condition)

# make sure we recover any lost rows in case of left, right or outer joins
if join_type in ["left", "outer"]:
    df = df.merge(df_lhs_renamed, on=list(df_lhs_renamed.columns), how="right")
elif join_type in ["right", "outer"]:
    df = df.merge(df_rhs_renamed, on=list(df_rhs_renamed.columns), how="right")

df = df.drop(columns=["left_idx", "right_idx"])

your tests still succeed

That's brilliant! I didn't think of that. I rolled back join.py to the previous version and added the lines you mentioned.

One caveat is that in this case, if any input column has type int and there are unmatched rows, np.nan will be added and the column type will be cast to float, instead of nullable int. Since we don't know whether or not there are unmatched rows without triggering compute() (I guess), we have to either leave it as is (users may want to type-cast float into nullable int later) or we cast all int columns to nullable int (which may not be good either). What do you think?

By the way, I also find a bug in the merge_single_partitions function in join.py. (I don't know how you quote code in the conversation XD.) In the returned dataframe, there is an extra column "common" which causes a metadata mismatch for left join when there are only inequality conditions (e.g. my time-series table tests), so I dropped the column.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Left Join becomes Inner Join for inequality conditions
4 participants