Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve write operations to be closer to target-postgres #13

Merged
merged 1 commit into from
Dec 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.idea
.venv*
*.egg-info
.coverage*
coverage.xml
build
dist
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## In progress
- Add support for container types `ARRAY`, `OBJECT`, and `FLOAT_VECTOR`.
- Improve write operations to be closer to `target-postgres`.

## 2023-12-08 v0.0.1
- Make it work. It can run the canonical Meltano GitHub -> DB example.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,25 @@ LIMIT
```


## Write Strategy

Meltano's `target-postgres` uses a temporary table to receive data first, and
then update the effective target table with information from that.

CrateDB's `target-cratedb` offers the possibility to also write directly into
the target table, yielding speed improvements, which may be important in certain
situations.

The environment variable `MELTANO_CRATEDB_STRATEGY_DIRECT` controls the behavior.

- `MELTANO_CRATEDB_STRATEGY_DIRECT=true`: Directly write to the target table.
- `MELTANO_CRATEDB_STRATEGY_DIRECT=false`: Use a temporary table to stage updates.

Note: The current default value is `true`, effectively short-cutting the native
way of how Meltano handles database updates. The reason is that the vanilla way
does not satisfy all test cases, yet.


## Vector Store Support

In order to support CrateDB's vector store feature, i.e. its `FLOAT_VECTOR`
Expand Down
5 changes: 4 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -266,4 +266,7 @@ release = [
{ cmd = "twine upload dist/*" },
]

test = { cmd = "pytest" }
test = [
{ shell = "MELTANO_CRATEDB_STRATEGY_DIRECT=true pytest" },
{ shell = "MELTANO_CRATEDB_STRATEGY_DIRECT=false pytest" },
]
122 changes: 100 additions & 22 deletions target_cratedb/sinks.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
"""CrateDB target sink class, which handles writing streams."""
import datetime
import os
import time
from typing import List, Optional, Union

import sqlalchemy as sa
from pendulum import now
from sqlalchemy.util import asbool
from target_postgres.sinks import PostgresSink

from target_cratedb.connector import CrateDBConnector

MELTANO_CRATEDB_STRATEGY_DIRECT = asbool(os.getenv("MELTANO_CRATEDB_STRATEGY_DIRECT", "true"))


class CrateDBSink(PostgresSink):
"""CrateDB target sink class."""
Expand All @@ -18,6 +22,15 @@ class CrateDBSink(PostgresSink):
soft_delete_column_name = "__sdc_deleted_at"
version_column_name = "__sdc_table_version"

def __init__(self, *args, **kwargs):
"""Initialize SQL Sink. See super class for more details."""
super().__init__(*args, **kwargs)

# Whether to use the Meltano standard strategy, looping the data
# through a temporary table, or whether to directly apply the DML
# operations on the target table.
self.strategy_direct = MELTANO_CRATEDB_STRATEGY_DIRECT

# Record processing

def _add_sdc_metadata_to_record(
Expand Down Expand Up @@ -112,7 +125,9 @@ def process_batch(self, context: dict) -> None:
Args:
context: Stream partition or context dictionary.
"""
# Use one connection so we do this all in a single transaction

# The PostgreSQL adapter uses only one connection, so we do this all in a single transaction.
# The CrateDB adapter will use a separate connection, to make `REFRESH TABLE ...` work.
with self.connector._connect() as connection, connection.begin():
# Check structure of table
table: sa.Table = self.connector.prepare_table(
Expand All @@ -122,21 +137,30 @@ def process_batch(self, context: dict) -> None:
as_temp_table=False,
connection=connection,
)
# Insert into table
self.bulk_insert_records(
table=table,
schema=self.schema,
primary_keys=self.key_properties,
records=context["records"],
connection=connection,
)
# FIXME: Upserts do not work yet.
"""

# Insert directly into target table.
# This can be used as a surrogate if the regular temptable-upsert
# procedure doesn't work, or isn't applicable for performance reasons.
if self.strategy_direct:
self.bulk_insert_records(
table=table,
schema=self.schema,
primary_keys=self.key_properties,
records=context["records"],
connection=connection,
)
self.refresh_table(table)
return

# Create a temp table (Creates from the table above)
# CrateDB: Need to pre-compute full-qualified table name, and quoted variant,
# for satisfying both Meltano, and for running a `REFRESH TABLE`.
temp_full_table_name = f"{self.schema_name}.{self.temp_table_name}"
temp_table: sa.Table = self.connector.copy_table_structure(
full_table_name=self.temp_table_name,
full_table_name=temp_full_table_name,
from_table=table,
as_temp_table=True,
# CrateDB does not provide temporary tables.
as_temp_table=False,
connection=connection,
)
# Insert into temp table
Expand All @@ -147,6 +171,11 @@ def process_batch(self, context: dict) -> None:
records=context["records"],
connection=connection,
)

# Run a new "transaction" to synchronize write operations.
with self.connector._connect() as connection:
self.refresh_table(temp_table)

# Merge data from Temp table to main table
self.upsert(
from_table=temp_table,
Expand All @@ -157,14 +186,15 @@ def process_batch(self, context: dict) -> None:
)
# Drop temp table
self.connector.drop_table(table=temp_table, connection=connection)
"""

def upsertX(
self.refresh_table(table)

def upsert(
self,
from_table: sa.Table,
to_table: sa.Table,
schema: dict,
join_keys: List[sa.Column],
join_keys: List[str],
connection: sa.engine.Connection,
) -> Optional[int]:
"""Merge upsert data from one table to another.
Expand All @@ -181,24 +211,24 @@ def upsertX(
report number of records affected/inserted.

"""

if self.append_only is True:
# Insert
select_stmt = sa.select(from_table.columns).select_from(from_table)
insert_stmt = to_table.insert().from_select(names=list(from_table.columns), select=select_stmt)
connection.execute(insert_stmt)
else:
join_predicates = []
to_table_key: sa.Column
for key in join_keys:
from_table_key: sa.Column = from_table.columns[key] # type: ignore[call-overload]
to_table_key: sa.Column = to_table.columns[key] # type: ignore[call-overload]
join_predicates.append(from_table_key == to_table_key) # type: ignore[call-overload]
from_table_key: sa.Column = from_table.columns[key]
to_table_key = to_table.columns[key]
join_predicates.append(from_table_key == to_table_key)

join_condition = sa.and_(*join_predicates)

where_predicates = []
for key in join_keys:
to_table_key: sa.Column = to_table.columns[key] # type: ignore[call-overload,no-redef]
to_table_key = to_table.columns[key]
where_predicates.append(to_table_key.is_(None))
where_condition = sa.and_(*where_predicates)

Expand All @@ -212,18 +242,50 @@ def upsertX(
connection.execute(insert_stmt)

# Update
# CrateDB does not support `UPDATE ... FROM` statements.
# https://github.com/crate/crate/issues/15204
"""
where_condition = join_condition
update_columns = {}
for column_name in self.schema["properties"].keys():
from_table_column: sa.Column = from_table.columns[column_name]
to_table_column: sa.Column = to_table.columns[column_name]
# Prevent: `Updating a primary key is not supported`
# For CrateDB, skip updating primary key columns. Otherwise, CrateDB
# will fail like `ColumnValidationException[Validation failed for code:
# Updating a primary key is not supported]`.
if to_table_column.primary_key:
continue
update_columns[to_table_column] = from_table_column

update_stmt = sa.update(to_table).where(where_condition).values(update_columns)
connection.execute(update_stmt)
"""

# Update, Python-emulated
to_table_pks = to_table.primary_key.columns
from_table_pks = from_table.primary_key.columns

where_condition = join_condition
select_stmt = sa.select(from_table).where(where_condition)
cursor = connection.execute(select_stmt)
for record in cursor.fetchall():
record_dict = record._asdict()
update_where_clauses = []
for from_table_pk, to_table_pk in zip(from_table_pks, to_table_pks):
# Get primary key name and value from record.
pk_name = from_table_pk.name
pk_value = record_dict[pk_name]

# CrateDB: Need to omit primary keys from record.
# ColumnValidationException[Validation failed for id: Updating a primary key is not supported]
del record_dict[pk_name]

# Build up where clauses for UPDATE statement.
update_where_clauses.append(to_table_pk == pk_value)

update_where_condition = sa.and_(*update_where_clauses)
update_stmt = sa.update(to_table).values(record_dict).where(update_where_condition)
connection.execute(update_stmt)

return None

Expand Down Expand Up @@ -269,6 +331,7 @@ def activate_version(self, new_version: int) -> None:
f'OR "{self.version_column_name}" IS NULL'
)
)
self.refresh_table(self.full_table_name)
return

if not self.connector.column_exists(
Expand Down Expand Up @@ -296,6 +359,8 @@ def activate_version(self, new_version: int) -> None:
)
connection.execute(query)

self.refresh_table(self.full_table_name)

def generate_insert_statement(
self,
full_table_name: str,
Expand All @@ -314,3 +379,16 @@ def generate_insert_statement(
metadata = sa.MetaData(schema=self.schema_name)
table = sa.Table(full_table_name, metadata, *columns)
return sa.insert(table)

def refresh_table(self, table: Union[sa.Table, str]):
"""
Synchronize write operations on CrateDB.
"""
with self.connector._connect() as connection:
if isinstance(table, sa.Table):
table_full = f'"{table.schema}"."{table.name}"'
elif isinstance(table, str):
table_full = table
else:
raise TypeError(f"Unknown type for `table`: {table}")
connection.exec_driver_sql(f"REFRESH TABLE {table_full};")
Loading