Skip to content

Commit

Permalink
Feature/bespoke data partners (#231)
Browse files Browse the repository at this point in the history
Co-authored-by: keegansmith21 <[email protected]>
  • Loading branch information
keegansmith21 and keegansmith21 authored May 24, 2024
1 parent 3c58f13 commit 026055f
Show file tree
Hide file tree
Showing 90 changed files with 1,297 additions and 67 deletions.
61 changes: 51 additions & 10 deletions dags/oaebu_workflows/oaebu_partners.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# Copyright 2020-2024 Curtin University
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,7 +15,8 @@
# Author: Tuan Chien, Keegan Smith

import os
from typing import Union
import inspect
from typing import Union, List, Dict
from dataclasses import dataclass

from oaebu_workflows.config import schema_folder, sql_folder
Expand Down Expand Up @@ -113,11 +113,11 @@ def __init__(
schema_directory: str,
sql_directory: str,
book_product_functions: str,
export_author: bool,
export_book_metrics: bool,
export_country: bool,
export_subject: bool,
has_metdata: bool = True,
export_author: bool = False,
export_book_metrics: bool = False,
export_country: bool = False,
export_subject: bool = False,
has_metadata: bool = True,
):
"""
Initialises the class. Also uses the DataPartnerFiles class to set up the file names.
Expand All @@ -129,7 +129,7 @@ def __init__(
:param export_book_metrics: Indicates if the partner will use the book metrics export table.
:param export_country: Indicates if the partner will use the country export table.
:param export_subject: Indicates if the partner will use the subject export tables (bic, bisac, thema).
:param has_metdata: Whether the partner has book metadata records
:param has_metadata: Whether the partner has book metadata records
"""
super().__init__(
type_id=type_id,
Expand All @@ -147,7 +147,7 @@ def __init__(
self.export_book_metrics = export_book_metrics
self.export_country = export_country
self.export_subject = export_subject
self.has_metadata = has_metdata
self.has_metadata = has_metadata
self.files = DataPartnerFiles(partner_name=self.type_id)


Expand Down Expand Up @@ -206,7 +206,7 @@ def __init__(
export_book_metrics=True,
export_country=True,
export_subject=True,
has_metdata=False,
has_metadata=False,
),
google_books_sales=DataPartner(
type_id="google_books_sales",
Expand Down Expand Up @@ -407,3 +407,44 @@ def partner_from_str(partner: Union[str, OaebuPartner], metadata_partner: bool =
raise KeyError(f"Partner not found: {partner}").with_traceback(e.__traceback__)

return partner


def create_bespoke_data_partners(partners: List[Dict]) -> List[DataPartner]:
return [create_bespoke_data_partner(p) for p in partners]


def create_bespoke_data_partner(partner_dict: dict) -> DataPartner:
"""Converts a dictionary description of a partner to a DataPartner object
:param partner: The dictionary description of the partner. Should have all required fields as per the DataPartner
object description
"""

# Get the required parameters of the DataPartner class
# https://docs.python.org/3/library/inspect.html#inspect.Parameter.kind
required_params = []
dp_params = inspect.signature(DataPartner.__init__).parameters
for k, v in dp_params.items():
if k == "self": # Ignore the "self" parameter
continue
if v.default is v.empty:
required_params.append(k)

present_params = list(partner_dict.keys())
missing_params = []
for p in required_params:
if p not in present_params:
missing_params.append(p)
if missing_params:
raise NameError(f"Missing required parameters for DataPartner class: {missing_params}")

undefined_params = []
all_params = list(k for k in dp_params.keys() if k != "self")
print(all_params)
for p in partner_dict.keys():
if p not in all_params:
undefined_params.append(p)
if undefined_params:
raise NameError(f"Unrecognised arguments supplied to DataPartner class: {undefined_params}")

return DataPartner(**partner_dict)
143 changes: 87 additions & 56 deletions dags/oaebu_workflows/onix_workflow/onix_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import logging
import os
import re
from typing import Iterable, List, Optional, Tuple, Union
from typing import Iterable, List, Optional, Tuple, Union, Dict

import jsonlines
import pendulum
Expand All @@ -35,7 +35,7 @@

from oaebu_workflows.airflow_pools import CrossrefEventsPool
from oaebu_workflows.config import oaebu_user_agent_header, schema_folder as default_schema_folder, sql_folder
from oaebu_workflows.oaebu_partners import DataPartner, OaebuPartner, partner_from_str
from oaebu_workflows.oaebu_partners import DataPartner, OaebuPartner, partner_from_str, create_bespoke_data_partners
from oaebu_workflows.onix_workflow.onix_work_aggregation import BookWorkAggregator, BookWorkFamilyAggregator
from observatory_platform.airflow.airflow import on_failure_callback
from observatory_platform.airflow.release import make_snapshot_date, set_task_state, SnapshotRelease
Expand Down Expand Up @@ -233,10 +233,10 @@ def create_dag(
bq_worksid_table_name: str = "onix_workid_isbn",
bq_worksid_error_table_name: str = "onix_workid_isbn_errors",
bq_workfamilyid_table_name: str = "onix_workfamilyid_isbn",
oaebu_intermediate_match_suffix: str = "_matched",
skip_downloading_crossref_events: bool = True,
# Run parameters
data_partners: List[Union[str, OaebuPartner]] = None,
bespoke_data_partners: List[Dict] = None,
ga3_views_field="page_views",
schema_folder: str = default_schema_folder(workflow_module="onix_workflow"),
mailto: str = "[email protected]",
Expand Down Expand Up @@ -278,11 +278,11 @@ def create_dag(
:param bq_worksid_table_name: table ID of the worksid table
:param bq_worksid_error_table_name: table ID of the worksid error table
:param bq_workfamilyid_table_name: table ID of the workfamilyid table
:param oaebu_intermediate_match_suffix: Suffix to append to intermediate tables
:param skip_downloading_crossref_events: skip fetching new data for Crossref Events, when True, falls back to using
a previous version of the table.
:param data_partners: OAEBU data sources.
:param bespoke_data_partners: Dictionary-defined OAEBU Data Partners
:param ga3_views_field: The name of the GA3 views field - should be either 'page_views' or 'unique_views'
:param schema_folder: the SQL schema path.
:param mailto: email address used to identify the user when sending requests to an API.
Expand All @@ -306,6 +306,8 @@ def create_dag(

metadata_partner = partner_from_str(metadata_partner, metadata_partner=True)
data_partners = [partner_from_str(p) for p in data_partners]
if bespoke_data_partners:
data_partners.extend(create_bespoke_data_partners(bespoke_data_partners))

# Create pool for crossref API calls (if they don't exist)
# Pools are necessary to throttle the maxiumum number of requests we can make per second and avoid 429 errors
Expand Down Expand Up @@ -590,39 +592,26 @@ def create_book_table(release: dict, **context) -> None:
set_task_state(status, context["ti"].task_id, release=release)

@task_group(group_id="intermediate_tables")
def create_tasks_intermediate_tables(release: dict):
def _create_tasks_intermediate_tables(release: dict):
tasks = []
for data_partner in data_partners:
task = create_intermediate_table.override(task_id=f"intermediate_{data_partner.bq_table_name}")(
release,
orig_project_id=cloud_workspace.project_id,
orig_dataset=data_partner.bq_dataset_id,
orig_table=data_partner.bq_table_name,
orig_isbn=data_partner.isbn_field_name,
sharded=data_partner.sharded,
task = _create_intermediate_table.override(task_id=f"intermediate_{data_partner.type_id}")(
release, data_partner=data_partner
)
tasks.append(task)
chain(tasks)

@task()
def create_intermediate_table(
def _create_intermediate_table(
release: dict,
*,
orig_project_id: str,
orig_dataset: str,
orig_table: str,
orig_isbn: str,
sharded: bool,
data_partner: DataPartner,
**context,
) -> None:
"""Create an intermediate oaebu table. They are of the form datasource_matched<date>
"""Create an intermediate oaebu table. They are of the form type_id_matched<date>
:param release: Onix workflow release information.
:param orig_project_id: Project ID for the partner data.
:param orig_dataset: Dataset ID for the partner data.
:param orig_table: Table ID for the partner data.
:param orig_isbn: Name of the ISBN field in the partner data table.
:param sharded: Whether the data partner table is sharded
:param data_partner: The data partner object for the data source.
"""

release = OnixWorkflowRelease.from_dict(release)
Expand All @@ -632,44 +621,27 @@ def create_intermediate_table(
location=cloud_workspace.data_location,
description="Intermediate OAEBU Tables",
)
orig_table_id = (
bq_sharded_table_id(orig_project_id, orig_dataset, orig_table, release.snapshot_date)
if sharded
else bq_table_id(orig_project_id, orig_dataset, orig_table)
)
output_table_name = f"{orig_table}{oaebu_intermediate_match_suffix}"
template_path = os.path.join(
sql_folder(workflow_module="onix_workflow"), "assign_workid_workfamilyid.sql.jinja2"
)
output_table_id = bq_sharded_table_id(
cloud_workspace.project_id,
bq_oaebu_intermediate_dataset,
output_table_name,
release.snapshot_date,
)
wid_table_id = bq_sharded_table_id(
bq_worksid_table_id = bq_sharded_table_id(
cloud_workspace.project_id,
bq_onix_workflow_dataset,
bq_worksid_table_name,
release.snapshot_date,
)
wfam_table_id = bq_sharded_table_id(
bq_workfamily_table_id = bq_sharded_table_id(
cloud_workspace.project_id,
bq_onix_workflow_dataset,
bq_workfamilyid_table_name,
release.snapshot_date,
)

# Make the table from SQL query
client = Client(project=cloud_workspace.project_id)
sql = render_template(
template_path,
orig_table_id=orig_table_id,
orig_isbn=orig_isbn,
wid_table_id=wid_table_id,
wfam_table_id=wfam_table_id,
status = create_intermediate_table(
data_partner=data_partner,
project_id=cloud_workspace.project_id,
bq_output_dataset=bq_oaebu_intermediate_dataset,
bq_worksid_table_id=bq_worksid_table_id,
bq_workfamily_table_id=bq_workfamily_table_id,
snapshot_date=release.snapshot_date,
)
status = bq_create_table_from_query(sql=sql, table_id=output_table_id, client=client)
set_task_state(status, context["ti"].task_id, release=release)

@task()
Expand All @@ -686,11 +658,11 @@ def create_book_product_table(release: dict, **context) -> None:

# Data partner table names
dp_tables = {
f"{dp.type_id}_table_id": bq_sharded_table_id(
cloud_workspace.project_id,
bq_oaebu_intermediate_dataset,
f"{dp.type_id}_matched",
release.snapshot_date,
f"{dp.type_id}_table_id": bq_intermediate_table_id(
project_id=cloud_workspace.project_id,
bq_intermediate_dataset=bq_oaebu_intermediate_dataset,
dp_type_id=dp.type_id,
snapshot_date=release.snapshot_date,
)
for dp in data_partners
}
Expand Down Expand Up @@ -1056,7 +1028,7 @@ def cleanup_workflow(release: dict, **context):
task_create_crossref_metadata_table = create_crossref_metadata_table(xcom_release)
task_create_crossref_events_table = create_crossref_events_table(xcom_release)
task_create_book_table = create_book_table(xcom_release)
task_group_create_intermediate_tables = create_tasks_intermediate_tables(xcom_release)
task_group_create_intermediate_tables = _create_tasks_intermediate_tables(xcom_release)
task_create_book_product_table = create_book_product_table(xcom_release)
task_group_create_export_tables = create_tasks_export_tables(xcom_release)
task_update_latest_export_tables = update_latest_export_tables(xcom_release)
Expand All @@ -1082,6 +1054,65 @@ def cleanup_workflow(release: dict, **context):
return onix_workflow()


def create_intermediate_table(
data_partner: DataPartner,
project_id: str,
bq_output_dataset: str,
bq_worksid_table_id: str,
bq_workfamily_table_id: str,
snapshot_date: str,
) -> bool:
"""Create an intermediate oaebu table from a data partner.
:param data_partner: The data partner object for the data source.
:param project_id: Project ID for the partner data.
:param bq_worksid_table_id: The ID of the worksid table
:param bq_workfamily_table_id: The ID of the workfamily table
:param snapshot_date: The snapshot date to use
:param bq_output_dataset: The name of the dataset to output the resulting table
:return: Whether the operation was successful
"""

if data_partner.sharded:
orig_table_id = bq_sharded_table_id(
project_id, data_partner.bq_dataset_id, data_partner.bq_table_name, snapshot_date
)
else:
orig_table_id = bq_table_id(project_id, data_partner.bq_dataset_id, data_partner.bq_table_name)
output_table_id = bq_intermediate_table_id(
project_id=project_id,
bq_intermediate_dataset=bq_output_dataset,
dp_type_id=data_partner.type_id,
snapshot_date=snapshot_date,
)
template_path = os.path.join(sql_folder(workflow_module="onix_workflow"), "assign_workid_workfamilyid.sql.jinja2")

# Make the table from SQL query
client = Client(project=project_id)
sql = render_template(
template_path,
orig_table_id=orig_table_id,
orig_isbn=data_partner.isbn_field_name,
wid_table_id=bq_worksid_table_id,
wfam_table_id=bq_workfamily_table_id,
)
status = bq_create_table_from_query(sql=sql, table_id=output_table_id, client=client)
return status


def bq_intermediate_table_id(project_id: str, bq_intermediate_dataset: str, dp_type_id: str, snapshot_date: str) -> str:
"""
Generates a table ID for a given data partner type ID
:param project_id: The project ID
:param dp_type_id: The type ID of the data partner
:param snapshot_date: The stringified snapshot date
:param bq_intermediate_dataset: The 'intermediate' dateset name
:return: The data partner's intermediate table ID
"""
return bq_sharded_table_id(project_id, bq_intermediate_dataset, f"{dp_type_id}_matched", snapshot_date)


def get_doi_prefixes(dois: Iterable[str]) -> set[str]:
"""Convert DOIs to a set of unique DOI prefixes.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"fields": [
{
"mode": "NULLABLE",
"name": "value",
"type": "INTEGER",
"description": "Metric value"
}
],
"mode": "NULLABLE",
"name": "altmetrics_pilot",
"type": "RECORD",
"description": "Metrics derived from Altmetrics"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"fields": [
{
"mode": "NULLABLE",
"name": "value",
"type": "INTEGER",
"description": "Metric value"
}
],
"mode": "NULLABLE",
"name": "amazon_ltd_pilot",
"type": "RECORD",
"description": "Metrics derived from Amazon LTD"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"fields": [
{
"mode": "NULLABLE",
"name": "value",
"type": "INTEGER",
"description": "Metric value"
}
],
"mode": "NULLABLE",
"name": "amazon_pilot",
"type": "RECORD",
"description": "Metrics derived from Amazon"
}
Loading

0 comments on commit 026055f

Please sign in to comment.