Skip to content

Commit

Permalink
fix bugs, reformat, rename vars
Browse files Browse the repository at this point in the history
  • Loading branch information
AstrakhantsevaAA committed May 31, 2024
1 parent f456f19 commit 2f309bb
Showing 1 changed file with 67 additions and 46 deletions.
113 changes: 67 additions & 46 deletions docs/examples/postgres_to_postgres/postgres_to_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@
keywords: [connector x, pyarrow, zero copy, duckdb, postgres, initial load]
---
This examples shows you how to export and import data from postgres to postgres in a fast way with ConnectorX and DuckDB as the default export will
generate `Insert_statement` in the normalization phase, which is super slow for large tables.
This examples shows you how to export and import data from Postgres to Postgres in a fast way with ConnectorX and DuckDB
since the default export will generate `Insert_statement` during the normalization phase, which is super slow for large tables.
As it's an initial load, we create a separate schema with timestamp initially and then replace the existing schema with the new one.
:::note
This approach is tested and works well for an initial load (`--replace`), the incremental load (`--merge`) might need some adjustments (loading of load-tables of dlt, setting up first run after an intial
This approach is tested and works well for an initial load (`--replace`), however, the incremental load (`--merge`) might need some adjustments (loading of load-tables of dlt, setting up first run after an initial
load, etc.).
:::
Expand All @@ -20,15 +20,16 @@
- How to get arrow tables from [connector X](https://github.com/sfu-db/connector-x) and yield them in chunks.
- That merge and incremental loads work with arrow tables.
- How to use DuckDB for a speedy normalization.
- How to use `argparse` to turn your pipeline script into CLI.
- How to use `argparse` to turn your pipeline script into a CLI.
- How to work with `ConnectionStringCredentials` spec.
Be aware that you need to define the database credentials in `.dlt/secrets.toml` and adjust the tables names (table1 and table2):
Be aware that you need to define the database credentials in `.dlt/secrets.toml` or dlt ENVs and adjust the tables names ("table_1" and "table_2"):
Install `dlt` with `duckdb` as extra, also `connectorx`, Postgres adapter and progress bar tool:
```sh
pip install duckdb connectorx psycopg2-binary alive-progress
pip install dlt[duckdb] connectorx pyarrow psycopg2-binary alive-progress
```
Run the example:
Expand All @@ -41,9 +42,8 @@
[Slack](https://dlthub-community.slack.com/archives/C04DQA7JJN6/p1711579390028279?thread_ts=1711477727.553279&cid=C04DQA7JJN60)
As well as with installing DuckDB (see [issue
here](https://github.com/duckdb/duckdb/issues/8035#issuecomment-2020803032)), that's why I manually the `postgres_scanner.duckdb_extension` in my Dockerfile to load the data into Postgres.
here](https://github.com/duckdb/duckdb/issues/8035#issuecomment-2020803032)), that's why I manually installed the `postgres_scanner.duckdb_extension` in my Dockerfile to load the data into Postgres.
:::
"""

import argparse
Expand All @@ -53,16 +53,13 @@
import connectorx as cx
import duckdb
import psycopg2

import dlt
from dlt.sources.credentials import ConnectionStringCredentials

CHUNKSIZE = int(
os.getenv("CHUNKSIZE", 1000000)
) # 1 mio rows works well with 1GiB RAM memory (if no parallelism)
dataset_name = "lzn"
pipeline_name = "loading_postgres_to_postgres"
schema_name = "example_data_1"
connection_timeout = 15


def read_sql_x_chunked(conn_str: str, query: str, chunk_size: int = CHUNKSIZE):
Expand All @@ -83,9 +80,9 @@ def read_sql_x_chunked(conn_str: str, query: str, chunk_size: int = CHUNKSIZE):

@dlt.source(max_table_nesting=0)
def pg_resource_chunked(
name: str,
primary_key: list[str],
table_name: str,
primary_key: list[str],
schema_name: str,
order_date: str,
load_type: str = "merge",
columns: str = "*",
Expand All @@ -94,14 +91,14 @@ def pg_resource_chunked(
],
):
print(
f"dlt.resource write_dispostion: `{load_type}` -- ",
f"connection string: postgresql://{credentials.username}:*****@{credentials.host}:{credentials.host}/{credentials.database}"
f"dlt.resource write_disposition: `{load_type}` -- ",
f"connection string: postgresql://{credentials.username}:*****@{credentials.host}:{credentials.host}/{credentials.database}",
)

query = f"SELECT {columns} FROM {table_name} ORDER BY {order_date}" # Needed to have an idempotent query
query = f"SELECT {columns} FROM {schema_name}.{table_name} ORDER BY {order_date}" # Needed to have an idempotent query

source = dlt.resource(
name=name,
name=table_name,
table_name=table_name,
write_disposition=load_type, # use `replace` for initial load, `merge` for incremental
primary_key=primary_key,
Expand All @@ -119,11 +116,11 @@ def pg_resource_chunked(
return source


def table_desc(name, pk, table_name, order_date, columns="*"):
def table_desc(table_name, pk, schema_name, order_date, columns="*"):
return {
"name": name,
"pk": pk,
"table_name": table_name,
"pk": pk,
"schema_name": schema_name,
"order_date": order_date,
"columns": columns,
}
Expand All @@ -138,9 +135,13 @@ def table_desc(name, pk, table_name, order_date, columns="*"):
parser.add_argument("--merge", action="store_true", help="Run delta load")
args = parser.parse_args()

source_schema_name = "example_data_1"
target_schema_name = "example_data_2"
pipeline_name = "loading_postgres_to_postgres"

tables = [
table_desc("table_1", ["pk"], f"{schema_name}.table_1", "updated_at"),
table_desc("table_2", ["pk"], f"{schema_name}.table_2", "updated_at"),
table_desc("table_1", ["pk"], source_schema_name, "updated_at"),
table_desc("table_2", ["pk"], source_schema_name, "updated_at"),
]

if args.replace:
Expand All @@ -155,9 +156,9 @@ def table_desc(name, pk, table_name, order_date, columns="*"):
for table in tables:
resources.append(
pg_resource_chunked(
table["name"],
table["pk"],
table["table_name"],
table["pk"],
table["schema_name"],
table["order_date"],
load_type=load_type,
columns=table["columns"],
Expand All @@ -168,15 +169,15 @@ def table_desc(name, pk, table_name, order_date, columns="*"):
pipeline = dlt.pipeline(
pipeline_name=pipeline_name,
destination="duckdb",
dataset_name=dataset_name,
dataset_name=target_schema_name,
full_refresh=True,
progress="alive_progress",
)
else:
pipeline = dlt.pipeline(
pipeline_name=pipeline_name,
destination="postgres",
dataset_name=dataset_name,
dataset_name=target_schema_name,
full_refresh=False,
) # full_refresh=False

Expand All @@ -191,7 +192,9 @@ def table_desc(name, pk, table_name, order_date, columns="*"):
# 2. normalize
print("##################################### START NORMALIZATION ########")
if load_type == "replace":
info = pipeline.normalize(workers=2, loader_file_format="parquet") # https://dlthub.com/docs/blog/dlt-arrow-loading
info = pipeline.normalize(
workers=2, loader_file_format="parquet"
) # https://dlthub.com/docs/blog/dlt-arrow-loading
else:
info = pipeline.normalize()

Expand All @@ -209,51 +212,69 @@ def table_desc(name, pk, table_name, order_date, columns="*"):
# 4. Load DuckDB local database into Postgres
print("##################################### START DUCKDB LOAD ########")
conn = duckdb.connect(f"{pipeline_name}.duckdb")
conn.install_extension("./postgres_scanner.duckdb_extension") # duckdb_extension is downloaded/installed manually here, only needed if `LOAD/INSTALL postgres` throws an error
conn.install_extension(
"./postgres_scanner.duckdb_extension"
) # duckdb_extension is downloaded/installed manually here, only needed if `LOAD/INSTALL postgres` throws an error
conn.sql("LOAD postgres;")
# select generated timestamp schema
timestamped_schema = conn.sql(f"""select distinct table_schema from information_schema.tables
where table_schema like '{dataset_name}%'
timestamped_schema = conn.sql(
f"""select distinct table_schema from information_schema.tables
where table_schema like '{target_schema_name}%'
and table_schema NOT LIKE '%_staging'
order by table_schema desc""").fetchone()[0] # type: ignore
order by table_schema desc"""
).fetchone()[0] # type: ignore
print(f"timestamped_schema: {timestamped_schema}")

target_credentials = dlt.secrets["sources.postgres.credentials"]
target_credentials = ConnectionStringCredentials(dlt.secrets["destination.postgres.credentials"])
# connect to destination (timestamped schema)
conn.sql(
f"ATTACH 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}' AS pg_db (TYPE postgres);")
f"ATTACH 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}' AS pg_db (TYPE postgres);"
)
conn.sql(f"CREATE SCHEMA IF NOT EXISTS pg_db.{timestamped_schema};")

for table in tables:
print(f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['name']} TO Postgres {timestamped_schema}.{table['name']}")
print(
f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['table_name']} TO Postgres {timestamped_schema}.{table['table_name']}"
)

conn.sql(f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['name']} AS SELECT * FROM {timestamped_schema}.{table['name']};")
conn.sql(f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['name']};").show()
conn.sql(
f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['table_name']} AS SELECT * FROM {timestamped_schema}.{table['table_name']};"
)
conn.sql(
f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['table_name']};"
).show()

print(f"--Time elapsed: {datetime.now() - startTime}")
print("##################################### FINISHED ########")

# 5. Cleanup and rename Schema
print("##################################### RENAME Schema and CLEANUP ########")
print(
"##################################### RENAME Schema and CLEANUP ########"
)
try:
con_hd = psycopg2.connect(
dbname=target_credentials.database,
user=target_credentials.username,
password=target_credentials.password,
host=target_credentials.host,
port=target_credentials.port
port=target_credentials.port,
)
con_hd.autocommit = True
print("Connected to HD-DB: " + target_credentials.host + ", DB: " + target_credentials.username)
print(
"Connected to HD-DB: "
+ target_credentials.host
+ ", DB: "
+ target_credentials.username
)
except:
print("Unable to connect to HD-database!")

with con_hd.cursor() as cur:
# Drop existing lzn
print(f"Drop existing lzn")
cur.execute(f"DROP SCHEMA IF EXISTS lzn CASCADE;")
# Rename timestamped-lzn to lzn
print(f"Going to rename schema {timestamped_schema} to lzn")
cur.execute(f"ALTER SCHEMA {timestamped_schema} RENAME TO lzn;")
# Drop existing target_schema_name
print(f"Drop existing {target_schema_name}")
cur.execute(f"DROP SCHEMA IF EXISTS {target_schema_name} CASCADE;")
# Rename timestamped-target_schema_name to target_schema_name
print(f"Going to rename schema {timestamped_schema} to {target_schema_name}")
cur.execute(f"ALTER SCHEMA {timestamped_schema} RENAME TO {target_schema_name};")

con_hd.close()

0 comments on commit 2f309bb

Please sign in to comment.