Skip to content

Commit

Permalink
deleted unused functions, uncomment norm and loading
Browse files Browse the repository at this point in the history
  • Loading branch information
AstrakhantsevaAA committed May 31, 2024
1 parent 478f0f3 commit f456f19
Showing 1 changed file with 76 additions and 120 deletions.
196 changes: 76 additions & 120 deletions docs/examples/postgres_to_postgres/postgres_to_postgres.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,25 +25,15 @@
Be aware that you need to define the database credentials in `.dlt/secrets.toml` and adjust the tables names (table1 and table2):
Install `dlt` with `duckdb` as extra:
Install `dlt` with `duckdb` as extra, also `connectorx`, Postgres adapter and progress bar tool:
```sh
pip install duckdb
```
Install `connectorx` and `pyarrow`:
```sh
pip install connectorx pyarrow
```
Install Postgres adapter for Python:
```sh
pip install psycopg2
pip install duckdb connectorx psycopg2-binary alive-progress
```
Run the example:
```sh
postgres_to_postgres.py --replace
python postgres_to_postgres.py --replace
```
:::warn
Expand All @@ -61,7 +51,8 @@
from datetime import datetime

import connectorx as cx

import duckdb
import psycopg2
import dlt
from dlt.sources.credentials import ConnectionStringCredentials

Expand Down Expand Up @@ -138,43 +129,7 @@ def table_desc(name, pk, table_name, order_date, columns="*"):
}


def fetch_schema_tables(cursor, schema_name):
# Query to get the list of all tables in the current database
query = f"""
SELECT table_name
FROM information_schema.tables
WHERE table_schema = '{schema_name}'
"""

cursor.execute(query)
tables = cursor.fetchall()
return [t[0] for t in tables]


if __name__ == "__main__":
import psycopg2
import dlt

postgres_creds = dlt.secrets['sources.postgres.credentials']
try:
connection = psycopg2.connect(postgres_creds)
cursor = connection.cursor()
print("Connected to the PostgreSQL database successfully!")

tables = fetch_schema_tables(cursor, 'example_data')

# Print the table names
print("Tables in the database:")
for table in tables:
print(table)

# Close the cursor and connection
cursor.close()
connection.close()

except psycopg2.Error as error:
print(f"Error: {error}")

