Skip to content

Commit

Permalink
Nullable entries
Browse files Browse the repository at this point in the history
  • Loading branch information
keegansmith21 committed Jun 21, 2024
1 parent 702ac88 commit 4837f9d
Show file tree
Hide file tree
Showing 3 changed files with 226 additions and 288 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[
{
"description": "ISBN13 of the book.",
"mode": "REQUIRED",
"mode": "NULLABLE",
"name": "ISBN13",
"type": "STRING"
},
Expand All @@ -13,7 +13,7 @@
},
{
"description": "Country of sale",
"mode": "REQUIRED",
"mode": "NULLABLE",
"name": "Country",
"type": "STRING"
},
Expand Down
184 changes: 72 additions & 112 deletions dags/oaebu_workflows/ucl_sales_telescope/ucl_sales_telescope.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,25 +196,6 @@ def _transform(release: dict, **context) -> None:
)
set_task_state(success, context["ti"].task_id, release=release)

@task()
def _data_integrity_check(release: dict, **context) -> None:
"""Checks that the transformed data is valid"""

release = UclSalesRelease.from_dict(release)
# Download files from GCS
success = gcs_download_blob(
bucket_name=cloud_workspace.transform_bucket,
blob_name=release.transform_blob_name,
file_path=release.transform_path,
)
if not success:
raise FileNotFoundError(f"Error downloading file: {release.download_blob_name}")

data = load_jsonl(release.transform_path)
success = data_integrity_check(data, current_date=release.partition_date)
if not success:
raise RuntimeError("Data integrity check failed. Check logs for information")

@task()
def _bq_load(release: dict, **context) -> None:
"""Loads the transformed data into BigQuery"""
Expand Down Expand Up @@ -276,7 +257,6 @@ def _cleanup_workflow(release: dict, **context) -> None:
xcom_release = _make_release()
task_download = _download(xcom_release)
task_transform = _transform(xcom_release)
task_data_integrity_check = _data_integrity_check(xcom_release)
task_bq_load = _bq_load(xcom_release)
task_add_new_dataset_releases = _add_new_dataset_releases(xcom_release)
task_cleanup_workflow = _cleanup_workflow(xcom_release)
Expand All @@ -286,7 +266,6 @@ def _cleanup_workflow(release: dict, **context) -> None:
>> xcom_release
>> task_download
>> task_transform
>> task_data_integrity_check
>> task_bq_load
>> task_add_new_dataset_releases
>> task_cleanup_workflow
Expand Down Expand Up @@ -328,24 +307,6 @@ def drop_empty_rows(data: List[List]) -> List[List]:
return data


def fill_empty_dates(data: List[dict], fill_date: pendulum.DateTime) -> List[dict]:
"""Finds empty year/month values and populates whem with the appropriate value from the fill date
:param data: The data to fill if empty
:param fill_date: The date object. Empty year/month values will be filled as a string using this.
"""

for row in data:
try:
if not row["Year"]:
row["Year"] = str(fill_date.year)
if not row["Month"]:
row["Month"] = str(fill_date.month)
except KeyError:
logging.warn("No 'Year' or 'Month' key found in row")
return data


def make_release(dag_id: str, context: dict) -> UclSalesRelease:
"""Creates a new ucl discovery release instance
Expand Down Expand Up @@ -396,22 +357,58 @@ def download(
return sheet_contents


def transform(data: List[List], sheet_date: pendulum.DateTime) -> List[dict]:
"""Transforms the ucl sales data.
def clean_row(row: dict, fill_date: pendulum.DateTime) -> dict:
"""Fills cleans a row of data.
:param data: The UCL sales data
:return: The transformed data
:param row: The row to clean
:param fill_date: The date object. Empty year/month values will be filled as a string using this.
:return: The cleaned row
"""

data[0] = [h.strip().lower() for h in data[0]] # The first row is the header
data = drop_duplicate_headings(data)
data = drop_empty_rows(data)
def _isbn_check(isbn):
if not isbn.startswith("978") or len(isbn) != 13:
return False
return True

# Convert to list of dicts format
converted_data = []
for row in data[1:]:
converted_data.append(dict(zip(data[0], row)))
# Nullify invalid ISBNs
nullified = []
if not _isbn_check(row["ISBN13"]):
row["ISBN13"] = None
nullified.append("ISBN13")

# Attempt publication date formatting but it's not essential
try:
row["Publication_Date"] = pendulum.parse(row["Publication_Date"]).format("YYYY-MM-DD")
except (pendulum.parsing.exceptions.ParserError, ValueError): # Value error raised if empty string
row["Publication_Date"] = None
nullified.append("Publication_Date")

