Skip to content

Commit

Permalink
Improve write operations to be closer to target-postgres
Browse files Browse the repository at this point in the history
Meltano's `target-postgres` uses a temporary table to receive data
first, and then update the effective target table with information from
that.

CrateDB's `target-cratedb` additonally offers the possibility to also
write directly into the target table, yielding speed improvements, which
may be important in certain situations.

The environment variable `MELTANO_CRATEDB_STRATEGY_DIRECT` controls that
behavior.

- `MELTANO_CRATEDB_STRATEGY_DIRECT=true`: Directly write to the target
  table.
- `MELTANO_CRATEDB_STRATEGY_DIRECT=false`: Use a temporary table to
  stage updates.

Note: The current default value is `true`, effectively short-cutting the
native way of how Meltano handles database updates. The reason is that
the vanilla way does not satisfy all test cases, yet.
  • Loading branch information
amotl committed Dec 21, 2023
1 parent 14cf7c2 commit 9db4bed
Show file tree
Hide file tree
Showing 6 changed files with 147 additions and 36 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.idea
.venv*
*.egg-info
.coverage*
coverage.xml
build
dist
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## In progress
- Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`.
- Improve write operations to be closer to `target-postgres`.

## 2023-12-08 v0.0.1
- Make it work. It can run the canonical Meltano GitHub -> DB example.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,25 @@ LIMIT
```


## Write Strategy

Meltano's `target-postgres` uses a temporary table to receive data first, and
then update the effective target table with information from that.

CrateDB's `target-cratedb` offers the possibility to also write directly into
the target table, yielding speed improvements, which may be important in certain
situations.

The environment variable `MELTANO_CRATEDB_STRATEGY_DIRECT` controls the behavior.

- `MELTANO_CRATEDB_STRATEGY_DIRECT=true`: Directly write to the target table.
- `MELTANO_CRATEDB_STRATEGY_DIRECT=false`: Use a temporary table to stage updates.

Note: The current default value is `true`, effectively short-cutting the native
way of how Meltano handles database updates. The reason is that the vanilla way
does not satisfy all test cases, yet.


## Vector Store Support

In order to support CrateDB's vector store feature, i.e. its `FLOAT_VECTOR`
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,7 @@ release = [
{ cmd = "twine upload dist/*" },
]

test = { cmd = "pytest" }
test = [
{ shell = "MELTANO_CRATEDB_STRATEGY_DIRECT=true pytest" },
{ shell = "MELTANO_CRATEDB_STRATEGY_DIRECT=false pytest" },
]
122 changes: 100 additions & 22 deletions target_cratedb/sinks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
"""CrateDB target sink class, which handles writing streams."""
import datetime
import os
import time
from typing import List, Optional, Union

import sqlalchemy as sa
from pendulum import now
from sqlalchemy.util import asbool
from target_postgres.sinks import PostgresSink

from target_cratedb.connector import CrateDBConnector

MELTANO_CRATEDB_STRATEGY_DIRECT = asbool(os.getenv("MELTANO_CRATEDB_STRATEGY_DIRECT", "true"))


class CrateDBSink(PostgresSink):
"""CrateDB target sink class."""
Expand All @@ -18,6 +22,15 @@ class CrateDBSink(PostgresSink):
soft_delete_column_name = "__sdc_deleted_at"
version_column_name = "__sdc_table_version"

def __init__(self, *args, **kwargs):
"""Initialize SQL Sink. See super class for more details."""
super().__init__(*args, **kwargs)

# Whether to use the Meltano standard strategy, looping the data
# through a temporary table, or whether to directly apply the DML
# operations on the target table.
self.strategy_direct = MELTANO_CRATEDB_STRATEGY_DIRECT

# Record processing

def _add_sdc_metadata_to_record(
Expand Down Expand Up @@ -112,7 +125,9 @@ def process_batch(self, context: dict) -> None:
Args:
context: Stream partition or context dictionary.
"""
# Use one connection so we do this all in a single transaction

# The PostgreSQL adapter uses only one connection, so we do this all in a single transaction.
# The CrateDB adapter will use a separate connection, to make `REFRESH TABLE ...` work.
with self.connector._connect() as connection, connection.begin():
# Check structure of table
table: sa.Table = self.connector.prepare_table(
Expand All @@ -122,21 +137,30 @@ def process_batch(self, context: dict) -> None:
as_temp_table=False,
connection=connection,
)
# Insert into table
self.bulk_insert_records(
table=table,
schema=self.schema,
primary_keys=self.key_properties,
records=context["records"],
connection=connection,
)
# FIXME: Upserts do not work yet.
"""

# Insert directly into target table.
# This can be used as a surrogate if the regular temptable-upsert
# procedure doesn't work, or isn't applicable for performance reasons.
if self.strategy_direct:
self.bulk_insert_records(
table=table,
schema=self.schema,
primary_keys=self.key_properties,
records=context["records"],
connection=connection,
)
self.refresh_table(table)
return

# Create a temp table (Creates from the table above)
# CrateDB: Need to pre-compute full-qualified table name, and quoted variant,
# for satisfying both Meltano, and for running a `REFRESH TABLE`.
temp_full_table_name = f"{self.schema_name}.{self.temp_table_name}"
temp_table: sa.Table = self.connector.copy_table_structure(
full_table_name=self.temp_table_name,
full_table_name=temp_full_table_name,
from_table=table,
as_temp_table=True,
# CrateDB does not provide temporary tables.
as_temp_table=False,
connection=connection,
)
# Insert into temp table
Expand All @@ -147,6 +171,11 @@ def process_batch(self, context: dict) -> None:
records=context["records"],
connection=connection,
)

