Skip to content

Commit

Permalink
Improve write operations to be closer to target-postgres
Browse files Browse the repository at this point in the history
It is not there yet, but there is a mechanism now to use the vanilla
"indirect" strategy by defining an environment variable

  MELTANO_CRATEDB_STRATEGY_DIRECT=false

The last step of the `upsert` procedure uses an `UPDATE ... FROM`
statement, which needs to be emulated per Python code, because CrateDB
does not provide that feature yet.
  • Loading branch information
amotl committed Dec 19, 2023
1 parent 74296f7 commit de6a7b5
Show file tree
Hide file tree
Showing 6 changed files with 136 additions and 32 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.idea
.venv*
*.egg-info
.coverage*
coverage.xml
build
dist
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## In progress
- Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`.
- Improve write operations to be closer to `target-postgres`.

## 2023-12-08 v0.0.1
- Make it work. It can run the canonical Meltano GitHub -> DB example.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,25 @@ LIMIT
```


## Write Strategy

Meltano's `target-postgres` uses a temporary table to receive data first, and
then update the effective target table with information from that.

CrateDB's `target-cratedb` offers the possibility to also write directly into
the target table, yielding speed improvements, which may be important in certain
situations.

The environment variable `MELTANO_CRATEDB_STRATEGY_DIRECT` controls the behavior.

- `MELTANO_CRATEDB_STRATEGY_DIRECT=true`: Directly write to the target table.
- `MELTANO_CRATEDB_STRATEGY_DIRECT=false`: Use a temporary table to stage updates.

Note: The current default value is `true`, effectively short-cutting the native
way of how Meltano handles database updates. The reason is that the vanilla way
does not satisfy all test cases, yet.


## Vector Store Support

In order to support CrateDB's vector store feature, i.e. its `FLOAT_VECTOR`
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -260,4 +260,7 @@ release = [
{ cmd = "twine upload dist/*" },
]

test = { cmd = "pytest" }
test = [
{ shell = "MELTANO_CRATEDB_STRATEGY_DIRECT=true pytest" },
{ shell = "MELTANO_CRATEDB_STRATEGY_DIRECT=false pytest" },
]
109 changes: 87 additions & 22 deletions target_cratedb/sinks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
"""CrateDB target sink class, which handles writing streams."""
import datetime
import os
import time
from typing import List, Optional, Union

import sqlalchemy as sa
from pendulum import now
from sqlalchemy.util import asbool
from target_postgres.sinks import PostgresSink

from target_cratedb.connector import CrateDBConnector

MELTANO_CRATEDB_STRATEGY_DIRECT = asbool(os.getenv("MELTANO_CRATEDB_STRATEGY_DIRECT", "true"))


class CrateDBSink(PostgresSink):
"""CrateDB target sink class."""
Expand All @@ -18,6 +22,15 @@ class CrateDBSink(PostgresSink):
soft_delete_column_name = "__sdc_deleted_at"
version_column_name = "__sdc_table_version"

def __init__(self, *args, **kwargs):
"""Initialize SQL Sink. See super class for more details."""
super().__init__(*args, **kwargs)

# Whether to use the Meltano standard strategy, looping the data
# through a temporary table, or whether to directly apply the DML
# operations on the target table.
self.strategy_direct = MELTANO_CRATEDB_STRATEGY_DIRECT

# Record processing

