Skip to content

Commit

Permalink
CrateDB: Derive from dbt-postgres. Remove Python adapter code.
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Nov 20, 2024
1 parent 926a551 commit 54f1401
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 158 deletions.
13 changes: 3 additions & 10 deletions dbt/adapters/cratedb/column.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,5 @@
from dbt.adapters.base import Column
from dbt.adapters.postgres import PostgresColumn


class CrateDBColumn(Column):
@property
def data_type(self):
# on cratedb, do not convert 'text' or 'varchar' to 'varchar()'
if self.dtype.lower() == "text" or (
self.dtype.lower() == "character varying" and self.char_size is None
):
return self.dtype
return super().data_type
class CrateDBColumn(PostgresColumn):
pass
52 changes: 6 additions & 46 deletions dbt/adapters/cratedb/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
CrossDbReferenceProhibitedError,
IndexConfigError,
IndexConfigNotDictError,
UnexpectedDbReferenceError,
)
from dbt.adapters.sql import SQLAdapter
from dbt.adapters.postgres import PostgresAdapter
from dbt_common.contracts.constraints import ConstraintType
from dbt_common.dataclass_schema import ValidationError, dbtClassMixin
from dbt_common.exceptions import DbtRuntimeError
Expand Down Expand Up @@ -64,19 +63,19 @@ class CrateDBConfig(AdapterConfig):
indexes: Optional[List[CrateDBIndexConfig]] = None


class CrateDBAdapter(SQLAdapter):
class CrateDBAdapter(PostgresAdapter):
Relation = CrateDBRelation
ConnectionManager = CrateDBConnectionManager
ConnectionManager = CrateDBConnectionManager # type: ignore[assignment]
Column = CrateDBColumn

AdapterSpecificConfigs = CrateDBConfig
AdapterSpecificConfigs = CrateDBConfig # type: ignore[assignment]

CONSTRAINT_SUPPORT = {
ConstraintType.check: ConstraintSupport.ENFORCED,
ConstraintType.not_null: ConstraintSupport.ENFORCED,
ConstraintType.unique: ConstraintSupport.ENFORCED,
ConstraintType.unique: ConstraintSupport.NOT_SUPPORTED,
ConstraintType.primary_key: ConstraintSupport.ENFORCED,
ConstraintType.foreign_key: ConstraintSupport.ENFORCED,
ConstraintType.foreign_key: ConstraintSupport.NOT_SUPPORTED,
}

CATALOG_BY_RELATION_SUPPORT = True
Expand All @@ -85,20 +84,6 @@ class CrateDBAdapter(SQLAdapter):
{Capability.SchemaMetadataByRelations: CapabilitySupport(support=Support.Full)}
)

@classmethod
def date_function(cls):
return "now()"

@available
def verify_database(self, database):
if database.startswith('"'):
database = database.strip('"')
expected = self.config.credentials.database
if database.lower() != expected.lower():
raise UnexpectedDbReferenceError(self.type(), database, expected)
# return an empty string on success so macros can call this
return ""

@available
def parse_index(self, raw_index: Any) -> Optional[CrateDBIndexConfig]:
return CrateDBIndexConfig.parse(raw_index)
Expand Down Expand Up @@ -131,31 +116,6 @@ def _get_catalog_schemas(self, manifest):
except DbtRuntimeError as exc:
raise CrossDbReferenceProhibitedError(self.type(), exc.msg)

def _link_cached_relations(self, manifest) -> None:
schemas: Set[str] = set()
relations_schemas = self._get_cache_schemas(manifest)
for relation in relations_schemas:
self.verify_database(relation.database)
schemas.add(relation.schema.lower()) # type: ignore

self._link_cached_database_relations(schemas)

def _relations_cache_for_schemas(self, manifest, cache_schemas=None):
super()._relations_cache_for_schemas(manifest, cache_schemas)
self._link_cached_relations(manifest)

def timestamp_add_sql(self, add_to: str, number: int = 1, interval: str = "hour") -> str:
return f"{add_to} + interval '{number} {interval}'"