# Run a new "transaction" to synchronize write operations.
with self.connector._connect() as connection:
self.refresh_table(temp_table)

# Merge data from Temp table to main table
self.upsert(
from_table=temp_table,
Expand All @@ -157,14 +186,15 @@ def process_batch(self, context: dict) -> None:
)
# Drop temp table
self.connector.drop_table(table=temp_table, connection=connection)
"""

def upsertX(
self.refresh_table(table)

def upsert(
self,
from_table: sa.Table,
to_table: sa.Table,
schema: dict,
join_keys: List[sa.Column],
join_keys: List[str],
connection: sa.engine.Connection,
) -> Optional[int]:
"""Merge upsert data from one table to another.
Expand All @@ -181,24 +211,24 @@ def upsertX(
report number of records affected/inserted.
"""

if self.append_only is True:
# Insert
select_stmt = sa.select(from_table.columns).select_from(from_table)
insert_stmt = to_table.insert().from_select(names=list(from_table.columns), select=select_stmt)
connection.execute(insert_stmt)
else:
join_predicates = []
to_table_key: sa.Column
for key in join_keys:
from_table_key: sa.Column = from_table.columns[key] # type: ignore[call-overload]
to_table_key: sa.Column = to_table.columns[key] # type: ignore[call-overload]
join_predicates.append(from_table_key == to_table_key) # type: ignore[call-overload]
from_table_key: sa.Column = from_table.columns[key]
to_table_key = to_table.columns[key]
join_predicates.append(from_table_key == to_table_key)

join_condition = sa.and_(*join_predicates)

where_predicates = []
for key in join_keys:
to_table_key: sa.Column = to_table.columns[key] # type: ignore[call-overload,no-redef]
to_table_key = to_table.columns[key]
where_predicates.append(to_table_key.is_(None))
where_condition = sa.and_(*where_predicates)

Expand All @@ -212,18 +242,50 @@ def upsertX(
connection.execute(insert_stmt)

# Update
# CrateDB does not support `UPDATE ... FROM` statements.
# https://github.com/crate/crate/issues/15204
"""
where_condition = join_condition
update_columns = {}
for column_name in self.schema["properties"].keys():
from_table_column: sa.Column = from_table.columns[column_name]
to_table_column: sa.Column = to_table.columns[column_name]
# Prevent: `Updating a primary key is not supported`
# For CrateDB, skip updating primary key columns. Otherwise, CrateDB
# will fail like `ColumnValidationException[Validation failed for code:
# Updating a primary key is not supported]`.
if to_table_column.primary_key:
continue
update_columns[to_table_column] = from_table_column
update_stmt = sa.update(to_table).where(where_condition).values(update_columns)
connection.execute(update_stmt)
"""

# Update, Python-emulated
to_table_pks = to_table.primary_key.columns
from_table_pks = from_table.primary_key.columns

where_condition = join_condition
select_stmt = sa.select(from_table).where(where_condition)
cursor = connection.execute(select_stmt)
for record in cursor.fetchall():
record_dict = record._asdict()
update_where_clauses = []
for from_table_pk, to_table_pk in zip(from_table_pks, to_table_pks):
# Get primary key name and value from record.
pk_name = from_table_pk.name
pk_value = record_dict[pk_name]

# CrateDB: Need to omit primary keys from record.
# ColumnValidationException[Validation failed for id: Updating a primary key is not supported]
del record_dict[pk_name]

# Build up where clauses for UPDATE statement.
update_where_clauses.append(to_table_pk == pk_value)

update_where_condition = sa.and_(*update_where_clauses)
update_stmt = sa.update(to_table).values(record_dict).where(update_where_condition)
connection.execute(update_stmt)

return None

Expand Down Expand Up @@ -269,6 +331,7 @@ def activate_version(self, new_version: int) -> None:
f'OR "{self.version_column_name}" IS NULL'
)
)
self.refresh_table(self.full_table_name)
return

if not self.connector.column_exists(
Expand Down Expand Up @@ -296,6 +359,8 @@ def activate_version(self, new_version: int) -> None:
)
connection.execute(query)

self.refresh_table(self.full_table_name)

def generate_insert_statement(
self,
full_table_name: str,
Expand All @@ -314,3 +379,16 @@ def generate_insert_statement(
metadata = sa.MetaData(schema=self.schema_name)
table = sa.Table(full_table_name, metadata, *columns)
return sa.insert(table)

def refresh_table(self, table: Union[sa.Table, str]):
"""
Synchronize write operations on CrateDB.
"""
with self.connector._connect() as connection:
if isinstance(table, sa.Table):
table_full = f'"{table.schema}"."{table.name}"'
elif isinstance(table, str):
table_full = table
else:
raise TypeError(f"Unknown type for `table`: {table}")
connection.exec_driver_sql(f"REFRESH TABLE {table_full};")
Loading

0 comments on commit 9db4bed

Please sign in to comment.