Skip to content

Commit

Permalink
refactor: Use jsonschema_to_sql_converter
Browse files Browse the repository at this point in the history
  • Loading branch information
edgarrmondragon committed Nov 28, 2024
1 parent f11a10b commit c378fcc
Showing 1 changed file with 21 additions and 15 deletions.
36 changes: 21 additions & 15 deletions target_postgres/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,24 @@
class JSONSchemaToPostgres(JSONSchemaToSQL):
"""Convert JSON Schema types to Postgres types."""

def __init__(self, *, content_encoding: bool = True) -> None:
def __init__(self, *, content_encoding: bool = True, **kwargs):
"""Initialize the JSONSchemaToPostgres instance."""
super().__init__()
super().__init__(**kwargs)
self.content_encoding = content_encoding

@classmethod
def from_config(
cls: type[JSONSchemaToPostgres],
config: dict,
*,
max_varchar_length: int | None = None,
) -> JSONSchemaToPostgres:
"""Create a JSONSchemaToPostgres instance from a configuration."""
return cls(
content_encoding=config.get("interpret_content_encoding", False),
max_varchar_length=max_varchar_length,
)

def handle_raw_string(self, schema):
"""Handle a raw string type."""
if self.content_encoding and schema.get("contentEncoding") == "base16":
Expand All @@ -63,6 +76,8 @@ class PostgresConnector(SQLConnector):
allow_merge_upsert: bool = True # Whether MERGE UPSERT is supported.
allow_temp_tables: bool = True # Whether temp tables are supported.

jsonschema_to_sql_converter = JSONSchemaToPostgres

def __init__(self, config: dict) -> None:
"""Initialize a connector to a Postgres database.
Expand Down Expand Up @@ -100,18 +115,6 @@ def __init__(self, config: dict) -> None:
sqlalchemy_url=url.render_as_string(hide_password=False),
)

@cached_property
def interpret_content_encoding(self) -> bool:
"""Whether to interpret schema contentEncoding to set the column type.
It is an opt-in feature because it might result in data loss if the
actual data does not match the schema's advertised encoding.
Returns:
True if the feature is enabled, False otherwise.
"""
return self.config.get("interpret_content_encoding", False)

def prepare_table( # type: ignore[override] # noqa: PLR0913
self,
full_table_name: str | FullyQualifiedName,
Expand Down Expand Up @@ -258,7 +261,10 @@ def _handle_array_type(self, jsonschema: dict) -> ARRAY | JSONB:
@cached_property
def jsonschema_to_sql(self) -> JSONSchemaToSQL:
"""Return a JSONSchemaToSQL instance with custom type handling."""
to_sql = JSONSchemaToPostgres(content_encoding=self.interpret_content_encoding)
to_sql = JSONSchemaToPostgres.from_config(
self.config,
max_varchar_length=self.max_varchar_length,
)
to_sql.fallback_type = TEXT
to_sql.register_type_handler("integer", BIGINT)
to_sql.register_type_handler("object", JSONB)
Expand Down

0 comments on commit c378fcc

Please sign in to comment.