From 0dfe3b65e7912a37b0be77ff8f4ad02610085963 Mon Sep 17 00:00:00 2001 From: Charlie Costanzo Date: Tue, 26 Nov 2024 12:38:13 -0500 Subject: [PATCH] Scrape geoportal for shn, add SHN flag on stops table (#3529) * scrape geoportal for shn, add flag on stops table * successful initial scrape of geoportal endpoint * functioning json format and external table creation, need to modify coordinates * shn external tables * add initial source, staging, and docs for state geoportal scrape * add flag for shn to dim_stops_latest * readd templating for buckets and remove hardcode schema * refactor operator to be more agnostic for data inputs * add handling for state_highway_network table * fix comment * remove local variables and comments to allow for a production merge when ready * use stop_id for calculations * Apply suggestions from code review Co-authored-by: Mjumbe Poe * restore source table * remove hardcoded source, remove old comments, add comment decribing buffer distance * remove hardcoded bucket * refactor how we keep final columns, dynamically rename columns * restore _gtfs_key over stop_id in shn geo calculation * checking before refactor * revisions based on Mjumbe's review --------- Co-authored-by: Mjumbe Poe --- .../state_geoportal/state_highway_network.yml | 29 +++ .../dags/scrape_state_geoportal/METADATA.yml | 19 ++ .../state_highway_network.yml | 7 + airflow/plugins/operators/__init__.py | 1 + .../operators/scrape_state_geoportal.py | 217 ++++++++++++++++++ .../gtfs_schedule_latest/dim_stops_latest.sql | 45 +++- .../models/staging/state_geoportal/_src.yml | 9 + .../state_geoportal/_stg_state_geoportal.yml | 4 + ...geoportal__state_highway_network_stops.sql | 14 ++ 9 files changed, 344 insertions(+), 1 deletion(-) create mode 100644 airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml create mode 100644 airflow/dags/scrape_state_geoportal/METADATA.yml create mode 100644 airflow/dags/scrape_state_geoportal/state_highway_network.yml create mode 100644 airflow/plugins/operators/scrape_state_geoportal.py create mode 100644 warehouse/models/staging/state_geoportal/_src.yml create mode 100644 warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml create mode 100644 warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql diff --git a/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml b/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml new file mode 100644 index 0000000000..9dd396fcdf --- /dev/null +++ b/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml @@ -0,0 +1,29 @@ +operator: operators.ExternalTable +bucket: gs://calitp-state-geoportal-scrape +source_objects: + - "state_highway_network_geodata/*.jsonl.gz" +source_format: NEWLINE_DELIMITED_JSON +use_bq_client: true +hive_options: + mode: CUSTOM + require_partition_filter: false + source_uri_prefix: "state_highway_network_geodata/{dt:DATE}/{execution_ts:TIMESTAMP}/" +destination_project_dataset_table: "external_state_geoportal.state_highway_network" +prefix_bucket: false +post_hook: | + SELECT * + FROM `{{ get_project_id() }}`.external_state_geoportal.state_highway_network + LIMIT 1; +schema_fields: + - name: Route + type: INTEGER + - name: County + type: STRING + - name: District + type: INTEGER + - name: RouteType + type: STRING + - name: Direction + type: STRING + - name: wkt_coordinates + type: GEOGRAPHY diff --git a/airflow/dags/scrape_state_geoportal/METADATA.yml b/airflow/dags/scrape_state_geoportal/METADATA.yml new file mode 100644 index 0000000000..95ec8d3742 --- /dev/null +++ b/airflow/dags/scrape_state_geoportal/METADATA.yml @@ -0,0 +1,19 @@ +description: "Scrape State Highway Network from State Geoportal" +schedule_interval: "0 4 1 * *" # 4am UTC first day of every month +tags: + - all_gusty_features +default_args: + owner: airflow + depends_on_past: False + catchup: False + start_date: "2024-09-15" + email: + - "hello@calitp.org" + email_on_failure: True + email_on_retry: False + retries: 1 + retry_delay: !timedelta 'minutes: 2' + concurrency: 50 + #sla: !timedelta 'hours: 2' +wait_for_defaults: + timeout: 3600 diff --git a/airflow/dags/scrape_state_geoportal/state_highway_network.yml b/airflow/dags/scrape_state_geoportal/state_highway_network.yml new file mode 100644 index 0000000000..d5be284196 --- /dev/null +++ b/airflow/dags/scrape_state_geoportal/state_highway_network.yml @@ -0,0 +1,7 @@ +operator: operators.StateGeoportalAPIOperator + +root_url: 'https://caltrans-gis.dot.ca.gov/arcgis/rest/services/' +service: "CHhighway/SHN_Lines" +layer: "0" +product: 'state_highway_network' +resultRecordCount: 2000 diff --git a/airflow/plugins/operators/__init__.py b/airflow/plugins/operators/__init__.py index 209be114fd..78a7178186 100644 --- a/airflow/plugins/operators/__init__.py +++ b/airflow/plugins/operators/__init__.py @@ -9,3 +9,4 @@ from operators.pod_operator import PodOperator from operators.scrape_ntd_api import NtdDataProductAPIOperator from operators.scrape_ntd_xlsx import NtdDataProductXLSXOperator +from operators.scrape_state_geoportal import StateGeoportalAPIOperator diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py new file mode 100644 index 0000000000..a538df5495 --- /dev/null +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -0,0 +1,217 @@ +import gzip +import logging +import os +from typing import ClassVar, List + +import pandas as pd # type: ignore +import pendulum +import requests +from calitp_data_infra.storage import PartitionedGCSArtifact, get_fs # type: ignore +from pydantic import HttpUrl, parse_obj_as + +from airflow.models import BaseOperator # type: ignore + +API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] + + +class StateGeoportalAPIExtract(PartitionedGCSArtifact): + bucket: ClassVar[str] + execution_ts: pendulum.DateTime = pendulum.now() + dt: pendulum.Date = execution_ts.date() + partition_names: ClassVar[List[str]] = ["dt", "execution_ts"] + + # The name to be used in the data warehouse to refer to the data + # product. + product: str + + # The root of the ArcGIS services. As of Nov 2024, this should + # be "https://caltrans-gis.dot.ca.gov/arcgis/rest/services/". + root_url: str + + # The name of the service being requested. In the feature service's + # URL, this will be everything between the root and "/FeatureServer". + # Don't include a leading or trailing slash. + service: str + + # The layer to query. This will usually be "0", so that is the + # default. + layer: str = "0" + + # The query filter. By default, all rows will be returned from the + # service. Refer to the ArcGIS documentation for syntax: + # https://developers.arcgis.com/rest/services-reference/enterprise/query-feature-service-layer/#request-parameters + where: str = "1=1" + + # A comma-separated list of fields to include in the results. Use + # "*" (the default) to include all fields. + outFields: str = "*" + + # The number of records to request for each API call (the operator + # will request all data from the layer in batches of this size). + resultRecordCount: int + + @property + def table(self) -> str: + return self.product + + @property + def filename(self) -> str: + return self.table + + class Config: + arbitrary_types_allowed = True + + def fetch_from_state_geoportal(self): + """ """ + + logging.info(f"Downloading state geoportal data for {self.product}.") + + try: + # Set up the parameters for the request + url = f"{self.root_url}/{self.service}/FeatureServer/{self.layer}/query" + validated_url = parse_obj_as(HttpUrl, url) + + params = { + "where": self.where, + "outFields": self.outFields, + "f": "geojson", + "resultRecordCount": self.resultRecordCount, + } + + all_features = [] # To store all retrieved rows + offset = 0 + + while True: + # Update the resultOffset for each request + params["resultOffset"] = offset + + # Make the request + response = requests.get(validated_url, params=params) + response.raise_for_status() + data = response.json() + + # Break the loop if there are no more features + if "features" not in data or not data["features"]: + break + + # Append the retrieved features + all_features.extend(data["features"]) + + # Increment the offset + offset += params["resultRecordCount"] + + if all_features is None or len(all_features) == 0: + logging.info( + f"There is no data to download for {self.product}. Ending pipeline." + ) + + pass + else: + logging.info( + f"Downloaded {self.product} data with {len(all_features)} rows!" + ) + + return all_features + + except requests.exceptions.RequestException as e: + logging.info(f"An error occurred: {e}") + + raise + + +# # Function to convert coordinates to WKT format +def to_wkt(geometry_type, coordinates): + if geometry_type == "LineString": + # Format as a LineString + coords_str = ", ".join([f"{lng} {lat}" for lng, lat in coordinates]) + return f"LINESTRING({coords_str})" + elif geometry_type == "MultiLineString": + # Format as a MultiLineString + multiline_coords_str = ", ".join( + f"({', '.join([f'{lng} {lat}' for lng, lat in line])})" + for line in coordinates + ) + return f"MULTILINESTRING({multiline_coords_str})" + else: + return None + + +class JSONExtract(StateGeoportalAPIExtract): + bucket = API_BUCKET + + +class StateGeoportalAPIOperator(BaseOperator): + template_fields = ( + "product", + "root_url", + "service", + "layer", + "resultRecordCount", + ) + + def __init__( + self, + product, + root_url, + service, + layer, + resultRecordCount, + **kwargs, + ): + self.product = product + self.root_url = root_url + self.service = service + self.layer = layer + self.resultRecordCount = resultRecordCount + + """An operator that extracts and saves JSON data from the State Geoportal + and saves it as one JSONL file, hive-partitioned by date in Google Cloud + """ + + # Save JSONL files to the bucket + self.extract = JSONExtract( + root_url=self.root_url, + service=self.service, + product=f"{self.product}_geodata", + layer=self.layer, + resultRecordCount=self.resultRecordCount, + filename=f"{self.product}_geodata.jsonl.gz", + ) + + super().__init__(**kwargs) + + def execute(self, **kwargs): + api_content = self.extract.fetch_from_state_geoportal() + + df = pd.json_normalize(api_content) + + if self.product == "state_highway_network": + # Select columns to keep, have to be explicit before renaming because there are duplicate values after normalizing + df = df[ + [ + "properties.Route", + "properties.County", + "properties.District", + "properties.RouteType", + "properties.Direction", + "geometry.type", + "geometry.coordinates", + ] + ] + + # Dynamically create a mapping by removing known prefixes + columns = {col: col.split(".")[-1] for col in df.columns} + + # Rename columns using the dynamically created mapping + df = df.rename(columns=columns) + + # Create new column with WKT format + df["wkt_coordinates"] = df.apply( + lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 + ) + + # Compress the DataFrame content and save it + self.gzipped_content = gzip.compress( + df.to_json(orient="records", lines=True).encode() + ) + self.extract.save_content(fs=get_fs(), content=self.gzipped_content) diff --git a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql index 8c964acb31..58b648a57b 100644 --- a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql +++ b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql @@ -4,6 +4,49 @@ dim_stops_latest AS ( table_name = ref('dim_stops'), clean_table_name = 'dim_stops' ) }} +), + +stg_state_geoportal__state_highway_network_stops AS ( + SELECT * + FROM {{ ref('stg_state_geoportal__state_highway_network_stops') }} +), + + +buffer_geometry_table AS ( + SELECT + -- equal to 100ft, as requested by Uriel + ST_BUFFER(wkt_coordinates, + 30.48) AS buffer_geometry + FROM stg_state_geoportal__state_highway_network_stops +), + +current_stops AS ( + SELECT + pt_geom, + _gtfs_key + FROM dim_stops_latest +), + + +stops_on_shn AS ( + SELECT + current_stops.* + FROM buffer_geometry_table, current_stops + WHERE ST_DWITHIN( + buffer_geometry_table.buffer_geometry,current_stops.pt_geom, 0) +), + +dim_stops_latest_with_shn_boolean AS ( + +SELECT + dim_stops_latest.*, + IF(stops_on_shn._gtfs_key IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest +FROM + dim_stops_latest +LEFT JOIN + stops_on_shn +ON + dim_stops_latest._gtfs_key = stops_on_shn._gtfs_key ) -SELECT * FROM dim_stops_latest +SELECT * FROM dim_stops_latest_with_shn_boolean diff --git a/warehouse/models/staging/state_geoportal/_src.yml b/warehouse/models/staging/state_geoportal/_src.yml new file mode 100644 index 0000000000..2e2d8dc2de --- /dev/null +++ b/warehouse/models/staging/state_geoportal/_src.yml @@ -0,0 +1,9 @@ +version: 2 + +sources: + - name: external_state_geoportal + description: Data tables scraped from state geoportal. + database: "{{ env_var('DBT_SOURCE_DATABASE', var('SOURCE_DATABASE')) }}" + schema: external_state_geoportal + tables: + - name: state_highway_network diff --git a/warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml b/warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml new file mode 100644 index 0000000000..7e12ae93f5 --- /dev/null +++ b/warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml @@ -0,0 +1,4 @@ +version: 2 + +models: + - name: stg_state_geoportal__state_highway_network_stops diff --git a/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql new file mode 100644 index 0000000000..a89a1075ee --- /dev/null +++ b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql @@ -0,0 +1,14 @@ +WITH external_state_geoportal__state_highway_network AS ( + SELECT * + FROM {{ source('external_state_geoportal', 'state_highway_network') }} +), + +stg_state_geoportal__state_highway_network_stops AS( + + SELECT * + FROM external_state_geoportal__state_highway_network + -- we pull the whole table every month in the pipeline, so this gets only the latest extract + QUALIFY DENSE_RANK() OVER (ORDER BY execution_ts DESC) = 1 +) + +SELECT * FROM stg_state_geoportal__state_highway_network_stops