Skip to content

Alternate: Add NCHS mortality geo aggregation at the HHS and nation levels #1258

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nchs_mortality/.pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
disable=logging-format-interpolation,
too-many-locals,
too-many-arguments,
fixme,
# Allow pytest functions to be part of a class.
no-self-use,
# Allow pytest classes to have one test.
Expand Down
11 changes: 8 additions & 3 deletions nchs_mortality/delphi_nchs_mortality/constants.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
"""Registry for constants."""
# global constants
PERCENT_EXPECTED = "percent_of_expected_deaths"
METRICS = [
"covid_19_deaths", "total_deaths", "percent_of_expected_deaths",
"covid_19_deaths", "total_deaths", PERCENT_EXPECTED,
"pneumonia_deaths", "pneumonia_and_covid_19_deaths", "influenza_deaths",
"pneumonia_influenza_or_covid_19_deaths"
]
Expand All @@ -14,7 +15,7 @@
SENSOR_NAME_MAP = {
"covid_19_deaths": "deaths_covid_incidence",
"total_deaths": "deaths_allcause_incidence",
"percent_of_expected_deaths": "deaths_percent_of_expected",
PERCENT_EXPECTED: "deaths_percent_of_expected",
"pneumonia_deaths": "deaths_pneumonia_notflu_incidence",
"pneumonia_and_covid_19_deaths": "deaths_covid_and_pneumonia_notflu_incidence",
"influenza_deaths": "deaths_flu_incidence",
Expand All @@ -25,7 +26,11 @@
"prop"
]
INCIDENCE_BASE = 100000
GEO_RES = "state"
GEO_RES = [
"nation",
"hhs",
"state"
]

# this is necessary as a delimiter in the f-string expressions we use to
# construct detailed error reports
Expand Down
142 changes: 83 additions & 59 deletions nchs_mortality/delphi_nchs_mortality/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,15 @@
import time
from datetime import datetime, date, timedelta
from typing import Dict, Any
from itertools import product

import numpy as np
from delphi_utils import S3ArchiveDiffer, get_structured_logger, create_export_csv
from delphi_utils import S3ArchiveDiffer, get_structured_logger, create_export_csv, GeoMapper

from .archive_diffs import arch_diffs
from .constants import (METRICS, SENSOR_NAME_MAP,
SENSORS, INCIDENCE_BASE, GEO_RES)
SENSORS, INCIDENCE_BASE, GEO_RES,
PERCENT_EXPECTED)
from .pull import pull_nchs_mortality_data


Expand Down Expand Up @@ -42,13 +44,14 @@ def run_module(params: Dict[str, Any]):
__name__, filename=params["common"].get("log_filename"),
log_exceptions=params["common"].get("log_exceptions", True))
export_start_date = params["indicator"]["export_start_date"]
if export_start_date == "latest": # Find the previous Saturday
if export_start_date == "latest": # Find the previous Saturday
export_start_date = date.today() - timedelta(
days=date.today().weekday() + 2)
days=date.today().weekday() + 2)
export_start_date = export_start_date.strftime('%Y-%m-%d')
daily_export_dir = params["common"]["daily_export_dir"]
token = params["indicator"]["token"]
test_file = params["indicator"].get("test_file", None)
gmpr = GeoMapper()

if "archive" in params:
daily_arch_diff = S3ArchiveDiffer(
Expand All @@ -59,57 +62,46 @@ def run_module(params: Dict[str, Any]):

stats = []
df_pull = pull_nchs_mortality_data(token, test_file)
for metric in METRICS:
if metric == 'percent_of_expected_deaths':
logger.info("Generating signal and exporting to CSV",
metric = metric)
df = df_pull.copy()
df["val"] = df[metric]
df["se"] = np.nan
df["sample_size"] = np.nan
df = df[~df["val"].isnull()]
sensor_name = "_".join([SENSOR_NAME_MAP[metric]])
dates = create_export_csv(
df,
geo_res=GEO_RES,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
weekly_dates=True
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))
else:
for sensor in SENSORS:
logger.info("Generating signal and exporting to CSV",
metric = metric,
sensor = sensor)
df = df_pull.copy()
if sensor == "num":
df["val"] = df[metric]
else:
df["val"] = df[metric] / df["population"] * INCIDENCE_BASE
df["se"] = np.nan
df["sample_size"] = np.nan
df = df[~df["val"].isnull()]
sensor_name = "_".join([SENSOR_NAME_MAP[metric], sensor])
dates = create_export_csv(
df,
geo_res=GEO_RES,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
weekly_dates=True
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))

# Weekly run of archive utility on Monday
# - Does not upload to S3, that is handled by daily run of archive utility
# - Exports issues into receiving for the API
# Daily run of archiving utility
# - Uploads changed files to S3
# - Does not export any issues into receiving
for metric, geo, sensor, in product(METRICS, GEO_RES, SENSORS):
is_percent = metric == PERCENT_EXPECTED
if is_percent and sensor == 'prop':
continue

