Skip to content

Commit

Permalink
NTD: Modify API sync for column name handling, ingest safety and secu…
Browse files Browse the repository at this point in the history
…rity tables, create external tables (#3579)

* NTD: Modify API sync for column name handling, ingest safety and security tables, create external tables

* update ymls and add correct endpoints for safety and security table ingest
  • Loading branch information
charlie-costanzo authored Dec 12, 2024
1 parent eb16c3c commit ce8e861
Show file tree
Hide file tree
Showing 7 changed files with 61 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
operator: operators.ExternalTable
bucket: gs://calitp-ntd-api-products
source_objects:
- "fra_regulated_mode_major_security_events/historical/*.jsonl.gz"
source_format: NEWLINE_DELIMITED_JSON
use_bq_client: true
hive_options:
mode: CUSTOM
require_partition_filter: false
source_uri_prefix: "fra_regulated_mode_major_security_events/historical/{dt:DATE}/{execution_ts:TIMESTAMP}"
destination_project_dataset_table: "external_ntd__safety_and_security.historical__fra_regulated_mode_major_security_events"
prefix_bucket: false
post_hook: SELECT * FROM `{{ get_project_id() }}`.external_ntd__safety_and_security.historical__fra_regulated_mode_major_security_events LIMIT 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
operator: operators.ExternalTable
bucket: gs://calitp-ntd-api-products
source_objects:
- "major_safety_events/historical/*.jsonl.gz"
source_format: NEWLINE_DELIMITED_JSON
use_bq_client: true
hive_options:
mode: CUSTOM
require_partition_filter: false
source_uri_prefix: "major_safety_events/historical/{dt:DATE}/{execution_ts:TIMESTAMP}"
destination_project_dataset_table: "external_ntd__safety_and_security.historical__major_safety_events"
prefix_bucket: false
post_hook: SELECT * FROM `{{ get_project_id() }}`.external_ntd__safety_and_security.historical__major_safety_events LIMIT 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
operator: operators.ExternalTable
bucket: gs://calitp-ntd-api-products
source_objects:
- "monthly_modal_time_series_safety_and_service/historical/*.jsonl.gz"
source_format: NEWLINE_DELIMITED_JSON
use_bq_client: true
hive_options:
mode: CUSTOM
require_partition_filter: false
source_uri_prefix: "monthly_modal_time_series_safety_and_service/historical/{dt:DATE}/{execution_ts:TIMESTAMP}"
destination_project_dataset_table: "external_ntd__safety_and_security.historical__monthly_modal_time_series_safety_and_service"
prefix_bucket: false
post_hook: SELECT * FROM `{{ get_project_id() }}`.external_ntd__safety_and_security.historical__monthly_modal_time_series_safety_and_service LIMIT 1;
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
operator: operators.ExternalTable
bucket: gs://calitp-ntd-api-products
source_objects:
- "nonmajor_safety_and_security_events/historical/*.jsonl.gz"
source_format: NEWLINE_DELIMITED_JSON
use_bq_client: true
hive_options:
mode: CUSTOM
require_partition_filter: false
source_uri_prefix: "nonmajor_safety_and_security_events/historical/{dt:DATE}/{execution_ts:TIMESTAMP}"
destination_project_dataset_table: "external_ntd__safety_and_security.historical__nonmajor_safety_and_security_events"
prefix_bucket: false
post_hook: SELECT * FROM `{{ get_project_id() }}`.external_ntd__safety_and_security.historical__nonmajor_safety_and_security_events LIMIT 1;
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ operator: operators.NtdDataProductAPIOperator
year: 'historical'
product: 'major_safety_events'
root_url: 'https://data.transportation.gov/resource/'
endpoint_id: '9ivb-8ae9'
endpoint_id: 'urir-txqm'
file_format: '.json'
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,5 @@ operator: operators.NtdDataProductAPIOperator
year: 'historical'
product: 'monthly_modal_time_series_safety_and_service'
root_url: 'https://data.transportation.gov/resource/'
endpoint_id: '65fa-qbkf'
endpoint_id: '5ti2-5uiv'
file_format: '.json'
8 changes: 7 additions & 1 deletion airflow/plugins/operators/scrape_ntd_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@
import pandas as pd # type: ignore
import pendulum
import requests
from calitp_data_infra.storage import PartitionedGCSArtifact, get_fs # type: ignore
from calitp_data_infra.storage import ( # type: ignore
PartitionedGCSArtifact,
get_fs,
make_name_bq_safe,
)
from pydantic import HttpUrl, parse_obj_as

from airflow.models import BaseOperator # type: ignore
Expand Down Expand Up @@ -113,6 +117,8 @@ def execute(self, **kwargs):

df = pd.read_json(decode_api_content)

df = df.rename(make_name_bq_safe, axis="columns")

self.gzipped_content = gzip.compress(
df.to_json(orient="records", lines=True).encode()
)
Expand Down

0 comments on commit ce8e861

Please sign in to comment.