# Clean the Sale Type so that it's a consistent format
row["Sale_Type"] = row["Sale_Type"].strip().lower()
if row["Sale_Type"] not in ("paid", "return", "free"):
row["Sale_Type"] = None
nullified.append("Sale_Type")

# Add our own year/month if they do not exist in the row
if not row["Year"]:
row["Year"] = str(fill_date.year)
nullified.append("Sale_Type")
if not row["Month"]:
row["Month"] = str(fill_date.month)
nullified.append("Sale_Type")

if nullified:
logging.warn(f"The following entries were invalid for this row and have been amended/nullified: {nullified}")

return row


def convert_headings(data: List[dict]) -> List[dict]:
"""Converts the headings for the input data do our desired headings
:param data: The data to convert
:return: The data with its headings converted
"""
# Check that all required headings are present
headings_mapping = {
"isbn": "ISBN13",
Expand All @@ -423,83 +420,46 @@ def transform(data: List[List], sheet_date: pendulum.DateTime) -> List[dict]:
"book": "Title",
"pub date": "Publication_Date",
}
for row in converted_data:
for row in data:
if not all(h in row.keys() for h in headings_mapping.keys()):
raise ValueError(f"Invalid header found: {row.keys()}")

# Map the headings to our desired values
for i, row in enumerate(converted_data):
converted_data[i] = {v: row[k] for k, v in headings_mapping.items()}
for i, row in enumerate(data):
data[i] = {v: row[k] for k, v in headings_mapping.items()}

# Lower case the sale type
for row in converted_data:
row["Sale_Type"] = row["Sale_Type"].strip().lower()
return data


def transform(data: List[List], sheet_date: pendulum.DateTime) -> List[dict]:
"""Transforms the ucl sales data.
:param data: The UCL sales data
:return: The transformed data
"""

data[0] = [h.strip().lower() for h in data[0]] # The first row is the header
data = drop_duplicate_headings(data)
data = drop_empty_rows(data)

# Convert to list of dicts format
converted_data = []
for row in data[1:]:
converted_data.append(dict(zip(data[0], row)))

# Fill empty year/month entries
fill_empty_dates(converted_data, sheet_date)
# Change headings to our desired format
converted_data = convert_headings(converted_data)

transformed = []
for row in converted_data:
row = clean_row(row, sheet_date)

# Make the release date partition based on each row's year/month
release_date = pendulum.datetime(year=int(row["Year"]), month=int(row["Month"]), day=1).end_of("month")
add_partition_date([row], partition_date=release_date, partition_field="release_date")

# Add the sheet month as a field for book-keeping
row["sheet_month"] = sheet_date.format("YYYYMM")

# Attempt publication date formatting but it's not essential
try:
print(row)
row["Publication_Date"] = pendulum.parse(row["Publication_Date"]).format("YYYY-MM-DD")
except pendulum.parsing.exceptions.ParserError:
row["Publication_Date"] = None
transformed.append(row)

return transformed


def data_integrity_check(data: List[dict], current_date: pendulum.DateTime) -> bool:
"""Checks that the ucl sales data is valid
:param data: The data as a list of dictionaries.
:param current_date: The date to compare the data to. No date in the data should be after this.
:return: True/False depending on if the check passed/failed.
"""

def _isbn_check(isbn):
if not isbn.startswith("978") or len(isbn) != 13:
return False
return True

is_valid = True

# Empty check
if len(data) < 1:
logging.warn("Empty dataset supplied!")
is_valid = False

# Date check
dates = [pendulum.parse(r["release_date"]) for r in data]
if not all([r <= current_date for r in dates]):
logging.warn("Found sale month in the future!")
is_valid = False

# Sale type check
sale_types = [r["Sale_Type"] for r in data]
if not all([st in ("paid", "return", "free") for st in sale_types]):
logging.warn("Not all sale types one of 'paid', 'return', 'free'")
is_valid = False

# ISBN check
isbns = [r["ISBN13"] for r in data]
if not all([_isbn_check(i) for i in isbns]):
logging.warn("Invalid ISBN found in data")
is_valid = False

# # Quantity check
# qtys = [int(r["Quantity"]) for r in data]
# if not all([q >= 0 for q in qtys]):
# logging.warn("Negative Quantity found in data")
# is_valid = False

return is_valid
Loading

0 comments on commit 4837f9d

Please sign in to comment.