Skip to content

Commit

Permalink
Work with returned List, not Iterator
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Jan 9, 2025
1 parent c0d078b commit e820749
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
from airflow.exceptions import AirflowException, AirflowSkipException
from airflow.decorators import dag, task, task_group
from google.cloud.bigquery import TimePartitioningType, SourceFormat, WriteDisposition, Client
from google.cloud.bigquery.table import RowIterator
from google.cloud.bigquery.table import Row

from oaebu_workflows.oaebu_partners import OaebuPartner, partner_from_str
from observatory_platform.dataset_api import DatasetAPI, DatasetRelease
Expand Down Expand Up @@ -464,12 +464,13 @@ def _gb_early_stop(table_id: str, cloud_workspace: CloudWorkspace, logical_date:
"""

client = Client(project=cloud_workspace.project_id)
dates = get_partitions(table_id, client=client)
partition_key = "release_date"
dates = get_partitions(table_id, partition_key=partition_key, client=client)
this_run_date = logical_date.subtract(months=1).end_of("month").date()
try:
most_recent_pd = dates[0].get("release_date") # Latest release date
except StopIteration: # There are no partitions available

if not dates: # There are no partitions available
raise AirflowSkipException("No partitions available and no files required for processing. Skipping.")
most_recent_pd = dates[0].get(partition_key) # Latest release date

if most_recent_pd < this_run_date:
if logical_date.day > 4:
Expand All @@ -478,7 +479,7 @@ def _gb_early_stop(table_id: str, cloud_workspace: CloudWorkspace, logical_date:
raise AirflowSkipException("No files required for processing. Skipping.")


def get_partitions(table_id: str, partition_key: str = "release_date", client: Client = None) -> RowIterator:
def get_partitions(table_id: str, partition_key: str = "release_date", client: Client = None) -> List[Row]:
"""Queries the table and returns a list of distinct partitions
:param table_id: The fully qualified table id to query
Expand Down
16 changes: 8 additions & 8 deletions tests/google_books_telescope/test_google_books_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -378,26 +378,26 @@ class TestGBEarlyStop(SandboxTestCase):
def test_no_releases(self, mock_get_partitions):
"""Test when data is current - should not raise any exception."""

mock_get_partitions.side_effect = [iter([])]
mock_get_partitions.side_effect = [[]]
with self.assertRaisesRegex(AirflowSkipException, "No partitions available"):
_gb_early_stop(self.table_id, self.fake_cloud_workspace, self.logical_date)

@patch("oaebu_workflows.google_books_telescope.google_books_telescope.get_partitions")
def test_matching_partitions_with_current_data(self, mock_get_partitions):
"""Test when data is current - should not raise any exception."""

row1 = Row([pendulum.datetime(2024, 2, 28)], {"release_date": 0})
row2 = Row([pendulum.datetime(2024, 1, 31)], {"release_date": 0})
mock_get_partitions.side_effect = [iter([row1, row2])]
row1 = Row([pendulum.date(2024, 2, 28)], {"release_date": 0})
row2 = Row([pendulum.date(2024, 1, 31)], {"release_date": 0})
mock_get_partitions.side_effect = [[row1, row2]]
_gb_early_stop(self.table_id, self.fake_cloud_workspace, self.logical_date)

@patch("oaebu_workflows.google_books_telescope.google_books_telescope.get_partitions")
def test_missing_data_before_fourth(self, mock_get_partitions):
"""Test when data is missing but it's before the 4th of the month."""

logical_date = pendulum.datetime(2024, 2, 3)
row = Row([pendulum.datetime(2023, 12, 31)], {"release_date": 0})
mock_get_partitions.side_effect = [iter([row])]
row = Row([pendulum.date(2023, 12, 31)], {"release_date": 0})
mock_get_partitions.side_effect = [[row]]
with self.assertRaisesRegex(AirflowSkipException, "No files required"):
_gb_early_stop(self.table_id, self.fake_cloud_workspace, logical_date)

Expand All @@ -406,7 +406,7 @@ def test_missing_data_after_fourth(self, mock_get_partitions):
"""Test when data is missing and it's after the 4th of the month."""

logical_date = pendulum.datetime(2024, 2, 5)
row = Row([pendulum.datetime(2023, 12, 31)], {"release_date": 0})
mock_get_partitions.side_effect = [iter([row])]
row = Row([pendulum.date(2023, 12, 31)], {"release_date": 0})
mock_get_partitions.side_effect = [[row]]
with self.assertRaisesRegex(AirflowException, "It's past the 4th"):
_gb_early_stop(self.table_id, self.fake_cloud_workspace, logical_date)

0 comments on commit e820749

Please sign in to comment.