Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KKTIX Backfill #109

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ Please use Gitlab Flow, otherwise you cannot pass dockerhub CI
* KLAVIYO_LIST_ID: Create from https://www.klaviyo.com/lists
* KLAVIYO_CAMPAIGN_ID: Create from https://www.klaviyo.com/campaigns
* kktix_events_endpoint: url path of kktix's `hosting_events`, ask @gtb for details!
* kktix_only_not_ended_events: decide whether to only retrieve ended events
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just to clarify, this DAG is only for backfill, right?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, only for backfill :)


### CI/CD

Expand Down
41 changes: 41 additions & 0 deletions dags/ods/kktix_ticket_orders/kktix_backfill_dag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""
Ingest KKTIX's data and load them to BigQuery every 5mins
"""
from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from ods.kktix_ticket_orders.udfs import bigquery_loader, kktix_api

DEFAULT_ARGS = {
"owner": "[email protected]",
"depends_on_past": False,
"start_date": datetime(2022, 10, 30),
"retries": 2,
"retry_delay": timedelta(minutes=5),
"on_failure_callback": lambda x: "Need to send notification to Discord!",
}
dag = DAG(
"KKTIX_TICKET_BACKFILL_ORDERS_V1",
default_args=DEFAULT_ARGS,
schedule_interval="*/5 * * * *",
max_active_runs=1,
catchup=True,
)
with dag:
CREATE_TABLE_IF_NEEDED = PythonOperator(
task_id="CREATE_TABLE_IF_NEEDED",
python_callable=bigquery_loader.create_table_if_needed,
)

GET_ATTENDEE_INFOS = PythonOperator(
task_id="GET_ATTENDEE_INFOS",
python_callable=kktix_api.main,
provide_context=True,
op_kwargs={"backfill": True},
)

CREATE_TABLE_IF_NEEDED >> GET_ATTENDEE_INFOS

if __name__ == "__main__":
dag.cli()
3 changes: 2 additions & 1 deletion dags/ods/kktix_ticket_orders/sqls/create_table.sql
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ CREATE TABLE IF NOT EXISTS `{}`
(
ID INT64 NOT NULL,
NAME STRING NOT NULL,
ATTENDEE_INFO STRING NOT NULL
ATTENDEE_INFO STRING NOT NULL,
REFUNDED BOOLEAN FALSE
tomatoprinx marked this conversation as resolved.
Show resolved Hide resolved
);
2 changes: 1 addition & 1 deletion dags/ods/kktix_ticket_orders/udfs/bigquery_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

from google.cloud import bigquery

TABLE = f"{os.getenv('BIGQUERY_PROJECT')}.ods.ods_kktix_attendeeId_datetime"
TABLE = f"{os.getenv('BIGQUERY_PROJECT')}.ods.ods_kktix_attendeeId_datetime_copy2"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copy2 is only testing right?

# since backfill would insert duplicate records, we need this dedupe to make it idempotent
DEDUPE_SQL = f"""
CREATE OR REPLACE TABLE
Expand Down
34 changes: 20 additions & 14 deletions dags/ods/kktix_ticket_orders/udfs/kktix_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,10 @@ def main(**context):
ts_datetime_obj = parse(context["ts"])
year = ts_datetime_obj.year
timestamp = ts_datetime_obj.timestamp()
event_raw_data_array = _extract(year=year, timestamp=timestamp,)
is_backfill = context["backfill"]
event_raw_data_array = _extract(
year=year, timestamp=timestamp, backfill=is_backfill
)
transformed_event_raw_data_array = kktix_transformer.transform(
copy.deepcopy(event_raw_data_array)
)
Expand All @@ -40,7 +43,7 @@ def main(**context):
)


def _extract(year: int, timestamp: float) -> List[Dict]:
def _extract(year: int, timestamp: float, backfill: bool) -> List[Dict]:
"""
get data from KKTIX's API
1. condition_filter_callb: use this callbacl to filter out unwanted event!
Expand All @@ -54,7 +57,7 @@ def _extract(year: int, timestamp: float) -> List[Dict]:
event_metadatas = get_event_metadatas(condition_filter_callback)
for event_metadata in event_metadatas:
event_id = event_metadata["id"]
for attendee_info in get_attendee_infos(event_id, timestamp):
for attendee_info in get_attendee_infos(event_id, timestamp, backfill):
event_raw_data_array.append(
{
"id": event_id,
Expand All @@ -65,13 +68,13 @@ def _extract(year: int, timestamp: float) -> List[Dict]:
return event_raw_data_array


def get_attendee_infos(event_id: int, timestamp: float) -> List:
def get_attendee_infos(event_id: int, timestamp: float, backfill: bool) -> List:
"""
it's a public wrapper for people to get attendee infos!
"""
attendance_book_id = _get_attendance_book_id(event_id)
attendee_ids = _get_attendee_ids(event_id, attendance_book_id)
attendee_infos = _get_attendee_infos(event_id, attendee_ids, timestamp)
attendee_infos = _get_attendee_infos(event_id, attendee_ids, timestamp, backfill)
return attendee_infos


Expand All @@ -80,7 +83,7 @@ def get_event_metadatas(condition_filter: Callable) -> List[Dict]:
Fetch all the ongoing events
"""
event_list_resp = HTTP_HOOK.run_with_advanced_retry(
endpoint=f"{Variable.get('kktix_events_endpoint')}?only_not_ended_event=true",
endpoint=f"{Variable.get('kktix_events_endpoint')}?only_not_ended_event={Variable.get('kktix_only_not_ended_events')}",
tomatoprinx marked this conversation as resolved.
Show resolved Hide resolved
_retry_args=RETRY_ARGS,
).json()
event_metadatas: List[dict] = []
Expand Down Expand Up @@ -116,7 +119,7 @@ def _get_attendee_ids(event_id: int, attendance_book_id: int) -> List[int]:


def _get_attendee_infos(
event_id: int, attendee_ids: List[int], timestamp: float
event_id: int, attendee_ids: List[int], timestamp: float, backfill: bool
) -> List:
"""
get attendee infos, e.g. email, phonenumber, name and etc
Expand All @@ -127,12 +130,15 @@ def _get_attendee_infos(
endpoint=f"{Variable.get('kktix_events_endpoint')}/{event_id}/attendees/{attendee_id}",
_retry_args=RETRY_ARGS,
).json()
if not attendee_info["is_paid"]:
continue
if (
timestamp
< attendee_info["updated_at"]
< timestamp + SCHEDULE_INTERVAL_SECONDS
):
if backfill:
attendee_infos.append(attendee_info)
else:
if not attendee_info["is_paid"]:
continue
if (
timestamp
< attendee_info["updated_at"]
< timestamp + SCHEDULE_INTERVAL_SECONDS
):
attendee_infos.append(attendee_info)
tomatoprinx marked this conversation as resolved.
Show resolved Hide resolved
return attendee_infos