Skip to content

Commit

Permalink
Alternative ONIX snapshot date source (#223)
Browse files Browse the repository at this point in the history
Co-authored-by: keegansmith21 <[email protected]>
  • Loading branch information
keegansmith21 and keegansmith21 authored Apr 12, 2024
1 parent a569823 commit ece3eef
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 52 deletions.
13 changes: 11 additions & 2 deletions dags/oaebu_workflows/oaebu_partners.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,9 @@ def __init__(self, *, partner_name: str):
class OaebuPartner:
"""Class for storing information about data sources we are using to produce oaebu intermediate tables for.
:param type_id: The dataset type id. Should be the same as its dictionary key
:param type_id: The dataset type id. Should be the same as its dictionary key.
:param bq_dataset_id: The BigQuery dataset ID Bigquery Dataset ID.
:param bq_table_name: The BigQuery table name Bigquery Table name
:param bq_table_name: The BigQuery table name.
:param isbn_field_name: Name of the field containing the ISBN.
:param title_field_name: Name of the field containing the Title.
:param sharded: whether the table is sharded or not.
Expand Down Expand Up @@ -179,6 +179,15 @@ def __init__(
sharded=True,
schema_path=os.path.join(schema_folder(workflow_module="onix_telescope"), "onix.json"),
),
onix_view=OaebuPartner(
type_id="onix_view",
bq_dataset_id="onix",
bq_table_name="onix",
isbn_field_name="ISBN13",
title_field_name="TitleDetails.TitleElements.TitleText",
sharded=False,
schema_path=os.path.join(schema_folder(workflow_module="onix_telescope"), "onix.json"),
),
)

OAEBU_DATA_PARTNERS = dict(
Expand Down
82 changes: 38 additions & 44 deletions dags/oaebu_workflows/onix_workflow/onix_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,23 +74,25 @@ def __init__(
dag_id: str,
run_id: str,
snapshot_date: pendulum.DateTime,
onix_snapshot_date: pendulum.DateTime,
crossref_master_snapshot_date: pendulum.DateTime,
onix_table_id: str,
):
"""
Construct the OnixWorkflow Release
:param dag_id: DAG ID.
:param release_date: The date of the partition/release
:param onix_snapshot_date: The ONIX snapshot/release date.
:param crossref_master_snapshot_date: The release date/suffix of the crossref master table
:param onix_table_id: The table ID of the onix table to use for querying
"""

super().__init__(dag_id=dag_id, run_id=run_id, snapshot_date=snapshot_date)

# Dates
self.onix_snapshot_date = onix_snapshot_date
self.crossref_master_snapshot_date = crossref_master_snapshot_date

# Onix table
self.onix_table_id = onix_table_id

# Files
self.workslookup_file_name = "worksid.jsonl.gz"
self.workslookup_errors_file_name = "worksid_errors.jsonl.gz"
Expand Down Expand Up @@ -183,17 +185,17 @@ def from_dict(dict_: dict):
dag_id=dict_["dag_id"],
run_id=dict_["run_id"],
snapshot_date=pendulum.from_format(dict_["snapshot_date"], "YYYY-MM-DD"),
onix_snapshot_date=pendulum.from_format(dict_["onix_snapshot_date"], "YYYY-MM-DD"),
crossref_master_snapshot_date=pendulum.from_format(dict_["crossref_master_snapshot_date"], "YYYY-MM-DD"),
onix_table_id=dict_["onix_table_id"],
)

def to_dict(self):
return {
"dag_id": self.dag_id,
"run_id": self.run_id,
"snapshot_date": self.snapshot_date.to_date_string(),
"onix_snapshot_date": self.onix_snapshot_date.to_date_string(),
"crossref_master_snapshot_date": self.crossref_master_snapshot_date.to_date_string(),
"onix_table_id": self.onix_table_id,
}


Expand Down Expand Up @@ -246,6 +248,7 @@ def create_dag(
:param dag_id: DAG ID.
:param cloud_workspace: The CloudWorkspace object for this DAG
:param metadata_partner: The Oaebu Metadata partner
:param bq_master_crossref_project_id: GCP project ID of crossref master data
:param bq_master_crossref_dataset_id: GCP dataset ID of crossref master data
Expand Down Expand Up @@ -332,42 +335,53 @@ def make_release(**context) -> dict:
:return: a dictionary representation of the OnixWorkflowRelease object.
"""

# Get ONIX release date
onix_table_id = bq_table_id(
project_id=cloud_workspace.project_id,
dataset_id=metadata_partner.bq_dataset_id,
table_id=metadata_partner.bq_table_name,
)
snapshot_date = make_snapshot_date(**context)
client = Client(project=cloud_workspace.project_id)
onix_snapshot_dates = bq_select_table_shard_dates(
table_id=onix_table_id, end_date=snapshot_date, client=client
)
if not len(onix_snapshot_dates):
raise RuntimeError("OnixWorkflow.make_release: no ONIX releases found")

onix_snapshot_date = onix_snapshot_dates[0] # Get most recent snapshot
# Get ONIX table ID
if metadata_partner.sharded:
onix_source_table_id = bq_table_id(
cloud_workspace.project_id, metadata_partner.bq_dataset_id, metadata_partner.bq_table_name
)
onix_snapshot_dates = bq_select_table_shard_dates(
table_id=onix_source_table_id, end_date=snapshot_date, client=client
)

if not len(onix_snapshot_dates):
raise RuntimeError("OnixWorkflow.make_release: no ONIX releases found")

onix_snapshot_date = onix_snapshot_dates[0] # Get most recent snapshot
onix_table_id = bq_sharded_table_id(
cloud_workspace.project_id,
metadata_partner.bq_dataset_id,
metadata_partner.bq_table_name,
onix_snapshot_date,
)
else:
onix_table_id = bq_table_id(
cloud_workspace.project_id, metadata_partner.bq_dataset_id, metadata_partner.bq_table_name
)

# Get Crossref Metadata release date
crossref_table_id = bq_table_id(
project_id=bq_master_crossref_project_id,
dataset_id=bq_master_crossref_dataset_id,
table_id=bq_master_crossref_metadata_table_name,
bq_master_crossref_project_id, bq_master_crossref_dataset_id, bq_master_crossref_metadata_table_name
)
crossref_metadata_snapshot_dates = bq_select_table_shard_dates(
table_id=crossref_table_id, end_date=snapshot_date, client=client
)

if not len(crossref_metadata_snapshot_dates):
raise RuntimeError("OnixWorkflow.make_release: no Crossref Metadata releases found")

crossref_master_snapshot_date = crossref_metadata_snapshot_dates[0] # Get most recent snapshot

# Make the release object
return OnixWorkflowRelease(
dag_id=dag_id,
run_id=context["run_id"],
snapshot_date=snapshot_date,
onix_snapshot_date=onix_snapshot_date,
crossref_master_snapshot_date=crossref_master_snapshot_date,
onix_table_id=onix_table_id,
).to_dict()

@task()
Expand All @@ -387,14 +401,8 @@ def aggregate_works(release: dict, **context) -> None:
)

