Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(sql): make compilers usable with a base install #9766

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
4748dd7
refactor(sql): make compilers usable with a base install
cpcloud Aug 4, 2024
e19f239
chore: fix geospatial casting
cpcloud Aug 4, 2024
dcba80c
chore: bigquery fixup
cpcloud Aug 4, 2024
de04b33
chore: clean up udfs
cpcloud Aug 4, 2024
e61f764
chore: move udf impls to compilers and out of backends
cpcloud Aug 4, 2024
ce302ce
chore: move udf impls to compilers and out of backends
cpcloud Aug 4, 2024
d75de4d
chore: more udf compilation churn
cpcloud Aug 4, 2024
ca2c7d0
chore: fix pandas/polars/dask
cpcloud Aug 4, 2024
dce7e6a
chore: fix sql compilers
cpcloud Aug 4, 2024
032bb80
chore: more udf churn
cpcloud Aug 4, 2024
78291fa
chore(mssql): simplify non-finite value access
cpcloud Aug 5, 2024
5295c20
chore(mssql): silence linter warnings about unused kwargs
cpcloud Aug 5, 2024
509035b
chore(mssql): call the right method
cpcloud Aug 5, 2024
8ecf511
chore: remove dead `_to_sqlglot`
cpcloud Aug 5, 2024
65f483e
chore: clean up signature of `to_sqlglot`
cpcloud Aug 5, 2024
4cf502a
chore: rename methods on flink backend
cpcloud Aug 5, 2024
cd73de1
chore: clean up error messages for registering udfs
cpcloud Aug 5, 2024
8d0cbc0
chore(snowflake): support python 3.11
cpcloud Aug 5, 2024
3ecc093
chore(bigquery): use the correct property name
cpcloud Aug 5, 2024
eb11df1
tests(bigquery): remove useless snapshot tests from system tests
cpcloud Aug 5, 2024
4c4dd58
chore: remove useless kwargs from to_sqlglot
cpcloud Aug 5, 2024
d5892c8
chore: avoid passing execution-related arguments into compile
cpcloud Aug 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions ibis/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ def load_backend(name: str) -> BaseBackend:
# - compile
# - has_operation
# - _from_url
# - _to_sqlglot
#
# We also copy over the docstring from `do_connect` to the proxy `connect`
# method, since that's where all the backend-specific kwargs are currently
Expand All @@ -121,7 +120,7 @@ def connect(*args, **kwargs):
proxy.has_operation = backend.has_operation
proxy.name = name
proxy._from_url = backend._from_url
proxy._to_sqlglot = backend._to_sqlglot

# Add any additional methods that should be exposed at the top level
for attr in getattr(backend, "_top_level_methods", ()):
setattr(proxy, attr, getattr(backend, attr))
Expand Down
13 changes: 0 additions & 13 deletions ibis/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1032,14 +1032,9 @@ def _register_in_memory_table(self, op: ops.InMemoryTable):

def _run_pre_execute_hooks(self, expr: ir.Expr) -> None:
"""Backend-specific hooks to run before an expression is executed."""
self._define_udf_translation_rules(expr)
self._register_udfs(expr)
self._register_in_memory_tables(expr)

def _define_udf_translation_rules(self, expr: ir.Expr):
if self.supports_python_udfs:
raise NotImplementedError(self.name)

def compile(
self,
expr: ir.Expr,
Expand All @@ -1048,14 +1043,6 @@ def compile(
"""Compile an expression."""
return self.compiler.to_sql(expr, params=params)

def _to_sqlglot(self, expr: ir.Expr, **kwargs) -> sg.exp.Expression:
"""Convert an Ibis expression to a sqlglot expression.

Called by `ibis.to_sql`; gives the backend an opportunity to generate
nicer SQL for human consumption.
"""
raise NotImplementedError(f"Backend '{self.name}' backend doesn't support SQL")

def execute(self, expr: ir.Expr) -> Any:
"""Execute an expression."""

Expand Down
157 changes: 19 additions & 138 deletions ibis/backends/bigquery/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from pydata_google_auth import cache

import ibis
import ibis.backends.sql.compilers as sc
import ibis.common.exceptions as com
import ibis.expr.operations as ops
import ibis.expr.schema as sch
Expand All @@ -32,9 +33,7 @@
schema_from_bigquery_table,
)
from ibis.backends.bigquery.datatypes import BigQuerySchema
from ibis.backends.bigquery.udf.core import PythonToJavaScriptTranslator
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers import BigQueryCompiler
from ibis.backends.sql.datatypes import BigQueryType

if TYPE_CHECKING:
Expand Down Expand Up @@ -150,7 +149,7 @@ def _force_quote_table(table: sge.Table) -> sge.Table:

class Backend(SQLBackend, CanCreateDatabase, CanCreateSchema):
name = "bigquery"
compiler = BigQueryCompiler()
compiler = sc.bigquery.compiler
supports_in_memory_tables = True
supports_python_udfs = False

Expand Down Expand Up @@ -652,68 +651,6 @@ def _get_schema_using_query(self, query: str) -> sch.Schema:
)
return BigQuerySchema.to_ibis(job.schema)

