Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Moved common SQL handler methods of common-sql-provider into dedicated module #43747

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
8978b5d
refactor: Moved handlers out of sql module into dedicated handlers mo…
davidblain-infrabel Nov 6, 2024
a94c988
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 6, 2024
5248212
refactor: Renamed handlers.pyi
davidblain-infrabel Nov 6, 2024
56b7bd3
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 6, 2024
6689ba3
refactor: Added handlers python-module under hooks of provider.yaml
davidblain-infrabel Nov 6, 2024
4100c1b
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 7, 2024
621781e
refactor: Bumped sql common provider to version 1.19.1
davidblain-infrabel Nov 7, 2024
0222e71
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 7, 2024
70ec910
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 7, 2024
711a509
refactor: Use cached connection property instead of get_connection in…
davidblain-infrabel Nov 8, 2024
1dd088e
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 8, 2024
c1a4dbf
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 10, 2024
24b5d22
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 11, 2024
81b863f
Update providers/src/airflow/providers/common/sql/provider.yaml
potiuk Nov 12, 2024
378c2e5
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 12, 2024
c8a945d
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 12, 2024
a932b63
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 12, 2024
ce9a914
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 12, 2024
fc239af
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 12, 2024
39fc423
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 12, 2024
d7325ce
Merge branch 'main' into feature/move-common-sql-handlers-to-dedicate…
dabla Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 73 additions & 0 deletions providers/src/airflow/providers/common/sql/hooks/handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from typing import Iterable


def return_single_query_results(sql: str | Iterable[str], return_last: bool, split_statements: bool):
"""
Determine when results of single query only should be returned.

For compatibility reasons, the behaviour of the DBAPIHook is somewhat confusing.
In some cases, when multiple queries are run, the return value will be an iterable (list) of results
-- one for each query. However, in other cases, when single query is run, the return value will be just
the result of that single query without wrapping the results in a list.

The cases when single query results are returned without wrapping them in a list are as follows:

a) sql is string and ``return_last`` is True (regardless what ``split_statements`` value is)
b) sql is string and ``split_statements`` is False

In all other cases, the results are wrapped in a list, even if there is only one statement to process.
In particular, the return value will be a list of query results in the following circumstances:

a) when ``sql`` is an iterable of string statements (regardless what ``return_last`` value is)
b) when ``sql`` is string, ``split_statements`` is True and ``return_last`` is False

:param sql: sql to run (either string or list of strings)
:param return_last: whether last statement output should only be returned
:param split_statements: whether to split string statements.
:return: True if the hook should return single query results
"""
return isinstance(sql, str) and (return_last or not split_statements)


def fetch_all_handler(cursor) -> list[tuple] | None:
"""Return results for DbApiHook.run()."""
if not hasattr(cursor, "description"):
raise RuntimeError(
"The database we interact with does not support DBAPI 2.0. Use operator and "
"handlers that are specifically designed for your database."
)
if cursor.description is not None:
return cursor.fetchall()
else:
return None


def fetch_one_handler(cursor) -> list[tuple] | None:
"""Return first result for DbApiHook.run()."""
if not hasattr(cursor, "description"):
raise RuntimeError(
"The database we interact with does not support DBAPI 2.0. Use operator and "
"handlers that are specifically designed for your database."
)
if cursor.description is not None:
return cursor.fetchone()
else:
return None
38 changes: 38 additions & 0 deletions providers/src/airflow/providers/common/sql/hooks/handlers.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#
# This is automatically generated stub for the `common.sql` provider
#
# This file is generated automatically by the `update-common-sql-api stubs` pre-commit
# and the .pyi file represents part of the "public" API that the
# `common.sql` provider exposes to other providers.
#
# Any, potentially breaking change in the stubs will require deliberate manual action from the contributor
# making a change to the `common.sql` provider. Those stubs are also used by MyPy automatically when checking
# if only public API of the common.sql provider is used by all the other providers.
#
# You can read more in the README_API.md file
#
"""
Definition of the public interface for airflow.providers.common.sql.hooks.handlers
isort:skip_file
"""
from typing import Iterable

def return_single_query_results(sql: str | Iterable[str], return_last: bool, split_statements: bool): ...
def fetch_all_handler(cursor) -> list[tuple] | None: ...
def fetch_one_handler(cursor) -> list[tuple] | None: ...
60 changes: 17 additions & 43 deletions providers/src/airflow/providers/common/sql/hooks/sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,60 +60,32 @@

T = TypeVar("T")
SQL_PLACEHOLDERS = frozenset({"%s", "?"})
WARNING_MESSAGE = """Import of {} from the 'airflow.providers.common.sql.hooks' module is deprecated and will
be removed in the future. Please import it from 'airflow.providers.common.sql.hooks.handlers'."""