def _add_sdc_metadata_to_record(
Expand Down Expand Up @@ -112,7 +125,9 @@ def process_batch(self, context: dict) -> None:
Args:
context: Stream partition or context dictionary.
"""
# Use one connection so we do this all in a single transaction

# The PostgreSQL adapter uses only one connection, so we do this all in a single transaction.
# The CrateDB adapter will use a separate connection, to make `REFRESH TABLE ...` work.
with self.connector._connect() as connection, connection.begin():
# Check structure of table
table: sa.Table = self.connector.prepare_table(
Expand All @@ -122,21 +137,31 @@ def process_batch(self, context: dict) -> None:
as_temp_table=False,
connection=connection,
)
# Insert into table
self.bulk_insert_records(
table=table,
schema=self.schema,
primary_keys=self.key_properties,
records=context["records"],
connection=connection,
)
# FIXME: Upserts do not work yet.
"""

# Insert directly into target table.
# This can be used as a surrogate if the regular temptable-upsert
# procedure doesn't work, or isn't applicable for performance reasons.
if self.strategy_direct:
self.bulk_insert_records(
table=table,
schema=self.schema,
primary_keys=self.key_properties,
records=context["records"],
connection=connection,
)
to_table_name_quoted = f'"{table.schema}"."{table.name}"'
connection.exec_driver_sql(f"REFRESH TABLE {to_table_name_quoted};")
return

# Create a temp table (Creates from the table above)
# CrateDB: Need to pre-compute full-qualified table name, and quoted variant,
# for satisfying both Meltano, and for running a `REFRESH TABLE`.
temp_full_table_name = f"{self.schema_name}.{self.temp_table_name}"
temp_table: sa.Table = self.connector.copy_table_structure(
full_table_name=self.temp_table_name,
full_table_name=temp_full_table_name,
from_table=table,
as_temp_table=True,
# CrateDB does not provide temporary tables.
as_temp_table=False,
connection=connection,
)
# Insert into temp table
Expand All @@ -147,6 +172,12 @@ def process_batch(self, context: dict) -> None:
records=context["records"],
connection=connection,
)

# Run a new "transaction" to synchronize write operations.
with self.connector._connect() as connection, connection.begin():
temp_full_table_name_quoted = f'"{self.schema_name}"."{self.temp_table_name}"'
connection.execute(sa.text(f"REFRESH TABLE {temp_full_table_name_quoted};"))

