Skip to content

Commit

Permalink
Fix "schema name" propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
amotl committed Dec 8, 2023
1 parent 29fccd3 commit 7061deb
Showing 1 changed file with 21 additions and 2 deletions.
23 changes: 21 additions & 2 deletions target_cratedb/sinks.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
"""CrateDB target sink class, which handles writing streams."""
import datetime
import time
from typing import List, Optional
from typing import List, Optional, Union

import sqlalchemy
from pendulum import now
from sqlalchemy import Column, bindparam, insert, select, update
from sqlalchemy import Column, Executable, MetaData, Table, bindparam, insert, select, update
from target_postgres.sinks import PostgresSink

from target_cratedb.connector import CrateDBConnector
Expand Down Expand Up @@ -296,3 +296,22 @@ def activate_version(self, new_version: int) -> None:
bindparam("version", value=new_version, type_=integer_type),
)
connection.execute(query)

def generate_insert_statement(
self,
full_table_name: str,
columns: List[Column],
) -> Union[str, Executable]:
"""Generate an insert statement for the given records.
Args:
full_table_name: the target table name.
schema: the JSON schema for the new table.
Returns:
An insert statement.
"""
# FIXME:
metadata = MetaData(schema=self.schema_name)
table = Table(full_table_name, metadata, *columns)
return insert(table)

0 comments on commit 7061deb

Please sign in to comment.