From a807a10c96eec6abee18ac2de992d46edc725ceb Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Tue, 5 Nov 2024 12:52:20 -0500 Subject: [PATCH 01/20] scrape geoportal for shn, add flag on stops table --- .../dags/scrape_state_geoportal/METADATA.yml | 19 +++ .../state_highway_network.yml | 7 ++ airflow/plugins/operators/__init__.py | 1 + .../operators/scrape_state_geoportal.py | 119 ++++++++++++++++++ .../gtfs_schedule_latest/dim_stops_latest.sql | 25 +++- 5 files changed, 170 insertions(+), 1 deletion(-) create mode 100644 airflow/dags/scrape_state_geoportal/METADATA.yml create mode 100644 airflow/dags/scrape_state_geoportal/state_highway_network.yml create mode 100644 airflow/plugins/operators/scrape_state_geoportal.py diff --git a/airflow/dags/scrape_state_geoportal/METADATA.yml b/airflow/dags/scrape_state_geoportal/METADATA.yml new file mode 100644 index 0000000000..95ec8d3742 --- /dev/null +++ b/airflow/dags/scrape_state_geoportal/METADATA.yml @@ -0,0 +1,19 @@ +description: "Scrape State Highway Network from State Geoportal" +schedule_interval: "0 4 1 * *" # 4am UTC first day of every month +tags: + - all_gusty_features +default_args: + owner: airflow + depends_on_past: False + catchup: False + start_date: "2024-09-15" + email: + - "hello@calitp.org" + email_on_failure: True + email_on_retry: False + retries: 1 + retry_delay: !timedelta 'minutes: 2' + concurrency: 50 + #sla: !timedelta 'hours: 2' +wait_for_defaults: + timeout: 3600 diff --git a/airflow/dags/scrape_state_geoportal/state_highway_network.yml b/airflow/dags/scrape_state_geoportal/state_highway_network.yml new file mode 100644 index 0000000000..d8800faa97 --- /dev/null +++ b/airflow/dags/scrape_state_geoportal/state_highway_network.yml @@ -0,0 +1,7 @@ +operator: operators.NtdDataProductAPIOperator + +product: 'state_highway_network' +root_url: 'https://caltrans-gis.dot.ca.gov/arcgis/rest/services/' +endpoint_id: 'CHhighway/SHN_Lines/FeatureServer/0/' +query: 'query?where=1%3D1&outFields=*&outSR=4326&f=' +file_format: 'geojson' diff --git a/airflow/plugins/operators/__init__.py b/airflow/plugins/operators/__init__.py index 209be114fd..78a7178186 100644 --- a/airflow/plugins/operators/__init__.py +++ b/airflow/plugins/operators/__init__.py @@ -9,3 +9,4 @@ from operators.pod_operator import PodOperator from operators.scrape_ntd_api import NtdDataProductAPIOperator from operators.scrape_ntd_xlsx import NtdDataProductXLSXOperator +from operators.scrape_state_geoportal import StateGeoportalAPIOperator diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py new file mode 100644 index 0000000000..34b05a7c08 --- /dev/null +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -0,0 +1,119 @@ +import gzip +import logging +from typing import ClassVar, List # , Optional + +import pandas as pd # type: ignore +import pendulum +import requests +from calitp_data_infra.storage import PartitionedGCSArtifact, get_fs # type: ignore +from pydantic import HttpUrl, parse_obj_as + +from airflow.models import BaseOperator # type: ignore + +API_BUCKET = "gs://calitp-state-geoportal-scrape" +# API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] + + +class StateGeoportalAPIExtract(PartitionedGCSArtifact): + bucket: ClassVar[str] + product: str + execution_ts: pendulum.DateTime = pendulum.now() + dt: pendulum.Date = execution_ts.date() + root_url: str + endpoint_id: str + query: str + file_format: str + partition_names: ClassVar[List[str]] = ["dt", "execution_ts"] + + @property + def table(self) -> str: + return self.product + + @property + def filename(self) -> str: + return self.table + + class Config: + arbitrary_types_allowed = True + + def fetch_from_state_geoportal(self): + """ """ + + logging.info(f"Downloading state geoportal data for {self.product}.") + + try: + url = self.root_url + self.endpoint_id + self.query + self.file_format + + validated_url = parse_obj_as(HttpUrl, url) + + response = requests.get(validated_url).content + + if response is None or len(response) == 0: + logging.info( + f"There is no data to download for {self.product}. Ending pipeline." + ) + + pass + else: + logging.info( + f"Downloaded {self.product} data with {len(response)} rows!" + ) + + return response + + except requests.exceptions.RequestException as e: + logging.info(f"An error occurred: {e}") + + raise + + +class JSONExtract(StateGeoportalAPIExtract): + bucket = API_BUCKET + + +class StateGeoportalAPIOperator(BaseOperator): + template_fields = ("product", "root_url", "endpoint_id", "query", "file_format") + + def __init__( + self, + product, + root_url, + endpoint_id, + query, + file_format, + **kwargs, + ): + self.product = product + self.root_url = root_url + self.endpoint_id = endpoint_id + self.query = query + self.file_format = file_format + + """An operator that extracts and saves GEOJSON data from the State Geoportal + and saves it as one GEOJSONL file, hive-partitioned by date in Google Cloud + """ + + # Save JSONL files to the bucket + self.extract = JSONExtract( + product=self.product, + root_url=self.root_url, + endpoint_id=self.endpoint_id, + query=self.query, + file_format=self.file_format, + filename=f"{self.product}.geojsonl.gz", + ) + + super().__init__(**kwargs) + + def execute(self, **kwargs): + api_content = self.extract.fetch_from_state_geoportal() + + decode_api_content = api_content.decode("utf-8") + + df = pd.read_json(decode_api_content) + + self.gzipped_content = gzip.compress( + df.to_json(orient="records", lines=True).encode() + ) + + self.extract.save_content(fs=get_fs(), content=self.gzipped_content) diff --git a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql index 8c964acb31..63c4e44587 100644 --- a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql +++ b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql @@ -4,6 +4,29 @@ dim_stops_latest AS ( table_name = ref('dim_stops'), clean_table_name = 'dim_stops' ) }} +), + +buffer_geometry_table AS ( + SELECT + ST_BUFFER(geometry, + 30.48) AS buffer_geometry + FROM {{ ref('state_highway_network') }} +), + + +current_stops AS ( + SELECT + pt_geom, + key + FROM {{ ref('dim_stops_latest') }} ) -SELECT * FROM dim_stops_latest + +SELECT + current_stops.* +FROM buffer_geometry_table, current_stops +WHERE ST_DWITHIN( + buffer_geometry_table.buffer_geometry,current_stops.pt_geom, 0) + + +-- SELECT * FROM dim_stops_latest From e8f3ad9d67982d437846666e1e43fb60176bc096 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Wed, 6 Nov 2024 10:36:49 -0500 Subject: [PATCH 02/20] successful initial scrape of geoportal endpoint --- .../dags/scrape_state_geoportal/state_highway_network.yml | 4 ++-- airflow/plugins/operators/scrape_state_geoportal.py | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/airflow/dags/scrape_state_geoportal/state_highway_network.yml b/airflow/dags/scrape_state_geoportal/state_highway_network.yml index d8800faa97..4bf2af433f 100644 --- a/airflow/dags/scrape_state_geoportal/state_highway_network.yml +++ b/airflow/dags/scrape_state_geoportal/state_highway_network.yml @@ -1,7 +1,7 @@ -operator: operators.NtdDataProductAPIOperator +operator: operators.StateGeoportalAPIOperator product: 'state_highway_network' root_url: 'https://caltrans-gis.dot.ca.gov/arcgis/rest/services/' endpoint_id: 'CHhighway/SHN_Lines/FeatureServer/0/' query: 'query?where=1%3D1&outFields=*&outSR=4326&f=' -file_format: 'geojson' +file_format: 'json' diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index 34b05a7c08..d742104b2a 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -89,8 +89,8 @@ def __init__( self.query = query self.file_format = file_format - """An operator that extracts and saves GEOJSON data from the State Geoportal - and saves it as one GEOJSONL file, hive-partitioned by date in Google Cloud + """An operator that extracts and saves JSON data from the State Geoportal + and saves it as one JSONL file, hive-partitioned by date in Google Cloud """ # Save JSONL files to the bucket @@ -100,7 +100,7 @@ def __init__( endpoint_id=self.endpoint_id, query=self.query, file_format=self.file_format, - filename=f"{self.product}.geojsonl.gz", + filename=f"{self.product}.jsonl.gz", ) super().__init__(**kwargs) @@ -110,7 +110,7 @@ def execute(self, **kwargs): decode_api_content = api_content.decode("utf-8") - df = pd.read_json(decode_api_content) + df = pd.read_json(decode_api_content, lines=True) self.gzipped_content = gzip.compress( df.to_json(orient="records", lines=True).encode() From 44fa45fc7dede4f7573631586a38d934a71b99a9 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Thu, 7 Nov 2024 18:04:44 -0500 Subject: [PATCH 03/20] functioning json format and external table creation, need to modify coordinates --- .../state_geoportal/state_highway_network.yml | 14 +++++++ .../state_highway_network.yml | 2 +- .../operators/scrape_state_geoportal.py | 39 ++++++++++++++++++- 3 files changed, 52 insertions(+), 3 deletions(-) create mode 100644 airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml diff --git a/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml b/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml new file mode 100644 index 0000000000..9ad27493f0 --- /dev/null +++ b/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml @@ -0,0 +1,14 @@ +operator: operators.ExternalTable +bucket: gs://calitp-state-geoportal-scrape +source_objects: + - "state_highway_network/*.jsonl.gz" +source_format: NEWLINE_DELIMITED_JSON +json_extension: GEOJSON +use_bq_client: true +hive_options: + mode: CUSTOM + require_partition_filter: false + source_uri_prefix: "state_highway_network/{dt:DATE}/{execution_ts:TIMESTAMP}" +destination_project_dataset_table: "external_state_geoportal.state_highway_network" +prefix_bucket: false +post_hook: SELECT * FROM `{{ get_project_id() }}`.external_state_geoportal.state_highway_network LIMIT 1; diff --git a/airflow/dags/scrape_state_geoportal/state_highway_network.yml b/airflow/dags/scrape_state_geoportal/state_highway_network.yml index 4bf2af433f..338430fc42 100644 --- a/airflow/dags/scrape_state_geoportal/state_highway_network.yml +++ b/airflow/dags/scrape_state_geoportal/state_highway_network.yml @@ -4,4 +4,4 @@ product: 'state_highway_network' root_url: 'https://caltrans-gis.dot.ca.gov/arcgis/rest/services/' endpoint_id: 'CHhighway/SHN_Lines/FeatureServer/0/' query: 'query?where=1%3D1&outFields=*&outSR=4326&f=' -file_format: 'json' +file_format: 'geojson' diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index d742104b2a..2fe4e3bee3 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -1,4 +1,5 @@ import gzip +import json import logging from typing import ClassVar, List # , Optional @@ -108,9 +109,43 @@ def __init__( def execute(self, **kwargs): api_content = self.extract.fetch_from_state_geoportal() - decode_api_content = api_content.decode("utf-8") + decoded_api_content = api_content.decode("utf-8") - df = pd.read_json(decode_api_content, lines=True) + data = json.loads(decoded_api_content) + + values = data.get("features") + + flattened_data_list = [] + + for record in values: + # Copy the original dictionary to avoid modifying it + flattened_item = record.copy() + + # Flatten the "geometry" and "properties" dictionaries + flattened_item.update(flattened_item.pop("geometry")) + flattened_item.update(flattened_item.pop("properties")) + + # Add the flattened dictionary to the new list + flattened_data_list.append(flattened_item) + + # json_string = json.dumps(flattened_data_list) + + for item in flattened_data_list: + # Check if 'coordinates' exists in the dictionary + if "coordinates" in item: + # Create a GeoJSON structure + geojson = { + "type": "LineString", # Assuming LineString geometry for this example + "coordinates": item["coordinates"], + } + + # Convert to a GeoJSON string + geojson_string = json.dumps(geojson) + + # Replace the 'coordinates' value with the GeoJSON string in the dictionary + item["coordinates"] = geojson_string + + df = pd.DataFrame(flattened_data_list) self.gzipped_content = gzip.compress( df.to_json(orient="records", lines=True).encode() From ac47b74afe558056c578496f6c0b2ff1d10c5b7e Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Tue, 12 Nov 2024 11:05:11 -0500 Subject: [PATCH 04/20] shn external tables --- .../state_geoportal/state_highway_network.yml | 23 ++- .../operators/scrape_state_geoportal.py | 145 ++++++++++++------ 2 files changed, 119 insertions(+), 49 deletions(-) diff --git a/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml b/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml index 9ad27493f0..f419f01615 100644 --- a/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml +++ b/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml @@ -1,14 +1,29 @@ operator: operators.ExternalTable bucket: gs://calitp-state-geoportal-scrape source_objects: - - "state_highway_network/*.jsonl.gz" + - "state_highway_network_data/*.jsonl.gz" source_format: NEWLINE_DELIMITED_JSON -json_extension: GEOJSON use_bq_client: true hive_options: mode: CUSTOM require_partition_filter: false - source_uri_prefix: "state_highway_network/{dt:DATE}/{execution_ts:TIMESTAMP}" + source_uri_prefix: "state_highway_network_data/{dt:DATE}/{execution_ts:TIMESTAMP}/" destination_project_dataset_table: "external_state_geoportal.state_highway_network" prefix_bucket: false -post_hook: SELECT * FROM `{{ get_project_id() }}`.external_state_geoportal.state_highway_network LIMIT 1; +post_hook: | + SELECT * + FROM `{{ get_project_id() }}`.external_state_geoportal.state_highway_network + LIMIT 1; +schema_fields: + - name: Route + type: INTEGER + - name: County + type: STRING + - name: District + type: INTEGER + - name: RouteType + type: STRING + - name: Direction + type: STRING + - name: wkt_coordinates + type: GEOGRAPHY diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index 2fe4e3bee3..c4afc0c37a 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -1,5 +1,6 @@ import gzip -import json + +# import json import logging from typing import ClassVar, List # , Optional @@ -7,10 +8,13 @@ import pendulum import requests from calitp_data_infra.storage import PartitionedGCSArtifact, get_fs # type: ignore -from pydantic import HttpUrl, parse_obj_as from airflow.models import BaseOperator # type: ignore +# from pydantic import HttpUrl, parse_obj_as + + +# API_BUCKET = "gs://calitp-state-highway-network-stops" API_BUCKET = "gs://calitp-state-geoportal-scrape" # API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] @@ -43,13 +47,43 @@ def fetch_from_state_geoportal(self): logging.info(f"Downloading state geoportal data for {self.product}.") try: - url = self.root_url + self.endpoint_id + self.query + self.file_format + # url = self.root_url + self.endpoint_id + self.query + self.file_format + + # validated_url = parse_obj_as(HttpUrl, url) + + # response = requests.get(validated_url).content + + # Set up the parameters for the request + url = "https://caltrans-gis.dot.ca.gov/arcgis/rest/services/CHhighway/SHN_Lines/FeatureServer/0/query" + params = { + "where": "1=1", # You can change this to filter data + "outFields": "*", # Specify the fields to return + "f": "geojson", # Format of the response + "resultRecordCount": 2000, # Maximum number of rows per request + } - validated_url = parse_obj_as(HttpUrl, url) + all_features = [] # To store all retrieved rows + offset = 0 - response = requests.get(validated_url).content + while True: + # Update the resultOffset for each request + params["resultOffset"] = offset - if response is None or len(response) == 0: + # Make the request + response = requests.get(url, params=params) + data = response.json() + + # Break the loop if there are no more features + if "features" not in data or not data["features"]: + break + + # Append the retrieved features + all_features.extend(data["features"]) + + # Increment the offset + offset += params["resultRecordCount"] + + if all_features is None or len(all_features) == 0: logging.info( f"There is no data to download for {self.product}. Ending pipeline." ) @@ -57,10 +91,10 @@ def fetch_from_state_geoportal(self): pass else: logging.info( - f"Downloaded {self.product} data with {len(response)} rows!" + f"Downloaded {self.product} data with {len(all_features)} rows!" ) - return response + return all_features except requests.exceptions.RequestException as e: logging.info(f"An error occurred: {e}") @@ -68,6 +102,23 @@ def fetch_from_state_geoportal(self): raise +# # Function to convert coordinates to WKT format +def to_wkt(geometry_type, coordinates): + if geometry_type == "LineString": + # Format as a LineString + coords_str = ", ".join([f"{lng} {lat}" for lng, lat in coordinates]) + return f"LINESTRING({coords_str})" + elif geometry_type == "MultiLineString": + # Format as a MultiLineString + multiline_coords_str = ", ".join( + f"({', '.join([f'{lng} {lat}' for lng, lat in line])})" + for line in coordinates + ) + return f"MULTILINESTRING({multiline_coords_str})" + else: + return None + + class JSONExtract(StateGeoportalAPIExtract): bucket = API_BUCKET @@ -96,12 +147,12 @@ def __init__( # Save JSONL files to the bucket self.extract = JSONExtract( - product=self.product, + product=f"{self.product}_data", root_url=self.root_url, endpoint_id=self.endpoint_id, query=self.query, file_format=self.file_format, - filename=f"{self.product}.jsonl.gz", + filename=f"{self.product}_stops.jsonl.gz", ) super().__init__(**kwargs) @@ -109,43 +160,47 @@ def __init__( def execute(self, **kwargs): api_content = self.extract.fetch_from_state_geoportal() - decoded_api_content = api_content.decode("utf-8") - - data = json.loads(decoded_api_content) - - values = data.get("features") - - flattened_data_list = [] - - for record in values: - # Copy the original dictionary to avoid modifying it - flattened_item = record.copy() - - # Flatten the "geometry" and "properties" dictionaries - flattened_item.update(flattened_item.pop("geometry")) - flattened_item.update(flattened_item.pop("properties")) - - # Add the flattened dictionary to the new list - flattened_data_list.append(flattened_item) - - # json_string = json.dumps(flattened_data_list) - - for item in flattened_data_list: - # Check if 'coordinates' exists in the dictionary - if "coordinates" in item: - # Create a GeoJSON structure - geojson = { - "type": "LineString", # Assuming LineString geometry for this example - "coordinates": item["coordinates"], - } - - # Convert to a GeoJSON string - geojson_string = json.dumps(geojson) + df = pd.json_normalize(api_content) + + df = df[ + [ + "properties.Route", + "properties.County", + "properties.District", + "properties.RouteType", + "properties.Direction", + "geometry.type", + "geometry.coordinates", + ] + ] + + df = df.rename( + columns={ + "properties.Route": "Route", + "properties.County": "County", + "properties.District": "District", + "properties.RouteType": "RouteType", + "properties.Direction": "Direction", + "geometry.type": "type", + "geometry.coordinates": "coordinates", + } + ) - # Replace the 'coordinates' value with the GeoJSON string in the dictionary - item["coordinates"] = geojson_string + # Apply function to create new column with WKT format + df["wkt_coordinates"] = df.apply( + lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 + ) - df = pd.DataFrame(flattened_data_list) + df = df[ + [ + "Route", + "County", + "District", + "RouteType", + "Direction", + "wkt_coordinates", + ] + ] self.gzipped_content = gzip.compress( df.to_json(orient="records", lines=True).encode() From 0af175908240237f8d7da6fe78dc3b8cdebbc095 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Tue, 12 Nov 2024 11:45:40 -0500 Subject: [PATCH 05/20] add initial source, staging, and docs for state geoportal scrape --- .../gtfs_schedule_latest/dim_stops_latest.sql | 25 +---------- .../models/staging/state_geoportal/_src.yml | 9 ++++ .../state_geoportal/_stg_state_geoportal.yml | 4 ++ ...geoportal__state_highway_network_stops.sql | 43 +++++++++++++++++++ 4 files changed, 57 insertions(+), 24 deletions(-) create mode 100644 warehouse/models/staging/state_geoportal/_src.yml create mode 100644 warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml create mode 100644 warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql diff --git a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql index 63c4e44587..8c964acb31 100644 --- a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql +++ b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql @@ -4,29 +4,6 @@ dim_stops_latest AS ( table_name = ref('dim_stops'), clean_table_name = 'dim_stops' ) }} -), - -buffer_geometry_table AS ( - SELECT - ST_BUFFER(geometry, - 30.48) AS buffer_geometry - FROM {{ ref('state_highway_network') }} -), - - -current_stops AS ( - SELECT - pt_geom, - key - FROM {{ ref('dim_stops_latest') }} ) - -SELECT - current_stops.* -FROM buffer_geometry_table, current_stops -WHERE ST_DWITHIN( - buffer_geometry_table.buffer_geometry,current_stops.pt_geom, 0) - - --- SELECT * FROM dim_stops_latest +SELECT * FROM dim_stops_latest diff --git a/warehouse/models/staging/state_geoportal/_src.yml b/warehouse/models/staging/state_geoportal/_src.yml new file mode 100644 index 0000000000..2e2d8dc2de --- /dev/null +++ b/warehouse/models/staging/state_geoportal/_src.yml @@ -0,0 +1,9 @@ +version: 2 + +sources: + - name: external_state_geoportal + description: Data tables scraped from state geoportal. + database: "{{ env_var('DBT_SOURCE_DATABASE', var('SOURCE_DATABASE')) }}" + schema: external_state_geoportal + tables: + - name: state_highway_network diff --git a/warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml b/warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml new file mode 100644 index 0000000000..7e12ae93f5 --- /dev/null +++ b/warehouse/models/staging/state_geoportal/_stg_state_geoportal.yml @@ -0,0 +1,4 @@ +version: 2 + +models: + - name: stg_state_geoportal__state_highway_network_stops diff --git a/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql new file mode 100644 index 0000000000..0d046abd9f --- /dev/null +++ b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql @@ -0,0 +1,43 @@ +WITH external_state_geoportal__state_highway_network AS ( + SELECT * + FROM + {{ source('external_state_geoportal', 'state_highway_network') }} +), + +get_latest_extract AS( + + SELECT * + FROM external_state_geoportal__state_highway_network + -- we pull the whole table every month in the pipeline, so this gets only the latest extract + QUALIFY DENSE_RANK() OVER (ORDER BY execution_ts DESC) = 1 +), + +buffer_geometry_table AS ( + SELECT + ST_BUFFER(wkt_coordinates, + 30.48) AS buffer_geometry + FROM get_latest_extract +), + +current_stops AS ( + SELECT + pt_geom, + key + FROM {{ ref('dim_stops_latest') }} +), + + +stops_on_shn AS ( + SELECT + current_stops.* + FROM buffer_geometry_table, current_stops + WHERE ST_DWITHIN( + buffer_geometry_table.buffer_geometry,current_stops.pt_geom, 0) +), + +stg_state_geoportal__state_highway_network_stops AS ( + SELECT * + FROM stops_on_shn +) + +SELECT * FROM stg_state_geoportal__state_highway_network_stops From df234a4d1c8f485bc262ee2d5d5fdab5912b08c7 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Tue, 12 Nov 2024 12:02:53 -0500 Subject: [PATCH 06/20] add flag for shn to dim_stops_latest --- .../gtfs_schedule_latest/dim_stops_latest.sql | 43 ++++++++++++++++++- ...geoportal__state_highway_network_stops.sql | 33 ++------------ 2 files changed, 45 insertions(+), 31 deletions(-) diff --git a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql index 8c964acb31..c8e34b5abd 100644 --- a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql +++ b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql @@ -4,6 +4,47 @@ dim_stops_latest AS ( table_name = ref('dim_stops'), clean_table_name = 'dim_stops' ) }} +), + +stg_state_geoportal__state_highway_network_stops AS ( +SELECT * FROM {{ ref('stg_state_geoportal__state_highway_network_stops') }} +), + + +buffer_geometry_table AS ( + SELECT + ST_BUFFER(wkt_coordinates, + 30.48) AS buffer_geometry + FROM stg_state_geoportal__state_highway_network_stops +), + +current_stops AS ( + SELECT + pt_geom, + key + FROM dim_stops_latest +), + + +stops_on_shn AS ( + SELECT + current_stops.* + FROM buffer_geometry_table, current_stops + WHERE ST_DWITHIN( + buffer_geometry_table.buffer_geometry,current_stops.pt_geom, 0) +), + +dim_stops_latest_with_shn_boolean AS ( + +SELECT + dim_stops_latest.*, + IF(stops_on_shn.key IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest +FROM + dim_stops_latest +LEFT JOIN + stops_on_shn +ON + dim_stops_latest.key = stops_on_shn.key ) -SELECT * FROM dim_stops_latest +SELECT * FROM dim_stops_latest_with_shn_boolean diff --git a/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql index 0d046abd9f..ff67f185df 100644 --- a/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql +++ b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql @@ -1,43 +1,16 @@ WITH external_state_geoportal__state_highway_network AS ( SELECT * FROM - {{ source('external_state_geoportal', 'state_highway_network') }} + `cal-itp-data-infra-staging.external_state_geoportal.state_highway_network` + --{{ source('external_state_geoportal', 'state_highway_network') }} ), -get_latest_extract AS( +stg_state_geoportal__state_highway_network_stops AS( SELECT * FROM external_state_geoportal__state_highway_network -- we pull the whole table every month in the pipeline, so this gets only the latest extract QUALIFY DENSE_RANK() OVER (ORDER BY execution_ts DESC) = 1 -), - -buffer_geometry_table AS ( - SELECT - ST_BUFFER(wkt_coordinates, - 30.48) AS buffer_geometry - FROM get_latest_extract -), - -current_stops AS ( - SELECT - pt_geom, - key - FROM {{ ref('dim_stops_latest') }} -), - - -stops_on_shn AS ( - SELECT - current_stops.* - FROM buffer_geometry_table, current_stops - WHERE ST_DWITHIN( - buffer_geometry_table.buffer_geometry,current_stops.pt_geom, 0) -), - -stg_state_geoportal__state_highway_network_stops AS ( - SELECT * - FROM stops_on_shn ) SELECT * FROM stg_state_geoportal__state_highway_network_stops From 68d3fac72bd9d6edbb9f628d3ee45d4883dbfa28 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Tue, 12 Nov 2024 12:04:16 -0500 Subject: [PATCH 07/20] readd templating for buckets and remove hardcode schema --- airflow/plugins/operators/scrape_state_geoportal.py | 2 -- .../stg_state_geoportal__state_highway_network_stops.sql | 4 ++-- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index c4afc0c37a..ebd86f1866 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -13,8 +13,6 @@ # from pydantic import HttpUrl, parse_obj_as - -# API_BUCKET = "gs://calitp-state-highway-network-stops" API_BUCKET = "gs://calitp-state-geoportal-scrape" # API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] diff --git a/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql index ff67f185df..6fd2950a37 100644 --- a/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql +++ b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql @@ -1,8 +1,8 @@ WITH external_state_geoportal__state_highway_network AS ( SELECT * FROM - `cal-itp-data-infra-staging.external_state_geoportal.state_highway_network` - --{{ source('external_state_geoportal', 'state_highway_network') }} + --`cal-itp-data-infra-staging.external_state_geoportal.state_highway_network` + {{ source('external_state_geoportal', 'state_highway_network') }} ), stg_state_geoportal__state_highway_network_stops AS( From 6a9064c8cfdc28ad1e927e10ad6b987c3ef5c28d Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Wed, 13 Nov 2024 10:55:44 -0500 Subject: [PATCH 08/20] refactor operator to be more agnostic for data inputs --- .../state_highway_network.yml | 10 ++- .../operators/scrape_state_geoportal.py | 78 +++++++++++-------- 2 files changed, 52 insertions(+), 36 deletions(-) diff --git a/airflow/dags/scrape_state_geoportal/state_highway_network.yml b/airflow/dags/scrape_state_geoportal/state_highway_network.yml index 338430fc42..e57234cc35 100644 --- a/airflow/dags/scrape_state_geoportal/state_highway_network.yml +++ b/airflow/dags/scrape_state_geoportal/state_highway_network.yml @@ -1,7 +1,9 @@ operator: operators.StateGeoportalAPIOperator -product: 'state_highway_network' root_url: 'https://caltrans-gis.dot.ca.gov/arcgis/rest/services/' -endpoint_id: 'CHhighway/SHN_Lines/FeatureServer/0/' -query: 'query?where=1%3D1&outFields=*&outSR=4326&f=' -file_format: 'geojson' +service: "CHhighway/SHN_Lines/FeatureServer/0/query" +product: 'state_highway_network' +where: "1=1" # You can change this to filter data +outFields: "*" # Specify the fields to return +f: "geojson" # Format of the response +resultRecordCount: 2000 diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index ebd86f1866..f164ab0684 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -8,25 +8,27 @@ import pendulum import requests from calitp_data_infra.storage import PartitionedGCSArtifact, get_fs # type: ignore +from pydantic import HttpUrl, parse_obj_as from airflow.models import BaseOperator # type: ignore -# from pydantic import HttpUrl, parse_obj_as - +# switch before merge API_BUCKET = "gs://calitp-state-geoportal-scrape" # API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] class StateGeoportalAPIExtract(PartitionedGCSArtifact): bucket: ClassVar[str] - product: str execution_ts: pendulum.DateTime = pendulum.now() dt: pendulum.Date = execution_ts.date() - root_url: str - endpoint_id: str - query: str - file_format: str partition_names: ClassVar[List[str]] = ["dt", "execution_ts"] + root_url: str + service: str + product: str + where: str + outFields: str + f: str + resultRecordCount: int @property def table(self) -> str: @@ -45,19 +47,15 @@ def fetch_from_state_geoportal(self): logging.info(f"Downloading state geoportal data for {self.product}.") try: - # url = self.root_url + self.endpoint_id + self.query + self.file_format - - # validated_url = parse_obj_as(HttpUrl, url) - - # response = requests.get(validated_url).content - # Set up the parameters for the request - url = "https://caltrans-gis.dot.ca.gov/arcgis/rest/services/CHhighway/SHN_Lines/FeatureServer/0/query" + url = self.root_url + self.service + validated_url = parse_obj_as(HttpUrl, url) + params = { - "where": "1=1", # You can change this to filter data - "outFields": "*", # Specify the fields to return - "f": "geojson", # Format of the response - "resultRecordCount": 2000, # Maximum number of rows per request + "where": self.where, # You can change this to filter data + "outFields": self.outFields, # Specify the fields to return + "f": self.f, # Format of the response + "resultRecordCount": self.resultRecordCount, # Maximum number of rows per request } all_features = [] # To store all retrieved rows @@ -68,7 +66,8 @@ def fetch_from_state_geoportal(self): params["resultOffset"] = offset # Make the request - response = requests.get(url, params=params) + response = requests.get(validated_url, params=params) + # response = requests.get(url, params=params) data = response.json() # Break the loop if there are no more features @@ -101,6 +100,7 @@ def fetch_from_state_geoportal(self): # # Function to convert coordinates to WKT format +# break out into own parse operator afterwards, which then reads the raw data in the bucket, and def to_wkt(geometry_type, coordinates): if geometry_type == "LineString": # Format as a LineString @@ -122,22 +122,34 @@ class JSONExtract(StateGeoportalAPIExtract): class StateGeoportalAPIOperator(BaseOperator): - template_fields = ("product", "root_url", "endpoint_id", "query", "file_format") + template_fields = ( + "root_url", + "service", + "product", + "where", + "outFields", + "f", + "resultRecordCount", + ) def __init__( self, - product, root_url, - endpoint_id, - query, - file_format, + service, + product, + where, + outFields, + f, + resultRecordCount, **kwargs, ): - self.product = product self.root_url = root_url - self.endpoint_id = endpoint_id - self.query = query - self.file_format = file_format + self.service = service + self.product = product + self.where = where + self.outFields = outFields + self.f = f + self.resultRecordCount = resultRecordCount """An operator that extracts and saves JSON data from the State Geoportal and saves it as one JSONL file, hive-partitioned by date in Google Cloud @@ -145,11 +157,13 @@ def __init__( # Save JSONL files to the bucket self.extract = JSONExtract( - product=f"{self.product}_data", root_url=self.root_url, - endpoint_id=self.endpoint_id, - query=self.query, - file_format=self.file_format, + service=self.service, + product=f"{self.product}_data", + where=self.where, + outFields=self.outFields, + f=self.f, + resultRecordCount=self.resultRecordCount, filename=f"{self.product}_stops.jsonl.gz", ) From 386abf83f29ab3eef086cd9190fd35002ae3969c Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Wed, 13 Nov 2024 11:33:44 -0500 Subject: [PATCH 09/20] add handling for state_highway_network table --- .../state_geoportal/state_highway_network.yml | 4 +- .../operators/scrape_state_geoportal.py | 42 +++++++------------ 2 files changed, 16 insertions(+), 30 deletions(-) diff --git a/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml b/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml index f419f01615..9dd396fcdf 100644 --- a/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml +++ b/airflow/dags/create_external_tables/state_geoportal/state_highway_network.yml @@ -1,13 +1,13 @@ operator: operators.ExternalTable bucket: gs://calitp-state-geoportal-scrape source_objects: - - "state_highway_network_data/*.jsonl.gz" + - "state_highway_network_geodata/*.jsonl.gz" source_format: NEWLINE_DELIMITED_JSON use_bq_client: true hive_options: mode: CUSTOM require_partition_filter: false - source_uri_prefix: "state_highway_network_data/{dt:DATE}/{execution_ts:TIMESTAMP}/" + source_uri_prefix: "state_highway_network_geodata/{dt:DATE}/{execution_ts:TIMESTAMP}/" destination_project_dataset_table: "external_state_geoportal.state_highway_network" prefix_bucket: false post_hook: | diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index f164ab0684..0278f1bd51 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -1,6 +1,4 @@ import gzip - -# import json import logging from typing import ClassVar, List # , Optional @@ -100,7 +98,6 @@ def fetch_from_state_geoportal(self): # # Function to convert coordinates to WKT format -# break out into own parse operator afterwards, which then reads the raw data in the bucket, and def to_wkt(geometry_type, coordinates): if geometry_type == "LineString": # Format as a LineString @@ -159,12 +156,12 @@ def __init__( self.extract = JSONExtract( root_url=self.root_url, service=self.service, - product=f"{self.product}_data", + product=f"{self.product}_geodata", where=self.where, outFields=self.outFields, f=self.f, resultRecordCount=self.resultRecordCount, - filename=f"{self.product}_stops.jsonl.gz", + filename=f"{self.product}_geodata.jsonl.gz", ) super().__init__(**kwargs) @@ -174,20 +171,9 @@ def execute(self, **kwargs): df = pd.json_normalize(api_content) - df = df[ - [ - "properties.Route", - "properties.County", - "properties.District", - "properties.RouteType", - "properties.Direction", - "geometry.type", - "geometry.coordinates", - ] - ] - - df = df.rename( - columns={ + if self.product == "state_highway_network": + # Select and rename columns + columns = { "properties.Route": "Route", "properties.County": "County", "properties.District": "District", @@ -196,15 +182,15 @@ def execute(self, **kwargs): "geometry.type": "type", "geometry.coordinates": "coordinates", } - ) + df = df[list(columns.keys())].rename(columns=columns) - # Apply function to create new column with WKT format - df["wkt_coordinates"] = df.apply( - lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 - ) + # Create new column with WKT format + df["wkt_coordinates"] = df.apply( + lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 + ) - df = df[ - [ + # Select final columns for output + final_columns = [ "Route", "County", "District", @@ -212,10 +198,10 @@ def execute(self, **kwargs): "Direction", "wkt_coordinates", ] - ] + df = df[final_columns] + # Compress the DataFrame content and save it self.gzipped_content = gzip.compress( df.to_json(orient="records", lines=True).encode() ) - self.extract.save_content(fs=get_fs(), content=self.gzipped_content) From f9b3615ca2fbef3fceee831105d07f325e1597b4 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Wed, 13 Nov 2024 11:34:37 -0500 Subject: [PATCH 10/20] fix comment --- airflow/plugins/operators/scrape_state_geoportal.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index 0278f1bd51..0088818d99 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -10,7 +10,7 @@ from airflow.models import BaseOperator # type: ignore -# switch before merge +# create composer env variable and switch before merge API_BUCKET = "gs://calitp-state-geoportal-scrape" # API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] From 0dd22f788d339589b1fc0118424ea3ecb799a838 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Wed, 13 Nov 2024 11:43:05 -0500 Subject: [PATCH 11/20] remove local variables and comments to allow for a production merge when ready --- airflow/plugins/operators/scrape_state_geoportal.py | 7 +++---- .../stg_state_geoportal__state_highway_network_stops.sql | 1 - 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index 0088818d99..9a5cfd0531 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -1,6 +1,7 @@ import gzip import logging -from typing import ClassVar, List # , Optional +import os +from typing import ClassVar, List import pandas as pd # type: ignore import pendulum @@ -10,9 +11,7 @@ from airflow.models import BaseOperator # type: ignore -# create composer env variable and switch before merge -API_BUCKET = "gs://calitp-state-geoportal-scrape" -# API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] +API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] class StateGeoportalAPIExtract(PartitionedGCSArtifact): diff --git a/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql index 6fd2950a37..d59b2446b9 100644 --- a/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql +++ b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql @@ -1,7 +1,6 @@ WITH external_state_geoportal__state_highway_network AS ( SELECT * FROM - --`cal-itp-data-infra-staging.external_state_geoportal.state_highway_network` {{ source('external_state_geoportal', 'state_highway_network') }} ), From 32becb7a78c12d3da65c9622868b7dcd5900f59c Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Thu, 14 Nov 2024 11:52:35 -0500 Subject: [PATCH 12/20] use stop_id for calculations --- .../mart/gtfs_schedule_latest/dim_stops_latest.sql | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql index c8e34b5abd..d373641890 100644 --- a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql +++ b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql @@ -7,7 +7,9 @@ dim_stops_latest AS ( ), stg_state_geoportal__state_highway_network_stops AS ( -SELECT * FROM {{ ref('stg_state_geoportal__state_highway_network_stops') }} +SELECT * +-- FROM `cal-itp-data-infra-staging.external_state_geoportal.stg_state_geoportal__state_highway_network_stops` +FROM {{ ref('stg_state_geoportal__state_highway_network_stops') }} ), @@ -21,7 +23,8 @@ buffer_geometry_table AS ( current_stops AS ( SELECT pt_geom, - key + stop_id + --key FROM dim_stops_latest ), @@ -38,13 +41,15 @@ dim_stops_latest_with_shn_boolean AS ( SELECT dim_stops_latest.*, - IF(stops_on_shn.key IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest + IF(stops_on_shn.stop_id IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest + -- IF(stops_on_shn.key IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest FROM dim_stops_latest LEFT JOIN stops_on_shn ON - dim_stops_latest.key = stops_on_shn.key + dim_stops_latest.stop_id = stops_on_shn.stop_id + -- dim_stops_latest.key = stops_on_shn.key ) SELECT * FROM dim_stops_latest_with_shn_boolean From 45019d133c2c2a52c1af3fb951c5e427c9e5e502 Mon Sep 17 00:00:00 2001 From: Charlie Costanzo Date: Mon, 18 Nov 2024 13:12:03 -0500 Subject: [PATCH 13/20] Apply suggestions from code review Co-authored-by: Mjumbe Poe --- .../state_highway_network.yml | 3 +- .../operators/scrape_state_geoportal.py | 56 +++++++++++++------ .../gtfs_schedule_latest/dim_stops_latest.sql | 1 - 3 files changed, 41 insertions(+), 19 deletions(-) diff --git a/airflow/dags/scrape_state_geoportal/state_highway_network.yml b/airflow/dags/scrape_state_geoportal/state_highway_network.yml index e57234cc35..5612468e18 100644 --- a/airflow/dags/scrape_state_geoportal/state_highway_network.yml +++ b/airflow/dags/scrape_state_geoportal/state_highway_network.yml @@ -1,7 +1,8 @@ operator: operators.StateGeoportalAPIOperator root_url: 'https://caltrans-gis.dot.ca.gov/arcgis/rest/services/' -service: "CHhighway/SHN_Lines/FeatureServer/0/query" +service: "CHhighway/SHN_Lines" +layer: "0" product: 'state_highway_network' where: "1=1" # You can change this to filter data outFields: "*" # Specify the fields to return diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index 9a5cfd0531..be40ea5e6c 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -19,12 +19,35 @@ class StateGeoportalAPIExtract(PartitionedGCSArtifact): execution_ts: pendulum.DateTime = pendulum.now() dt: pendulum.Date = execution_ts.date() partition_names: ClassVar[List[str]] = ["dt", "execution_ts"] + + # The name to be used in the data warehouse to refer to the data + # product. + product: str + + # The root of the ArcGIS services. As of Nov 2024, this should + # be "https://caltrans-gis.dot.ca.gov/arcgis/rest/services/". root_url: str + + # The name of the service being requested. In the feature service's + # URL, this will be everything between the root and "/FeatureServer". + # Don't include a leading or trailing slash. service: str - product: str - where: str - outFields: str - f: str + + # The layer to query. This will usually be "0", so that is the + # default. + layer: str = "0" + + # The query filter. By default, all rows will be returned from the + # service. Refer to the ArcGIS documentation for syntax: + # https://developers.arcgis.com/rest/services-reference/enterprise/query-feature-service-layer/#request-parameters + where: str = "1=1" + + # A comma-separated list of fields to include in the results. Use + # "*" (the default) to include all fields. + outFields: str = "*" + + # The number of records to request for each API call (the operator + # will request all data from the layer in batches of this size). resultRecordCount: int @property @@ -45,14 +68,14 @@ def fetch_from_state_geoportal(self): try: # Set up the parameters for the request - url = self.root_url + self.service + url = f'{self.root_url}/{self.service}/FeatureServer/{self.layer}/query' validated_url = parse_obj_as(HttpUrl, url) params = { - "where": self.where, # You can change this to filter data - "outFields": self.outFields, # Specify the fields to return - "f": self.f, # Format of the response - "resultRecordCount": self.resultRecordCount, # Maximum number of rows per request + "where": self.where, + "outFields": self.outFields, + "f": "geojson", + "resultRecordCount": self.resultRecordCount, } all_features = [] # To store all retrieved rows @@ -64,7 +87,6 @@ def fetch_from_state_geoportal(self): # Make the request response = requests.get(validated_url, params=params) - # response = requests.get(url, params=params) data = response.json() # Break the loop if there are no more features @@ -119,32 +141,32 @@ class JSONExtract(StateGeoportalAPIExtract): class StateGeoportalAPIOperator(BaseOperator): template_fields = ( + "product", "root_url", "service", - "product", + "layer", "where", "outFields", - "f", "resultRecordCount", ) def __init__( self, + product, root_url, service, - product, + layer, where, outFields, - f, resultRecordCount, **kwargs, ): + self.product = product self.root_url = root_url self.service = service - self.product = product + self.layer = layer self.where = where self.outFields = outFields - self.f = f self.resultRecordCount = resultRecordCount """An operator that extracts and saves JSON data from the State Geoportal @@ -158,7 +180,7 @@ def __init__( product=f"{self.product}_geodata", where=self.where, outFields=self.outFields, - f=self.f, + layer=self.layer, resultRecordCount=self.resultRecordCount, filename=f"{self.product}_geodata.jsonl.gz", ) diff --git a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql index d373641890..469e46105b 100644 --- a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql +++ b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql @@ -42,7 +42,6 @@ dim_stops_latest_with_shn_boolean AS ( SELECT dim_stops_latest.*, IF(stops_on_shn.stop_id IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest - -- IF(stops_on_shn.key IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest FROM dim_stops_latest LEFT JOIN From 008bc0eee0a372227a505633ddcfb40578a205f8 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Thu, 14 Nov 2024 14:35:04 -0500 Subject: [PATCH 14/20] restore source table --- .../stg_state_geoportal__state_highway_network_stops.sql | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql index d59b2446b9..a89a1075ee 100644 --- a/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql +++ b/warehouse/models/staging/state_geoportal/stg_state_geoportal__state_highway_network_stops.sql @@ -1,7 +1,6 @@ WITH external_state_geoportal__state_highway_network AS ( SELECT * - FROM - {{ source('external_state_geoportal', 'state_highway_network') }} + FROM {{ source('external_state_geoportal', 'state_highway_network') }} ), stg_state_geoportal__state_highway_network_stops AS( From 553682c9a217014592daf7adbec2a9771e82d366 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Mon, 18 Nov 2024 13:54:05 -0500 Subject: [PATCH 15/20] remove hardcoded source, remove old comments, add comment decribing buffer distance --- .../models/mart/gtfs_schedule_latest/dim_stops_latest.sql | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql index 469e46105b..373b707841 100644 --- a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql +++ b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql @@ -7,14 +7,14 @@ dim_stops_latest AS ( ), stg_state_geoportal__state_highway_network_stops AS ( -SELECT * --- FROM `cal-itp-data-infra-staging.external_state_geoportal.stg_state_geoportal__state_highway_network_stops` -FROM {{ ref('stg_state_geoportal__state_highway_network_stops') }} + SELECT * + FROM {{ ref('stg_state_geoportal__state_highway_network_stops') }} ), buffer_geometry_table AS ( SELECT + -- equal to 100ft, as requested by Uriel ST_BUFFER(wkt_coordinates, 30.48) AS buffer_geometry FROM stg_state_geoportal__state_highway_network_stops @@ -24,7 +24,6 @@ current_stops AS ( SELECT pt_geom, stop_id - --key FROM dim_stops_latest ), @@ -48,7 +47,6 @@ LEFT JOIN stops_on_shn ON dim_stops_latest.stop_id = stops_on_shn.stop_id - -- dim_stops_latest.key = stops_on_shn.key ) SELECT * FROM dim_stops_latest_with_shn_boolean From cf5f01e1467f1d09c2e230a1228b1caba0828417 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Mon, 18 Nov 2024 13:56:27 -0500 Subject: [PATCH 16/20] remove hardcoded bucket --- .../operators/scrape_state_geoportal.py | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index be40ea5e6c..62e57df18c 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -19,33 +19,33 @@ class StateGeoportalAPIExtract(PartitionedGCSArtifact): execution_ts: pendulum.DateTime = pendulum.now() dt: pendulum.Date = execution_ts.date() partition_names: ClassVar[List[str]] = ["dt", "execution_ts"] - + # The name to be used in the data warehouse to refer to the data # product. product: str - + # The root of the ArcGIS services. As of Nov 2024, this should # be "https://caltrans-gis.dot.ca.gov/arcgis/rest/services/". root_url: str - + # The name of the service being requested. In the feature service's # URL, this will be everything between the root and "/FeatureServer". # Don't include a leading or trailing slash. service: str - - # The layer to query. This will usually be "0", so that is the + + # The layer to query. This will usually be "0", so that is the # default. layer: str = "0" - + # The query filter. By default, all rows will be returned from the # service. Refer to the ArcGIS documentation for syntax: # https://developers.arcgis.com/rest/services-reference/enterprise/query-feature-service-layer/#request-parameters where: str = "1=1" - + # A comma-separated list of fields to include in the results. Use # "*" (the default) to include all fields. outFields: str = "*" - + # The number of records to request for each API call (the operator # will request all data from the layer in batches of this size). resultRecordCount: int @@ -68,7 +68,7 @@ def fetch_from_state_geoportal(self): try: # Set up the parameters for the request - url = f'{self.root_url}/{self.service}/FeatureServer/{self.layer}/query' + url = f"{self.root_url}/{self.service}/FeatureServer/{self.layer}/query" validated_url = parse_obj_as(HttpUrl, url) params = { From 11d2c123450f9f02f85661a2dc7b3d9d379ff5a5 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Mon, 18 Nov 2024 15:43:41 -0500 Subject: [PATCH 17/20] refactor how we keep final columns, dynamically rename columns --- .../operators/scrape_state_geoportal.py | 40 +++++++++---------- 1 file changed, 18 insertions(+), 22 deletions(-) diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index 62e57df18c..3e9387d88c 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -193,34 +193,30 @@ def execute(self, **kwargs): df = pd.json_normalize(api_content) if self.product == "state_highway_network": - # Select and rename columns - columns = { - "properties.Route": "Route", - "properties.County": "County", - "properties.District": "District", - "properties.RouteType": "RouteType", - "properties.Direction": "Direction", - "geometry.type": "type", - "geometry.coordinates": "coordinates", - } - df = df[list(columns.keys())].rename(columns=columns) + # Select columns to keep + df = df[ + [ + "properties.Route", + "properties.County", + "properties.District", + "properties.RouteType", + "properties.Direction", + "geometry.type", + "geometry.coordinates", + ] + ] + + # Dynamically create a mapping by removing known prefixes + columns = {col: col.split(".")[-1] for col in df.columns} + + # Rename columns using the dynamically created mapping + df = df.rename(columns=columns) # Create new column with WKT format df["wkt_coordinates"] = df.apply( lambda row: to_wkt(row["type"], row["coordinates"]), axis=1 ) - # Select final columns for output - final_columns = [ - "Route", - "County", - "District", - "RouteType", - "Direction", - "wkt_coordinates", - ] - df = df[final_columns] - # Compress the DataFrame content and save it self.gzipped_content = gzip.compress( df.to_json(orient="records", lines=True).encode() From 791ffef1cc1b2f5286d87247751da0b3f0e90d43 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Wed, 20 Nov 2024 13:43:21 -0500 Subject: [PATCH 18/20] restore _gtfs_key over stop_id in shn geo calculation --- .../models/mart/gtfs_schedule_latest/dim_stops_latest.sql | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql index 373b707841..58b648a57b 100644 --- a/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql +++ b/warehouse/models/mart/gtfs_schedule_latest/dim_stops_latest.sql @@ -23,7 +23,7 @@ buffer_geometry_table AS ( current_stops AS ( SELECT pt_geom, - stop_id + _gtfs_key FROM dim_stops_latest ), @@ -40,13 +40,13 @@ dim_stops_latest_with_shn_boolean AS ( SELECT dim_stops_latest.*, - IF(stops_on_shn.stop_id IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest + IF(stops_on_shn._gtfs_key IS NOT NULL, TRUE, FALSE) AS exists_in_dim_stops_latest FROM dim_stops_latest LEFT JOIN stops_on_shn ON - dim_stops_latest.stop_id = stops_on_shn.stop_id + dim_stops_latest._gtfs_key = stops_on_shn._gtfs_key ) SELECT * FROM dim_stops_latest_with_shn_boolean From 2bcea5481c933aefe71c58423c009ba3e9df0c49 Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Fri, 22 Nov 2024 13:48:35 -0500 Subject: [PATCH 19/20] checking before refactor --- .../state_highway_network.yml | 3 --- .../operators/scrape_state_geoportal.py | 18 ++++++------------ 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/airflow/dags/scrape_state_geoportal/state_highway_network.yml b/airflow/dags/scrape_state_geoportal/state_highway_network.yml index 5612468e18..d5be284196 100644 --- a/airflow/dags/scrape_state_geoportal/state_highway_network.yml +++ b/airflow/dags/scrape_state_geoportal/state_highway_network.yml @@ -4,7 +4,4 @@ root_url: 'https://caltrans-gis.dot.ca.gov/arcgis/rest/services/' service: "CHhighway/SHN_Lines" layer: "0" product: 'state_highway_network' -where: "1=1" # You can change this to filter data -outFields: "*" # Specify the fields to return -f: "geojson" # Format of the response resultRecordCount: 2000 diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index 3e9387d88c..4d94619d0b 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -1,6 +1,7 @@ import gzip import logging -import os + +# import os from typing import ClassVar, List import pandas as pd # type: ignore @@ -11,7 +12,8 @@ from airflow.models import BaseOperator # type: ignore -API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] +API_BUCKET = "gs://calitp-state-geoportal-scrape" +# API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] class StateGeoportalAPIExtract(PartitionedGCSArtifact): @@ -86,7 +88,7 @@ def fetch_from_state_geoportal(self): params["resultOffset"] = offset # Make the request - response = requests.get(validated_url, params=params) + response = requests.get(validated_url, params=params).raise_for_status() data = response.json() # Break the loop if there are no more features @@ -145,8 +147,6 @@ class StateGeoportalAPIOperator(BaseOperator): "root_url", "service", "layer", - "where", - "outFields", "resultRecordCount", ) @@ -156,8 +156,6 @@ def __init__( root_url, service, layer, - where, - outFields, resultRecordCount, **kwargs, ): @@ -165,8 +163,6 @@ def __init__( self.root_url = root_url self.service = service self.layer = layer - self.where = where - self.outFields = outFields self.resultRecordCount = resultRecordCount """An operator that extracts and saves JSON data from the State Geoportal @@ -178,8 +174,6 @@ def __init__( root_url=self.root_url, service=self.service, product=f"{self.product}_geodata", - where=self.where, - outFields=self.outFields, layer=self.layer, resultRecordCount=self.resultRecordCount, filename=f"{self.product}_geodata.jsonl.gz", @@ -193,7 +187,7 @@ def execute(self, **kwargs): df = pd.json_normalize(api_content) if self.product == "state_highway_network": - # Select columns to keep + # Select columns to keep, have to be explicit because there are duplicate values after normalizing df = df[ [ "properties.Route", From 0ca4a89c311d9da726e150017b39610a2523cb3a Mon Sep 17 00:00:00 2001 From: Charles Costanzo Date: Fri, 22 Nov 2024 14:08:34 -0500 Subject: [PATCH 20/20] revisions based on Mjumbe's review --- airflow/plugins/operators/scrape_state_geoportal.py | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/airflow/plugins/operators/scrape_state_geoportal.py b/airflow/plugins/operators/scrape_state_geoportal.py index 4d94619d0b..a538df5495 100644 --- a/airflow/plugins/operators/scrape_state_geoportal.py +++ b/airflow/plugins/operators/scrape_state_geoportal.py @@ -1,7 +1,6 @@ import gzip import logging - -# import os +import os from typing import ClassVar, List import pandas as pd # type: ignore @@ -12,8 +11,7 @@ from airflow.models import BaseOperator # type: ignore -API_BUCKET = "gs://calitp-state-geoportal-scrape" -# API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] +API_BUCKET = os.environ["CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS"] class StateGeoportalAPIExtract(PartitionedGCSArtifact): @@ -88,7 +86,8 @@ def fetch_from_state_geoportal(self): params["resultOffset"] = offset # Make the request - response = requests.get(validated_url, params=params).raise_for_status() + response = requests.get(validated_url, params=params) + response.raise_for_status() data = response.json() # Break the loop if there are no more features @@ -187,7 +186,7 @@ def execute(self, **kwargs): df = pd.json_normalize(api_content) if self.product == "state_highway_network": - # Select columns to keep, have to be explicit because there are duplicate values after normalizing + # Select columns to keep, have to be explicit before renaming because there are duplicate values after normalizing df = df[ [ "properties.Route",