Skip to content

Commit

Permalink
Add publish external snowflake table function (#410)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeppe742 authored May 7, 2024
1 parent c991799 commit 2ab19a6
Show file tree
Hide file tree
Showing 2 changed files with 208 additions and 5 deletions.
107 changes: 102 additions & 5 deletions adapta/storage/database/snowflake_sql.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
"""
Snowflake Client Wrapper
"""

import os
from types import TracebackType
from typing import Optional
from typing import List, Optional

from pandas import DataFrame
import snowflake.connector
import pyarrow

from snowflake.connector.errors import DatabaseError, ProgrammingError

from adapta.logs.models import LogLevel
from adapta.logs import SemanticLogger

from adapta.storage.models.azure import AdlsGen2Path


class SnowflakeClient:
"""
Expand Down Expand Up @@ -62,9 +67,9 @@ def __enter__(self) -> Optional["SnowflakeClient"]:

def __exit__(
self,
exc_type: type[BaseException] | None,
exc_val: BaseException | None,
exc_tb: TracebackType | None,
exc_type: Optional[type[BaseException]] = None,
exc_val: Optional[BaseException] = None,
exc_tb: Optional[TracebackType] = None,
) -> None:
"""
Exits the context manager and closes the database connection.
Expand All @@ -77,7 +82,7 @@ def __exit__(
if exc_val is not None:
self._logger.error(f"An error occurred while closing the database connection: {exc_val}")

def query(self, query: str) -> DataFrame | None:
def query(self, query: str) -> Optional[DataFrame]:
"""
Executes the given SQL query and returns the result as a Pandas DataFrame.
Expand All @@ -91,3 +96,95 @@ def query(self, query: str) -> DataFrame | None:
except ProgrammingError as ex:
self._logger.error("Error executing query {query}", query=query, exception=ex)
return None

def _get_snowflake_type(self, data_type: pyarrow.DataType) -> str:
"""Maps pyarrow type to Snowflake type"""

type_map = {
pyarrow.types.is_string: "TEXT",
pyarrow.types.is_integer: "INTEGER",
pyarrow.types.is_floating: "FLOAT",
pyarrow.types.is_timestamp: "TIMESTAMP_NTZ",
pyarrow.types.is_date: "DATE",
pyarrow.types.is_struct: "VARIANT",
pyarrow.types.is_map: "VARIANT",
pyarrow.types.is_list: "VARIANT",
pyarrow.types.is_boolean: "BOOLEAN",
pyarrow.types.is_binary: "BINARY",
}

for type_checker, snowflake_type_name in type_map.items():
if type_checker(data_type):
return snowflake_type_name

if pyarrow.types.is_decimal(data_type):
return f"DECIMAL({data_type.precision},{data_type.scale})"

raise ValueError(f"found type:{data_type} which is currently not supported")

def publish_external_delta_table(
self,
database: str,
schema: str,
table: str,
path: AdlsGen2Path,
table_schema: pyarrow.Schema,
partition_columns: Optional[List[str]] = None,
storage_integration: Optional[str] = None,
) -> None:
"""
Creates delta table as external table in Snowflake
:param database: name of the database, in Snowflake, to create the table
:param schema: name of the schema, in Snowflake, to create the table
:param table: name of the table to be created in Snowflake
:param path: path to the delta table in datalake
:param storage_integration: name of the storage integration to use in Snowflake. Default to the name of the storage account
"""

self.query(f"create schema if not exists {database}.{schema}")

self.query(
f"""create stage if not exists {database}.{schema}.stage_{table}
storage_integration = {storage_integration if storage_integration is not None else path.account}
url = azure://{path.account}.blob.core.windows.net/{path.container}/{path.path};"""
)

if partition_columns is not None:
partition_expr = ",".join(partition_columns)
partition_select = [
f"\"{partition_column}\" TEXT AS (split_part(split_part(metadata$filename, '=', {2 + i}), '/', 1))"
for i, partition_column in enumerate(partition_columns)
]
else:
partition_expr = ""
partition_select = []
partition_columns = []

snowflake_columns = [
(column.name, self._get_snowflake_type(column.type))
for column in table_schema
if column.name not in partition_columns
]

columns = [
f'"{column}" {col_type} AS ($1:"{column}"::{col_type})' for column, col_type in snowflake_columns
] + partition_select

column_expr = ("," + os.linesep).join(columns)

self.query(
f"""
create or replace external table "{database}"."{schema}"."{table}"
(
{column_expr}
)
{f"partition by ({partition_expr})" if partition_expr else ""}
location={database}.{schema}.stage_{table}
auto_refresh = false
refresh_on_create=false
file_format = (type = parquet)
table_format = delta;"""
)

self.query(f'alter external table "{database}"."{schema}"."{table}" refresh;')
106 changes: 106 additions & 0 deletions tests/test_snowflake.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
# Copyright (c) 2023-2024. ECCO Sneaks & Data
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
import pathlib
from unittest.mock import patch, MagicMock

from deltalake import DeltaTable

from adapta.storage.database.snowflake_sql import SnowflakeClient

from adapta.storage.models.azure import AdlsGen2Path


@patch("adapta.storage.database.snowflake_sql.SnowflakeClient.query")
def test_publish_external_delta_table(
mock_query: MagicMock,
):
test_data_path = f"{pathlib.Path(__file__).parent.resolve()}/delta_table"
snowflake_client = SnowflakeClient(user="", account="", warehouse="")
path = AdlsGen2Path.from_hdfs_path("abfss://[email protected]/test_schema/test_table")
delta_table = DeltaTable(
f"{test_data_path}",
)
snowflake_client.publish_external_delta_table(
database="test_database",
schema="test_schema",
table="test_table",
path=path,
table_schema=delta_table.schema().to_pyarrow(),
)

mock_query.assert_any_call("create schema if not exists test_database.test_schema")
mock_query.assert_any_call(
"""create stage if not exists test_database.test_schema.stage_test_table
storage_integration = account
url = azure://account.blob.core.windows.net/container/test_schema/test_table;"""
)
mock_query.assert_any_call(
"""
create or replace external table "test_database"."test_schema"."test_table"
(
"A" TEXT AS ($1:"A"::TEXT),
"B" TEXT AS ($1:"B"::TEXT)
)
location=test_database.test_schema.stage_test_table
auto_refresh = false
refresh_on_create=false
file_format = (type = parquet)
table_format = delta;"""
)
mock_query.assert_any_call('alter external table "test_database"."test_schema"."test_table" refresh;')


@patch("adapta.storage.database.snowflake_sql.SnowflakeClient.query")
def test_publish_external_delta_table_partitioned(
mock_query: MagicMock,
):
test_data_path = f"{pathlib.Path(__file__).parent.resolve()}/delta_table_with_partitions"
snowflake_client = SnowflakeClient(user="", account="", warehouse="")
path = AdlsGen2Path.from_hdfs_path("abfss://[email protected]/test_schema/test_table")
delta_table = DeltaTable(
f"{test_data_path}",
)
snowflake_client.publish_external_delta_table(
database="test_database",
schema="test_schema",
table="test_table",
path=path,
table_schema=delta_table.schema().to_pyarrow(),
partition_columns=["colP"],
)

mock_query.assert_any_call("create schema if not exists test_database.test_schema")
mock_query.assert_any_call(
"""create stage if not exists test_database.test_schema.stage_test_table
storage_integration = account
url = azure://account.blob.core.windows.net/container/test_schema/test_table;"""
)
mock_query.assert_any_call(
"""
create or replace external table "test_database"."test_schema"."test_table"
(
"colA" INTEGER AS ($1:"colA"::INTEGER),
"colB" TEXT AS ($1:"colB"::TEXT),
"colP" TEXT AS (split_part(split_part(metadata$filename, \'=\', 2), \'/\', 1))
)
partition by (colP)
location=test_database.test_schema.stage_test_table
auto_refresh = false
refresh_on_create=false
file_format = (type = parquet)
table_format = delta;"""
)
mock_query.assert_any_call('alter external table "test_database"."test_schema"."test_table" refresh;')

0 comments on commit 2ab19a6

Please sign in to comment.