logger.info("Generating signal and exporting to CSV",
metric = metric,
sensor = sensor)

sensor_name = [SENSOR_NAME_MAP[metric]]
if not is_percent:
sensor_name.append(sensor)
sensor_name = "_".join(sensor_name)

df = _safe_copy_df(df_pull, metric)

if geo in ["hhs", "nation"]:
df = _map_from_state(df, geo, gmpr, weighted=is_percent)

if sensor == "prop":
# never encountered when is_percent
df["val"] = df["val"] / df["population"] * INCIDENCE_BASE

dates = create_export_csv(
df,
geo_res=geo,
export_dir=daily_export_dir,
start_date=datetime.strptime(export_start_date, "%Y-%m-%d"),
sensor=sensor_name,
weekly_dates=True
)
if len(dates) > 0:
stats.append((max(dates), len(dates)))

# Weekly run of archive utility on Monday
# - Does not upload to S3, that is handled by daily run of archive utility
# - Exports issues into receiving for the API
# Daily run of archiving utility
# - Uploads changed files to S3
# - Does not export any issues into receiving
if "archive" in params:
arch_diffs(params, daily_arch_diff, logger)

Expand All @@ -119,7 +111,39 @@ def run_module(params: Dict[str, Any]):
max_lag_in_days = min_max_date and (datetime.now() - min_max_date).days
formatted_min_max_date = min_max_date and min_max_date.strftime("%Y-%m-%d")
logger.info("Completed indicator run",
elapsed_time_in_seconds = elapsed_time_in_seconds,
csv_export_count = csv_export_count,
max_lag_in_days = max_lag_in_days,
oldest_final_export_date = formatted_min_max_date)
elapsed_time_in_seconds=elapsed_time_in_seconds,
csv_export_count=csv_export_count,
max_lag_in_days=max_lag_in_days,
oldest_final_export_date=formatted_min_max_date)


def _safe_copy_df(df, metric_col_name):
"""Create a copy of the given df, and drop rows where the metric is nan."""
df_copy = df.copy()
df_copy["se"] = np.nan
df_copy["sample_size"] = np.nan
df_copy["val"] = df_copy[metric_col_name]
return df_copy[~df_copy["val"].isnull()]


def _map_from_state(df, geo, gmpr, weighted=False):
"""Map from state_id to another given geocode.

The weighted flag is used when aggregating metrics which come as percentages
rather than raw counts, and therefore need to be weighted by population when
combining.
"""
# TODO - this first mapping from state_id to state_code is necessary because
# the GeoMapper does not currently support going directly from state_id to hhs or
# nation. See issue #1255
df = gmpr.replace_geocode(
df, "state_id", "state_code", from_col="geo_id", date_col="timestamp")
if weighted:
df["weight"] = df["population"]
df = gmpr.replace_geocode(
df, "state_code", geo, data_cols=["val"], date_col="timestamp").rename(
columns={geo: "geo_id"})
if weighted:
df["val"] = df["val"] / df["population"]

return df
20 changes: 16 additions & 4 deletions nchs_mortality/tests/test_run.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,17 +36,29 @@ def test_output_files_exist(self, run_as_module, date):
'deaths_pneumonia_or_flu_or_covid_incidence']
sensors = ["num", "prop"]

expected_files = []
expected_files_nation = []
expected_files_state=[]
expected_files_hhs=[]
for d in dates:
for metric in metrics:
if metric == "deaths_percent_of_expected":
expected_files += ["weekly_" + d + "_state_" \
expected_files_nation += ["weekly_" + d + "_nation_" \
+ metric + ".csv"]
expected_files_state += ["weekly_" + d + "_state_" \
+ metric + ".csv"]
expected_files_hhs += ["weekly_" + d + "_hhs_" \
+ metric + ".csv"]
else:
for sensor in sensors:
expected_files += ["weekly_" + d + "_state_" \
expected_files_nation += ["weekly_" + d + "_nation_" \
+ metric + "_" + sensor + ".csv"]
expected_files_state += ["weekly_" + d + "_state_" \
+ metric + "_" + sensor + ".csv"]
expected_files_hhs += ["weekly_" + d + "_hhs_" \
+ metric + "_" + sensor + ".csv"]
assert set(expected_files).issubset(set(csv_files))
assert set(expected_files_nation).issubset(set(csv_files))
assert set(expected_files_state).issubset(set(csv_files))
assert set(expected_files_hhs).issubset(set(csv_files))

@pytest.mark.parametrize("date", ["2020-09-14", "2020-09-18"])
def test_output_file_format(self, run_as_module, date):
Expand Down