Skip to content

Commit

Permalink
feat: optimize data insertion with batch processing
Browse files Browse the repository at this point in the history
  • Loading branch information
loicguillois committed Jan 27, 2025
1 parent cea47c0 commit 1dbbe72
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 39 deletions.
2 changes: 1 addition & 1 deletion .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ fileignoreconfig:
- filename: README.md
checksum: df312ccb4c75fc4c2441a1f7f2c7817ee98ffb3065c78d5d7d6addf6ab129176
- filename: analytics/dagster/src/assets/populate_housings_ban_addresses.py
checksum: 4c16939cca930517d45f30128d79332784cf09c933a1510f0f02e949b1eaac87
checksum: 08c9deed34bb3cce9e77601a1db4d9e8ca8acc5862c8ee7f73f6164c30a45946
- filename: analytics/dagster/src/assets/populate_owners_ban_addresses.py
checksum: b068d130431a26be58c78e4f14346be53d318110c4020c7f7212069cbd190bf2
- filename: analytics/dagster/src/resources/ban_config.py
Expand Down
75 changes: 37 additions & 38 deletions analytics/dagster/src/assets/populate_housings_ban_addresses.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import pandas as pd
import psycopg2
from io import StringIO

from sqlalchemy import create_engine

@asset(description="Return housing records from `fast_housing` that have no matching entry in `ban_addresses`.", required_resource_keys={"ban_config"})
def housings_without_address(context: AssetExecutionContext):
Expand Down Expand Up @@ -71,47 +71,46 @@ def parse_api_response_and_insert_housing_addresses(context: AssetExecutionConte

api_df = pd.read_csv(StringIO(send_csv_to_api))

conn = psycopg2.connect(
dbname=config.db_name,
user=config.db_user,
password=config.db_password,
host=config.db_host,
port=config.db_port,
)
cursor = conn.cursor()

filtered_df = api_df[api_df['result_status'] == 'ok']
failed_rows = api_df[api_df['result_status'] != 'ok']
context.log.warning(f"Number of housings with failed API results: {len(failed_rows)}")

for _, row in filtered_df.iterrows():

# L'API BAN renvoie des valeurs NaN pour les champs vides. Par exemple pour les lieux-dits il n'y a pas de numéro de rue ni de rue
row = row.apply(lambda x: None if pd.isna(x) else x)

cursor.execute(
"""
INSERT INTO ban_addresses (ref_id, house_number, address, street, postal_code, city, latitude, longitude, score, ban_id, address_kind)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
""",
(
row['housing_id'],
row['result_housenumber'],
row['result_label'],
row['result_street'],
row['result_postcode'],
row['result_city'],
row['latitude'],
row['longitude'],
row['result_score'],
row['result_id'],
"Housing"
),
)

conn.commit()
cursor.close()
conn.close()
filtered_df = filtered_df.applymap(lambda x: None if pd.isna(x) else x)

engine = create_engine(f'postgresql://{config.db_user}:{config.db_password}@{config.db_host}:{config.db_port}/{config.db_name}')

filtered_df.to_sql(
'ban_addresses',
engine,
if_exists='append',
index=False,
columns=[
'housing_id',
'result_housenumber',
'result_label',
'result_street',
'result_postcode',
'result_city',
'latitude',
'longitude',
'result_score',
'result_id',
'address_kind'
],
dtype={
'housing_id': 'INTEGER',
'result_housenumber': 'TEXT',
'result_label': 'TEXT',
'result_street': 'TEXT',
'result_postcode': 'TEXT',
'result_city': 'TEXT',
'latitude': 'FLOAT',
'longitude': 'FLOAT',
'result_score': 'FLOAT',
'result_id': 'TEXT',
'address_kind': 'TEXT'
}
)

context.log.info(f"{len(api_df)} records inserted successfully.")

Expand Down

0 comments on commit 1dbbe72

Please sign in to comment.