Skip to content

Commit

Permalink
Add alembic to handle ALTER TABLE
Browse files Browse the repository at this point in the history
  • Loading branch information
steinitzu committed Sep 6, 2024
1 parent 2ff6b79 commit 8efa445
Show file tree
Hide file tree
Showing 5 changed files with 62 additions and 28 deletions.
38 changes: 38 additions & 0 deletions dlt/destinations/impl/sqlalchemy/alter_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
from typing import List

import sqlalchemy as sa
from alembic.runtime.migration import MigrationContext
from alembic.operations import Operations


class ListBuffer:
"""A partial implementation of string IO to use with alembic.
SQL statements are stored in a list instead of file/stdio
"""

def __init__(self) -> None:
self._buf = ""
self.sql_lines: List[str] = []

def write(self, data: str) -> None:
self._buf += data

def flush(self) -> None:
if self._buf:
self.sql_lines.append(self._buf)
self._buf = ""


class MigrationMaker:
def __init__(self, dialect: sa.engine.Dialect) -> None:
self._buf = ListBuffer()
self.ctx = MigrationContext(dialect, None, {"as_sql": True, "output_buffer": self._buf})
self.ops = Operations(self.ctx)

def add_column(self, table_name: str, column: sa.Column, schema: str) -> None:
self.ops.add_column(table_name, column, schema=schema)

def consume_statements(self) -> List[str]:
lines = self._buf.sql_lines[:]
self._buf.sql_lines.clear()
return lines
23 changes: 12 additions & 11 deletions dlt/destinations/impl/sqlalchemy/db_api_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
from dlt.destinations.typing import DBTransaction, DBApiCursor
from dlt.destinations.sql_client import SqlClientBase, DBApiCursorImpl
from dlt.destinations.impl.sqlalchemy.configuration import SqlalchemyCredentials
from dlt.destinations.impl.sqlalchemy.alter_table import MigrationMaker
from dlt.common.typing import TFun


Expand Down Expand Up @@ -97,6 +98,7 @@ class SqlalchemyClient(SqlClientBase[Connection]):
dialect: sa.engine.interfaces.Dialect
dialect_name: str
dbapi = DbApiProps # type: ignore[assignment]
migrations: Optional[MigrationMaker] = None # lazy init as needed

def __init__(
self,
Expand Down Expand Up @@ -316,17 +318,16 @@ def fully_qualified_dataset_name(self, escape: bool = True, staging: bool = Fals
raise NotImplementedError("Staging not supported")
return self.dialect.identifier_preparer.format_schema(self.dataset_name) # type: ignore[attr-defined, no-any-return]

def alter_table_add_column(self, column: sa.Column) -> None:
"""Execute an ALTER TABLE ... ADD COLUMN ... statement for the given column.
The column must be fully defined and attached to a table.
"""
# TODO: May need capability to override ALTER TABLE statement for different dialects
alter_tmpl = "ALTER TABLE {table} ADD COLUMN {column};"
statement = alter_tmpl.format(
table=self._make_qualified_table_name(self._make_qualified_table_name(column.table)), # type: ignore[arg-type]
column=self.compile_column_def(column),
)
self.execute_sql(statement)
def alter_table_add_columns(self, columns: Sequence[sa.Column]) -> None:
if not columns:
return
if self.migrations is None:
self.migrations = MigrationMaker(self.dialect)
for column in columns:
self.migrations.add_column(column.table.name, column, self.dataset_name)
statements = self.migrations.consume_statements()
for statement in statements:
self.execute_sql(statement)

def escape_column_name(self, column_name: str, escape: bool = True) -> str:
if self.dialect.requires_name_normalize: # type: ignore[attr-defined]
Expand Down
12 changes: 3 additions & 9 deletions dlt/destinations/impl/sqlalchemy/sqlalchemy_job_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ def to_db_type(self, column: TColumnSchema, table_format: TTableSchema) -> sa.ty
elif sc_t == "bool":
return sa.Boolean()
elif sc_t == "timestamp":
return self._create_date_time_type(sc_t, precision)
return self._create_date_time_type(sc_t, precision, column.get("timezone"))
elif sc_t == "bigint":
return self._db_integer_type(precision)
elif sc_t == "binary":
Expand All @@ -128,7 +128,7 @@ def to_db_type(self, column: TColumnSchema, table_format: TTableSchema) -> sa.ty
elif sc_t == "date":
return sa.Date()
elif sc_t == "time":
return self._create_date_time_type(sc_t, precision)
return self._create_date_time_type(sc_t, precision, column.get("timezone"))
raise TerminalValueError(f"Unsupported data type: {sc_t}")

def _from_db_integer_type(self, db_type: sa.Integer) -> TColumnType:
Expand Down Expand Up @@ -400,13 +400,7 @@ def update_stored_schema(
with self.sql_client.begin_transaction():
for table_obj in tables_to_create:
self.sql_client.create_table(table_obj)
for col in columns_to_add:
alter = "ALTER TABLE {} ADD COLUMN {}".format(
self.sql_client.make_qualified_table_name(col.table.name),
self.sql_client.compile_column_def(col),
)
self.sql_client.execute_sql(alter)

self.sql_client.alter_table_add_columns(columns_to_add)
self._update_schema_in_storage(self.schema)

return schema_update
Expand Down
14 changes: 7 additions & 7 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ lancedb = { version = ">=0.8.2", optional = true, markers = "python_version >= '
tantivy = { version = ">= 0.22.0", optional = true }
deltalake = { version = ">=0.19.0", optional = true }
sqlalchemy = {version = ">=1.4", optional = true}
alembic = {version = "^1.13.2", optional = true}

[tool.poetry.extras]
gcp = ["grpcio", "google-cloud-bigquery", "db-dtypes", "gcsfs"]
Expand All @@ -109,7 +110,7 @@ clickhouse = ["clickhouse-driver", "clickhouse-connect", "s3fs", "gcsfs", "adlfs
dremio = ["pyarrow"]
lancedb = ["lancedb", "pyarrow", "tantivy"]
deltalake = ["deltalake", "pyarrow"]
sqlalchemy = ["sqlalchemy"]
sqlalchemy = ["sqlalchemy", "alembic"]


[tool.poetry.scripts]
Expand Down

0 comments on commit 8efa445

Please sign in to comment.