diff --git a/quidel_covidtest/delphi_quidel_covidtest/backfill.py b/quidel_covidtest/delphi_quidel_covidtest/backfill.py index 7f7aba06d..a7591c284 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/backfill.py +++ b/quidel_covidtest/delphi_quidel_covidtest/backfill.py @@ -1,8 +1,12 @@ # -*- coding: utf-8 -*- """Store backfill data.""" +import calendar import os import glob -from datetime import datetime +import re +import shutil +from datetime import datetime, timedelta +from typing import Union import pandas as pd @@ -11,7 +15,7 @@ gmpr = GeoMapper() -def store_backfill_file(df, _end_date, backfill_dir): +def store_backfill_file(df, _end_date, backfill_dir, logger): """ Store county level backfill data into backfill_dir. @@ -59,6 +63,7 @@ def store_backfill_file(df, _end_date, backfill_dir): 'num_age_0_17', 'den_age_0_17'] backfilldata = backfilldata.loc[backfilldata["time_value"] >= _start_date, selected_columns] + logger.info("Filtering source data", startdate=_start_date, enddate=_end_date) backfilldata["lag"] = [(_end_date - x).days for x in backfilldata["time_value"]] backfilldata["time_value"] = backfilldata.time_value.dt.strftime("%Y-%m-%d") backfilldata["issue_date"] = datetime.strftime(_end_date, "%Y-%m-%d") @@ -70,19 +75,75 @@ def store_backfill_file(df, _end_date, backfill_dir): "state_id": "string" }) - path = backfill_dir + \ - "/quidel_covidtest_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + filename = "quidel_covidtest_as_of_%s.parquet"%datetime.strftime(_end_date, "%Y%m%d") + path = f"{backfill_dir}/{filename}" # Store intermediate file into the backfill folder - backfilldata.to_parquet(path, index=False) + try: + backfilldata.to_parquet(path, index=False) + logger.info("Stored source data in parquet", filename=filename) + except Exception: # pylint: disable=W0703 + logger.info("Failed to store source data in parquet") + return path -def merge_backfill_file(backfill_dir, backfill_merge_day, today, - test_mode=False, check_nd=25): + +def merge_existing_backfill_files(backfill_dir, backfill_file, issue_date, logger): """ - Merge ~4 weeks' backfill data into one file. + Merge existing backfill with the patch data included. This function is specifically run for patching. - Usually this function should merge 28 days' data into a new file so as to - save the reading time when running the backfill pipelines. We set a softer - threshold to allow flexibility in data delivery. + When the indicator fails for some reason or another, there's a gap in the backfill files. + The patch to fill in the missing dates happens later down the line when the backfill files are already merged. + This function takes the merged files with the missing date, insert the particular date, and merge back the file. + Parameters + ---------- + issue_date : datetime + The most recent date when the raw data is received + backfill_dir : str + specified path to store backfill files. + backfill_file : str + specific file add to merged backfill file. + """ + new_files = glob.glob(backfill_dir + "/quidel_covidtest_*") + + def get_file_with_date(files) -> Union[str, None]: + # pylint: disable=R1716 + for filename in files: + # need to only match files with 6 digits for merged files + pattern = re.findall(r"_(\d{6,6})\.parquet", filename) + if pattern: + file_month = datetime.strptime(pattern[0], "%Y%m").replace(day=1) + end_date = (file_month + timedelta(days=32)).replace(day=1) + if issue_date >= file_month and issue_date < end_date: + return filename + # pylint: enable=R1716 + return "" + + file_name = get_file_with_date(new_files) + + if len(file_name) == 0: + logger.info("Issue date has no matching merged files", issue_date=issue_date.strftime("%Y-%m-%d")) + return + + logger.info("Adding missing date to merged file", issue_date=issue_date, + filename=backfill_file, merged_filename=file_name) + + # Start to merge files + merge_file = f"{file_name.split('.')[0]}_after_merge.parquet" + try: + shutil.copyfile(file_name, merge_file) + existing_df = pd.read_parquet(merge_file, engine="pyarrow") + df = pd.read_parquet(backfill_file, engine="pyarrow") + merged_df = pd.concat([existing_df, df]).sort_values(["time_value", "fips"]) + merged_df.to_parquet(merge_file, index=False) + os.remove(file_name) + os.rename(merge_file, file_name) + except Exception as e: # pylint: disable=W0703 + os.remove(merge_file) + logger.error(e) + return + +def merge_backfill_file(backfill_dir, today, logger, test_mode=False): + """ + Merge month's backfill data into one file. Parameters ---------- @@ -90,17 +151,12 @@ def merge_backfill_file(backfill_dir, backfill_merge_day, today, The most recent date when the raw data is received backfill_dir : str specified path to store backfill files. - backfill_merge_day: int - The day of a week that we used to merge the backfill files. e.g. 0 - is Monday. test_mode: bool - check_nd: int - The criteria of the number of unmerged files. Ideally, we want the - number to be 28, but we use a looser criteria from practical - considerations """ - new_files = glob.glob(backfill_dir + "/quidel_covidtest_as_of_*") + previous_month = (today.replace(day=1) - timedelta(days=1)).strftime("%Y%m") + new_files = glob.glob(backfill_dir + f"/quidel_covidtest_as_of_{previous_month}*") if len(new_files) == 0: # if no any daily file is stored + logger.info("No new files to merge; skipping merging") return def get_date(file_link): @@ -110,23 +166,24 @@ def get_date(file_link): return datetime.strptime(fn, "%Y%m%d") date_list = list(map(get_date, new_files)) - earliest_date = min(date_list) latest_date = max(date_list) # Check whether to merge # Check the number of files that are not merged - if today.weekday() != backfill_merge_day or (today-earliest_date).days <= check_nd: + date_list = list(map(get_date, new_files)) + num_of_days_in_month = calendar.monthrange(latest_date.year, latest_date.month)[1] + if len(date_list) < num_of_days_in_month: + logger.info("Not enough days, skipping merging", n_file_days=len(date_list)) return # Start to merge files + logger.info("Merging files", start_date=date_list[0], end_date=date_list[-1]) pdList = [] for fn in new_files: df = pd.read_parquet(fn, engine='pyarrow') pdList.append(df) merged_file = pd.concat(pdList).sort_values(["time_value", "fips"]) - path = backfill_dir + "/quidel_covidtest_from_%s_to_%s.parquet"%( - datetime.strftime(earliest_date, "%Y%m%d"), - datetime.strftime(latest_date, "%Y%m%d")) + path = backfill_dir + f"/quidel_covidtest_{datetime.strftime(latest_date, '%Y%m')}.parquet" merged_file.to_parquet(path, index=False) # Delete daily files once we have the merged one. diff --git a/quidel_covidtest/delphi_quidel_covidtest/constants.py b/quidel_covidtest/delphi_quidel_covidtest/constants.py index 8e4d37cb2..6caf51863 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/constants.py +++ b/quidel_covidtest/delphi_quidel_covidtest/constants.py @@ -1,4 +1,6 @@ """Registry for constants.""" +from datetime import datetime + # global constants MIN_OBS = 50 # minimum number of observations in order to compute a proportion. POOL_DAYS = 7 # number of days in the past (including today) to pool over @@ -49,3 +51,5 @@ "age_65plus", "age_0_17", ] + +FULL_BKFILL_START_DATE = datetime(2020, 5, 26) diff --git a/quidel_covidtest/delphi_quidel_covidtest/patch.py b/quidel_covidtest/delphi_quidel_covidtest/patch.py new file mode 100644 index 000000000..9b5dd1ee8 --- /dev/null +++ b/quidel_covidtest/delphi_quidel_covidtest/patch.py @@ -0,0 +1,76 @@ +""" +This module is used for patching data in the delphi_doctor_visits package. + +To use this module, you need to specify the range of issue dates in params.json, like so: + +{ + "common": { + ... + }, + "validation": { + ... + }, + "patch": { + "patch_dir": "/Users/minhkhuele/Desktop/delphi/covidcast-indicators/doctor_visits/AprilPatch", + "start_issue": "2024-04-20", + "end_issue": "2024-04-21" + } +} + +It will generate data for that range of issue dates, and store them in batch issue format: +[name-of-patch]/issue_[issue-date]/quidel_covidtest/actual_data_file.csv +""" +from datetime import datetime, timedelta +from os import makedirs + +from delphi_utils import get_structured_logger, read_params + +from .run import run_module +from .constants import END_FROM_TODAY_MINUS + +def patch(): + """ + Run the quidel_covidtest indicator for a range of issue dates. + + The range of issue dates is specified in params.json using the following keys: + - "patch": Only used for patching data + - "start_date": str, YYYY-MM-DD format, first issue date + - "end_date": str, YYYY-MM-DD format, last issue date + - "patch_dir": str, directory to write all issues output + """ + params = read_params() + logger = get_structured_logger("delphi_quidel_covidtest.patch", filename=params["common"]["log_filename"]) + + start_issue = datetime.strptime(params["patch"]["start_issue"], "%Y-%m-%d") + end_issue = datetime.strptime(params["patch"]["end_issue"], "%Y-%m-%d") + + logger.info( + "Starting patching", + patch_directory=params["patch"]["patch_dir"], + start_issue=start_issue.strftime("%Y-%m-%d"), + end_issue=end_issue.strftime("%Y-%m-%d"), + ) + makedirs(params["patch"]["patch_dir"], exist_ok=True) + export_day_range = params["indicator"]["export_day_range"] + + current_issue = start_issue + + export_day_range -= END_FROM_TODAY_MINUS + + while current_issue <= end_issue: + logger.info("Running issue", issue_date=current_issue.strftime("%Y-%m-%d")) + + current_issue_yyyymmdd = current_issue.strftime("%Y%m%d") + current_issue_dir = f"""{params["patch"]["patch_dir"]}/issue_{current_issue_yyyymmdd}/quidel_covidtest""" + makedirs(f"{current_issue_dir}", exist_ok=True) + params["common"]["export_dir"] = f"""{current_issue_dir}""" + calculated_start_date = current_issue - timedelta(export_day_range) + calculated_end_date = current_issue + params["indicator"]["pull_start_date"] = calculated_start_date.strftime("%Y-%m-%d") + params["indicator"]["pull_end_date"] = calculated_end_date.strftime("%Y-%m-%d") + + run_module(params, logger) + current_issue += timedelta(days=1) + +if __name__ == "__main__": + patch() diff --git a/quidel_covidtest/delphi_quidel_covidtest/pull.py b/quidel_covidtest/delphi_quidel_covidtest/pull.py index 560f89456..31704c3fb 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/pull.py +++ b/quidel_covidtest/delphi_quidel_covidtest/pull.py @@ -9,15 +9,17 @@ import pandas as pd import numpy as np -from .constants import AGE_GROUPS +from .constants import AGE_GROUPS, FULL_BKFILL_START_DATE -def get_from_s3(start_date, end_date, bucket, logger): +def get_from_s3(params, start_date, end_date, logger): """ Get raw data from aws s3 bucket. Args: + params: dict + read from params.json start_date: datetime.datetime pull data from file tagged with date on/after the start date end_date: datetime.datetime @@ -30,6 +32,15 @@ def get_from_s3(start_date, end_date, bucket, logger): df: pd.DataFrame time_flag: datetime.datetime """ + # connect aws s3 bucket + aws_access_key_id = params["aws_credentials"]["aws_access_key_id"] + aws_secret_access_key = params["aws_credentials"]["aws_secret_access_key"] + bucket_name = params["bucket_name"] + + s3 = boto3.resource('s3', aws_access_key_id=aws_access_key_id, + aws_secret_access_key=aws_secret_access_key) + bucket = s3.Bucket(bucket_name) + time_flag = None selected_columns = ['SofiaSerNum', 'TestDate', 'Facility', 'City', 'State', 'Zip', 'PatientAge', 'Result1', @@ -118,7 +129,7 @@ def fix_date(df, logger): df["timestamp"].values[mask] = df["StorageDate"].values[mask] return df -def preprocess_new_data(start_date, end_date, params, test_mode, logger): +def preprocess_new_data(start_date, end_date, params, logger): """ Pull and pre-process Quidel Covid Test data. @@ -133,8 +144,6 @@ def preprocess_new_data(start_date, end_date, params, test_mode, logger): pull data from file tagged with date on/before the end date params: dict read from params.json - test_mode: bool - pull raw data from s3 or not logger: logging.Logger The structured logger. output: @@ -142,23 +151,8 @@ def preprocess_new_data(start_date, end_date, params, test_mode, logger): time_flag: datetime.date: the actual pull end date on which we successfully pull the data """ - if test_mode: - test_data_dir = "./test_data/test_data.csv" - df, time_flag = pd.read_csv( - test_data_dir, - parse_dates=["StorageDate", "TestDate"] - ), datetime(2020, 8, 17) - else: - # connect aws s3 bucket - aws_access_key_id = params["aws_credentials"]["aws_access_key_id"] - aws_secret_access_key = params["aws_credentials"]["aws_secret_access_key"] - bucket_name = params["bucket_name"] - - s3 = boto3.resource('s3', aws_access_key_id=aws_access_key_id, - aws_secret_access_key=aws_secret_access_key) - bucket = s3.Bucket(bucket_name) - # Get new data from s3 - df, time_flag = get_from_s3(start_date, end_date, bucket, logger) + # Get new data from s3 + df, time_flag = get_from_s3(params, start_date, end_date, logger) # No new data can be pulled if time_flag is None: @@ -282,13 +276,16 @@ def pull_quidel_covidtest(params, logger): """ cache_dir = params["input_cache_dir"] - - test_mode = params["test_mode"] + if params["pull_start_date"] == "": + params["pull_start_date"] = FULL_BKFILL_START_DATE # pull new data only that has not been ingested previous_df, pull_start_date = check_intermediate_file( cache_dir, - datetime.strptime(params["pull_start_date"], '%Y-%m-%d')) + params["pull_start_date"]) + + if params["pull_start_date"] != FULL_BKFILL_START_DATE: + pull_start_date = datetime.strptime(params["pull_start_date"], '%Y-%m-%d') if params["pull_end_date"] == "": pull_end_date = datetime.today() @@ -298,7 +295,7 @@ def pull_quidel_covidtest(params, logger): # Pull data from the file at 5 digit zipcode level # Use _end_date to check the most recent date that we received data df, _end_date = preprocess_new_data( - pull_start_date, pull_end_date, params, test_mode, logger) + pull_start_date, pull_end_date, params, logger) # Utilize previously stored data if previous_df is not None: @@ -343,7 +340,7 @@ def check_export_end_date(input_export_end_date, _end_date, def check_export_start_date(export_start_date, export_end_date, export_day_range): """ - Ensure that the starte date, end date, and day range are mutually consistent. + Ensure that the start date, end date, and day range are mutually consistent. Parameters: export_start_date: str @@ -359,7 +356,7 @@ def check_export_start_date(export_start_date, export_end_date, """ if export_start_date == "": - export_start_date = datetime(2020, 5, 26) + export_start_date = FULL_BKFILL_START_DATE else: export_start_date = datetime.strptime(export_start_date, '%Y-%m-%d') # Only export data from -50 days to -5 days diff --git a/quidel_covidtest/delphi_quidel_covidtest/run.py b/quidel_covidtest/delphi_quidel_covidtest/run.py index e6974b6aa..e6f05a45c 100644 --- a/quidel_covidtest/delphi_quidel_covidtest/run.py +++ b/quidel_covidtest/delphi_quidel_covidtest/run.py @@ -92,7 +92,7 @@ def generate_and_export_for_parent_geo(geo_groups, geo_data, res_key, smooth, de remove_null_samples=True) # for parent geo, remove null sample size return dates -def run_module(params: Dict[str, Any]): +def run_module(params: Dict[str, Any], logger=None): """Run the quidel_covidtest indicator. The `params` argument is expected to have the following structure: @@ -117,9 +117,10 @@ def run_module(params: Dict[str, Any]): - "test_mode": bool, whether we are running in test mode """ start_time = time.time() - logger = get_structured_logger( - __name__, filename=params["common"].get("log_filename"), - log_exceptions=params["common"].get("log_exceptions", True)) + if logger is None: + logger = get_structured_logger( + __name__, filename=params["common"].get("log_filename"), + log_exceptions=params["common"].get("log_exceptions", True)) stats = [] # Log at program exit in case of an exception, otherwise after successful completion atexit.register(log_exit, start_time, stats, logger) @@ -136,17 +137,14 @@ def run_module(params: Dict[str, Any]): # (generate files). if params["indicator"].get("generate_backfill_files", True): backfill_dir = params["indicator"]["backfill_dir"] - backfill_merge_day = params["indicator"]["backfill_merge_day"] - # Merge 4 weeks' data into one file to save runtime - # Notice that here we don't check the _end_date(receive date) - # since we always want such merging happens on a certain day of a week - merge_backfill_file(backfill_dir, backfill_merge_day, datetime.today()) + # Merge a month's data into one file to save runtime + merge_backfill_file(backfill_dir, datetime.today(), logger) if _end_date is None: logger.info("The data is up-to-date. Currently, no new data to be ingested.") return # Store the backfill intermediate file - store_backfill_file(df, _end_date, backfill_dir) + store_backfill_file(df, _end_date, backfill_dir, logger) export_end_date = check_export_end_date( export_end_date, _end_date, END_FROM_TODAY_MINUS) @@ -224,7 +222,8 @@ def run_module(params: Dict[str, Any]): # Export the cache file if the pipeline runs successfully. # Otherwise, don't update the cache file - update_cache_file(df, _end_date, cache_dir, logger) + if not params["common"].get("custom_run", False): + update_cache_file(df, _end_date, cache_dir, logger) # Log stats now instead of at program exit atexit.unregister(log_exit) log_exit(start_time, stats, logger) diff --git a/quidel_covidtest/setup.py b/quidel_covidtest/setup.py index 82c80832a..8c40e103c 100644 --- a/quidel_covidtest/setup.py +++ b/quidel_covidtest/setup.py @@ -7,6 +7,7 @@ "darker[isort]~=2.1.1", "delphi-utils", "imap-tools", + "mock", "numpy", "openpyxl", "pandas", diff --git a/quidel_covidtest/tests/conftest.py b/quidel_covidtest/tests/conftest.py index c5ebcf630..67d4ccbee 100644 --- a/quidel_covidtest/tests/conftest.py +++ b/quidel_covidtest/tests/conftest.py @@ -1,16 +1,57 @@ # -*- coding: utf-8 -*- - +from datetime import datetime from os.path import join import os +from pathlib import Path + +import mock +import pandas as pd import pytest +from mock.mock import patch + +import delphi_quidel_covidtest.run + +TEST_DIR = Path(__file__).parent +SOURCE_DIR = Path(__file__).parent.parent +@pytest.fixture(scope='session') +def params(): + PARAMS = { + "common": { + "export_dir": f"{TEST_DIR}/receiving" + }, + "indicator": { + "static_file_dir": f"{SOURCE_DIR}/static", + "input_cache_dir": f"{TEST_DIR}/cache", + "backfill_dir": f"{TEST_DIR}/backfill", + "backfill_merge_day": 0, + "export_start_date": "2020-06-30", + "export_end_date": "", + "pull_start_date": "2020-07-09", + "pull_end_date":"", + "export_day_range":40, + "aws_credentials": { + "aws_access_key_id": "", + "aws_secret_access_key": "" + }, + "bucket_name": "", + "wip_signal": "", + "test_mode": True + } + } + return PARAMS +@pytest.fixture(scope="session", autouse=True) +def mock_get_from_s3(): + with patch("delphi_quidel_covidtest.pull.get_from_s3") as m: + test_data_dir = "./test_data/test_data.csv" + time_flag = datetime(2020, 8, 17) + df = pd.read_csv( + test_data_dir, + parse_dates=["StorageDate", "TestDate"] + ) + m.return_value = df, time_flag + yield m @pytest.fixture(scope="session") -def clean_receiving_dir(): - # Clean receiving directory - for fname in os.listdir("receiving"): - if ".csv" in fname: - os.remove(join("receiving", fname)) - for fname in os.listdir("cache"): - if ".csv" in fname: - os.remove(join("cache", fname)) +def run_as_module(params, mock_get_from_s3): + delphi_quidel_covidtest.run.run_module(params) diff --git a/quidel_covidtest/tests/test_backfill.py b/quidel_covidtest/tests/test_backfill.py index 27e0d01bc..9dc45ceb6 100644 --- a/quidel_covidtest/tests/test_backfill.py +++ b/quidel_covidtest/tests/test_backfill.py @@ -1,24 +1,20 @@ +import calendar import logging import os import glob -from datetime import datetime - +from datetime import datetime, timedelta +from pathlib import Path import pandas as pd -from delphi_quidel_covidtest.pull import pull_quidel_covidtest - +from delphi_utils.logger import get_structured_logger from delphi_quidel_covidtest.backfill import (store_backfill_file, - merge_backfill_file) - -END_FROM_TODAY_MINUS = 5 -EXPORT_DAY_RANGE = 40 + merge_backfill_file, merge_existing_backfill_files) -TEST_LOGGER = logging.getLogger() -backfill_dir="./backfill" - -class TestBackfill: - - df, _end_date = pull_quidel_covidtest({ +TEST_PATH = Path(__file__).parent +PARAMS = { + "indicator": { + "backfill_dir": f"{TEST_PATH}/backfill", + "drop_date": "2020-06-11", "static_file_dir": "../static", "input_cache_dir": "./cache", "export_start_date": "2020-06-30", @@ -29,19 +25,34 @@ class TestBackfill: "aws_access_key_id": "", "aws_secret_access_key": "" }, - "bucket_name": "", - "wip_signal": "", - "test_mode": True - }, TEST_LOGGER) - - def test_store_backfill_file(self): - - store_backfill_file(self.df, datetime(2020, 1, 1), backfill_dir) + "bucket_name": "", + "wip_signal": "", + "test_mode": True + }, + } +DATA_FILEPATH = f"{PARAMS['indicator']['input_cache_dir']}/pulled_until_20200817.csv" +backfill_dir = PARAMS["indicator"]["backfill_dir"] + +class TestBackfill: + _end_date = datetime.strptime(DATA_FILEPATH.split("_")[2].split(".")[0], + '%Y%m%d') + timedelta(days=1) + df = pd.read_csv(DATA_FILEPATH, sep=",", parse_dates=["timestamp"]) + + def cleanup(self): + for file in glob.glob(f"{backfill_dir}/*.parquet"): + os.remove(file) + assert file not in os.listdir(backfill_dir) + + def test_store_backfill_file(self, caplog): + caplog.set_level(logging.INFO) + logger = get_structured_logger() + + store_backfill_file(self.df, datetime(2020, 1, 1), backfill_dir, logger) fn = "quidel_covidtest_as_of_20200101.parquet" assert fn in os.listdir(backfill_dir) + assert "Stored source data in parquet" in caplog.text backfill_df = pd.read_parquet(backfill_dir + "/"+ fn, engine='pyarrow') - selected_columns = ['time_value', 'fips', 'state_id', 'den_total', 'num_total', 'num_age_0_4', 'den_age_0_4', @@ -51,37 +62,36 @@ def test_store_backfill_file(self): 'num_age_65plus', 'den_age_65plus', 'num_age_0_17', 'den_age_0_17', 'lag', 'issue_date'] - assert set(selected_columns) == set(backfill_df.columns) - - os.remove(backfill_dir + "/" + fn) - assert fn not in os.listdir(backfill_dir) - - def test_merge_backfill_file(self): - - today = datetime.today() - fn = "quidel_covidtest_from_20200817_to_20200820.parquet" + assert set(selected_columns) == set(backfill_df.columns) + assert fn in os.listdir(backfill_dir) + + self.cleanup() + def test_merge_backfill_file(self, caplog, monkeypatch): + caplog.set_level(logging.INFO) + logger = get_structured_logger() + + fn = "quidel_covidtest_202008.parquet" assert fn not in os.listdir(backfill_dir) - # Check the when no daily file stored + # Check when no daily file stored today = datetime(2020, 8, 20) - merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=8) + merge_backfill_file(backfill_dir, today, logger, test_mode=True) assert fn not in os.listdir(backfill_dir) + assert "No new files to merge; skipping merging" in caplog.text for d in range(17, 21): - dropdate = datetime(2020, 8, d) - store_backfill_file(self.df, dropdate, backfill_dir) - - # Check the when the merged file is not generated - today = datetime(2020, 8, 20) - merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=8) - assert fn not in os.listdir(backfill_dir) - + dropdate = datetime(2020, 8, d) + store_backfill_file(self.df, dropdate, backfill_dir, logger) + # Generate the merged file, but not delete it - merge_backfill_file(backfill_dir, today.weekday(), today, test_mode=True, check_nd=2) + today = datetime(2020, 9, 1) + monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) + merge_backfill_file(backfill_dir, today, logger, test_mode=True,) assert fn in os.listdir(backfill_dir) + assert "Merging files" in caplog.text # Read daily file - new_files = glob.glob(backfill_dir + "/quidel_covidtest*.parquet") + new_files = glob.glob(backfill_dir + "/quidel_covidtest_as_of*.parquet") pdList = [] for file in new_files: if "from" in file: @@ -89,9 +99,6 @@ def test_merge_backfill_file(self): df = pd.read_parquet(file, engine='pyarrow') pdList.append(df) os.remove(file) - new_files = glob.glob(backfill_dir + "/quidel_covidtest*.parquet") - assert len(new_files) == 1 - expected = pd.concat(pdList).sort_values(["time_value", "fips"]) # Read the merged file @@ -101,6 +108,75 @@ def test_merge_backfill_file(self): assert expected.shape[0] == merged.shape[0] assert expected.shape[1] == merged.shape[1] - os.remove(backfill_dir + "/" + fn) - assert fn not in os.listdir(backfill_dir) + self.cleanup() + + def test_merge_existing_backfill_files(self, caplog, monkeypatch): + issue_date = datetime(year=2020, month=7, day=20) + issue_date_str = issue_date.strftime("%Y%m%d") + caplog.set_level(logging.INFO) + logger = get_structured_logger() + def prep_backfill_data(): + # Generate backfill daily files + for d in range(18, 24): + dropdate = datetime(2020, 7, d) + df_part = self.df[self.df['timestamp'] == dropdate] + store_backfill_file(df_part, dropdate, backfill_dir, logger) + + today = datetime(2020, 8, 1) + # creating expected file + monkeypatch.setattr(calendar, 'monthrange', lambda x, y: (1, 4)) + merge_backfill_file(backfill_dir, today, logger, + test_mode=True) + original = f"{backfill_dir}/quidel_covidtest_202007.parquet" + os.rename(original, f"{backfill_dir}/expected.parquet") + + # creating backfill without issue date + issue_date_filename = f"{backfill_dir}/quidel_covidtest_as_of_{issue_date_str}.parquet" + os.remove(issue_date_filename) + merge_backfill_file(backfill_dir, today, logger, + test_mode=True) + + old_files = glob.glob(backfill_dir + "/quidel_covidtest_as_of_*") + for file in old_files: + os.remove(file) + + prep_backfill_data() + + df_to_add = self.df[self.df['timestamp'] == issue_date] + file_to_add = store_backfill_file(df_to_add, issue_date, backfill_dir, logger) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) + + assert "Adding missing date to merged file" in caplog.text + + expected = pd.read_parquet(f"{backfill_dir}/expected.parquet") + merged = pd.read_parquet(f"{backfill_dir}/quidel_covidtest_202007.parquet") + + check = pd.concat([merged, expected]).drop_duplicates(keep=False) + + assert len(check) == 0 + self.cleanup() + + + def test_merge_existing_backfill_files_no_call(self, caplog): + issue_date = datetime(year=2020, month=5, day=20) + caplog.set_level(logging.INFO) + logger = get_structured_logger() + def prep_backfill_data(): + # Generate backfill daily files + for d in range(18, 24): + dropdate = datetime(2020, 7, d) + df_part = self.df[self.df["timestamp"] == dropdate] + store_backfill_file(df_part, dropdate, backfill_dir, logger) + + today = datetime(2020, 8, 1) + # creating expected file + merge_backfill_file(backfill_dir, today, logger, + test_mode=True) + + prep_backfill_data() + file_to_add = store_backfill_file(self.df, issue_date, backfill_dir, logger) + merge_existing_backfill_files(backfill_dir, file_to_add, issue_date, logger) + assert "Issue date has no matching merged files" in caplog.text + self.cleanup() + diff --git a/quidel_covidtest/tests/test_patch.py b/quidel_covidtest/tests/test_patch.py new file mode 100644 index 000000000..43b61ec46 --- /dev/null +++ b/quidel_covidtest/tests/test_patch.py @@ -0,0 +1,33 @@ +from unittest.mock import patch as mock_patch +from delphi_quidel_covidtest.patch import patch +import os +import shutil + +class TestPatchModule: + def test_patch(self): + with mock_patch('delphi_quidel_covidtest.patch.get_structured_logger') as mock_get_structured_logger, \ + mock_patch('delphi_quidel_covidtest.patch.read_params') as mock_read_params, \ + mock_patch('delphi_quidel_covidtest.patch.run_module') as mock_run_module: + + mock_read_params.return_value = { + "common": { + "log_filename": "test.log" + }, + "indicator": { + "export_day_range": 40, + }, + "patch": { + "start_issue": "2021-01-01", + "end_issue": "2021-01-02", + "patch_dir": "./patch_dir" + } + } + + patch() + + assert os.path.isdir('./patch_dir') + assert os.path.isdir('./patch_dir/issue_20210101/quidel-covidtest') + assert os.path.isdir('./patch_dir/issue_20210102/quidel-covidtest') + + # Clean up the created directories after the test + shutil.rmtree(mock_read_params.return_value["patch"]["patch_dir"]) \ No newline at end of file diff --git a/quidel_covidtest/tests/test_pull.py b/quidel_covidtest/tests/test_pull.py index a3436392b..8177df1a5 100644 --- a/quidel_covidtest/tests/test_pull.py +++ b/quidel_covidtest/tests/test_pull.py @@ -38,8 +38,7 @@ def test_fix_date(self): datetime(2020, 6, 11), datetime(2020, 7, 2)]) class TestingPullData: - def test_pull_quidel_covidtest(self): - + def test_pull_quidel_covidtest(self, mock_get_from_s3): df, _ = pull_quidel_covidtest({ "static_file_dir": "../static", "input_cache_dir": "./cache", diff --git a/quidel_covidtest/tests/test_run.py b/quidel_covidtest/tests/test_run.py index 4e15bea91..cc054dfc2 100644 --- a/quidel_covidtest/tests/test_run.py +++ b/quidel_covidtest/tests/test_run.py @@ -1,49 +1,25 @@ """Tests for running the quidel covidtest indicator.""" +from itertools import product +import os from os import listdir from os.path import join +from pathlib import Path import pandas as pd import numpy as np from delphi_utils import add_prefix from delphi_quidel_covidtest.constants import PARENT_GEO_RESOLUTIONS, NONPARENT_GEO_RESOLUTIONS, \ - SENSORS -from delphi_quidel_covidtest.run import run_module - + SENSORS, AGE_GROUPS class TestRun: """Tests for run_module().""" - PARAMS = { - "common": { - "export_dir": "./receiving" - }, - "indicator": { - "static_file_dir": "../static", - "input_cache_dir": "./cache", - "backfill_dir": "./backfill", - "backfill_merge_day": 0, - "export_start_date": "2020-06-30", - "export_end_date": "", - "pull_start_date": "2020-07-09", - "pull_end_date":"", - "export_day_range":40, - "aws_credentials": { - "aws_access_key_id": "", - "aws_secret_access_key": "" - }, - "bucket_name": "", - "wip_signal": "", - "test_mode": True - } - } - - def test_output_files(self, clean_receiving_dir): + def test_output_files(self, run_as_module, params): """Tests that the proper files are output.""" # Test output exists - run_module(self.PARAMS) - csv_files = [i for i in listdir("receiving") if i.endswith(".csv")] + csv_files = [i for i in listdir(params["common"]["export_dir"]) if i.endswith(".csv")] dates = [ "20200718", @@ -52,16 +28,20 @@ def test_output_files(self, clean_receiving_dir): ] geos = PARENT_GEO_RESOLUTIONS + NONPARENT_GEO_RESOLUTIONS sensors = add_prefix(SENSORS, - wip_signal=self.PARAMS["indicator"]["wip_signal"], + wip_signal=params["indicator"]["wip_signal"], prefix="wip_") + full_sensor = [f"{sensor}_{age}" for sensor, age in product(sensors, AGE_GROUPS)] expected_files = [] for date in dates: - for geo in geos: + for geo in PARENT_GEO_RESOLUTIONS: for sensor in sensors: expected_files += [date + "_" + geo + "_" + sensor + ".csv"] + for geo in NONPARENT_GEO_RESOLUTIONS: + for sensor in full_sensor: + expected_files += [date + "_" + geo + "_" + sensor + ".csv"] - assert set(expected_files).issubset(set(csv_files)) + assert set(expected_files) == (set(csv_files)) assert '20200721_state_covid_ag_raw_pct_positive.csv' not in csv_files assert '20200722_state_covid_ag_raw_pct_positive.csv' not in csv_files @@ -105,3 +85,8 @@ def test_output_files(self, clean_receiving_dir): if ".csv" in fname: flag = 1 assert flag is not None + + for files in Path(params["common"]["export_dir"]).glob("*.csv"): + os.remove(files) + for files in Path(params["indicator"]["input_cache_dir"]).glob("*.csv"): + os.remove(files)