def _to_sqlglot(
self,
expr: ir.Expr,
limit: str | None = None,
params: Mapping[ir.Expr, Any] | None = None,
**kwargs,
) -> Any:
"""Compile an Ibis expression.

Parameters
----------
expr
Ibis expression
limit
For expressions yielding result sets; retrieve at most this number
of values/rows. Overrides any limit already set on the expression.
params
Named unbound parameters
kwargs
Keyword arguments passed to the compiler

Returns
-------
Any
The output of compilation. The type of this value depends on the
backend.

"""
self._define_udf_translation_rules(expr)
sql = super()._to_sqlglot(expr, limit=limit, params=params, **kwargs)

table_expr = expr.as_table()
geocols = [
name for name, typ in table_expr.schema().items() if typ.is_geospatial()
]

query = sql.transform(
_qualify_memtable,
dataset=getattr(self._session_dataset, "dataset_id", None),
project=getattr(self._session_dataset, "project", None),
).transform(_remove_null_ordering_from_unsupported_window)

if not geocols:
return query

# if there are any geospatial columns, we have to convert them to WKB,
# so interactive mode knows how to display them
#
# by default bigquery returns data to python as WKT, and there's really
# no point in supporting both if we don't need to.
compiler = self.compiler
quoted = compiler.quoted
f = compiler.f
return sg.select(
sge.Star(
replace=[
f.st_asbinary(sg.column(col, quoted=quoted)).as_(col, quoted=quoted)
for col in geocols
]
)
).from_(query.subquery())

