diff --git a/.talismanrc b/.talismanrc index d9e2b3c12..d491e668b 100644 --- a/.talismanrc +++ b/.talismanrc @@ -4,11 +4,13 @@ fileignoreconfig: - filename: README.md checksum: df312ccb4c75fc4c2441a1f7f2c7817ee98ffb3065c78d5d7d6addf6ab129176 - filename: analytics/dagster/src/assets/populate_housings_ban_addresses.py - checksum: 08c9deed34bb3cce9e77601a1db4d9e8ca8acc5862c8ee7f73f6164c30a45946 + checksum: 66b41821bccc209598ed3d082e5666102edf52ae854b41db3f0b3fe3640657b7 - filename: analytics/dagster/src/assets/populate_owners_ban_addresses.py - checksum: b068d130431a26be58c78e4f14346be53d318110c4020c7f7212069cbd190bf2 + checksum: 6d33b062918f2957e659ddf9f63413fe273ab88b34ba31932d8b9cfda996a1f1 - filename: analytics/dagster/src/resources/ban_config.py checksum: 034c6924978983da0ca5897bb06b64598a5a813dc93d1d9e8f8a62da952d4d22 +- filename: analytics/dagster/src/resources/database_resources.py + checksum: 12fb6c30e1a0378c39cd1da759ec1ece28bda86ea6353c3ea0076c2d94da682e - filename: frontend/.env.example checksum: 7e2a5ff197c49ff9f715b3d189da7282bdb40de53ea49735e9f183ece19168fc - filename: frontend/src/components/Draft/DraftSender.tsx diff --git a/analytics/dagster/src/assets/populate_housings_ban_addresses.py b/analytics/dagster/src/assets/populate_housings_ban_addresses.py index 3efbff625..79ab414a4 100644 --- a/analytics/dagster/src/assets/populate_housings_ban_addresses.py +++ b/analytics/dagster/src/assets/populate_housings_ban_addresses.py @@ -1,14 +1,13 @@ -from dagster import asset, MetadataValue, AssetExecutionContext +from dagster import asset, MetadataValue, AssetExecutionContext, resource, op import requests import pandas as pd -import psycopg2 from io import StringIO -from sqlalchemy import create_engine -@asset(description="Return housing records from `fast_housing` that have no matching entry in `ban_addresses`.", required_resource_keys={"ban_config"}) +@asset( + description="Return housing records from `fast_housing` that have no matching entry in `ban_addresses`.", + required_resource_keys={"psycopg2_connection"} +) def housings_without_address(context: AssetExecutionContext): - config = context.resources.ban_config - query = """ SELECT fh.id as housing_id, array_to_string(fh.address_dgfip, ' ') as address_dgfip, fh.geo_code FROM fast_housing fh @@ -16,20 +15,19 @@ def housings_without_address(context: AssetExecutionContext): WHERE ba.ref_id IS NULL; """ - conn = psycopg2.connect( - dbname=config.db_name, - user=config.db_user, - password=config.db_password, - host=config.db_host, - port=config.db_port, - ) - df = pd.read_sql(query, conn) - conn.close() + try: + with context.resources.psycopg2_connection as conn: + df = pd.read_sql(query, conn) + except Exception as e: + context.log.error(f"Error executing query: {e}") + raise return df - -@asset(description="Write housing records without addresses to a CSV file, log the file path and a preview, then return the file path as metadata.", required_resource_keys={"ban_config"}) +@asset( + description="Write housing records without addresses to a CSV file, log the file path and a preview, then return the file path as metadata.", + required_resource_keys={"ban_config"} +) def create_csv_from_housings(context: AssetExecutionContext, housings_without_address): config = context.resources.ban_config @@ -48,8 +46,10 @@ def create_csv_from_housings(context: AssetExecutionContext, housings_without_ad "metadata": {"file_path": MetadataValue.text(csv_file_path)} } - -@asset(deps=[create_csv_from_housings], description="Send the local CSV file to the BAN address API for address processing. Raises an exception if the request fails.", required_resource_keys={"ban_config"}) +@asset( + deps=[create_csv_from_housings], description="Send the local CSV file to the BAN address API for address processing. Raises an exception if the request fails.", + required_resource_keys={"ban_config"} +) def send_csv_to_api(context: AssetExecutionContext): config = context.resources.ban_config @@ -65,55 +65,59 @@ def send_csv_to_api(context: AssetExecutionContext): raise Exception(f"API request failed with status code {response.status_code}") -@asset(description="Parse the CSV response from the BAN address API, insert valid addresses into the `ban_addresses` table, log a preview and any failed results, then return the total number of inserted records as metadata.", required_resource_keys={"ban_config"}) +@asset( + description="Parse the CSV response from the BAN address API, insert valid addresses into the `ban_addresses` table, log a preview and any failed results, then return the total number of inserted records as metadata.", + required_resource_keys={"sqlalchemy_engine"} +) def parse_api_response_and_insert_housing_addresses(context: AssetExecutionContext, send_csv_to_api): - config = context.resources.ban_config - api_df = pd.read_csv(StringIO(send_csv_to_api)) filtered_df = api_df[api_df['result_status'] == 'ok'] failed_rows = api_df[api_df['result_status'] != 'ok'] context.log.warning(f"Number of housings with failed API results: {len(failed_rows)}") + if not failed_rows.empty: + context.log.warning(f"Failed rows preview:\n{failed_rows.head(5)}") filtered_df = filtered_df.applymap(lambda x: None if pd.isna(x) else x) - - engine = create_engine(f'postgresql://{config.db_user}:{config.db_password}@{config.db_host}:{config.db_port}/{config.db_name}') - - filtered_df.to_sql( - 'ban_addresses', - engine, - if_exists='append', - index=False, - columns=[ - 'housing_id', - 'result_housenumber', - 'result_label', - 'result_street', - 'result_postcode', - 'result_city', - 'latitude', - 'longitude', - 'result_score', - 'result_id', - 'address_kind' - ], - dtype={ - 'housing_id': 'INTEGER', - 'result_housenumber': 'TEXT', - 'result_label': 'TEXT', - 'result_street': 'TEXT', - 'result_postcode': 'TEXT', - 'result_city': 'TEXT', - 'latitude': 'FLOAT', - 'longitude': 'FLOAT', - 'result_score': 'FLOAT', - 'result_id': 'TEXT', - 'address_kind': 'TEXT' - } - ) - - context.log.info(f"{len(api_df)} records inserted successfully.") + filtered_df['address_kind'] = "Housing" + engine = context.resources.sqlalchemy_engine + + with engine.begin() as connection: + filtered_df.to_sql( + 'ban_addresses', + connection, + if_exists='append', + index=False, + columns=[ + 'housing_id', + 'result_housenumber', + 'result_label', + 'result_street', + 'result_postcode', + 'result_city', + 'latitude', + 'longitude', + 'result_score', + 'result_id', + 'address_kind' + ], + dtype={ + 'housing_id': 'INTEGER', + 'result_housenumber': 'TEXT', + 'result_label': 'TEXT', + 'result_street': 'TEXT', + 'result_postcode': 'TEXT', + 'result_city': 'TEXT', + 'latitude': 'FLOAT', + 'longitude': 'FLOAT', + 'result_score': 'FLOAT', + 'result_id': 'TEXT', + 'address_kind': 'TEXT' + } + ) + + context.log.info(f"{len(filtered_df)} valid records inserted successfully.") return { - "metadata": {"num_records": MetadataValue.text(f"{len(api_df)} records inserted")} + "metadata": {"num_records": MetadataValue.text(f"{len(filtered_df)} records inserted")} } diff --git a/analytics/dagster/src/assets/populate_owners_ban_addresses.py b/analytics/dagster/src/assets/populate_owners_ban_addresses.py index b80fa2b0c..c44bafb7b 100644 --- a/analytics/dagster/src/assets/populate_owners_ban_addresses.py +++ b/analytics/dagster/src/assets/populate_owners_ban_addresses.py @@ -1,13 +1,13 @@ -from dagster import asset, Config, MetadataValue, AssetExecutionContext, Output +from dagster import asset, MetadataValue, AssetExecutionContext, Output, op import requests import pandas as pd -import psycopg2 from io import StringIO -@asset(description="Return owners with no BAN address or a non-validated BAN address (score < 1).", required_resource_keys={"ban_config"}) +@asset( + description="Return owners with no BAN address or a non-validated BAN address (score < 1).", + required_resource_keys={"psycopg2_connection"} +) def owners_without_address(context: AssetExecutionContext): - config = context.resources.ban_config - query = """ SELECT o.id as owner_id, @@ -18,19 +18,19 @@ def owners_without_address(context: AssetExecutionContext): OR (ba.ref_id IS NOT NULL AND ba.address_kind = 'Owner' AND ba.score < 1); -- Propriétaires avec adresse non validée par Stéphanie """ - conn = psycopg2.connect( - dbname=config.db_name, - user=config.db_user, - password=config.db_password, - host=config.db_host, - port=config.db_port, - ) - df = pd.read_sql(query, conn) - conn.close() + try: + with context.resources.psycopg2_connection as conn: + df = pd.read_sql(query, conn) + except Exception as e: + context.log.error(f"Error executing query: {e}") + raise return df -@asset(description="Split the owners DataFrame into multiple CSV files (chunks), store them to disk, and return the file paths as metadata.", required_resource_keys={"ban_config"}) +@asset( + description="Split the owners DataFrame into multiple CSV files (chunks), store them to disk, and return the file paths as metadata.", + required_resource_keys={"ban_config"} +) def create_csv_chunks_from_owners(context: AssetExecutionContext, owners_without_address): config = context.resources.ban_config @@ -62,7 +62,10 @@ def create_csv_chunks_from_owners(context: AssetExecutionContext, owners_without return Output(value=file_paths, metadata={"file_paths": MetadataValue.text(", ".join(file_paths))}) -@asset(description="Send each CSV chunk to the BAN address API, aggregate valid responses into a single CSV, and return the path to the aggregated CSV file.", required_resource_keys={"ban_config"}) +@asset( + description="Send each CSV chunk to the BAN address API, aggregate valid responses into a single CSV, and return the path to the aggregated CSV file.", + required_resource_keys={"ban_config"} +) def send_csv_chunks_to_api(context: AssetExecutionContext, create_csv_chunks_from_owners): config = context.resources.ban_config @@ -88,33 +91,47 @@ def send_csv_chunks_to_api(context: AssetExecutionContext, create_csv_chunks_fro return aggregated_file_path -@asset(description="Parse the aggregated CSV from the BAN address API, insert valid owners' addresses into `ban_addresses`, and return the count of processed records.", required_resource_keys={"ban_config"}) +@asset( + description="Parse the aggregated CSV from the BAN address API, insert valid owners' addresses into `ban_addresses`, and return the count of processed records.", + required_resource_keys={"psycopg2_connection"} +) def parse_api_response_and_insert_owners_addresses(context: AssetExecutionContext, send_csv_chunks_to_api): - config = context.resources.ban_config - api_df = pd.read_csv(send_csv_chunks_to_api) - conn = psycopg2.connect( - dbname=config.db_name, - user=config.db_user, - password=config.db_password, - host=config.db_host, - port=config.db_port, - ) - cursor = conn.cursor() - filtered_df = api_df[api_df['result_status'] == 'ok'] failed_rows = api_df[api_df['result_status'] != 'ok'] context.log.warning(f"Number of owners with failed API results: {len(failed_rows)}") - for _, row in filtered_df.iterrows(): - # L'API BAN renvoie des valeurs NaN pour les champs vides. Par exemple pour les lieux-dits il n'y a pas de numéro de rue ni de rue - row = row.apply(lambda x: None if pd.isna(x) else x) - - cursor.execute( - """ + filtered_df = filtered_df.applymap(lambda x: None if pd.isna(x) else x) + filtered_df['address_kind'] = "Owner" + + with context.resources.psycopg2_connection as conn: + with conn.cursor() as cursor: + cursor.execute(""" + CREATE TEMP TABLE temp_ban_addresses ( + ref_id TEXT, + house_number TEXT, + address TEXT, + street TEXT, + postal_code TEXT, + city TEXT, + latitude FLOAT, + longitude FLOAT, + score FLOAT, + ban_id TEXT, + address_kind TEXT + ); + """) + + buffer = StringIO() + filtered_df.to_csv(buffer, sep='\t', header=False, index=False) + buffer.seek(0) + cursor.copy_from(buffer, 'temp_ban_addresses', sep='\t') + + cursor.execute(""" INSERT INTO ban_addresses (ref_id, house_number, address, street, postal_code, city, latitude, longitude, score, ban_id, address_kind) - VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + SELECT ref_id, house_number, address, street, postal_code, city, latitude, longitude, score, ban_id, address_kind + FROM temp_ban_addresses ON CONFLICT (ref_id, address_kind) DO UPDATE SET house_number = EXCLUDED.house_number, @@ -126,28 +143,13 @@ def parse_api_response_and_insert_owners_addresses(context: AssetExecutionContex longitude = EXCLUDED.longitude, score = EXCLUDED.score, ban_id = EXCLUDED.ban_id; - """, - ( - row['owner_id'], - row['result_housenumber'], - row['result_label'], - row['result_street'], - row['result_postcode'], - row['result_city'], - row['latitude'], - row['longitude'], - row['result_score'], - row['result_id'], - "Owner" - ), - ) - - conn.commit() - cursor.close() - conn.close() - - context.log.info(f"{len(api_df)} records inserted successfully.") + """) + cursor.execute("DROP TABLE temp_ban_addresses;") + + conn.commit() + + context.log.info(f"{len(filtered_df)} valid records inserted successfully.") return { - "metadata": {"num_records": MetadataValue.text(f"{len(api_df)} records inserted")} + "metadata": {"num_records": MetadataValue.text(f"{len(filtered_df)} records inserted")} } diff --git a/analytics/dagster/src/jobs/housings_ban_addresses_job.py b/analytics/dagster/src/jobs/housings_ban_addresses_job.py index f64a942f7..131aeb9a4 100644 --- a/analytics/dagster/src/jobs/housings_ban_addresses_job.py +++ b/analytics/dagster/src/jobs/housings_ban_addresses_job.py @@ -1,10 +1,13 @@ from dagster import job from dagster.src.assets.populate_owners_ban_addresses import housings_without_address from dagster.src.resources.ban_config import ban_config_resource +from dagster.src.resources import sqlalchemy_engine_resource, postgres_resource @job( resource_defs={ "ban_config": ban_config_resource, + "sqlalchemy_engine": sqlalchemy_engine_resource, + "postgres": postgres_resource, } ) def housings_ban_addresses_job(): diff --git a/analytics/dagster/src/jobs/owners_ban_addresses_job.py b/analytics/dagster/src/jobs/owners_ban_addresses_job.py index 12558323e..a412613e1 100644 --- a/analytics/dagster/src/jobs/owners_ban_addresses_job.py +++ b/analytics/dagster/src/jobs/owners_ban_addresses_job.py @@ -1,10 +1,13 @@ from dagster import job from dagster.src.assets.populate_owners_ban_addresses import owners_without_address from dagster.src.resources.ban_config import ban_config_resource +from dagster.src.resources import sqlalchemy_engine_resource, postgres_resource @job( resource_defs={ "ban_config": ban_config_resource, + "sqlalchemy_engine": sqlalchemy_engine_resource, + "postgres": postgres_resource, } ) def owners_ban_addresses_job(): diff --git a/analytics/dagster/src/resources/ban_config.py b/analytics/dagster/src/resources/ban_config.py index c6565b368..07f35ac40 100644 --- a/analytics/dagster/src/resources/ban_config.py +++ b/analytics/dagster/src/resources/ban_config.py @@ -3,12 +3,6 @@ from dagster import resource class BANConfig(BaseSettings): - db_name: str = Field("isoprod", env="DB_NAME") - db_user: str = Field("postgres", env="DB_USER") - db_password: str = Field("postgres", env="DB_PASSWORD") - db_host: str = Field("localhost", env="DB_HOST") - db_port: str = Field("5432", env="DB_PORT") - api_url: str = Field("https://api-adresse.data.gouv.fr/search/csv/", env="BAN_API_URL") csv_file_path: str = Field("temp_csv", env="CSV_FILE_PATH") @@ -28,11 +22,6 @@ class Config: @resource( config_schema={ - "db_name": str, - "db_user": str, - "db_password": str, - "db_host": str, - "db_port": str, "api_url": str, "csv_file_path": str, "chunk_size": int, diff --git a/analytics/dagster/src/resources/database_resources.py b/analytics/dagster/src/resources/database_resources.py new file mode 100644 index 000000000..dcbe9e37e --- /dev/null +++ b/analytics/dagster/src/resources/database_resources.py @@ -0,0 +1,41 @@ +from dagster import resource +from sqlalchemy import create_engine +import psycopg2 + +@resource(config_schema={ + "db_name": str, + "db_user": str, + "db_password": str, + "db_host": str, + "db_port": int, +}) +def psycopg2_connection_resource(init_context): + config = init_context.resource_config + conn = psycopg2.connect( + dbname=config["db_name"], + user=config["db_user"], + password=config["db_password"], + host=config["db_host"], + port=config["db_port"], + ) + try: + yield conn + finally: + conn.close() + +@resource(config_schema={ + "db_name": str, + "db_user": str, + "db_password": str, + "db_host": str, + "db_port": int, +}) +def sqlalchemy_engine_resource(init_context): + config = init_context.resource_config + engine = create_engine( + f'postgresql://{config["db_user"]}:{config["db_password"]}@{config["db_host"]}:{config["db_port"]}/{config["db_name"]}' + ) + try: + yield engine + finally: + engine.dispose()