Skip to content

Commit

Permalink
feat: optimize and refactor BAN API response parsing and insertion wo…
Browse files Browse the repository at this point in the history
…rkflows
  • Loading branch information
loicguillois committed Jan 27, 2025
1 parent 1dbbe72 commit 0a7a5fe
Show file tree
Hide file tree
Showing 7 changed files with 174 additions and 130 deletions.
6 changes: 4 additions & 2 deletions .talismanrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,13 @@ fileignoreconfig:
- filename: README.md
checksum: df312ccb4c75fc4c2441a1f7f2c7817ee98ffb3065c78d5d7d6addf6ab129176
- filename: analytics/dagster/src/assets/populate_housings_ban_addresses.py
checksum: 08c9deed34bb3cce9e77601a1db4d9e8ca8acc5862c8ee7f73f6164c30a45946
checksum: 66b41821bccc209598ed3d082e5666102edf52ae854b41db3f0b3fe3640657b7
- filename: analytics/dagster/src/assets/populate_owners_ban_addresses.py
checksum: b068d130431a26be58c78e4f14346be53d318110c4020c7f7212069cbd190bf2
checksum: 6d33b062918f2957e659ddf9f63413fe273ab88b34ba31932d8b9cfda996a1f1
- filename: analytics/dagster/src/resources/ban_config.py
checksum: 034c6924978983da0ca5897bb06b64598a5a813dc93d1d9e8f8a62da952d4d22
- filename: analytics/dagster/src/resources/database_resources.py
checksum: 12fb6c30e1a0378c39cd1da759ec1ece28bda86ea6353c3ea0076c2d94da682e
- filename: frontend/.env.example
checksum: 7e2a5ff197c49ff9f715b3d189da7282bdb40de53ea49735e9f183ece19168fc
- filename: frontend/src/components/Draft/DraftSender.tsx
Expand Down
124 changes: 64 additions & 60 deletions analytics/dagster/src/assets/populate_housings_ban_addresses.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,33 @@
from dagster import asset, MetadataValue, AssetExecutionContext
from dagster import asset, MetadataValue, AssetExecutionContext, resource, op
import requests
import pandas as pd
import psycopg2
from io import StringIO
from sqlalchemy import create_engine

@asset(description="Return housing records from `fast_housing` that have no matching entry in `ban_addresses`.", required_resource_keys={"ban_config"})
@asset(
description="Return housing records from `fast_housing` that have no matching entry in `ban_addresses`.",
required_resource_keys={"psycopg2_connection"}
)
def housings_without_address(context: AssetExecutionContext):
config = context.resources.ban_config

query = """
SELECT fh.id as housing_id, array_to_string(fh.address_dgfip, ' ') as address_dgfip, fh.geo_code
FROM fast_housing fh
LEFT JOIN ban_addresses ba ON fh.id = ba.ref_id
WHERE ba.ref_id IS NULL;
"""

conn = psycopg2.connect(
dbname=config.db_name,
user=config.db_user,
password=config.db_password,
host=config.db_host,
port=config.db_port,
)
df = pd.read_sql(query, conn)
conn.close()
try:
with context.resources.psycopg2_connection as conn:
df = pd.read_sql(query, conn)
except Exception as e:
context.log.error(f"Error executing query: {e}")
raise

return df


@asset(description="Write housing records without addresses to a CSV file, log the file path and a preview, then return the file path as metadata.", required_resource_keys={"ban_config"})
@asset(
description="Write housing records without addresses to a CSV file, log the file path and a preview, then return the file path as metadata.",
required_resource_keys={"ban_config"}
)
def create_csv_from_housings(context: AssetExecutionContext, housings_without_address):
config = context.resources.ban_config

Expand All @@ -48,8 +46,10 @@ def create_csv_from_housings(context: AssetExecutionContext, housings_without_ad
"metadata": {"file_path": MetadataValue.text(csv_file_path)}
}