def valid_incremental_strategies(self):
"""The set of standard builtin strategies which this adapter supports out-of-the-box.
Not used to validate custom strategies defined by end users.
"""
return ["append", "delete+insert", "merge", "microbatch"]

def debug_query(self):
self.execute("select 1 as id")

def get_rows_different_sql(
self,
relation_a: BaseRelation,
Expand Down
105 changes: 3 additions & 102 deletions dbt/adapters/cratedb/relation.py
Original file line number Diff line number Diff line change
@@ -1,111 +1,12 @@
from dataclasses import dataclass, field
from typing import FrozenSet, List, Optional

from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.contracts.relation import RelationConfig, RelationType
from dbt.adapters.relation_configs import (
RelationConfigChangeAction,
RelationResults,
)
from dbt_common.exceptions import DbtRuntimeError
from dataclasses import dataclass
from dbt.adapters.postgres import PostgresRelation

from dbt.adapters.cratedb.relation_configs import (
MAX_CHARACTERS_IN_IDENTIFIER,
PostgresIndexConfig,
PostgresIndexConfigChange,
PostgresMaterializedViewConfig,
PostgresMaterializedViewConfigChangeCollection,
)


@dataclass(frozen=True, eq=False, repr=False)
class CrateDBRelation(BaseRelation):
renameable_relations: FrozenSet[RelationType] = field(
default_factory=lambda: frozenset(
{
RelationType.View,
RelationType.Table,
}
)
)
replaceable_relations: FrozenSet[RelationType] = field(
default_factory=lambda: frozenset(
{
RelationType.View,
RelationType.Table,
}
)
)

def __post_init__(self):
# Check for length of Postgres table/view names.
# Check self.type to exclude test relation identifiers
if (
self.identifier is not None
and self.type is not None
and len(self.identifier) > self.relation_max_name_length()
):
raise DbtRuntimeError(
f"Relation name '{self.identifier}' "
f"is longer than {self.relation_max_name_length()} characters"
)

class CrateDBRelation(PostgresRelation):
def relation_max_name_length(self):
return MAX_CHARACTERS_IN_IDENTIFIER

def get_materialized_view_config_change_collection(
self, relation_results: RelationResults, relation_config: RelationConfig
) -> Optional[PostgresMaterializedViewConfigChangeCollection]:
config_change_collection = PostgresMaterializedViewConfigChangeCollection()

existing_materialized_view = PostgresMaterializedViewConfig.from_relation_results(
relation_results
)
new_materialized_view = PostgresMaterializedViewConfig.from_config(relation_config)

config_change_collection.indexes = self._get_index_config_changes(
existing_materialized_view.indexes, new_materialized_view.indexes
)

# we return `None` instead of an empty `PostgresMaterializedViewConfigChangeCollection` object
# so that it's easier and more extensible to check in the materialization:
# `core/../materializations/materialized_view.sql` :
# {% if configuration_changes is none %}
if config_change_collection.has_changes:
return config_change_collection
return None

def _get_index_config_changes(
self,
existing_indexes: FrozenSet[PostgresIndexConfig],
new_indexes: FrozenSet[PostgresIndexConfig],
) -> List[PostgresIndexConfigChange]:
"""
Get the index updates that will occur as a result of a new run
There are four scenarios:
1. Indexes are equal -> don't return these
2. Index is new -> create these
3. Index is old -> drop these
4. Indexes are not equal -> drop old, create new -> two actions
*Note:*
The order of the operations matters here because if the same index is dropped and recreated
(e.g. via --full-refresh) then we need to drop it first, then create it.
Returns: an ordered list of index updates in the form {"action": "drop/create", "context": <IndexConfig>}
"""
drop_changes = [
PostgresIndexConfigChange.from_dict(
{"action": RelationConfigChangeAction.drop, "context": index}
)
for index in existing_indexes.difference(new_indexes)
]
create_changes = [
PostgresIndexConfigChange.from_dict(
{"action": RelationConfigChangeAction.create, "context": index}
)
for index in new_indexes.difference(existing_indexes)
]
return drop_changes + create_changes # type: ignore[return-value]

0 comments on commit 54f1401

Please sign in to comment.