Skip to content

Commit

Permalink
fix: optimize tables/schema operations (#57)
Browse files Browse the repository at this point in the history
Closes #29

- disable altering for now until we optimize that better. This was also
done in
[target-postgres](https://github.com/MeltanoLabs/target-postgres/blob/bfb80d328076acc440ddc98f3f9b14ecb381d0ce/target_postgres/connector.py#L19).
I opened #58 to
track adding support back in once it efficient.
- caches table columns and schemas within a sink object. If a schema or
key properties change for a stream then a new sink object is created so
this is safe to cache and does not need to be invalidated.
- I like the `prepare_schema` method was used with `IF NOT EXISTS` as a
quick way to avoid these weird column casing and reserved word errors.
Its slow to let it constantly retry creating so I removed it by fixing
the `schema_exists` logic to not accidentally try to recreate it again.
This required schema names to be conformed before passing to the
`prepare_schema` method.
- UPDATE: there was an issue with the way schema messages were processed
that caused github streams to constantly re-initialize new sinks and
drain old ones. For the Squared CI test it was doing this 150+ times for
a 1 day set of data. I put in a fix here by overriding `get_sink` to
remove `_sdc_` columns before comparing because the existing sink schema
has already been post processed at that point and the new incoming
schema has not. This should be pushed down to the SDK but for now this
worked to get my CI tests down to the original transferwise timing. cc
@kgpayne
  • Loading branch information
pnadolny13 authored Jun 14, 2023
1 parent 13cec6e commit 5693b6f
Show file tree
Hide file tree
Showing 3 changed files with 117 additions and 12 deletions.
52 changes: 42 additions & 10 deletions target_snowflake/connector.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from operator import contains, eq
from typing import Sequence, Tuple, cast
from typing import Sequence, Tuple, cast, Union, List, Dict

import snowflake.sqlalchemy.custom_types as sct
import sqlalchemy
Expand Down Expand Up @@ -42,10 +42,37 @@ class SnowflakeConnector(SQLConnector):

allow_column_add: bool = True # Whether ADD COLUMN is supported.
allow_column_rename: bool = True # Whether RENAME COLUMN is supported.
allow_column_alter: bool = True # Whether altering column types is supported.
allow_column_alter: bool = False # Whether altering column types is supported.
allow_merge_upsert: bool = False # Whether MERGE UPSERT is supported.
allow_temp_tables: bool = True # Whether temp tables are supported.

def __init__(self, *args, **kwargs) -> None:
self.table_cache: dict = {}
self.schema_cache: dict = {}
super().__init__(*args, **kwargs)

def get_table_columns(
self,
full_table_name: str,
column_names: Union[List[str], None] = None,
) -> Dict[str, sqlalchemy.Column]:
"""Return a list of table columns.
Args:
full_table_name: Fully qualified table name.
column_names: A list of column names to filter to.
Returns:
An ordered list of column objects.
"""
# Cache these columns because they're frequently used.
if full_table_name in self.table_cache:
return self.table_cache[full_table_name]
else:
parsed_columns = super().get_table_columns(full_table_name, column_names)
self.table_cache[full_table_name] = parsed_columns
return parsed_columns

def get_sqlalchemy_url(self, config: dict) -> str:
"""Generates a SQLAlchemy URL for Snowflake.
Expand Down Expand Up @@ -189,14 +216,19 @@ def to_sql_type(jsonschema_type: dict) -> sqlalchemy.types.TypeEngine:

return cast(sqlalchemy.types.TypeEngine, target_type)

def prepare_schema(self, schema_name: str) -> None:
"""Create the target database schema.
Args:
schema_name: The target schema name.
"""
with self._connect() as conn:
conn.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_name}")
def schema_exists(self, schema_name: str) -> bool:
if schema_name in self.schema_cache:
return True
else:
schema_names = sqlalchemy.inspect(self._engine).get_schema_names()
self.schema_cache = schema_names
formatter = SnowflakeIdentifierPreparer(SnowflakeDialect())
# Make quoted schema names upper case because we create them that way
# and the metadata that SQLAlchemy returns is case insensitive only for non-quoted
# schema names so these will look like they dont exist yet.
if '"' in formatter.format_collation(schema_name):
schema_name = schema_name.upper()
return schema_name in schema_names

# Custom SQL get methods

Expand Down
28 changes: 27 additions & 1 deletion target_snowflake/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,27 @@ def database_name(self) -> t.Optional[str]:
def table_name(self) -> str:
return super().table_name.upper()

def setup(self) -> None:
"""Set up Sink.
This method is called on Sink creation, and creates the required Schema and
Table entities in the target database.
"""
if self.schema_name:
# Needed to conform schema name
self.connector.prepare_schema(
self.conform_name(
self.schema_name,
object_type="schema"
),
)
self.connector.prepare_table(
full_table_name=self.full_table_name,
schema=self.conform_schema(self.schema),
primary_keys=self.key_properties,
as_temp_table=False,
)

def conform_name(
self,
name: str,
Expand Down Expand Up @@ -151,7 +172,12 @@ def insert_batch_files_via_internal_stage(
sync_id = f"{self.stream_name}-{uuid4()}"
file_format = f'{self.database_name}.{self.schema_name}."{sync_id}"'
self.connector.put_batches_to_stage(sync_id=sync_id, files=files)
self.connector.prepare_schema(schema_name=self.schema_name)
self.connector.prepare_schema(
self.conform_name(
self.schema_name,
object_type="schema"
),
)
self.connector.create_file_format(file_format=file_format)

if self.key_properties:
Expand Down
49 changes: 48 additions & 1 deletion target_snowflake/target.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from singer_sdk.target_base import SQLTarget

from target_snowflake.sinks import SnowflakeSink

from singer_sdk.sinks import Sink

class TargetSnowflake(SQLTarget):
"""Target for Snowflake."""
Expand Down Expand Up @@ -69,6 +69,53 @@ class TargetSnowflake(SQLTarget):

default_sink_class = SnowflakeSink

def get_sink(
self,
stream_name: str,
*,
record: dict | None = None,
schema: dict | None = None,
key_properties: list[str] | None = None,
) -> Sink:
_ = record # Custom implementations may use record in sink selection.
if schema is None:
self._assert_sink_exists(stream_name)
return self._sinks_active[stream_name]

existing_sink = self._sinks_active.get(stream_name, None)
if not existing_sink:
return self.add_sink(stream_name, schema, key_properties)

# Diffing was not accounting for metadata columns added by the target.
# The existing schema has the columns but the original from the SCHEMA
# message does not.
clean_existing_schema = existing_sink.schema.copy()
if self.config.get("add_record_metadata", True):
existing_props_copy = existing_sink.schema["properties"].copy()
for col in {
"_sdc_extracted_at",
"_sdc_received_at",
"_sdc_batched_at",
"_sdc_deleted_at",
"_sdc_sequence",
"_sdc_table_version",
}:
existing_props_copy.pop(col, None)
clean_existing_schema["properties"] = existing_props_copy
if (
clean_existing_schema != schema
or existing_sink.key_properties != key_properties
):
self.logger.info(
"Schema or key properties for '%s' stream have changed. "
"Initializing a new '%s' sink...",
stream_name,
stream_name,
)
self._sinks_to_clear.append(self._sinks_active.pop(stream_name))
return self.add_sink(stream_name, schema, key_properties)

return existing_sink

if __name__ == "__main__":
TargetSnowflake.cli()

0 comments on commit 5693b6f

Please sign in to comment.