Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
Signed-off-by: Anatoly Myachev <[email protected]>
  • Loading branch information
anmyachev committed May 16, 2024
1 parent ccfa3dc commit 4dbe0eb
Show file tree
Hide file tree
Showing 5 changed files with 3,125 additions and 6 deletions.
8 changes: 4 additions & 4 deletions modin/core/execution/dispatching/factories/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def prepare(cls):
_doc_io_method_template,
source="pandas DataFrame",
params="df : pandas.DataFrame",
method="utils.from_pandas",
method="io.from_pandas",
)
def _from_pandas(cls, df):
return cls.io_cls.from_pandas(df)
Expand All @@ -176,7 +176,7 @@ def _from_pandas(cls, df):
_doc_io_method_template,
source="Arrow Table",
params="at : pyarrow.Table",
method="utils.from_arrow",
method="io.from_arrow",
)
def _from_arrow(cls, at):
return cls.io_cls.from_arrow(at)
Expand All @@ -186,7 +186,7 @@ def _from_arrow(cls, at):
_doc_io_method_template,
source="a non-pandas object (dict, list, np.array etc...)",
params=_doc_io_method_all_params,
method="utils.from_non_pandas",
method="io.from_non_pandas",
)
def _from_non_pandas(cls, *args, **kwargs):
return cls.io_cls.from_non_pandas(*args, **kwargs)
Expand All @@ -196,7 +196,7 @@ def _from_non_pandas(cls, *args, **kwargs):
_doc_io_method_template,
source="a DataFrame object supporting exchange protocol `__dataframe__()`",
params=_doc_io_method_all_params,
method="utils.from_dataframe",
method="io.from_dataframe",
)
def _from_dataframe(cls, *args, **kwargs):
return cls.io_cls.from_dataframe(*args, **kwargs)
Expand Down
123 changes: 123 additions & 0 deletions modin/experimental/sql/hdk/query.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
# Licensed to Modin Development Team under one or more contributor license agreements.
# See the NOTICE file distributed with this work for additional information regarding
# copyright ownership. The Modin Development Team licenses this file to you under the
# Apache License, Version 2.0 (the "License"); you may not use this file except in
# compliance with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software distributed under
# the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
# ANY KIND, either express or implied. See the License for the specific language
# governing permissions and limitations under the License.

import pyarrow as pa
from pandas.core.dtypes.common import _get_dtype

import modin.pandas as pd
from modin.experimental.core.execution.native.implementations.hdk_on_native.dataframe.utils import (
ColNameCodec,
)
from modin.experimental.core.execution.native.implementations.hdk_on_native.hdk_worker import (
HdkWorker,
)
from modin.experimental.core.storage_formats.hdk import DFAlgQueryCompiler
from modin.pandas.io import from_arrow


def hdk_query(query: str, **kwargs) -> pd.DataFrame:
"""
Execute SQL queries on the HDK backend.
DataFrames are referenced in the query by names and are
passed to this function as name=value arguments.
Here is an example of a query to three data frames:
ids = [1, 2, 3]
first_names = ["James", "Peter", "Claus"]
last_names = ["Bond", "Pan", "Santa"]
courses_names = ["Mathematics", "Physics", "Geography"]
student = pd.DataFrame({"id": ids, "first_name": first_names, "last_name": last_names})
course = pd.DataFrame({"id": ids, "course_name": courses_names})
student_course = pd.DataFrame({"student_id": ids, "course_id": [3, 2, 1]})
query = '''
SELECT
student.first_name,
student.last_name,
course.course_name
FROM student
JOIN student_course
ON student.id = student_course.student_id
JOIN course
ON course.id = student_course.course_id
ORDER BY
last_name
'''
res = hdk_query(query, student=student, course=course, student_course=student_course)
print(res)
Parameters
----------
query : str
SQL query to be executed.
**kwargs : **dict
DataFrames referenced by the query.
Returns
-------
modin.pandas.DataFrame
Execution result.
"""
if len(kwargs) > 0:
query = _build_query(query, kwargs)
table = HdkWorker().executeDML(query)
df = from_arrow(table.to_arrow())
mdf = df._query_compiler._modin_frame
schema = mdf._partitions[0][0].get().schema
# HDK returns strings as dictionary. For the proper conversion to
# Pandas, we need to replace dtypes of the corresponding columns.
if replace := [
i for i, col in enumerate(schema) if pa.types.is_dictionary(col.type)
]:
dtypes = mdf._dtypes
obj_type = _get_dtype(object)
for i in replace:
dtypes[i] = obj_type
return df


def _build_query(query: str, frames: dict) -> str:
"""
Build query to be executed.
Table and column names are mapped to the real names
using the WITH statement.
Parameters
----------
query : str
SQL query to be processed.
frames : dict
DataFrames referenced by the query.
Returns
-------
str
SQL query to be executed.
"""
alias = []
for name, df in frames.items():
assert isinstance(df._query_compiler, DFAlgQueryCompiler)
mf = df._query_compiler._modin_frame
table = mf.force_import()
alias.append("WITH " if len(alias) == 0 else "\n),\n")
alias.extend((name, " AS (\n", " SELECT\n"))

for i, col in enumerate(table.column_names):
alias.append(" " if i == 0 else ",\n ")
alias.extend(('"', col, '"', " AS ", '"', ColNameCodec.decode(col), '"'))
alias.extend(("\n FROM\n ", table.name))

alias.extend(("\n)\n", query))
return "".join(alias)
Loading

0 comments on commit 4dbe0eb

Please sign in to comment.