Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Enhance bulk insert speed using copy command #370

Closed
wants to merge 23 commits into from
Closed
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
a91c94b
Generate a copy statement
kinghuang Jun 13, 2024
c11a929
Bulk insert data using copy
kinghuang Jun 13, 2024
4af0c97
Remove generate_insert_statement
kinghuang Jun 13, 2024
943419b
Directly handle csv generation
kinghuang Jun 14, 2024
734c9b5
Quote table name
kinghuang Jun 14, 2024
a97ab70
Add a comment about copy_expert
kinghuang Jun 14, 2024
72f2b37
Improve csv generation
kinghuang Jun 14, 2024
e8b438c
Remove unused imports
kinghuang Jun 14, 2024
20cdb55
Improve comment about unquoted null values
kinghuang Jun 14, 2024
534a38a
Escape backslashes in array values
kinghuang Jun 28, 2024
ed4838f
Escape backslashes in string values
kinghuang Jun 28, 2024
c9f4e3f
Merge branch 'main' into bulk-insert-copy
edgarrmondragon Jul 3, 2024
373f2a3
Merge branch 'main' into bulk-insert-copy
edgarrmondragon Jul 17, 2024
b09b4c4
Merge branch 'main' into bulk-insert-copy
edgarrmondragon Jul 17, 2024
dd1622a
Merge branch 'main' into bulk-insert-copy
edgarrmondragon Jul 18, 2024
cc1c6bc
Merge branch 'main' into bulk-insert-copy
edgarrmondragon Jul 30, 2024
4ca89fa
Merge branch 'main' into bulk-insert-copy
edgarrmondragon Aug 14, 2024
12a85b2
Update imports
edgarrmondragon Aug 14, 2024
519d15b
Merge branch 'main' into bulk-insert-copy
edgarrmondragon Aug 29, 2024
f325009
Merge branch 'main' into bulk-insert-copy
edgarrmondragon Sep 3, 2024
a7d4263
Merge branch 'main' into bulk-insert-copy
edgarrmondragon Sep 19, 2024
a99021e
Merge branch 'main' into bulk-insert-copy
edgarrmondragon Sep 27, 2024
a11c76b
Merge branch 'main' into bulk-insert-copy
edgarrmondragon Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
114 changes: 85 additions & 29 deletions target_postgres/sinks.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import datetime
import typing as t
import uuid
from io import StringIO

import sqlalchemy as sa
from singer_sdk.sinks import SQLSink
Expand All @@ -14,7 +15,6 @@

if t.TYPE_CHECKING:
from singer_sdk.connectors.sql import FullyQualifiedName
from sqlalchemy.sql import Executable