def return_single_query_results(sql: str | Iterable[str], return_last: bool, split_statements: bool):
"""
Determine when results of single query only should be returned.
warnings.warn(WARNING_MESSAGE.format("return_single_query_results"), DeprecationWarning, stacklevel=2)

For compatibility reasons, the behaviour of the DBAPIHook is somewhat confusing.
In some cases, when multiple queries are run, the return value will be an iterable (list) of results
-- one for each query. However, in other cases, when single query is run, the return value will be just
the result of that single query without wrapping the results in a list.
from airflow.providers.common.sql.hooks import handlers

The cases when single query results are returned without wrapping them in a list are as follows:
return handlers.return_single_query_results(sql, return_last, split_statements)

a) sql is string and ``return_last`` is True (regardless what ``split_statements`` value is)
b) sql is string and ``split_statements`` is False

In all other cases, the results are wrapped in a list, even if there is only one statement to process.
In particular, the return value will be a list of query results in the following circumstances:
def fetch_all_handler(cursor) -> list[tuple] | None:
warnings.warn(WARNING_MESSAGE.format("fetch_all_handler"), DeprecationWarning, stacklevel=2)

a) when ``sql`` is an iterable of string statements (regardless what ``return_last`` value is)
b) when ``sql`` is string, ``split_statements`` is True and ``return_last`` is False
from airflow.providers.common.sql.hooks import handlers

:param sql: sql to run (either string or list of strings)
:param return_last: whether last statement output should only be returned
:param split_statements: whether to split string statements.
:return: True if the hook should return single query results
"""
return isinstance(sql, str) and (return_last or not split_statements)
return handlers.fetch_all_handler(cursor)


def fetch_all_handler(cursor) -> list[tuple] | None:
"""Return results for DbApiHook.run()."""
if not hasattr(cursor, "description"):
raise RuntimeError(
"The database we interact with does not support DBAPI 2.0. Use operator and "
"handlers that are specifically designed for your database."
)
if cursor.description is not None:
return cursor.fetchall()
else:
return None
def fetch_one_handler(cursor) -> list[tuple] | None:
warnings.warn(WARNING_MESSAGE.format("fetch_one_handler"), DeprecationWarning, stacklevel=2)

from airflow.providers.common.sql.hooks import handlers

def fetch_one_handler(cursor) -> list[tuple] | None:
"""Return first result for DbApiHook.run()."""
if not hasattr(cursor, "description"):
raise RuntimeError(
"The database we interact with does not support DBAPI 2.0. Use operator and "
"handlers that are specifically designed for your database."
)
if cursor.description is not None:
return cursor.fetchone()
else:
return None
return handlers.fetch_one_handler(cursor)


class ConnectorProtocol(Protocol):
Expand Down Expand Up @@ -191,6 +163,7 @@ def get_conn_id(self) -> str:

@cached_property
def placeholder(self) -> str:
"""Return SQL placeholder."""
placeholder = self.connection_extra.get("placeholder")
if placeholder:
if placeholder in SQL_PLACEHOLDERS:
Expand Down Expand Up @@ -248,8 +221,9 @@ def get_uri(self) -> str:

:return: the extracted uri.
"""
conn = self.get_connection(self.get_conn_id())
conn.schema = self.__schema or conn.schema
conn = self.connection
if self.__schema:
conn.schema = self.__schema
return conn.get_uri()

@property
Expand Down
2 changes: 2 additions & 0 deletions providers/src/airflow/providers/common/sql/provider.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ state: ready
source-date-epoch: 1730012422
# note that those versions are maintained by release manager - do not update them manually
versions:
- 1.19.1
potiuk marked this conversation as resolved.
Show resolved Hide resolved
potiuk marked this conversation as resolved.
Show resolved Hide resolved
- 1.19.0
- 1.18.0
- 1.17.1
Expand Down Expand Up @@ -94,6 +95,7 @@ operators:
hooks:
- integration-name: Common SQL
python-modules:
- airflow.providers.common.sql.hooks.handlers
- airflow.providers.common.sql.hooks.sql

sensors:
Expand Down
57 changes: 57 additions & 0 deletions providers/tests/common/sql/hooks/test_handlers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import annotations

from unittest.mock import MagicMock

from airflow.providers.common.sql.hooks.handlers import (
fetch_all_handler,
fetch_one_handler,
return_single_query_results,
)


class TestHandlers:
def test_return_single_query_results(self):
assert return_single_query_results("SELECT 1", return_last=True, split_statements=False)
assert return_single_query_results("SELECT 1", return_last=False, split_statements=False)
assert return_single_query_results(["SELECT 1"], return_last=True, split_statements=False) is False
assert return_single_query_results(["SELECT 1"], return_last=False, split_statements=False) is False
assert return_single_query_results("SELECT 1", return_last=False, split_statements=True) is False
assert return_single_query_results(["SELECT 1"], return_last=False, split_statements=True) is False
assert return_single_query_results(["SELECT 1"], return_last=True, split_statements=True) is False

def test_fetch_all_handler(self):
cursor = MagicMock()
cursor.description = [("col1", "int"), ("col2", "string")]
cursor.fetchall.return_value = [(1, "hello")]

assert fetch_all_handler(cursor) == [(1, "hello")]

cursor.description = None
assert fetch_all_handler(cursor) is None

def test_fetch_one_handler(self):
cursor = MagicMock()
cursor.description = [("col1", "int")]
cursor.fetchone.return_value = 1

assert fetch_one_handler(cursor) == (1)

cursor.description = None
assert fetch_one_handler(cursor) is None
10 changes: 10 additions & 0 deletions providers/tests/common/sql/hooks/test_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,3 +277,13 @@ def test_placeholder_multiple_times_and_make_sure_connection_is_only_invoked_onc
for _ in range(10):
assert dbapi_hook.placeholder == "%s"
assert dbapi_hook.connection_invocations == 1

@pytest.mark.db_test
def test_uri(self):
dbapi_hook = mock_hook(DbApiHook)
assert dbapi_hook.get_uri() == "//login:password@host:1234/schema"

@pytest.mark.db_test
def test_uri_with_schema(self):
dbapi_hook = mock_hook(DbApiHook, conn_params={"schema": "other_schema"})
assert dbapi_hook.get_uri() == "//login:password@host:1234/other_schema"