Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Changes towards an engine that could help in Neurolang #1

Open
wants to merge 39 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
7e32181
Debugging improvements and added an implementation for logical minus
demianw Feb 7, 2021
d6c7e6a
Adding some new optimisations
demianw Feb 7, 2021
9ac75bb
Squashed commit of the following:
demianw Feb 8, 2021
8c5ec9e
Test for except
demianw Feb 8, 2021
3a93a11
Added Intersect support
jonasrenault Feb 9, 2021
78fc316
Merge branch 'various_changes' of github.com:NeuroLang/dask-sql into …
jonasrenault Feb 9, 2021
83ddf05
JoinToMultiJoinRule rule insterted to reorganiser joins
demianw Feb 9, 2021
7f44ece
Merge branch 'various_changes' of https://github.com/NeuroLang/dask-s…
demianw Feb 9, 2021
1640990
WIP
demianw Feb 9, 2021
ddf1551
Adding missing import
jonasrenault Feb 11, 2021
5c13175
Rewriting assign method to use rename as much as possible
jonasrenault Feb 12, 2021
f1a5646
Merge branch 'various_changes' of https://github.com/NeuroLang/dask-s…
demianw Feb 12, 2021
391acbf
Rewrite inner-join to merge on already existing columns instead of ad…
jonasrenault Feb 12, 2021
1662031
Merge branch 'various_changes' of https://github.com/NeuroLang/dask-s…
demianw Feb 12, 2021
3feaa01
Commenting the inplace join method which id added as it seems to not …
jonasrenault Mar 2, 2021
4cfd39c
Trying to make aggregations work on multi col params. Untested
jonasrenault Mar 9, 2021
905b62d
Merge branch 'various_changes' of https://github.com/NeuroLang/dask-s…
demianw Mar 9, 2021
be11cd6
Better and cleaner rule management in Calcite app
jonasrenault Mar 9, 2021
4739ccf
Merge branch 'various_changes' of https://github.com/NeuroLang/dask-s…
demianw Mar 9, 2021
48be680
Optimizations and extension of join implementation
demianw Mar 10, 2021
6b7b78e
Improved log and added one more project rule
demianw Mar 10, 2021
98a3a9d
Cleaning up rules for optimizer
jonasrenault Mar 11, 2021
6e35be5
adding a Materializer class to add RelOptMaterializations to the planner
jonasrenault Mar 16, 2021
70b922f
Managing materialized views
jonasrenault Mar 16, 2021
6269fbc
Added a hepplanner for materializedview optimisation
jonasrenault Mar 16, 2021
465fa03
Adding a rule to manage filter on join expressions
jonasrenault Mar 17, 2021
2e7f79e
Changing behaviour of aggregate group_by to get it to work with multi…
jonasrenault Mar 17, 2021
9b73509
Adding return_type info to aggregate apply calls
jonasrenault Mar 19, 2021
e54e406
Rewriting of aggregate to handle both GroupBy-aggregations and GroupB…
jonasrenault Mar 19, 2021
5865355
Added a rule to push filter expr out of join conditions
jonasrenault Mar 24, 2021
6285992
Remove JOIN_PUSH_TRANSITIVE_PREDICATES rule
jonasrenault Mar 25, 2021
8798026
Raise an assertionerror when join condition is a constant to treat it…
jonasrenault Apr 6, 2021
01427b6
Remove Project_filter_transpose rule that was causing some errors in …
jonasrenault Apr 13, 2021
f8e05d8
Refactor aggregate to call drop_duplicate when we're doing a groupby …
jonasrenault Apr 14, 2021
4dc898e
Remove Project_join_transpose rules that were causing issues in one …
jonasrenault Apr 15, 2021
d01dbd5
Add missing dependency for distributed
jonasrenault Jun 21, 2021
31e475d
Update Calcite version to 1.27
jonasrenault Sep 7, 2021
64b8a8c
Update pandas version
jonasrenault Oct 4, 2021
e78363c
Add a persist call when applying multiple aggregates so that they are…
jonasrenault Oct 27, 2021
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ jobs:
run: |
# explicitly install docker, fugue and sqlalchemy package
conda install sqlalchemy psycopg2 -c conda-forge
pip install docker fugue
pip install docker "fugue<=0.5.0"
if: matrix.os == 'ubuntu-latest'
- name: Install Java (again) and test with pytest
shell: bash -l {0}
Expand Down
31 changes: 25 additions & 6 deletions dask_sql/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ def __init__(self):
"""
# Storage for the registered tables
self.tables = {}
# Storage for the registered views
self.views = {}
# Storage for the registered functions
self.functions: Dict[str, Callable] = {}
self.function_list: List[FunctionDescription] = []
Expand All @@ -86,6 +88,8 @@ def __init__(self):
RelConverter.add_plugin_class(logical.LogicalSortPlugin, replace=False)
RelConverter.add_plugin_class(logical.LogicalTableScanPlugin, replace=False)
RelConverter.add_plugin_class(logical.LogicalUnionPlugin, replace=False)
RelConverter.add_plugin_class(logical.LogicalIntersectPlugin, replace=False)
RelConverter.add_plugin_class(logical.LogicalMinusPlugin, replace=False)
RelConverter.add_plugin_class(logical.LogicalValuesPlugin, replace=False)
RelConverter.add_plugin_class(logical.SamplePlugin, replace=False)
RelConverter.add_plugin_class(custom.AnalyzeTablePlugin, replace=False)
Expand Down Expand Up @@ -116,6 +120,7 @@ def create_table(
input_table: InputType,
format: str = None,
persist: bool = True,
sql: str = None,
**kwargs,
):
"""
Expand Down Expand Up @@ -191,6 +196,8 @@ def create_table(
**kwargs,
)
self.tables[table_name.lower()] = dc
if sql is not None:
self.views[table_name.lower()] = sql