# Merge data from Temp table to main table
self.upsert(
from_table=temp_table,
Expand All @@ -157,14 +188,13 @@ def process_batch(self, context: dict) -> None:
)
# Drop temp table
self.connector.drop_table(table=temp_table, connection=connection)
"""

def upsertX(
def upsert(
self,
from_table: sa.Table,
to_table: sa.Table,
schema: dict,
join_keys: List[sa.Column],
join_keys: List[str],
connection: sa.engine.Connection,
) -> Optional[int]:
"""Merge upsert data from one table to another.
Expand All @@ -181,24 +211,24 @@ def upsertX(
report number of records affected/inserted.
"""

if self.append_only is True:
# Insert
select_stmt = sa.select(from_table.columns).select_from(from_table)
insert_stmt = to_table.insert().from_select(names=list(from_table.columns), select=select_stmt)
connection.execute(insert_stmt)
else:
join_predicates = []
to_table_key: sa.Column
for key in join_keys:
from_table_key: sa.Column = from_table.columns[key] # type: ignore[call-overload]
to_table_key: sa.Column = to_table.columns[key] # type: ignore[call-overload]
join_predicates.append(from_table_key == to_table_key) # type: ignore[call-overload]
from_table_key: sa.Column = from_table.columns[key]
to_table_key = to_table.columns[key]
join_predicates.append(from_table_key == to_table_key)

join_condition = sa.and_(*join_predicates)

where_predicates = []
for key in join_keys:
to_table_key: sa.Column = to_table.columns[key] # type: ignore[call-overload,no-redef]
to_table_key = to_table.columns[key]
where_predicates.append(to_table_key.is_(None))
where_condition = sa.and_(*where_predicates)

Expand All @@ -212,18 +242,53 @@ def upsertX(
connection.execute(insert_stmt)

# Update
# CrateDB does not support `UPDATE ... FROM` statements.
# https://github.com/crate/crate/issues/15204
"""
where_condition = join_condition
update_columns = {}
for column_name in self.schema["properties"].keys():
from_table_column: sa.Column = from_table.columns[column_name]
to_table_column: sa.Column = to_table.columns[column_name]
# Prevent: `Updating a primary key is not supported`
# For CrateDB, skip updating primary key columns. Otherwise, CrateDB
# will fail like `ColumnValidationException[Validation failed for code:
# Updating a primary key is not supported]`.
if to_table_column.primary_key:
continue
update_columns[to_table_column] = from_table_column
update_stmt = sa.update(to_table).where(where_condition).values(update_columns)
connection.execute(update_stmt)
"""

# Update, Python-emulated
to_table_pks = to_table.primary_key.columns
from_table_pks = from_table.primary_key.columns

where_condition = join_condition
select_stmt = sa.select(from_table).where(where_condition)
cursor = connection.execute(select_stmt)
for record in cursor.fetchall():
record_dict = record._asdict()
update_where_clauses = []
for from_table_pk, to_table_pk in zip(from_table_pks, to_table_pks):
# Record up
pk_name = from_table_pk.name
pk_value = record_dict[pk_name]

# CrateDB: Need to omit primary keys from record.
# ColumnValidationException[Validation failed for id: Updating a primary key is not supported]
del record_dict[pk_name]

# Build up where condition for UPDATE statement.
update_where_clauses.append(to_table_pk == pk_value)

update_where_condition = sa.and_(*update_where_clauses)
update_stmt = sa.update(to_table).values(record_dict).where(update_where_condition)
connection.execute(update_stmt)

to_table_name_quoted = f'"{to_table.schema}"."{to_table.name}"'
connection.execute(sa.text(f"REFRESH TABLE {to_table_name_quoted};"))

return None

Expand Down
33 changes: 24 additions & 9 deletions target_cratedb/tests/test_standard_target.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
from target_postgres.tests.test_target_postgres import AssertionHelper

from target_cratedb.connector import CrateDBConnector
from target_cratedb.sinks import MELTANO_CRATEDB_STRATEGY_DIRECT
from target_cratedb.sqlalchemy.patch import polyfill_refresh_after_dml_engine
from target_cratedb.sqlalchemy.vector import FloatVector
from target_cratedb.target import TargetCrateDB
Expand Down Expand Up @@ -104,6 +105,9 @@ def initialize_database(cratedb_config):
"melty.commits",
"melty.foo",
"melty.object_mixed",
"melty.test_activate_version_hard",
"melty.test_activate_version_deletes_data_properly",
"melty.test_activate_version_soft",
"melty.test_new_array_column",
"melty.test_schema_updates",
]
Expand Down Expand Up @@ -247,6 +251,7 @@ def test_record_missing_required_property(cratedb_target):
singer_file_to_target(file_name, cratedb_target)


@pytest.mark.skipif(not MELTANO_CRATEDB_STRATEGY_DIRECT, reason="Does not work in temptable/upsert mode")
def test_camelcase(cratedb_target):
file_name = "camelcase.singer"
singer_file_to_target(file_name, cratedb_target)
Expand Down Expand Up @@ -309,7 +314,7 @@ def test_multiple_schema_messages(cratedb_target, caplog):
assert "Schema has changed for stream" not in caplog.text


@pytest.mark.skip("Upserts do not work yet")
@pytest.mark.skip("ColumnValidationException[Validation failed for id: Updating a primary key is not supported]")
def test_relational_data(cratedb_target, helper):
file_name = "user_location_data.singer"
singer_file_to_target(file_name, cratedb_target)
Expand Down Expand Up @@ -421,6 +426,7 @@ def test_array_boolean(cratedb_target, helper):
)


@pytest.mark.skipif(not MELTANO_CRATEDB_STRATEGY_DIRECT, reason="Does not work in temptable/upsert mode")
def test_array_float_vector(cratedb_target, helper):
file_name = "array_float_vector.singer"
singer_file_to_target(file_name, cratedb_target)
Expand Down Expand Up @@ -612,6 +618,9 @@ def test_activate_version_hard_delete(cratedb_config):
engine = create_engine(pg_hard_delete_true)
singer_file_to_target(file_name, pg_hard_delete_true)
with engine.connect() as connection:
# CrateDB-specific: Synchronize write operations.
# TODO: Why is it still needed? There is a polyfill which should handle that.
connection.execute(sa.text(f"REFRESH TABLE {full_table_name};"))
result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}"))
assert result.rowcount == 7
with engine.connect() as connection, connection.begin():
Expand All @@ -622,7 +631,8 @@ def test_activate_version_hard_delete(cratedb_config):
result = connection.execute(
sa.text(f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')")
)
# CrateDB-specific
# CrateDB-specific: Synchronize write operations.
# TODO: Can this case be handled transparently?
connection.execute(sa.text(f"REFRESH TABLE {full_table_name}"))
with engine.connect() as connection:
result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}"))
Expand Down Expand Up @@ -660,7 +670,8 @@ def test_activate_version_soft_delete(cratedb_target):
result = connection.execute(
sa.text(f"INSERT INTO {full_table_name}(code, \"name\") VALUES('Manual2', 'Meltano')")
)
# CrateDB-specific
# CrateDB-specific: Synchronize write operations.
# TODO: Can this case be handled transparently?
connection.execute(sa.text(f"REFRESH TABLE {full_table_name}"))
with engine.connect() as connection:
result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}"))
Expand Down Expand Up @@ -703,9 +714,10 @@ def test_activate_version_deletes_data_properly(cratedb_target):
result = connection.execute(
sa.text(f"INSERT INTO {full_table_name} (code, \"name\") VALUES('Manual2', 'Meltano')")
)
# CrateDB-specific
connection.execute(sa.text(f"REFRESH TABLE {full_table_name}"))
with engine.connect() as connection:
# CrateDB-specific: Synchronize write operations.
# TODO: Can this case be handled transparently?
connection.execute(sa.text(f"REFRESH TABLE {full_table_name};"))
with engine.connect() as connection, connection.begin():
result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}"))
assert result.rowcount == 9

Expand All @@ -714,28 +726,31 @@ def test_activate_version_deletes_data_properly(cratedb_target):
file_name = f"{table_name}_2.singer"
singer_file_to_target(file_name, pg_hard_delete)
with engine.connect() as connection:
# CrateDB-specific
# CrateDB-specific: Synchronize write operations.
# TODO: Why is it still needed? There is a polyfill which should handle that.
connection.execute(sa.text(f"REFRESH TABLE {full_table_name}"))
result = connection.execute(sa.text(f"SELECT * FROM {full_table_name}"))
assert result.rowcount == 0


@pytest.mark.skip("Does not work yet: extraneous input " "CHAR" "")
@pytest.mark.skip('Does not work yet: extraneous input "CHAR"')
def test_reserved_keywords(cratedb_target):
"""Target should work regardless of column names
Postgres has a number of resereved keywords listed here https://www.postgresql.org/docs/current/sql-keywords-appendix.html.
Postgres has a number of reserved keywords listed here https://www.postgresql.org/docs/current/sql-keywords-appendix.html.
"""
file_name = "reserved_keywords.singer"
singer_file_to_target(file_name, cratedb_target)


@pytest.mark.skipif(not MELTANO_CRATEDB_STRATEGY_DIRECT, reason="Does not work in temptable/upsert mode")
def test_uppercase_stream_name_with_column_alter(cratedb_target):
"""Column Alters need to work with uppercase stream names"""
file_name = "uppercase_stream_name_with_column_alter.singer"
singer_file_to_target(file_name, cratedb_target)


@pytest.mark.skipif(not MELTANO_CRATEDB_STRATEGY_DIRECT, reason="Does not work in temptable/upsert mode")
def test_activate_version_uppercase_stream_name(cratedb_config):
"""Activate Version should work with uppercase stream names"""
file_name = "test_activate_version_uppercase_stream_name.singer"
Expand Down

0 comments on commit de6a7b5

Please sign in to comment.