class PostgresSink(SQLSink):
Expand Down Expand Up @@ -145,35 +145,90 @@ def bulk_insert_records( # type: ignore[override]
True if table exists, False if not, None if unsure or undetectable.
"""
columns = self.column_representation(schema)
insert: str = t.cast(
str,
self.generate_insert_statement(
table.name,
columns,
),
)
self.logger.info("Inserting with SQL: %s", insert)
copy_statement: str = self.generate_copy_statement(table.name, columns)
self.logger.info("Inserting with SQL: %s", copy_statement)
# Only one record per PK, we want to take the last one
data_to_insert: list[dict[str, t.Any]] = []
data_to_insert: tuple[tuple[t.Any, ...], ...]

if self.append_only is False:
insert_records: dict[tuple, dict] = {} # pk tuple: record
copy_values: dict[tuple, tuple] = {} # pk tuple: values
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
values = tuple(record.get(column.name) for column in columns)
# No need to check for a KeyError here because the SDK already
# guarantees that all key properties exist in the record.
primary_key_tuple = tuple(record[key] for key in primary_keys)
insert_records[primary_key_tuple] = insert_record
data_to_insert = list(insert_records.values())
copy_values[primary_key_tuple] = values
data_to_insert = tuple(copy_values.values())
else:
for record in records:
insert_record = {
column.name: record.get(column.name) for column in columns
}
data_to_insert.append(insert_record)
connection.execute(insert, data_to_insert)
data_to_insert = tuple(
tuple(record.get(column.name) for column in columns)
for record in records
)

# Prepare to process the rows into csv. Use each column's bind_processor to do
# most of the work, then do the final construction of the csv rows ourselves
# to control exactly how values are converted and which ones are quoted.
column_processors = [
column.type.bind_processor(connection.dialect) or str for column in columns
]

# Make translation table for escaping in array values.
str_translate_table = str.maketrans(
{
'"': '""',
"\\": "\\\\",
}
)
array_translate_table = str.maketrans(
{
'"': '\\""',
"\\": "\\\\",
}
)

def process_column_value(data: t.Any, proc: t.Callable) -> str:
# If the data is null, return an unquoted, empty value.
# Unquoted is important here, for PostgreSQL to interpret as null.
if data is None:
return ""

# Pass the Python value through the bind_processor.
value = proc(data)

# If the value is a string, escape double-quotes as "" and return
# a quoted value.
if isinstance(value, str):
# escape double quotes as "".
return '"' + value.translate(str_translate_table) + '"'

# If the value is a list (for ARRAY), escape double-quotes as \" and return
# a quoted value in literal array format.
if isinstance(value, list):
# for each member of value, escape double quotes as \".
return (
'"{'
+ ",".join(
'""' + v.translate(array_translate_table) + '""' for v in value
)
+ '}"'
)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think once the tests are passing, we're good to go here. This method of escaping arrays stands out as awkward, but I think it's necessary. I tried putting together an implementation that avoids it using a custom dialect of csv writer. But I think that anything I come up with there is going to be even more janky.

Thoughts @visch ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense, surprising but if this is better that makes sense to me!

CREATE TABLE public.testing_table (
    id SERIAL PRIMARY KEY,
    jsonb_data JSONB[]
);

INSERT INTO public.testing_table --succeeds, but breaks backward compatibility
(jsonb_data)
VALUES('{"[\"string1\", \"string2\", 1, 2, 3]"}'::jsonb[]);

INSERT INTO public.testing_table --succeeds
(jsonb_data)
VALUES('{\"string1\", \"string2\", 1, 2, 3}'::jsonb[]);

INSERT INTO public.testing_table --fails
(jsonb_data)
VALUES('[\"string1\", \"string2\", 1, 2, 3]'::jsonb[]);

--ERROR: malformed array literal: "[\"string1\", \"string2\", 1, 2, 3]"
--  Detail: "[" must introduce explicitly-specified array dimensions.

INSERT INTO public.testing_table --fails
(jsonb_data)
VALUES('"[\"string1\", \"string2\", 1, 2, 3]"'::jsonb[]);

--ERROR: malformed array literal: ""[\"string1\", \"string2\", 1, 2, 3]""
--  Detail: Array value must start with "{" or dimension information.

@kinghuang

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check those out. The csv.QUOTE_NOTNULL and csv.QUOTE_STRINGS would both be more convenient, but they are new in Python 3.12.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@kinghuang are you interested in getting this PR to the finish line? Is there anything I can do to help?

I'll check those out. The csv.QUOTE_NOTNULL and csv.QUOTE_STRINGS would both be more convenient, but they are new in Python 3.12.

Would it be worth using those to make the tests pass on Python 3.12? If they significantly simplify the implementation, we can think about backporting the csv library from Python 3.12+ and publish the wheels to PyPI.


# Otherwise, return the string representation of the value.
return str(value)

buffer = StringIO()
for row in data_to_insert:
processed_row = ",".join(map(process_column_value, row, column_processors))

buffer.write(processed_row)
buffer.write("\n")
buffer.seek(0)

# Use copy_expert to run the copy statement.
# https://www.psycopg.org/docs/cursor.html#cursor.copy_expert
with connection.connection.cursor() as cur: # type: ignore[attr-defined]
cur.copy_expert(sql=copy_statement, file=buffer)

return True

def upsert(
Expand Down Expand Up @@ -261,23 +316,24 @@ def column_representation(
]
return columns

def generate_insert_statement(
def generate_copy_statement(
self,
full_table_name: str | FullyQualifiedName,
columns: list[sa.Column], # type: ignore[override]
) -> str | Executable:
"""Generate an insert statement for the given records.
) -> str:
"""Generate a copy statement for bulk copy.

Args:
full_table_name: the target table name.
columns: the target table columns.

Returns:
An insert statement.
A copy statement.
"""
metadata = sa.MetaData()
table = sa.Table(full_table_name, metadata, *columns)
return sa.insert(table)
columns_list = ", ".join(f'"{column.name}"' for column in columns)
sql: str = f'copy "{full_table_name}" ({columns_list}) from stdin with csv'

return sql

def conform_name(self, name: str, object_type: str | None = None) -> str:
"""Conforming names of tables, schemas, column names."""
Expand Down
Loading