@asset(deps=[create_csv_from_housings], description="Send the local CSV file to the BAN address API for address processing. Raises an exception if the request fails.", required_resource_keys={"ban_config"})
@asset(
deps=[create_csv_from_housings], description="Send the local CSV file to the BAN address API for address processing. Raises an exception if the request fails.",
required_resource_keys={"ban_config"}
)
def send_csv_to_api(context: AssetExecutionContext):
config = context.resources.ban_config

Expand All @@ -65,55 +65,59 @@ def send_csv_to_api(context: AssetExecutionContext):
raise Exception(f"API request failed with status code {response.status_code}")


@asset(description="Parse the CSV response from the BAN address API, insert valid addresses into the `ban_addresses` table, log a preview and any failed results, then return the total number of inserted records as metadata.", required_resource_keys={"ban_config"})
@asset(
description="Parse the CSV response from the BAN address API, insert valid addresses into the `ban_addresses` table, log a preview and any failed results, then return the total number of inserted records as metadata.",
required_resource_keys={"sqlalchemy_engine"}
)
def parse_api_response_and_insert_housing_addresses(context: AssetExecutionContext, send_csv_to_api):
config = context.resources.ban_config

api_df = pd.read_csv(StringIO(send_csv_to_api))

filtered_df = api_df[api_df['result_status'] == 'ok']
failed_rows = api_df[api_df['result_status'] != 'ok']
context.log.warning(f"Number of housings with failed API results: {len(failed_rows)}")
if not failed_rows.empty:
context.log.warning(f"Failed rows preview:\n{failed_rows.head(5)}")

filtered_df = filtered_df.applymap(lambda x: None if pd.isna(x) else x)

engine = create_engine(f'postgresql://{config.db_user}:{config.db_password}@{config.db_host}:{config.db_port}/{config.db_name}')

filtered_df.to_sql(
'ban_addresses',
engine,
if_exists='append',
index=False,
columns=[
'housing_id',
'result_housenumber',
'result_label',
'result_street',
'result_postcode',
'result_city',
'latitude',
'longitude',
'result_score',
'result_id',
'address_kind'
],
dtype={
'housing_id': 'INTEGER',
'result_housenumber': 'TEXT',
'result_label': 'TEXT',
'result_street': 'TEXT',
'result_postcode': 'TEXT',
'result_city': 'TEXT',
'latitude': 'FLOAT',
'longitude': 'FLOAT',
'result_score': 'FLOAT',
'result_id': 'TEXT',
'address_kind': 'TEXT'
}
)

context.log.info(f"{len(api_df)} records inserted successfully.")
filtered_df['address_kind'] = "Housing"
engine = context.resources.sqlalchemy_engine

with engine.begin() as connection:
filtered_df.to_sql(
'ban_addresses',
connection,
if_exists='append',
index=False,
columns=[
'housing_id',
'result_housenumber',
'result_label',
'result_street',
'result_postcode',
'result_city',
'latitude',
'longitude',
'result_score',
'result_id',
'address_kind'
],
dtype={
'housing_id': 'INTEGER',
'result_housenumber': 'TEXT',
'result_label': 'TEXT',
'result_street': 'TEXT',
'result_postcode': 'TEXT',
'result_city': 'TEXT',
'latitude': 'FLOAT',
'longitude': 'FLOAT',
'result_score': 'FLOAT',
'result_id': 'TEXT',
'address_kind': 'TEXT'
}
)

context.log.info(f"{len(filtered_df)} valid records inserted successfully.")

return {
"metadata": {"num_records": MetadataValue.text(f"{len(api_df)} records inserted")}
"metadata": {"num_records": MetadataValue.text(f"{len(filtered_df)} records inserted")}
}
116 changes: 59 additions & 57 deletions analytics/dagster/src/assets/populate_owners_ban_addresses.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from dagster import asset, Config, MetadataValue, AssetExecutionContext, Output
from dagster import asset, MetadataValue, AssetExecutionContext, Output, op
import requests
import pandas as pd
import psycopg2
from io import StringIO