def raw_sql(self, query: str, params=None, page_size: int | None = None):
query_parameters = [
bigquery_param(
Expand Down Expand Up @@ -747,19 +684,25 @@ def current_database(self) -> str | None:
return self.dataset

def compile(
self, expr: ir.Expr, limit: str | None = None, params=None, **kwargs: Any
self,
expr: ir.Expr,
limit: str | None = None,
params=None,
pretty: bool = True,
**kwargs: Any,
):
"""Compile an Ibis expression to a SQL string."""
query = self._to_sqlglot(expr, limit=limit, params=params, **kwargs)
udf_sources = []
for udf_node in expr.op().find(ops.ScalarUDF):
compile_func = getattr(
self, f"_compile_{udf_node.__input_type__.name.lower()}_udf"
)
if sql := compile_func(udf_node):
udf_sources.append(sql.sql(self.name, pretty=True))

sql = ";\n".join([*udf_sources, query.sql(dialect=self.name, pretty=True)])
session_dataset = self._session_dataset
query = self.compiler.to_sqlglot(
expr,
limit=limit,
params=params,
session_dataset_id=getattr(session_dataset, "dataset_id", None),
session_project=getattr(session_dataset, "project", None),
**kwargs,
)
queries = query if isinstance(query, list) else [query]
sql = ";\n".join(query.sql(self.dialect, pretty=pretty) for query in queries)
self._log(sql)
return sql

Expand Down Expand Up @@ -1202,68 +1145,6 @@ def _clean_up_cached_table(self, name):
force=True,
)

def _get_udf_source(self, udf_node: ops.ScalarUDF):
name = type(udf_node).__name__
type_mapper = self.compiler.udf_type_mapper

body = PythonToJavaScriptTranslator(udf_node.__func__).compile()
config = udf_node.__config__
libraries = config.get("libraries", [])

signature = [
sge.ColumnDef(
this=sg.to_identifier(name, quoted=self.compiler.quoted),
kind=type_mapper.from_ibis(param.annotation.pattern.dtype),
)
for name, param in udf_node.__signature__.parameters.items()
]

lines = ['"""']

if config.get("strict", True):
lines.append('"use strict";')

lines += [
body,
"",
f"return {udf_node.__func_name__}({', '.join(udf_node.argnames)});",
'"""',
]

func = sge.Create(
kind="FUNCTION",
this=sge.UserDefinedFunction(
this=sg.to_identifier(name), expressions=signature, wrapped=True
),
# not exactly what I had in mind, but it works
#
# quoting is too simplistic to handle multiline strings
expression=sge.Var(this="\n".join(lines)),
exists=False,
properties=sge.Properties(
expressions=[
sge.TemporaryProperty(),
sge.ReturnsProperty(this=type_mapper.from_ibis(udf_node.dtype)),
sge.StabilityProperty(
this="IMMUTABLE" if config.get("determinism") else "VOLATILE"
),
sge.LanguageProperty(this=sg.to_identifier("js")),
]
+ [
sge.Property(
this=sg.to_identifier("library"),
value=self.compiler.f.array(*libraries),
)
]
* bool(libraries)
),
)

return func

def _compile_python_udf(self, udf_node: ops.ScalarUDF) -> None:
return self._get_udf_source(udf_node)

def _register_udfs(self, expr: ir.Expr) -> None:
"""No op because UDFs made with CREATE TEMPORARY FUNCTION must be followed by a query."""

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

15 changes: 1 addition & 14 deletions ibis/backends/bigquery/tests/system/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -199,11 +199,9 @@ def test_parted_column(con, kind):
assert t.columns == [expected_column, "string_col", "int_col"]


def test_cross_project_query(public, snapshot):
def test_cross_project_query(public):
table = public.table("posts_questions")
expr = table[table.tags.contains("ibis")][["title", "tags"]]
result = expr.compile()
snapshot.assert_match(result, "out.sql")
n = 5
df = expr.limit(n).execute()
assert len(df) == n
Expand All @@ -226,17 +224,6 @@ def test_exists_table_different_project(con):
assert "foobar" not in con.list_tables(database=dataset)


def test_multiple_project_queries(con, snapshot):
cpcloud marked this conversation as resolved.
Show resolved Hide resolved
so = con.table(
"posts_questions",
database=("bigquery-public-data", "stackoverflow"),
)
trips = con.table("trips", database="nyc-tlc.yellow")
join = so.join(trips, so.tags == trips.rate_code)[[so.title]]
result = join.compile()
snapshot.assert_match(result, "out.sql")


def test_multiple_project_queries_execute(con):
posts_questions = con.table(
"posts_questions", database="bigquery-public-data.stackoverflow"
Expand Down
5 changes: 4 additions & 1 deletion ibis/backends/bigquery/tests/unit/udf/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,10 @@

import pytest

from ibis.backends.bigquery.udf.core import PythonToJavaScriptTranslator, SymbolTable
from ibis.backends.sql.compilers.bigquery.udf.core import (
PythonToJavaScriptTranslator,
SymbolTable,
)


def test_symbol_table():
Expand Down
2 changes: 1 addition & 1 deletion ibis/backends/bigquery/tests/unit/udf/test_find.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import ast

from ibis.backends.bigquery.udf.find import find_names
from ibis.backends.sql.compilers.bigquery.udf.find import find_names
from ibis.util import is_iterable


Expand Down
8 changes: 4 additions & 4 deletions ibis/backends/clickhouse/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from clickhouse_connect.driver.external import ExternalData

import ibis
import ibis.backends.sql.compilers as sc
import ibis.common.exceptions as com
import ibis.config
import ibis.expr.operations as ops
Expand All @@ -26,7 +27,6 @@
from ibis.backends import BaseBackend, CanCreateDatabase
from ibis.backends.clickhouse.converter import ClickHousePandasData
from ibis.backends.sql import SQLBackend
from ibis.backends.sql.compilers import ClickHouseCompiler
from ibis.backends.sql.compilers.base import C

if TYPE_CHECKING:
Expand All @@ -44,7 +44,7 @@ def _to_memtable(v):

class Backend(SQLBackend, CanCreateDatabase):
name = "clickhouse"
compiler = ClickHouseCompiler()
compiler = sc.clickhouse.compiler

# ClickHouse itself does, but the client driver does not
supports_temporary_tables = False
Expand Down Expand Up @@ -732,7 +732,7 @@ def create_table(
expression = None

if obj is not None:
expression = self._to_sqlglot(obj)
expression = self.compiler.to_sqlglot(obj)
external_tables.update(self._collect_in_memory_tables(obj))

code = sge.Create(
Expand All @@ -759,7 +759,7 @@ def create_view(
database: str | None = None,
overwrite: bool = False,
) -> ir.Table:
expression = self._to_sqlglot(obj)
expression = self.compiler.to_sqlglot(obj)
src = sge.Create(
this=sg.table(name, db=database),
kind="VIEW",
Expand Down
Loading
Loading