# Fetch ONIX data
sharded_onix_table = bq_sharded_table_id(
cloud_workspace.project_id,
metadata_partner.bq_dataset_id,
metadata_partner.bq_table_name,
release.onix_snapshot_date,
)
client = Client(project=cloud_workspace.project_id)
products = get_onix_records(sharded_onix_table, client=client)
products = get_onix_records(release.onix_table_id, client=client)

# Aggregate into works
agg = BookWorkAggregator(products)
Expand Down Expand Up @@ -454,12 +462,6 @@ def create_crossref_metadata_table(release: dict, **context) -> None:
description="Data from Crossref sources",
)

onix_table_id = bq_sharded_table_id(
cloud_workspace.project_id,
metadata_partner.bq_dataset_id,
metadata_partner.bq_table_name,
release.onix_snapshot_date,
)
master_crossref_metadata_table_id = bq_sharded_table_id(
bq_master_crossref_project_id,
bq_master_crossref_dataset_id,
Expand All @@ -468,7 +470,7 @@ def create_crossref_metadata_table(release: dict, **context) -> None:
)
sql = render_template(
os.path.join(sql_folder(workflow_module="onix_workflow"), "crossref_metadata_filter_isbn.sql.jinja2"),
onix_table_id=onix_table_id,
onix_table_id=release.onix_table_id,
crossref_metadata_table_id=master_crossref_metadata_table_id,
)
logging.info("Creating crossref metadata table from master table")
Expand Down Expand Up @@ -675,14 +677,6 @@ def create_book_product_table(release: dict, **context) -> None:
for dp in data_partners
}