@asset(description="Return owners with no BAN address or a non-validated BAN address (score < 1).", required_resource_keys={"ban_config"})
@asset(
description="Return owners with no BAN address or a non-validated BAN address (score < 1).",
required_resource_keys={"psycopg2_connection"}
)
def owners_without_address(context: AssetExecutionContext):
config = context.resources.ban_config

query = """
SELECT
o.id as owner_id,
Expand All @@ -18,19 +18,19 @@ def owners_without_address(context: AssetExecutionContext):
OR (ba.ref_id IS NOT NULL AND ba.address_kind = 'Owner' AND ba.score < 1); -- Propriétaires avec adresse non validée par Stéphanie
"""

conn = psycopg2.connect(
dbname=config.db_name,
user=config.db_user,
password=config.db_password,
host=config.db_host,
port=config.db_port,
)
df = pd.read_sql(query, conn)
conn.close()
try:
with context.resources.psycopg2_connection as conn:
df = pd.read_sql(query, conn)
except Exception as e:
context.log.error(f"Error executing query: {e}")
raise

return df

@asset(description="Split the owners DataFrame into multiple CSV files (chunks), store them to disk, and return the file paths as metadata.", required_resource_keys={"ban_config"})
@asset(
description="Split the owners DataFrame into multiple CSV files (chunks), store them to disk, and return the file paths as metadata.",
required_resource_keys={"ban_config"}
)
def create_csv_chunks_from_owners(context: AssetExecutionContext, owners_without_address):
config = context.resources.ban_config

