From 59f5c2244b41f38e6abd426c6928bd3459111451 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=84=B6=E7=84=B6?= <52230092+zhuoran-cheng@users.noreply.github.com> Date: Fri, 20 Aug 2021 13:31:09 -0400 Subject: [PATCH 1/5] NCHS data available at HHS, nation level --- .../delphi_nchs_mortality/constants.py | 6 +++++- nchs_mortality/delphi_nchs_mortality/run.py | 7 ++++--- nchs_mortality/tests/test_run.py | 20 +++++++++++++++---- 3 files changed, 25 insertions(+), 8 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/constants.py b/nchs_mortality/delphi_nchs_mortality/constants.py index 164b84307..783227369 100644 --- a/nchs_mortality/delphi_nchs_mortality/constants.py +++ b/nchs_mortality/delphi_nchs_mortality/constants.py @@ -25,7 +25,11 @@ "prop" ] INCIDENCE_BASE = 100000 -GEO_RES = "state" +GEO_RES = [ + "nation", + "hhs", + "state" +] # this is necessary as a delimiter in the f-string expressions we use to # construct detailed error reports diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index fa0226fcb..57b9da95c 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -7,6 +7,7 @@ import time from datetime import datetime, date, timedelta from typing import Dict, Any +from itertools import product import numpy as np from delphi_utils import S3ArchiveDiffer, get_structured_logger @@ -60,7 +61,7 @@ def run_module(params: Dict[str, Any]): stats = [] df_pull = pull_nchs_mortality_data(token, test_file) - for metric in METRICS: + for metric,geo in product(METRICS,GEO_RES): if metric == 'percent_of_expected_deaths': print(metric) df = df_pull.copy() @@ -71,7 +72,7 @@ def run_module(params: Dict[str, Any]): sensor_name = "_".join([SENSOR_NAME_MAP[metric]]) dates = export_csv( df, - geo_name=GEO_RES, + geo_name=geo, export_dir=daily_export_dir, start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), sensor=sensor_name, @@ -92,7 +93,7 @@ def run_module(params: Dict[str, Any]): sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor]) dates = export_csv( df, - geo_name=GEO_RES, + geo_name=geo, export_dir=daily_export_dir, start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), sensor=sensor_name, diff --git a/nchs_mortality/tests/test_run.py b/nchs_mortality/tests/test_run.py index 36dba6698..e8f6f7a0c 100644 --- a/nchs_mortality/tests/test_run.py +++ b/nchs_mortality/tests/test_run.py @@ -36,17 +36,29 @@ def test_output_files_exist(self, run_as_module, date): 'deaths_pneumonia_or_flu_or_covid_incidence'] sensors = ["num", "prop"] - expected_files = [] + expected_files_nation = [] + expected_files_state=[] + expected_files_hhs=[] for d in dates: for metric in metrics: if metric == "deaths_percent_of_expected": - expected_files += ["weekly_" + d + "_state_" \ + expected_files_nation += ["weekly_" + d + "_nation_" \ + + metric + ".csv"] + expected_files_state += ["weekly_" + d + "_state_" \ + + metric + ".csv"] + expected_files_hhs += ["weekly_" + d + "_hhs_" \ + metric + ".csv"] else: for sensor in sensors: - expected_files += ["weekly_" + d + "_state_" \ + expected_files_nation += ["weekly_" + d + "_nation_" \ + + metric + "_" + sensor + ".csv"] + expected_files_state += ["weekly_" + d + "_state_" \ + + metric + "_" + sensor + ".csv"] + expected_files_hhs += ["weekly_" + d + "_hhs_" \ + metric + "_" + sensor + ".csv"] - assert set(expected_files).issubset(set(csv_files)) + assert set(expected_files_nation).issubset(set(csv_files)) + assert set(expected_files_state).issubset(set(csv_files)) + assert set(expected_files_hhs).issubset(set(csv_files)) @pytest.mark.parametrize("date", ["2020-09-14", "2020-09-18"]) def test_output_file_format(self, run_as_module, date): From 70bd67028e6d4de40937261f9878c5518b028275 Mon Sep 17 00:00:00 2001 From: alexcoda Date: Mon, 13 Sep 2021 19:41:44 -0700 Subject: [PATCH 2/5] Aggregate nchs mortality data at the hhs and nation levels --- nchs_mortality/delphi_nchs_mortality/run.py | 51 ++++++++++++++------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index 57b9da95c..c01845393 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -10,7 +10,7 @@ from itertools import product import numpy as np -from delphi_utils import S3ArchiveDiffer, get_structured_logger +from delphi_utils import S3ArchiveDiffer, get_structured_logger, GeoMapper from .archive_diffs import arch_diffs from .constants import (METRICS, SENSOR_NAME_MAP, @@ -44,13 +44,14 @@ def run_module(params: Dict[str, Any]): __name__, filename=params["common"].get("log_filename"), log_exceptions=params["common"].get("log_exceptions", True)) export_start_date = params["indicator"]["export_start_date"] - if export_start_date == "latest": # Find the previous Saturday + if export_start_date == "latest": # Find the previous Saturday export_start_date = date.today() - timedelta( - days=date.today().weekday() + 2) + days=date.today().weekday() + 2) export_start_date = export_start_date.strftime('%Y-%m-%d') daily_export_dir = params["common"]["daily_export_dir"] token = params["indicator"]["token"] test_file = params["indicator"].get("test_file", None) + gmpr = GeoMapper() if "archive" in params: daily_arch_diff = S3ArchiveDiffer( @@ -61,7 +62,7 @@ def run_module(params: Dict[str, Any]): stats = [] df_pull = pull_nchs_mortality_data(token, test_file) - for metric,geo in product(METRICS,GEO_RES): + for metric, geo in product(METRICS, GEO_RES): if metric == 'percent_of_expected_deaths': print(metric) df = df_pull.copy() @@ -70,6 +71,17 @@ def run_module(params: Dict[str, Any]): df["sample_size"] = np.nan df = df[~df["val"].isnull()] sensor_name = "_".join([SENSOR_NAME_MAP[metric]]) + + if geo in ["hhs", "nation"]: + df = gmpr.replace_geocode( + df, "state_id", "state_code", from_col="geo_id", date_col="timestamp") + # Weight by population when aggregating across geocodes + df["weight"] = df["population"] + df = gmpr.replace_geocode( + df, "state_code", geo, data_cols=["val"], date_col="timestamp").rename( + columns={geo: "geo_id"}) + df["val"] = df["val"] / df["population"] + dates = export_csv( df, geo_name=geo, @@ -83,14 +95,19 @@ def run_module(params: Dict[str, Any]): for sensor in SENSORS: print(metric, sensor) df = df_pull.copy() + df["se"] = np.nan + df["sample_size"] = np.nan + sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor]) + if geo in ["hhs", "nation"]: + df = gmpr.replace_geocode( + df, "state_id", "state_code", from_col="geo_id", date_col="timestamp") + df = gmpr.replace_geocode( + df, "state_code", geo, date_col="timestamp").rename(columns={geo: "geo_id"}) if sensor == "num": df["val"] = df[metric] else: df["val"] = df[metric] / df["population"] * INCIDENCE_BASE - df["se"] = np.nan - df["sample_size"] = np.nan df = df[~df["val"].isnull()] - sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor]) dates = export_csv( df, geo_name=geo, @@ -101,12 +118,12 @@ def run_module(params: Dict[str, Any]): if len(dates) > 0: stats.append((max(dates), len(dates))) -# Weekly run of archive utility on Monday -# - Does not upload to S3, that is handled by daily run of archive utility -# - Exports issues into receiving for the API -# Daily run of archiving utility -# - Uploads changed files to S3 -# - Does not export any issues into receiving + # Weekly run of archive utility on Monday + # - Does not upload to S3, that is handled by daily run of archive utility + # - Exports issues into receiving for the API + # Daily run of archiving utility + # - Uploads changed files to S3 + # - Does not export any issues into receiving if "archive" in params: arch_diffs(params, daily_arch_diff) @@ -116,7 +133,7 @@ def run_module(params: Dict[str, Any]): max_lag_in_days = min_max_date and (datetime.now() - min_max_date).days formatted_min_max_date = min_max_date and min_max_date.strftime("%Y-%m-%d") logger.info("Completed indicator run", - elapsed_time_in_seconds = elapsed_time_in_seconds, - csv_export_count = csv_export_count, - max_lag_in_days = max_lag_in_days, - oldest_final_export_date = formatted_min_max_date) + elapsed_time_in_seconds=elapsed_time_in_seconds, + csv_export_count=csv_export_count, + max_lag_in_days=max_lag_in_days, + oldest_final_export_date=formatted_min_max_date) From ed6e1a4e8dff17f9a06a324a85592b832a25d834 Mon Sep 17 00:00:00 2001 From: alexcoda Date: Mon, 13 Sep 2021 19:54:34 -0700 Subject: [PATCH 3/5] pylint disable --- nchs_mortality/delphi_nchs_mortality/run.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index c01845393..247874c36 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -19,7 +19,7 @@ from .pull import pull_nchs_mortality_data -def run_module(params: Dict[str, Any]): +def run_module(params: Dict[str, Any]): # pylint: disable=too-many-branches, too-many-statements """Run module for processing NCHS mortality data. The `params` argument is expected to have the following structure: From b41849628a57ddf3b0b8119cbc7c6d4d4690f5f3 Mon Sep 17 00:00:00 2001 From: alexcoda Date: Thu, 16 Sep 2021 19:08:41 -0700 Subject: [PATCH 4/5] Pull out repeated functionality into helper methods --- nchs_mortality/.pylintrc | 1 + nchs_mortality/delphi_nchs_mortality/run.py | 128 +++++++++++--------- 2 files changed, 74 insertions(+), 55 deletions(-) diff --git a/nchs_mortality/.pylintrc b/nchs_mortality/.pylintrc index f30837c7e..02339e190 100644 --- a/nchs_mortality/.pylintrc +++ b/nchs_mortality/.pylintrc @@ -4,6 +4,7 @@ disable=logging-format-interpolation, too-many-locals, too-many-arguments, + fixme, # Allow pytest functions to be part of a class. no-self-use, # Allow pytest classes to have one test. diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index 247874c36..b7a841a3e 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -19,7 +19,7 @@ from .pull import pull_nchs_mortality_data -def run_module(params: Dict[str, Any]): # pylint: disable=too-many-branches, too-many-statements +def run_module(params: Dict[str, Any]): """Run module for processing NCHS mortality data. The `params` argument is expected to have the following structure: @@ -62,61 +62,47 @@ def run_module(params: Dict[str, Any]): # pylint: disable=too-many-branches, to stats = [] df_pull = pull_nchs_mortality_data(token, test_file) - for metric, geo in product(METRICS, GEO_RES): + for metric, geo, sensor, in product(METRICS, GEO_RES, SENSORS): if metric == 'percent_of_expected_deaths': - print(metric) - df = df_pull.copy() - df["val"] = df[metric] - df["se"] = np.nan - df["sample_size"] = np.nan - df = df[~df["val"].isnull()] - sensor_name = "_".join([SENSOR_NAME_MAP[metric]]) - - if geo in ["hhs", "nation"]: - df = gmpr.replace_geocode( - df, "state_id", "state_code", from_col="geo_id", date_col="timestamp") - # Weight by population when aggregating across geocodes - df["weight"] = df["population"] - df = gmpr.replace_geocode( - df, "state_code", geo, data_cols=["val"], date_col="timestamp").rename( - columns={geo: "geo_id"}) - df["val"] = df["val"] / df["population"] - - dates = export_csv( - df, - geo_name=geo, - export_dir=daily_export_dir, - start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), - sensor=sensor_name, - ) - if len(dates) > 0: - stats.append((max(dates), len(dates))) - else: - for sensor in SENSORS: - print(metric, sensor) - df = df_pull.copy() - df["se"] = np.nan - df["sample_size"] = np.nan - sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor]) - if geo in ["hhs", "nation"]: - df = gmpr.replace_geocode( - df, "state_id", "state_code", from_col="geo_id", date_col="timestamp") - df = gmpr.replace_geocode( - df, "state_code", geo, date_col="timestamp").rename(columns={geo: "geo_id"}) - if sensor == "num": - df["val"] = df[metric] - else: - df["val"] = df[metric] / df["population"] * INCIDENCE_BASE - df = df[~df["val"].isnull()] - dates = export_csv( - df, - geo_name=geo, - export_dir=daily_export_dir, - start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), - sensor=sensor_name, - ) - if len(dates) > 0: - stats.append((max(dates), len(dates))) + continue + print(metric, sensor) + sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor]) + df = _safe_copy_df(df_pull, metric) + + if geo in ["hhs", "nation"]: + df = _map_from_state(df, geo, gmpr) + + if sensor == "prop": + df["val"] = df["val"] / df["population"] * INCIDENCE_BASE + + dates = export_csv( + df, + geo_name=geo, + export_dir=daily_export_dir, + start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), + sensor=sensor_name, + ) + if len(dates) > 0: + stats.append((max(dates), len(dates))) + + for geo in GEO_RES: + metric = 'percent_of_expected_deaths' + print(metric) + sensor_name = "_".join([SENSOR_NAME_MAP[metric]]) + df = _safe_copy_df(df_pull, metric) + + if geo in ["hhs", "nation"]: + df = _map_from_state(df, geo, gmpr, weighted=True) + + dates = export_csv( + df, + geo_name=geo, + export_dir=daily_export_dir, + start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), + sensor=sensor_name, + ) + if len(dates) > 0: + stats.append((max(dates), len(dates))) # Weekly run of archive utility on Monday # - Does not upload to S3, that is handled by daily run of archive utility @@ -137,3 +123,35 @@ def run_module(params: Dict[str, Any]): # pylint: disable=too-many-branches, to csv_export_count=csv_export_count, max_lag_in_days=max_lag_in_days, oldest_final_export_date=formatted_min_max_date) + + +def _safe_copy_df(df, metric_col_name): + """Create a copy of the given df, and drop rows where the metric is nan.""" + df_copy = df.copy() + df_copy["se"] = np.nan + df_copy["sample_size"] = np.nan + df_copy["val"] = df_copy[metric_col_name] + return df_copy[~df_copy["val"].isnull()] + + +def _map_from_state(df, geo, gmpr, weighted=False): + """Map from state_id to another given geocode. + + The weighted flag is used when aggregating metrics which come as percentages + rather than raw counts, and therefore need to be weighted by population when + combining. + """ + # TODO - this first mapping from state_id to state_code is necessary because + # the GeoMapper does not currently support going directly from state_id to hhs or + # nation. See issue #1255 + df = gmpr.replace_geocode( + df, "state_id", "state_code", from_col="geo_id", date_col="timestamp") + if weighted: + df["weight"] = df["population"] + df = gmpr.replace_geocode( + df, "state_code", geo, data_cols=["val"], date_col="timestamp").rename( + columns={geo: "geo_id"}) + if weighted: + df["val"] = df["val"] / df["population"] + + return df From eeff5bb173f8906a7c230cc017a91409538425f4 Mon Sep 17 00:00:00 2001 From: Kathryn M Mazaitis Date: Mon, 20 Sep 2021 10:52:52 -0400 Subject: [PATCH 5/5] small reorg to reduce duplication --- .../delphi_nchs_mortality/constants.py | 5 ++- nchs_mortality/delphi_nchs_mortality/run.py | 37 +++++++------------ 2 files changed, 16 insertions(+), 26 deletions(-) diff --git a/nchs_mortality/delphi_nchs_mortality/constants.py b/nchs_mortality/delphi_nchs_mortality/constants.py index 783227369..b2603d14e 100644 --- a/nchs_mortality/delphi_nchs_mortality/constants.py +++ b/nchs_mortality/delphi_nchs_mortality/constants.py @@ -1,7 +1,8 @@ """Registry for constants.""" # global constants +PERCENT_EXPECTED = "percent_of_expected_deaths" METRICS = [ - "covid_19_deaths", "total_deaths", "percent_of_expected_deaths", + "covid_19_deaths", "total_deaths", PERCENT_EXPECTED, "pneumonia_deaths", "pneumonia_and_covid_19_deaths", "influenza_deaths", "pneumonia_influenza_or_covid_19_deaths" ] @@ -14,7 +15,7 @@ SENSOR_NAME_MAP = { "covid_19_deaths": "deaths_covid_incidence", "total_deaths": "deaths_allcause_incidence", - "percent_of_expected_deaths": "deaths_percent_of_expected", + PERCENT_EXPECTED: "deaths_percent_of_expected", "pneumonia_deaths": "deaths_pneumonia_notflu_incidence", "pneumonia_and_covid_19_deaths": "deaths_covid_and_pneumonia_notflu_incidence", "influenza_deaths": "deaths_flu_incidence", diff --git a/nchs_mortality/delphi_nchs_mortality/run.py b/nchs_mortality/delphi_nchs_mortality/run.py index b7a841a3e..2172d2f68 100644 --- a/nchs_mortality/delphi_nchs_mortality/run.py +++ b/nchs_mortality/delphi_nchs_mortality/run.py @@ -14,7 +14,8 @@ from .archive_diffs import arch_diffs from .constants import (METRICS, SENSOR_NAME_MAP, - SENSORS, INCIDENCE_BASE, GEO_RES) + SENSORS, INCIDENCE_BASE, GEO_RES, + PERCENT_EXPECTED) from .export import export_csv from .pull import pull_nchs_mortality_data @@ -63,16 +64,23 @@ def run_module(params: Dict[str, Any]): stats = [] df_pull = pull_nchs_mortality_data(token, test_file) for metric, geo, sensor, in product(METRICS, GEO_RES, SENSORS): - if metric == 'percent_of_expected_deaths': + is_percent = metric == PERCENT_EXPECTED + if is_percent and sensor == 'prop': continue - print(metric, sensor) - sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor]) + + sensor_name = [SENSOR_NAME_MAP[metric]] + if not is_percent: + sensor_name.append(sensor) + print(sensor_name) + sensor_name = "_".join(sensor_name) + df = _safe_copy_df(df_pull, metric) if geo in ["hhs", "nation"]: - df = _map_from_state(df, geo, gmpr) + df = _map_from_state(df, geo, gmpr, weighted=is_percent) if sensor == "prop": + # never encountered when is_percent df["val"] = df["val"] / df["population"] * INCIDENCE_BASE dates = export_csv( @@ -85,25 +93,6 @@ def run_module(params: Dict[str, Any]): if len(dates) > 0: stats.append((max(dates), len(dates))) - for geo in GEO_RES: - metric = 'percent_of_expected_deaths' - print(metric) - sensor_name = "_".join([SENSOR_NAME_MAP[metric]]) - df = _safe_copy_df(df_pull, metric) - - if geo in ["hhs", "nation"]: - df = _map_from_state(df, geo, gmpr, weighted=True) - - dates = export_csv( - df, - geo_name=geo, - export_dir=daily_export_dir, - start_date=datetime.strptime(export_start_date, "%Y-%m-%d"), - sensor=sensor_name, - ) - if len(dates) > 0: - stats.append((max(dates), len(dates))) - # Weekly run of archive utility on Monday # - Does not upload to S3, that is handled by daily run of archive utility # - Exports issues into receiving for the API