def register_dask_table(self, df: dd.DataFrame, name: str):
"""
Expand Down Expand Up @@ -448,11 +455,17 @@ def _prepare_schema(self):
logger.warning("No tables are registered.")

for name, dc in self.tables.items():
table = DaskTable(name)
df = dc.df
logger.debug(
f"Adding table '{name}' to schema with columns: {list(df.columns)}"
)
if name in self.views:
table = DaskTable(name, self.views[name])
logger.debug(
f"Adding materialied table '{name}' to schema with columns: {list(df.columns)}"
)
else:
table = DaskTable(name)
logger.debug(
f"Adding table '{name}' to schema with columns: {list(df.columns)}"
)
for column in df.columns:
data_type = df[column].dtype
sql_data_type = python_to_sql_type(data_type)
Expand Down Expand Up @@ -513,8 +526,14 @@ def _get_ral(self, sql):
else:
validatedSqlNode = generator.getValidatedNode(sqlNode)
nonOptimizedRelNode = generator.getRelationalAlgebra(validatedSqlNode)
rel_string_non_op = str(generator.getRelationalAlgebraString(nonOptimizedRelNode))
rel_non_op_count = rel_string_non_op.count('\n')
rel = generator.getOptimizedRelationalAlgebra(nonOptimizedRelNode)
rel_string = str(generator.getRelationalAlgebraString(rel))
logger.debug(
f"Non optimised query plan: {rel_non_op_count} ops\n "
f"{rel_string_non_op}"
)
except (ValidationException, SqlParseException) as e:
logger.debug(f"Original exception raised by Java:\n {e}")
# We do not want to re-raise an exception here
Expand Down Expand Up @@ -544,8 +563,8 @@ def _get_ral(self, sql):
"Not extracting output column names as the SQL is not a SELECT call"
)
select_names = None

logger.debug(f"Extracted relational algebra:\n {rel_string}")
br = '\n'
logger.debug(f"Extracted relational algebra {rel_string.count(br)} ops:\n {rel_string}")
return rel, select_names, rel_string

def _to_sql_string(self, s: "org.apache.calcite.sql.SqlNode", default_dialect=None):
Expand Down
32 changes: 23 additions & 9 deletions dask_sql/datacontainer.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ def _copy(self) -> ColumnContainer:
Internal function to copy this container
"""
return ColumnContainer(
self._frontend_columns.copy(), self._frontend_backend_mapping.copy()
self._frontend_columns.copy(),
self._frontend_backend_mapping.copy(),
)

def limit_to(self, fields: List[str]) -> ColumnContainer:
Expand Down Expand Up @@ -137,7 +138,9 @@ def make_unique(self, prefix="col"):
where <number> is the column index.
"""
return self.rename(
columns={str(col): f"{prefix}_{i}" for i, col in enumerate(self.columns)}
columns={
str(col): f"{prefix}_{i}" for i, col in enumerate(self.columns)
}
)


Expand Down Expand Up @@ -166,11 +169,22 @@ def assign(self) -> dd.DataFrame:
a dataframe which has the the columns specified in the
stored ColumnContainer.
"""
df = self.df.assign(
**{
col_from: self.df[col_to]
for col_from, col_to in self.column_container.mapping()
if col_from in self.column_container.columns
}
)
# We rename as many cols as possible because renaming is much more
# efficient than assigning.

renames = {}
assigns = {}
for col_from, col_to in self.column_container.mapping():
if col_from in self.column_container.columns:
if (
len(renames) < len(self.df.columns)
and col_to not in renames
and (col_from not in self.df.columns or col_from == col_to)
):
renames[col_to] = col_from
else:
assigns[col_from] = self.df[col_to]
df = self.df.rename(columns=renames)
if len(assigns) > 0:
df = df.assign(**assigns)
return df[self.column_container.columns]
5 changes: 4 additions & 1 deletion dask_sql/physical/rel/convert.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import logging
import time

import dask.dataframe as dd

Expand Down Expand Up @@ -53,6 +54,8 @@ def convert(
logger.debug(
f"Processing REL {rel} using {plugin_instance.__class__.__name__}..."
)
start_time = time.perf_counter()
df = plugin_instance.convert(rel, context=context)
logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)}")
elapsed_time = time.perf_counter() - start_time
logger.debug(f"Processed REL {rel} into {LoggableDataFrame(df)} ({elapsed_time}s)")
return df
4 changes: 4 additions & 0 deletions dask_sql/physical/rel/logical/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
from .sort import LogicalSortPlugin
from .table_scan import LogicalTableScanPlugin
from .union import LogicalUnionPlugin
from .intersect import LogicalIntersectPlugin
from .minus import LogicalMinusPlugin
from .values import LogicalValuesPlugin

__all__ = [
Expand All @@ -16,6 +18,8 @@
LogicalSortPlugin,
LogicalTableScanPlugin,
LogicalUnionPlugin,
LogicalIntersectPlugin,
LogicalMinusPlugin,
LogicalValuesPlugin,
SamplePlugin,
]
Loading