Expand Down Expand Up @@ -62,7 +62,10 @@ def create_csv_chunks_from_owners(context: AssetExecutionContext, owners_without
return Output(value=file_paths, metadata={"file_paths": MetadataValue.text(", ".join(file_paths))})


@asset(description="Send each CSV chunk to the BAN address API, aggregate valid responses into a single CSV, and return the path to the aggregated CSV file.", required_resource_keys={"ban_config"})
@asset(
description="Send each CSV chunk to the BAN address API, aggregate valid responses into a single CSV, and return the path to the aggregated CSV file.",
required_resource_keys={"ban_config"}
)
def send_csv_chunks_to_api(context: AssetExecutionContext, create_csv_chunks_from_owners):
config = context.resources.ban_config

Expand All @@ -88,33 +91,47 @@ def send_csv_chunks_to_api(context: AssetExecutionContext, create_csv_chunks_fro

return aggregated_file_path

@asset(description="Parse the aggregated CSV from the BAN address API, insert valid owners' addresses into `ban_addresses`, and return the count of processed records.", required_resource_keys={"ban_config"})
@asset(
description="Parse the aggregated CSV from the BAN address API, insert valid owners' addresses into `ban_addresses`, and return the count of processed records.",
required_resource_keys={"psycopg2_connection"}
)
def parse_api_response_and_insert_owners_addresses(context: AssetExecutionContext, send_csv_chunks_to_api):
config = context.resources.ban_config

api_df = pd.read_csv(send_csv_chunks_to_api)

conn = psycopg2.connect(
dbname=config.db_name,
user=config.db_user,
password=config.db_password,
host=config.db_host,
port=config.db_port,
)
cursor = conn.cursor()

filtered_df = api_df[api_df['result_status'] == 'ok']
failed_rows = api_df[api_df['result_status'] != 'ok']
context.log.warning(f"Number of owners with failed API results: {len(failed_rows)}")

for _, row in filtered_df.iterrows():
# L'API BAN renvoie des valeurs NaN pour les champs vides. Par exemple pour les lieux-dits il n'y a pas de numéro de rue ni de rue
row = row.apply(lambda x: None if pd.isna(x) else x)

cursor.execute(
"""
filtered_df = filtered_df.applymap(lambda x: None if pd.isna(x) else x)
filtered_df['address_kind'] = "Owner"

with context.resources.psycopg2_connection as conn:
with conn.cursor() as cursor:
cursor.execute("""
CREATE TEMP TABLE temp_ban_addresses (
ref_id TEXT,
house_number TEXT,
address TEXT,
street TEXT,
postal_code TEXT,
city TEXT,
latitude FLOAT,
longitude FLOAT,
score FLOAT,
ban_id TEXT,
address_kind TEXT
);
""")

buffer = StringIO()
filtered_df.to_csv(buffer, sep='\t', header=False, index=False)
buffer.seek(0)
cursor.copy_from(buffer, 'temp_ban_addresses', sep='\t')

cursor.execute("""
INSERT INTO ban_addresses (ref_id, house_number, address, street, postal_code, city, latitude, longitude, score, ban_id, address_kind)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
SELECT ref_id, house_number, address, street, postal_code, city, latitude, longitude, score, ban_id, address_kind
FROM temp_ban_addresses
ON CONFLICT (ref_id, address_kind)
DO UPDATE SET
house_number = EXCLUDED.house_number,
Expand All @@ -126,28 +143,13 @@ def parse_api_response_and_insert_owners_addresses(context: AssetExecutionContex
longitude = EXCLUDED.longitude,
score = EXCLUDED.score,
ban_id = EXCLUDED.ban_id;
""",
(
row['owner_id'],
row['result_housenumber'],
row['result_label'],
row['result_street'],
row['result_postcode'],
row['result_city'],
row['latitude'],
row['longitude'],
row['result_score'],
row['result_id'],
"Owner"
),
)

conn.commit()
cursor.close()
conn.close()

context.log.info(f"{len(api_df)} records inserted successfully.")
""")
cursor.execute("DROP TABLE temp_ban_addresses;")

conn.commit()

context.log.info(f"{len(filtered_df)} valid records inserted successfully.")

return {
"metadata": {"num_records": MetadataValue.text(f"{len(api_df)} records inserted")}
"metadata": {"num_records": MetadataValue.text(f"{len(filtered_df)} records inserted")}
}
3 changes: 3 additions & 0 deletions analytics/dagster/src/jobs/housings_ban_addresses_job.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from dagster import job
from dagster.src.assets.populate_owners_ban_addresses import housings_without_address
from dagster.src.resources.ban_config import ban_config_resource
from dagster.src.resources import sqlalchemy_engine_resource, postgres_resource

@job(
resource_defs={
"ban_config": ban_config_resource,
"sqlalchemy_engine": sqlalchemy_engine_resource,
"postgres": postgres_resource,
}
)
def housings_ban_addresses_job():
Expand Down
3 changes: 3 additions & 0 deletions analytics/dagster/src/jobs/owners_ban_addresses_job.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
from dagster import job
from dagster.src.assets.populate_owners_ban_addresses import owners_without_address
from dagster.src.resources.ban_config import ban_config_resource
from dagster.src.resources import sqlalchemy_engine_resource, postgres_resource

@job(
resource_defs={
"ban_config": ban_config_resource,
"sqlalchemy_engine": sqlalchemy_engine_resource,
"postgres": postgres_resource,
}
)
def owners_ban_addresses_job():
Expand Down
11 changes: 0 additions & 11 deletions analytics/dagster/src/resources/ban_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,6 @@
from dagster import resource

class BANConfig(BaseSettings):
db_name: str = Field("isoprod", env="DB_NAME")
db_user: str = Field("postgres", env="DB_USER")
db_password: str = Field("postgres", env="DB_PASSWORD")
db_host: str = Field("localhost", env="DB_HOST")
db_port: str = Field("5432", env="DB_PORT")

api_url: str = Field("https://api-adresse.data.gouv.fr/search/csv/", env="BAN_API_URL")
csv_file_path: str = Field("temp_csv", env="CSV_FILE_PATH")

Expand All @@ -28,11 +22,6 @@ class Config:

@resource(
config_schema={
"db_name": str,
"db_user": str,
"db_password": str,
"db_host": str,
"db_port": str,
"api_url": str,
"csv_file_path": str,
"chunk_size": int,
Expand Down
Loading

0 comments on commit 0a7a5fe

Please sign in to comment.