Skip to content

Commit d5c6a4b

Browse files
ohriteerikamov
andcommitted
Create Kuba sync and external table (#4083)
* Create Kuba bucket on staging * Add KubaHook to communicate with Proxima API * Add KubaToGCS operator * Add sync_kuba dag * Add production kuba bucket * Fix lint complaints * Fix terraform Kuba bucket complaint * Initialize KubaToGCSOperator and fix misspelled name * Update setting on yml file to fix creation of Kuba external table --------- Co-authored-by: Erika Pacheco <[email protected]>
1 parent ac9d450 commit d5c6a4b

File tree

23 files changed

+1288
-3
lines changed

23 files changed

+1288
-3
lines changed

airflow/.test.env

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ CALITP_BUCKET__GTFS_SCHEDULE_UNZIPPED=gs://calitp-staging-pytest
2222
CALITP_BUCKET__GTFS_SCHEDULE_UNZIPPED_HOURLY=gs://calitp-staging-pytest
2323
CALITP_BUCKET__GTFS_SCHEDULE_VALIDATION=gs://calitp-staging-pytest
2424
CALITP_BUCKET__GTFS_SCHEDULE_VALIDATION_HOURLY=gs://calitp-staging-pytest
25+
CALITP_BUCKET__KUBA=gs://calitp-staging-pytest
2526
CALITP_BUCKET__LITTLEPAY_PARSED=gs://calitp-staging-pytest
2627
CALITP_BUCKET__LITTLEPAY_PARSED_V3=gs://calitp-staging-pytest
2728
CALITP_BUCKET__LITTLEPAY_RAW=gs://calitp-staging-pytest
Lines changed: 200 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,200 @@
1+
operator: operators.ExternalTable
2+
bucket: "{{ env_var('CALITP_BUCKET__KUBA') }}"
3+
post_hook: |
4+
SELECT *
5+
FROM `{{ env_var('GOOGLE_CLOUD_PROJECT') }}`.external_kuba.device_properties
6+
LIMIT 1;
7+
source_objects:
8+
- "device_properties/*.jsonl.gz"
9+
destination_project_dataset_table: "external_kuba.device_properties"
10+
source_format: NEWLINE_DELIMITED_JSON
11+
use_bq_client: true
12+
hive_options:
13+
mode: CUSTOM
14+
require_partition_filter: false
15+
source_uri_prefix: "device_properties/{dt:DATE}/{ts:TIMESTAMP}/"
16+
schema_fields:
17+
- name: device
18+
type: RECORD
19+
fields:
20+
- name: fo_device_logical_id
21+
type: STRING
22+
- name: fo_device_type
23+
type: STRING
24+
- name: fo_device_type_model
25+
type: STRING
26+
- name: fo_device_serial_number
27+
type: STRING
28+
- name: fo_device_description
29+
type: STRING
30+
- name: fo_device_location_id
31+
type: STRING
32+
- name: fo_device_location
33+
type: STRING
34+
- name: fo_device_last_connection
35+
type: STRING
36+
- name: device_replicator_info
37+
type: RECORD
38+
fields:
39+
- name: software_version
40+
type: STRING
41+
- name: software_last_connection
42+
type: STRING
43+
- name: cd_version
44+
type: STRING
45+
- name: cd_last_connection
46+
type: STRING
47+
- name: dataset_version
48+
type: STRING
49+
- name: dataset_last_connection
50+
type: STRING
51+
- name: denylist_version
52+
type: STRING
53+
- name: denylist_last_connection
54+
type: STRING
55+
- name: acceptlist_version
56+
type: STRING
57+
- name: acceptlist_last_connection
58+
type: STRING
59+
- name: binlist_version
60+
type: STRING
61+
- name: binlist_last_connection
62+
type: STRING
63+
- name: asset_last_connection
64+
type: STRING
65+
- name: monitoring_last_connection
66+
type: STRING
67+
- name: ud_last_transaction_time
68+
type: STRING
69+
- name: device_monitor_info
70+
type: RECORD
71+
fields:
72+
- name: application__isdisabled
73+
type: STRING
74+
- name: application__isinservice
75+
type: STRING
76+
- name: application__servicestatus
77+
type: RECORD
78+
fields:
79+
- name: Rows
80+
type: RECORD
81+
mode: REPEATED
82+
fields:
83+
- name: Id
84+
type: STRING
85+
- name: gps__position
86+
type: RECORD
87+
fields:
88+
- name: altitude
89+
type: STRING
90+
- name: dateTime
91+
type: STRING
92+
- name: direction
93+
type: STRING
94+
- name: gpsFix
95+
type: STRING
96+
- name: groundSpeed
97+
type: STRING
98+
- name: hasAltitude
99+
type: STRING
100+
- name: hasDateTime
101+
type: STRING
102+
- name: hasDirection
103+
type: STRING
104+
- name: hasGpsFix
105+
type: STRING
106+
- name: hasGroundSpeed
107+
type: STRING
108+
- name: hasLatitude
109+
type: STRING
110+
- name: hasLongitude
111+
type: STRING
112+
- name: latitude
113+
type: STRING
114+
- name: longitude
115+
type: STRING
116+
- name: numberSatelites
117+
type: STRING
118+
- name: properties
119+
type: RECORD
120+
fields:
121+
- name: Id
122+
type: STRING
123+
- name: location__location__servicestatus
124+
type: STRING
125+
- name: location__location__source
126+
type: STRING
127+
- name: os__uptime
128+
type: STRING
129+
- name: smartmedium__emv3000__batterylevel
130+
type: STRING
131+
- name: ngwifi__gps__apistatus
132+
type: RECORD
133+
fields:
134+
- name: status
135+
type: STRING
136+
- name: ngwifi__gps__position
137+
type: RECORD
138+
fields:
139+
- name: altitude
140+
type: STRING
141+
- name: fix
142+
type: STRING
143+
- name: heading
144+
type: STRING
145+
- name: latitude
146+
type: STRING
147+
- name: longitude
148+
type: STRING
149+
- name: speed
150+
type: STRING
151+
- name: success
152+
type: STRING
153+
- name: timestamp
154+
type: STRING
155+
- name: location__location__info
156+
type: RECORD
157+
fields:
158+
- name: dataid
159+
type: STRING
160+
- name: dataversion
161+
type: STRING
162+
- name: locationProviderSource
163+
type: STRING
164+
- name: type
165+
type: STRING
166+
- name: properties
167+
type: RECORD
168+
fields:
169+
- name: Id
170+
type: STRING
171+
- name: current
172+
type: RECORD
173+
fields:
174+
- name: zoneinfo
175+
type: RECORD
176+
fields:
177+
- name: name
178+
type: STRING
179+
- name: reference
180+
type: STRING
181+
- name: stopinfo
182+
type: RECORD
183+
fields:
184+
- name: abbreviation
185+
type: STRING
186+
- name: farematrixreference
187+
type: STRING
188+
- name: name
189+
type: STRING
190+
- name: reference
191+
type: STRING
192+
- name: shortname
193+
type: STRING
194+
- name: type
195+
type: STRING
196+
- name: properties
197+
type: RECORD
198+
fields:
199+
- name: Id
200+
type: STRING
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
description: "Capture the Kuba API to GCS"
2+
schedule_interval: "0 0 * * *"
3+
tags:
4+
- all_gusty_features
5+
default_args:
6+
owner: airflow
7+
depends_on_past: False
8+
start_date: "2025-07-14"
9+
catchup: False
10+
email:
11+
12+
email_on_failure: True
13+
pool: default_pool
14+
concurrency: 50
15+
wait_for_defaults:
16+
timeout: 3600
17+
latest_only: True

airflow/dags/sync_kuba/README.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
# `sync_kuba`
2+
3+
Type: [Now / Scheduled](https://docs.calitp.org/data-infra/airflow/dags-maintenance.html)
4+
5+
This DAG orchestrates the syncing of raw Kuba data.
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
operator: operators.KubaToGCSOperator
2+
3+
product: device_properties
4+
endpoint: monitoring/deviceproperties/v1/ForLocations/all
5+
parameters:
6+
location_type: 1
7+
bucket: "{{ env_var('CALITP_BUCKET__KUBA') }}"

airflow/docker-compose.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -109,6 +109,7 @@ x-airflow-common:
109109
CALITP_BUCKET__NTD_XLSX_DATA_PRODUCTS__CLEAN: "gs://calitp-staging-ntd-xlsx-products-clean"
110110
CALITP_BUCKET__NTD_REPORT_VALIDATION: "gs://calitp-staging-ntd-report-validation"
111111
CALITP_BUCKET__STATE_GEOPORTAL_DATA_PRODUCTS: "gs://calitp-staging-state-geoportal-scrape"
112+
CALITP_BUCKET__KUBA: "gs://calitp-staging-kuba"
112113

113114
DBT_TARGET: staging_service_account
114115

airflow/plugins/hooks/kuba_hook.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from airflow.hooks.base import BaseHook
2+
from airflow.hooks.http_hook import HttpHook
3+
4+
5+
class KubaHook(BaseHook):
6+
_http_conn_id: str
7+
_method: str
8+
_session_id: str
9+
10+
def __init__(self, method: str = "GET", http_conn_id: str = "kuba_default"):
11+
super().__init__()
12+
self._http_conn_id: str = http_conn_id
13+
self._method: str = method
14+
self._session_id: str = None
15+
self._get_connection()
16+
17+
def _get_connection(self):
18+
conn = self.get_connection(self._http_conn_id)
19+
data = {
20+
"UserName": conn.login,
21+
"Password": conn.password,
22+
"OperatorIdentifier": int(conn.schema),
23+
}
24+
headers = {"Content-Type": "application/json"}
25+
kuba_api = HttpHook(method="POST", http_conn_id=self._http_conn_id)
26+
authenticate_path = "monitoring/authenticate/v1"
27+
result = kuba_api.run(
28+
endpoint=authenticate_path, headers=headers, json=data
29+
).json()
30+
assert result["Error"] is None
31+
self._session_id = result["SessionId"]
32+
33+
def get_headers(self):
34+
return {
35+
"Content-Type": "application/json",
36+
"Accept": "application/json",
37+
"Cookie": f"session-id={self._session_id}",
38+
}
39+
40+
def run(self, endpoint, data=None, headers=None):
41+
if headers is None:
42+
headers = self.get_headers()
43+
44+
kuba_api = HttpHook(method=self._method, http_conn_id=self._http_conn_id)
45+
response = kuba_api.run(endpoint=endpoint, headers=headers, data=data)
46+
return response.json()

airflow/plugins/operators/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from operators.external_table import ExternalTable
99
from operators.gtfs_csv_to_jsonl import GtfsGcsToJsonlOperator
1010
from operators.gtfs_csv_to_jsonl_hourly import GtfsGcsToJsonlOperatorHourly
11+
from operators.kuba_to_gcs_operator import KubaToGCSOperator
1112
from operators.littlepay_raw_sync import LittlepayRawSync
1213
from operators.littlepay_raw_sync_feed_v3 import LittlepayRawSyncV3
1314
from operators.littlepay_to_jsonl import LittlepayToJSONL

0 commit comments

Comments
 (0)