# Metadata table name
onix_table_id = bq_sharded_table_id(
cloud_workspace.project_id,
metadata_partner.bq_dataset_id,
metadata_partner.bq_table_name,
release.onix_snapshot_date,
)

# ONIX WF table names
workid_table_id = bq_sharded_table_id(
cloud_workspace.project_id,
Expand All @@ -707,7 +701,7 @@ def create_book_product_table(release: dict, **context) -> None:
data_partners=data_partners,
)
sql = env.render(
onix_table_id=onix_table_id,
onix_table_id=release.onix_table_id,
data_partners=data_partners,
book_table_id=book_table_id,
country_table_id=country_table_id,
Expand Down
79 changes: 73 additions & 6 deletions tests/onix_workflow/test_onix_workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

from oaebu_workflows.config import schema_folder as default_schema_folder
from oaebu_workflows.config import test_fixtures_folder
from oaebu_workflows.oaebu_partners import OaebuPartner, OAEBU_DATA_PARTNERS, partner_from_str
from oaebu_workflows.oaebu_partners import OaebuPartner, OAEBU_DATA_PARTNERS, OAEBU_METADATA_PARTNERS, partner_from_str
from oaebu_workflows.onix_workflow.onix_workflow import (
OnixWorkflowRelease,
CROSSREF_EVENT_URL_TEMPLATE,
Expand Down Expand Up @@ -141,11 +141,18 @@ def __init__(self, *args, **kwargs):
self.events_cassette = os.path.join(self.fixtures_folder, "crossref_events_request.yaml")

@patch("oaebu_workflows.onix_workflow.onix_workflow.bq_select_table_shard_dates")
def test_make_release(self, mock_sel_table_suffixes):
"""Tests that the make_release function works as intended"""
def test_make_release_sharded(self, mock_sel_table_suffixes):
"""Tests that the make_release function works as intended for a sharded metadata partner"""

# Use a different onix snapshot date for testing purposes
onix_snapshot_date = self.snapshot_date.add(days=1)
metadata_partner = OAEBU_METADATA_PARTNERS["onix"] # Sharded metadata partner
expected_onix_table = bq_sharded_table_id(
self.fake_cloud_workspace.project_id,
metadata_partner.bq_dataset_id,
metadata_partner.bq_table_name,
onix_snapshot_date,
)
crossref_snapshot_date = self.snapshot_date
mock_sel_table_suffixes.side_effect = [[onix_snapshot_date], [crossref_snapshot_date]]
env = SandboxEnvironment(self.gcp_project_id, self.data_location)
Expand All @@ -154,7 +161,7 @@ def test_make_release(self, mock_sel_table_suffixes):
dag_id="test_make_release",
cloud_workspace=self.fake_cloud_workspace,
data_partners=[self.fake_onix_data_partner],
metadata_partner="onix",
metadata_partner=metadata_partner,
)
with env.create_dag_run(dag, self.snapshot_date.add(days=1)):
ti = env.run_task("make_release")
Expand All @@ -180,7 +187,7 @@ def test_make_release(self, mock_sel_table_suffixes):
)