# Input Handling
parser = argparse.ArgumentParser(
description="Run specific functions in the script."
Expand All @@ -184,8 +139,8 @@ def fetch_schema_tables(cursor, schema_name):
args = parser.parse_args()

tables = [
table_desc("table1", ["pk"], f"{schema_name}.table1", "updated_at"),
table_desc("table2", ["pk"], f"{schema_name}.table2", "updated_at"),
table_desc("table_1", ["pk"], f"{schema_name}.table_1", "updated_at"),
table_desc("table_2", ["pk"], f"{schema_name}.table_2", "updated_at"),
]

if args.replace:
Expand Down Expand Up @@ -233,71 +188,72 @@ def fetch_schema_tables(cursor, schema_name):
pipeline.extract(resources)
print(f"--Time elapsed: {datetime.now() - startTime}")

# # 2. normalize
# print("##################################### START NORMALIZATION ########")
# if load_type == "replace":
# info = pipeline.normalize(workers=2, loader_file_format="parquet") # https://dlthub.com/docs/blog/dlt-arrow-loading
# else:
# info = pipeline.normalize()
#
# print(info)
# print(pipeline.last_trace.last_normalize_info)
# print(f"--Time elapsed: {datetime.now() - startTime}")
#
# # 3. load
# print("##################################### START LOAD ########")
# info = pipeline.load()
# print(info)
# print(f"--Time elapsed: {datetime.now() - startTime}")
#
# if load_type == "replace":
# # 4. Load DuckDB local database into Postgres
# print("##################################### START DUCKDB LOAD ########")
# conn = duckdb.connect(f"{pipeline_name}.duckdb")
# conn.install_extension("./postgres_scanner.duckdb_extension") # duckdb_extension is downloaded/installed manually here, only needed if `LOAD/INSTALL postgres` throws an error
# conn.sql("LOAD postgres;")
# # select generated timestamp schema
# timestamped_schema = conn.sql(f"""select distinct table_schema from information_schema.tables
# where table_schema like '{dataset_name}%'
# and table_schema NOT LIKE '%_staging'
# order by table_schema desc""").fetchone()[0] # type: ignore
# print(f"timestamped_schema: {timestamped_schema}")
#
# # connect to destination (timestamped schema)
# conn.sql(
# f"ATTACH 'dbname={POSTGRES_DATABASE_TARGET} user={POSTGRES_USERNAME_TARGET} password={POSTGRES_PASSWORD_TARGET} host={POSTGRES_HOST_TARGET} port={POSTGRES_PORT_TARGET}' AS pg_db (TYPE postgres);")
# conn.sql(f"CREATE SCHEMA IF NOT EXISTS pg_db.{timestamped_schema};")
#
# for table in tables:
# print(f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['name']} TO Postgres {timestamped_schema}.{table['name']}")
#
# conn.sql(f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['name']} AS SELECT * FROM {timestamped_schema}.{table['name']};")
# conn.sql(f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['name']};").show()
#
# print(f"--Time elapsed: {datetime.now() - startTime}")
# print("##################################### FINISHED ########")
#
# # 5. Cleanup and rename Schema
# print("##################################### RENAME Schema and CLEANUP ########")
# try:
# con_hd = psycopg2.connect(
# dbname=POSTGRES_DATABASE_TARGET,
# user=POSTGRES_USERNAME_TARGET,
# password=POSTGRES_PASSWORD_TARGET,
# host=POSTGRES_HOST_TARGET,
# port=POSTGRES_PORT_TARGET
# )
# con_hd.autocommit = True
# print("Connected to HD-DB: " + POSTGRES_HOST_TARGET + ", DB: " + POSTGRES_USERNAME_TARGET)
# except:
# print("Unable to connect to HD-database!")
#
# with con_hd.cursor() as cur:
# # Drop existing lzn
# print(f"Drop existing lzn")
# cur.execute(f"DROP SCHEMA IF EXISTS lzn CASCADE;")
# # Rename timestamped-lzn to lzn
# print(f"Going to rename schema {timestamped_schema} to lzn")
# cur.execute(f"ALTER SCHEMA {timestamped_schema} RENAME TO lzn;")
#
# con_hd.close()
# 2. normalize
print("##################################### START NORMALIZATION ########")
if load_type == "replace":
info = pipeline.normalize(workers=2, loader_file_format="parquet") # https://dlthub.com/docs/blog/dlt-arrow-loading
else:
info = pipeline.normalize()

print(info)
print(pipeline.last_trace.last_normalize_info)
print(f"--Time elapsed: {datetime.now() - startTime}")

# 3. load
print("##################################### START LOAD ########")
info = pipeline.load()
print(info)
print(f"--Time elapsed: {datetime.now() - startTime}")

if load_type == "replace":
# 4. Load DuckDB local database into Postgres
print("##################################### START DUCKDB LOAD ########")
conn = duckdb.connect(f"{pipeline_name}.duckdb")
conn.install_extension("./postgres_scanner.duckdb_extension") # duckdb_extension is downloaded/installed manually here, only needed if `LOAD/INSTALL postgres` throws an error
conn.sql("LOAD postgres;")
# select generated timestamp schema
timestamped_schema = conn.sql(f"""select distinct table_schema from information_schema.tables
where table_schema like '{dataset_name}%'
and table_schema NOT LIKE '%_staging'
order by table_schema desc""").fetchone()[0] # type: ignore
print(f"timestamped_schema: {timestamped_schema}")

target_credentials = dlt.secrets["sources.postgres.credentials"]
# connect to destination (timestamped schema)
conn.sql(
f"ATTACH 'dbname={target_credentials.database} user={target_credentials.username} password={target_credentials.password} host={target_credentials.host} port={target_credentials.port}' AS pg_db (TYPE postgres);")
conn.sql(f"CREATE SCHEMA IF NOT EXISTS pg_db.{timestamped_schema};")

for table in tables:
print(f"LOAD DuckDB -> Postgres: table: {timestamped_schema}.{table['name']} TO Postgres {timestamped_schema}.{table['name']}")

conn.sql(f"CREATE OR REPLACE TABLE pg_db.{timestamped_schema}.{table['name']} AS SELECT * FROM {timestamped_schema}.{table['name']};")
conn.sql(f"SELECT count(*) as count FROM pg_db.{timestamped_schema}.{table['name']};").show()

print(f"--Time elapsed: {datetime.now() - startTime}")
print("##################################### FINISHED ########")

# 5. Cleanup and rename Schema
print("##################################### RENAME Schema and CLEANUP ########")
try:
con_hd = psycopg2.connect(
dbname=target_credentials.database,
user=target_credentials.username,
password=target_credentials.password,
host=target_credentials.host,
port=target_credentials.port
)
con_hd.autocommit = True
print("Connected to HD-DB: " + target_credentials.host + ", DB: " + target_credentials.username)
except:
print("Unable to connect to HD-database!")

with con_hd.cursor() as cur:
# Drop existing lzn
print(f"Drop existing lzn")
cur.execute(f"DROP SCHEMA IF EXISTS lzn CASCADE;")
# Rename timestamped-lzn to lzn
print(f"Going to rename schema {timestamped_schema} to lzn")
cur.execute(f"ALTER SCHEMA {timestamped_schema} RENAME TO lzn;")

con_hd.close()

0 comments on commit f456f19

Please sign in to comment.