From ec131fbc19338f46271f3db9f2d0b6677510f03a Mon Sep 17 00:00:00 2001 From: Rodrigo Cunha Date: Tue, 23 Jul 2024 20:01:28 -0300 Subject: [PATCH 1/2] commit inicial --- queries/dbt_project.yml | 4 + queries/dev/utils.py | 13 +++ .../models/gtfs/staging/shapes_geom_gtfs2.py | 103 ++++++++++++++++++ 3 files changed, 120 insertions(+) create mode 100644 queries/models/gtfs/staging/shapes_geom_gtfs2.py diff --git a/queries/dbt_project.yml b/queries/dbt_project.yml index 4ae666c1..8a278793 100644 --- a/queries/dbt_project.yml +++ b/queries/dbt_project.yml @@ -252,6 +252,10 @@ models: +materialized: incremental +incremental_strategy: insert_overwrite +schema: gtfs + staging: + +materialized: incremental + +incremental_strategy: insert_overwrite + +schema: gtfs_staging indicadores_continuados_egp: +materialized: view +schema: indicadores_continuados_egp diff --git a/queries/dev/utils.py b/queries/dev/utils.py index db021da6..b7331d20 100644 --- a/queries/dev/utils.py +++ b/queries/dev/utils.py @@ -4,6 +4,7 @@ # from datetime import datetime as dt # from datetime import timedelta from typing import Dict, List, Union +import requests # import pandas as pd @@ -62,3 +63,15 @@ def run_dbt_model( print(f"\n>>> RUNNING: {run_command}\n") os.system(run_command) + +def fetch_dataset_sha(dataset_id: str): + """Fetches the SHA of a branch from Github""" + url = "https://api.github.com/repos/prefeitura-rio/pipelines_rj_smtr" + url += f"/commits?pipelines_rj_smtr/{dataset_id}" + response = requests.get(url) + + if response.status_code != 200: + return None + + dataset_version = response.json()[0]["sha"] + return {"version": dataset_version} \ No newline at end of file diff --git a/queries/models/gtfs/staging/shapes_geom_gtfs2.py b/queries/models/gtfs/staging/shapes_geom_gtfs2.py new file mode 100644 index 00000000..33263dcb --- /dev/null +++ b/queries/models/gtfs/staging/shapes_geom_gtfs2.py @@ -0,0 +1,103 @@ +import pandas as pd +import geopandas as gpd +import shapely + +def model(dbt, session): + + print(dbt) + print(session) + + dbt.config(partition_by = { 'field' :'feed_start_date', + 'data_type' :'date', + 'granularity': 'day' } + ) + dbt.config(unique_key = ['shape_id', 'feed_start_date']) + dbt.config(alias = 'shapes_geom2') + dbt.config(packages = ["geopandas","shapely","pandas"]) + + shapes_df_spk = dbt.ref("shapes_gtfs") + feed_info_df_spk = dbt.ref("feed_info_gtfs") + data_versao_gtfs = dbt.config.get("data_versao_gtfs") + last_feed_version= dbt.config.get("data_versao_gtfs") + version = dbt.config.get("version") + + shapes_df = shapes_df_spk.toPandas() + feed_info_df = feed_info_df_spk.toPandas() + + # Assuming 'shapes_df' is a Spark DataFrame + # point_udf = pyspark.sql.functions.udf(lambda lon, lat: shapely.Point(lon, lat)) + + # shapes_df = shapes_df.withColumn( + # 'ponto_shape', + # point_udf(shapes_df['shape_pt_lon'], shapes_df['shape_pt_lat']) + # ) + + # Convert to GeoDataFrame + shapes_df['ponto_shape'] = shapes_df[['shape_pt_lon', 'shape_pt_lat']].apply(lambda row: shapely.Point(row['shape_pt_lon'], row['shape_pt_lat']), axis=1) + gdf = gpd.GeoDataFrame(shapes_df, geometry='ponto_shape') + + if dbt.is_incremental: + gdf = gdf[gdf['feed_start_date'].isin([last_feed_version, data_versao_gtfs])] + + # Contents + contents = gdf[['shape_id', 'ponto_shape', 'shape_pt_sequence', 'feed_start_date']] + + # PTS + contents['final_pt_sequence'] = contents.groupby(['feed_start_date', 'shape_id'])['shape_pt_sequence'].transform('max') + pts = contents.sort_values(by=['feed_start_date', 'shape_id', 'shape_pt_sequence']) + + # Shapes + shapes = pts.groupby(['shape_id', 'feed_start_date']).agg({ + 'ponto_shape': lambda x: shapely.LineString(x.tolist()), + 'ponto_shape': ['first', 'last'] + }).reset_index() + shapes.columns = ['shape_id', 'feed_start_date', 'shape', 'start_pt', 'end_pt'] + + # Shapes Half + def create_half_shapes(df, condition): + return df[df['shape_pt_sequence'] <= condition] + + half_0 = pts.groupby(['shape_id', 'feed_start_date']).apply(lambda df: create_half_shapes(df, df['final_pt_sequence'].iloc[0] // 2)) + half_1 = pts.groupby(['shape_id', 'feed_start_date']).apply(lambda df: create_half_shapes(df, df['final_pt_sequence'].iloc[0] // 2 + 1)) + + half_0['new_shape_id'] = half_0['shape_id'] + "_0" + half_1['new_shape_id'] = half_1['shape_id'] + "_1" + + shapes_half_0 = half_0.groupby(['shape_id', 'feed_start_date', 'new_shape_id']).agg({ + 'ponto_shape': lambda x: shapely.LineString(x.tolist()), + 'ponto_shape': ['first', 'last'] + }).reset_index() + shapes_half_1 = half_1.groupby(['shape_id', 'feed_start_date', 'new_shape_id']).agg({ + 'ponto_shape': lambda x: shapely.LineString(x.tolist()), + 'ponto_shape': ['first', 'last'] + }).reset_index() + + shapes_half_0.columns = ['shape_id', 'feed_start_date', 'new_shape_id', 'shape', 'start_pt', 'end_pt'] + shapes_half_1.columns = ['shape_id', 'feed_start_date', 'new_shape_id', 'shape', 'start_pt', 'end_pt'] + + shapes_half = pd.concat([shapes_half_0, shapes_half_1], axis=0) + + # IDs + ids = shapes.groupby(['feed_start_date', 'shape_id']).first().reset_index() + + # Union Shapes + union_shapes = pd.concat([ids, shapes_half], axis=0, ignore_index=True) + + union_shapes = union_shapes[(round(union_shapes['start_pt'].y, 4) == round(union_shapes['end_pt'].y, 4)) & + (round(union_shapes['start_pt'].x, 4) == round(union_shapes['end_pt'].x, 4))] + + # Final Selection + result = union_shapes.merge(feed_info_df, on='feed_start_date') + + if dbt.is_incremental: + result = result[result['feed_start_date'].isin([last_feed_version, data_versao_gtfs])] + + result['shape_distance'] = result['shape'].apply(lambda x: round(x.length, 1)) + result['versao_modelo'] = version + + final_columns = ['feed_version', 'feed_start_date', 'feed_end_date', 'shape_id', 'shape', 'shape_distance', 'start_pt', 'end_pt', 'versao_modelo'] + result = result[final_columns] + + output_df = session.create_dataframe(result) + + return output_df \ No newline at end of file From f941b2e384e9c7910ea956b241769457a1a1d56b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Tue, 23 Jul 2024 23:03:51 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- queries/dev/utils.py | 4 +- .../models/gtfs/staging/shapes_geom_gtfs2.py | 173 ++++++++++++------ 2 files changed, 115 insertions(+), 62 deletions(-) diff --git a/queries/dev/utils.py b/queries/dev/utils.py index b7331d20..abbc68ce 100644 --- a/queries/dev/utils.py +++ b/queries/dev/utils.py @@ -4,6 +4,7 @@ # from datetime import datetime as dt # from datetime import timedelta from typing import Dict, List, Union + import requests # import pandas as pd @@ -64,6 +65,7 @@ def run_dbt_model( print(f"\n>>> RUNNING: {run_command}\n") os.system(run_command) + def fetch_dataset_sha(dataset_id: str): """Fetches the SHA of a branch from Github""" url = "https://api.github.com/repos/prefeitura-rio/pipelines_rj_smtr" @@ -74,4 +76,4 @@ def fetch_dataset_sha(dataset_id: str): return None dataset_version = response.json()[0]["sha"] - return {"version": dataset_version} \ No newline at end of file + return {"version": dataset_version} diff --git a/queries/models/gtfs/staging/shapes_geom_gtfs2.py b/queries/models/gtfs/staging/shapes_geom_gtfs2.py index 33263dcb..76ab0d8b 100644 --- a/queries/models/gtfs/staging/shapes_geom_gtfs2.py +++ b/queries/models/gtfs/staging/shapes_geom_gtfs2.py @@ -1,24 +1,23 @@ -import pandas as pd +# -*- coding: utf-8 -*- import geopandas as gpd +import pandas as pd import shapely + def model(dbt, session): print(dbt) print(session) - dbt.config(partition_by = { 'field' :'feed_start_date', - 'data_type' :'date', - 'granularity': 'day' } - ) - dbt.config(unique_key = ['shape_id', 'feed_start_date']) - dbt.config(alias = 'shapes_geom2') - dbt.config(packages = ["geopandas","shapely","pandas"]) + dbt.config(partition_by={"field": "feed_start_date", "data_type": "date", "granularity": "day"}) + dbt.config(unique_key=["shape_id", "feed_start_date"]) + dbt.config(alias="shapes_geom2") + dbt.config(packages=["geopandas", "shapely", "pandas"]) shapes_df_spk = dbt.ref("shapes_gtfs") feed_info_df_spk = dbt.ref("feed_info_gtfs") data_versao_gtfs = dbt.config.get("data_versao_gtfs") - last_feed_version= dbt.config.get("data_versao_gtfs") + last_feed_version = dbt.config.get("data_versao_gtfs") version = dbt.config.get("version") shapes_df = shapes_df_spk.toPandas() @@ -33,71 +32,123 @@ def model(dbt, session): # ) # Convert to GeoDataFrame - shapes_df['ponto_shape'] = shapes_df[['shape_pt_lon', 'shape_pt_lat']].apply(lambda row: shapely.Point(row['shape_pt_lon'], row['shape_pt_lat']), axis=1) - gdf = gpd.GeoDataFrame(shapes_df, geometry='ponto_shape') + shapes_df["ponto_shape"] = shapes_df[["shape_pt_lon", "shape_pt_lat"]].apply( + lambda row: shapely.Point(row["shape_pt_lon"], row["shape_pt_lat"]), axis=1 + ) + gdf = gpd.GeoDataFrame(shapes_df, geometry="ponto_shape") if dbt.is_incremental: - gdf = gdf[gdf['feed_start_date'].isin([last_feed_version, data_versao_gtfs])] - + gdf = gdf[gdf["feed_start_date"].isin([last_feed_version, data_versao_gtfs])] + # Contents - contents = gdf[['shape_id', 'ponto_shape', 'shape_pt_sequence', 'feed_start_date']] - + contents = gdf[["shape_id", "ponto_shape", "shape_pt_sequence", "feed_start_date"]] + # PTS - contents['final_pt_sequence'] = contents.groupby(['feed_start_date', 'shape_id'])['shape_pt_sequence'].transform('max') - pts = contents.sort_values(by=['feed_start_date', 'shape_id', 'shape_pt_sequence']) - + contents["final_pt_sequence"] = contents.groupby(["feed_start_date", "shape_id"])[ + "shape_pt_sequence" + ].transform("max") + pts = contents.sort_values(by=["feed_start_date", "shape_id", "shape_pt_sequence"]) + # Shapes - shapes = pts.groupby(['shape_id', 'feed_start_date']).agg({ - 'ponto_shape': lambda x: shapely.LineString(x.tolist()), - 'ponto_shape': ['first', 'last'] - }).reset_index() - shapes.columns = ['shape_id', 'feed_start_date', 'shape', 'start_pt', 'end_pt'] - + shapes = ( + pts.groupby(["shape_id", "feed_start_date"]) + .agg( + { + "ponto_shape": lambda x: shapely.LineString(x.tolist()), + "ponto_shape": ["first", "last"], + } + ) + .reset_index() + ) + shapes.columns = ["shape_id", "feed_start_date", "shape", "start_pt", "end_pt"] + # Shapes Half def create_half_shapes(df, condition): - return df[df['shape_pt_sequence'] <= condition] - - half_0 = pts.groupby(['shape_id', 'feed_start_date']).apply(lambda df: create_half_shapes(df, df['final_pt_sequence'].iloc[0] // 2)) - half_1 = pts.groupby(['shape_id', 'feed_start_date']).apply(lambda df: create_half_shapes(df, df['final_pt_sequence'].iloc[0] // 2 + 1)) - - half_0['new_shape_id'] = half_0['shape_id'] + "_0" - half_1['new_shape_id'] = half_1['shape_id'] + "_1" - - shapes_half_0 = half_0.groupby(['shape_id', 'feed_start_date', 'new_shape_id']).agg({ - 'ponto_shape': lambda x: shapely.LineString(x.tolist()), - 'ponto_shape': ['first', 'last'] - }).reset_index() - shapes_half_1 = half_1.groupby(['shape_id', 'feed_start_date', 'new_shape_id']).agg({ - 'ponto_shape': lambda x: shapely.LineString(x.tolist()), - 'ponto_shape': ['first', 'last'] - }).reset_index() - - shapes_half_0.columns = ['shape_id', 'feed_start_date', 'new_shape_id', 'shape', 'start_pt', 'end_pt'] - shapes_half_1.columns = ['shape_id', 'feed_start_date', 'new_shape_id', 'shape', 'start_pt', 'end_pt'] - + return df[df["shape_pt_sequence"] <= condition] + + half_0 = pts.groupby(["shape_id", "feed_start_date"]).apply( + lambda df: create_half_shapes(df, df["final_pt_sequence"].iloc[0] // 2) + ) + half_1 = pts.groupby(["shape_id", "feed_start_date"]).apply( + lambda df: create_half_shapes(df, df["final_pt_sequence"].iloc[0] // 2 + 1) + ) + + half_0["new_shape_id"] = half_0["shape_id"] + "_0" + half_1["new_shape_id"] = half_1["shape_id"] + "_1" + + shapes_half_0 = ( + half_0.groupby(["shape_id", "feed_start_date", "new_shape_id"]) + .agg( + { + "ponto_shape": lambda x: shapely.LineString(x.tolist()), + "ponto_shape": ["first", "last"], + } + ) + .reset_index() + ) + shapes_half_1 = ( + half_1.groupby(["shape_id", "feed_start_date", "new_shape_id"]) + .agg( + { + "ponto_shape": lambda x: shapely.LineString(x.tolist()), + "ponto_shape": ["first", "last"], + } + ) + .reset_index() + ) + + shapes_half_0.columns = [ + "shape_id", + "feed_start_date", + "new_shape_id", + "shape", + "start_pt", + "end_pt", + ] + shapes_half_1.columns = [ + "shape_id", + "feed_start_date", + "new_shape_id", + "shape", + "start_pt", + "end_pt", + ] + shapes_half = pd.concat([shapes_half_0, shapes_half_1], axis=0) - + # IDs - ids = shapes.groupby(['feed_start_date', 'shape_id']).first().reset_index() - + ids = shapes.groupby(["feed_start_date", "shape_id"]).first().reset_index() + # Union Shapes union_shapes = pd.concat([ids, shapes_half], axis=0, ignore_index=True) - - union_shapes = union_shapes[(round(union_shapes['start_pt'].y, 4) == round(union_shapes['end_pt'].y, 4)) & - (round(union_shapes['start_pt'].x, 4) == round(union_shapes['end_pt'].x, 4))] - + + union_shapes = union_shapes[ + (round(union_shapes["start_pt"].y, 4) == round(union_shapes["end_pt"].y, 4)) + & (round(union_shapes["start_pt"].x, 4) == round(union_shapes["end_pt"].x, 4)) + ] + # Final Selection - result = union_shapes.merge(feed_info_df, on='feed_start_date') - + result = union_shapes.merge(feed_info_df, on="feed_start_date") + if dbt.is_incremental: - result = result[result['feed_start_date'].isin([last_feed_version, data_versao_gtfs])] - - result['shape_distance'] = result['shape'].apply(lambda x: round(x.length, 1)) - result['versao_modelo'] = version - - final_columns = ['feed_version', 'feed_start_date', 'feed_end_date', 'shape_id', 'shape', 'shape_distance', 'start_pt', 'end_pt', 'versao_modelo'] + result = result[result["feed_start_date"].isin([last_feed_version, data_versao_gtfs])] + + result["shape_distance"] = result["shape"].apply(lambda x: round(x.length, 1)) + result["versao_modelo"] = version + + final_columns = [ + "feed_version", + "feed_start_date", + "feed_end_date", + "shape_id", + "shape", + "shape_distance", + "start_pt", + "end_pt", + "versao_modelo", + ] result = result[final_columns] output_df = session.create_dataframe(result) - - return output_df \ No newline at end of file + + return output_df