diff --git a/README.md b/README.md index 532e648..8c91697 100644 --- a/README.md +++ b/README.md @@ -127,6 +127,7 @@ Please use Gitlab Flow, otherwise you cannot pass dockerhub CI * KLAVIYO_LIST_ID: Create from https://www.klaviyo.com/lists * KLAVIYO_CAMPAIGN_ID: Create from https://www.klaviyo.com/campaigns * kktix_events_endpoint: url path of kktix's `hosting_events`, ask @gtb for details! + * kktix_only_not_ended_events: decide whether to only retrieve ended events ### CI/CD diff --git a/dags/ods/kktix_ticket_orders/kktix_backfill_dag.py b/dags/ods/kktix_ticket_orders/kktix_backfill_dag.py new file mode 100644 index 0000000..612be4d --- /dev/null +++ b/dags/ods/kktix_ticket_orders/kktix_backfill_dag.py @@ -0,0 +1,41 @@ +""" +Ingest KKTIX's data and load them to BigQuery every 5mins +""" +from datetime import datetime, timedelta + +from airflow import DAG +from airflow.operators.python_operator import PythonOperator +from ods.kktix_ticket_orders.udfs import bigquery_loader, kktix_api + +DEFAULT_ARGS = { + "owner": "bc842017@gmail.com", + "depends_on_past": False, + "start_date": datetime(2022, 10, 30), + "retries": 2, + "retry_delay": timedelta(minutes=5), + "on_failure_callback": lambda x: "Need to send notification to Discord!", +} +dag = DAG( + "KKTIX_TICKET_BACKFILL_ORDERS_V1", + default_args=DEFAULT_ARGS, + schedule_interval="*/5 * * * *", + max_active_runs=1, + catchup=True, +) +with dag: + CREATE_TABLE_IF_NEEDED = PythonOperator( + task_id="CREATE_TABLE_IF_NEEDED", + python_callable=bigquery_loader.create_table_if_needed, + ) + + GET_ATTENDEE_INFOS = PythonOperator( + task_id="GET_ATTENDEE_INFOS", + python_callable=kktix_api.main, + provide_context=True, + op_kwargs={"backfill": True}, + ) + + CREATE_TABLE_IF_NEEDED >> GET_ATTENDEE_INFOS + +if __name__ == "__main__": + dag.cli() diff --git a/dags/ods/kktix_ticket_orders/sqls/create_table.sql b/dags/ods/kktix_ticket_orders/sqls/create_table.sql index a0a5394..9c7a883 100644 --- a/dags/ods/kktix_ticket_orders/sqls/create_table.sql +++ b/dags/ods/kktix_ticket_orders/sqls/create_table.sql @@ -2,5 +2,6 @@ CREATE TABLE IF NOT EXISTS `{}` ( ID INT64 NOT NULL, NAME STRING NOT NULL, - ATTENDEE_INFO STRING NOT NULL + ATTENDEE_INFO STRING NOT NULL, + REFUNDED BOOLEAN FALSE ); diff --git a/dags/ods/kktix_ticket_orders/udfs/bigquery_loader.py b/dags/ods/kktix_ticket_orders/udfs/bigquery_loader.py index f306aa0..9fdd429 100644 --- a/dags/ods/kktix_ticket_orders/udfs/bigquery_loader.py +++ b/dags/ods/kktix_ticket_orders/udfs/bigquery_loader.py @@ -3,7 +3,7 @@ from google.cloud import bigquery -TABLE = f"{os.getenv('BIGQUERY_PROJECT')}.ods.ods_kktix_attendeeId_datetime" +TABLE = f"{os.getenv('BIGQUERY_PROJECT')}.ods.ods_kktix_attendeeId_datetime_copy2" # since backfill would insert duplicate records, we need this dedupe to make it idempotent DEDUPE_SQL = f""" CREATE OR REPLACE TABLE diff --git a/dags/ods/kktix_ticket_orders/udfs/kktix_api.py b/dags/ods/kktix_ticket_orders/udfs/kktix_api.py index ea4f97e..d9f01ff 100644 --- a/dags/ods/kktix_ticket_orders/udfs/kktix_api.py +++ b/dags/ods/kktix_ticket_orders/udfs/kktix_api.py @@ -27,7 +27,10 @@ def main(**context): ts_datetime_obj = parse(context["ts"]) year = ts_datetime_obj.year timestamp = ts_datetime_obj.timestamp() - event_raw_data_array = _extract(year=year, timestamp=timestamp,) + is_backfill = context["backfill"] + event_raw_data_array = _extract( + year=year, timestamp=timestamp, backfill=is_backfill + ) transformed_event_raw_data_array = kktix_transformer.transform( copy.deepcopy(event_raw_data_array) ) @@ -40,7 +43,7 @@ def main(**context): ) -def _extract(year: int, timestamp: float) -> List[Dict]: +def _extract(year: int, timestamp: float, backfill: bool) -> List[Dict]: """ get data from KKTIX's API 1. condition_filter_callb: use this callbacl to filter out unwanted event! @@ -54,7 +57,7 @@ def _extract(year: int, timestamp: float) -> List[Dict]: event_metadatas = get_event_metadatas(condition_filter_callback) for event_metadata in event_metadatas: event_id = event_metadata["id"] - for attendee_info in get_attendee_infos(event_id, timestamp): + for attendee_info in get_attendee_infos(event_id, timestamp, backfill): event_raw_data_array.append( { "id": event_id, @@ -65,13 +68,13 @@ def _extract(year: int, timestamp: float) -> List[Dict]: return event_raw_data_array -def get_attendee_infos(event_id: int, timestamp: float) -> List: +def get_attendee_infos(event_id: int, timestamp: float, backfill: bool) -> List: """ it's a public wrapper for people to get attendee infos! """ attendance_book_id = _get_attendance_book_id(event_id) attendee_ids = _get_attendee_ids(event_id, attendance_book_id) - attendee_infos = _get_attendee_infos(event_id, attendee_ids, timestamp) + attendee_infos = _get_attendee_infos(event_id, attendee_ids, timestamp, backfill) return attendee_infos @@ -80,7 +83,7 @@ def get_event_metadatas(condition_filter: Callable) -> List[Dict]: Fetch all the ongoing events """ event_list_resp = HTTP_HOOK.run_with_advanced_retry( - endpoint=f"{Variable.get('kktix_events_endpoint')}?only_not_ended_event=true", + endpoint=f"{Variable.get('kktix_events_endpoint')}?only_not_ended_event={Variable.get('kktix_only_not_ended_events')}", _retry_args=RETRY_ARGS, ).json() event_metadatas: List[dict] = [] @@ -116,7 +119,7 @@ def _get_attendee_ids(event_id: int, attendance_book_id: int) -> List[int]: def _get_attendee_infos( - event_id: int, attendee_ids: List[int], timestamp: float + event_id: int, attendee_ids: List[int], timestamp: float, backfill: bool ) -> List: """ get attendee infos, e.g. email, phonenumber, name and etc @@ -127,12 +130,15 @@ def _get_attendee_infos( endpoint=f"{Variable.get('kktix_events_endpoint')}/{event_id}/attendees/{attendee_id}", _retry_args=RETRY_ARGS, ).json() - if not attendee_info["is_paid"]: - continue - if ( - timestamp - < attendee_info["updated_at"] - < timestamp + SCHEDULE_INTERVAL_SECONDS - ): + if backfill: attendee_infos.append(attendee_info) + else: + if not attendee_info["is_paid"]: + continue + if ( + timestamp + < attendee_info["updated_at"] + < timestamp + SCHEDULE_INTERVAL_SECONDS + ): + attendee_infos.append(attendee_info) return attendee_infos