# Test that the onix and crossref snapshots are as expected
self.assertEqual(onix_snapshot_date, release.onix_snapshot_date)
self.assertEqual(expected_onix_table, release.onix_table_id)
self.assertEqual(crossref_snapshot_date, release.crossref_master_snapshot_date)

# Test for case - no ONIX releases found
Expand All @@ -195,6 +202,60 @@ def test_make_release(self, mock_sel_table_suffixes):
with self.assertRaisesRegex(RuntimeError, "Crossref"):
env.run_task("make_release")

@patch("oaebu_workflows.onix_workflow.onix_workflow.bq_select_table_shard_dates")
def test_make_release_unsharded(self, mock_sel_table_suffixes):
"""Tests that the make_release function works as intended for an unsharded metadata partner"""

# Use a different onix snapshot date for testing purposes
metadata_partner = OAEBU_METADATA_PARTNERS["onix_view"] # Unsharded metadata partner
expected_onix_table = bq_table_id(
self.fake_cloud_workspace.project_id,
metadata_partner.bq_dataset_id,
metadata_partner.bq_table_name,
)
crossref_snapshot_date = self.snapshot_date
mock_sel_table_suffixes.return_value = [crossref_snapshot_date]
env = SandboxEnvironment(self.gcp_project_id, self.data_location)
with env.create():
dag = create_dag(
dag_id="test_make_release",
cloud_workspace=self.fake_cloud_workspace,
data_partners=[self.fake_onix_data_partner],
metadata_partner=metadata_partner,
)
with env.create_dag_run(dag, self.snapshot_date.add(days=1)):
ti = env.run_task("make_release")
release_dict = ti.xcom_pull(task_ids="make_release", include_prior_dates=False)
release = OnixWorkflowRelease.from_dict(release_dict)
self.assertEqual(release.dag_id, dag.dag_id)

# Test release file names are as expected
self.assertEqual(release.workslookup_path, os.path.join(release.transform_folder, "worksid.jsonl.gz"))
self.assertEqual(
release.workslookup_errors_path,
os.path.join(release.transform_folder, "worksid_errors.jsonl.gz"),
)
self.assertEqual(
release.worksfamilylookup_path, os.path.join(release.transform_folder, "workfamilyid.jsonl.gz")
)
self.assertEqual(
release.crossref_metadata_path,
os.path.join(release.transform_folder, "crossref_metadata.jsonl.gz"),
)
self.assertEqual(
release.crossref_events_path, os.path.join(release.transform_folder, "crossref_events.jsonl.gz")
)

# Test that the onix table and crossref snapshots are as expected
self.assertEqual(expected_onix_table, release.onix_table_id)
self.assertEqual(crossref_snapshot_date, release.crossref_master_snapshot_date)

# Test for case - no Crossref releases found
dag.clear(task_ids=["make_release"])
mock_sel_table_suffixes.return_value = [] # No crossref releases
with self.assertRaisesRegex(RuntimeError, "Crossref"):
env.run_task("make_release")

def test_dag_load(self):
"""Test that the DAG loads"""

Expand Down Expand Up @@ -965,11 +1026,17 @@ def vcr_ignore_condition(request):
ti = env.run_task("make_release")
self.assertEqual(ti.state, State.SUCCESS)
release_dict = ti.xcom_pull(task_ids="make_release", include_prior_dates=False)
expected_onix_table_id = bq_sharded_table_id(
env.cloud_workspace.project_id,
metadata_partner.bq_dataset_id,
metadata_partner.bq_table_name,
partner_release_date,
)
expected_release_dict = {
"dag_id": "onix_workflow_test",
"run_id": "scheduled__2021-05-17T00:00:00+00:00",
"snapshot_date": "2021-05-24",
"onix_snapshot_date": "2021-05-15",
"onix_table_id": expected_onix_table_id,
"crossref_master_snapshot_date": "2021-05-15",
}
self.assertEqual(release_dict, expected_release_dict)
Expand Down

0 comments on commit ece3eef

Please sign in to comment.