Skip to content

Commit

Permalink
Backend: Fix VERA (#3864)
Browse files Browse the repository at this point in the history
# Description and Motivation
<!--- bulleted, high level items. use keywords (eg "closes #144" or
"fixes #4323") -->

VERA DAG was causing issues reading the csv from the web. 

- checks in the csv file; not a bad idea in case VERA ever decides to
take down the GitHub file we were hotlinking
- improves the reading of that file directly from `data/` and utilizing
`usecols=` to ignore unused cols per demographic
- updates test files

NOTE: this file is a good candidate for refactoring to polars, as the
tests now take over a minute due to the sheer size. This is tradeoff for
testing the ACTUAL source data rather than a mocked version that is
prone to inconsistencies with the source data and requires upkeep

## Has this been tested? How?

- tests passing
- runs on infra-test

## Screenshots (if appropriate)

## Types of changes

(leave all that apply)

- Refactor / chore

## New frontend preview link is below in the Netlify comment 😎
  • Loading branch information
benhammondmusic authored Dec 10, 2024
1 parent 7bfbfdc commit 1318273
Show file tree
Hide file tree
Showing 17 changed files with 1,879,946 additions and 303 deletions.
19 changes: 12 additions & 7 deletions airflow/dags/vera_incarceration_county.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
# pylint: disable=no-name-in-module
from airflow import DAG # type: ignore
from airflow.utils.dates import days_ago # type: ignore
from airflow.operators.dummy_operator import DummyOperator # type: ignore
import util

_VERA_WORKFLOW_ID = 'VERA_INCARCERATION_COUNTY'
Expand Down Expand Up @@ -36,7 +35,10 @@
)


vera_exporter_payload_race = {'dataset_name': _VERA_DATASET_NAME, 'demographic': "race_and_ethnicity"}
vera_exporter_payload_race = {
'dataset_name': _VERA_DATASET_NAME,
'demographic': "race_and_ethnicity",
}
vera_exporter_operator_race = util.create_exporter_operator(
'vera_incarceration_county_exporter_race', vera_exporter_payload_race, data_ingestion_dag
)
Expand All @@ -46,16 +48,19 @@
'vera_incarceration_county_exporter_age', vera_exporter_payload_age, data_ingestion_dag
)

vera_exporter_payload_sex = {'dataset_name': _VERA_DATASET_NAME, 'demographic': "sex"}
vera_exporter_payload_sex = {
'dataset_name': _VERA_DATASET_NAME,
'demographic': "sex",
'should_export_as_alls': True,
}
vera_exporter_operator_sex = util.create_exporter_operator(
'vera_incarceration_county_exporter_sex', vera_exporter_payload_sex, data_ingestion_dag
)

connector = DummyOperator(default_args=default_args, dag=data_ingestion_dag, task_id='connector')

# Ingestion DAG
(
[vera_bq_operator_race, vera_bq_operator_age, vera_bq_operator_sex]
>> connector
>> [vera_exporter_operator_race, vera_exporter_operator_age, vera_exporter_operator_sex]
>> vera_exporter_operator_sex
>> vera_exporter_operator_age
>> vera_exporter_operator_race
)
153,812 changes: 153,812 additions & 0 deletions data/vera/incarceration_trends.csv

Large diffs are not rendered by default.

80 changes: 52 additions & 28 deletions python/datasources/vera_incarceration_county.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,37 +117,57 @@
JUVENILE_COLS = ["female_juvenile_jail_pop", "male_juvenile_jail_pop"]
JUVENILE = "0-17"
ADULT = "18+"
GEO_COLS_TO_STANDARD = {VERA_FIPS: std_col.COUNTY_FIPS_COL, VERA_COUNTY: std_col.COUNTY_NAME_COL}

# NO AGE BREAKDOWN DATA

DATA_COLS = [
*RACE_PRISON_RAW_COLS_TO_STANDARD.keys(),
*RACE_PRISON_RATE_COLS_TO_STANDARD.keys(),
*SEX_PRISON_RAW_COLS_TO_STANDARD.keys(),
*SEX_PRISON_RATE_COLS_TO_STANDARD.keys(),
*RACE_JAIL_RAW_COLS_TO_STANDARD.keys(),
*RACE_JAIL_RATE_COLS_TO_STANDARD.keys(),
*SEX_JAIL_RAW_COLS_TO_STANDARD.keys(),
*SEX_JAIL_RATE_COLS_TO_STANDARD.keys(),
PRISON_RAW_ALL,
JAIL_RAW_ALL,
PRISON_RATE_ALL,
JAIL_RATE_ALL,
]

GEO_COLS_TO_STANDARD = {VERA_FIPS: std_col.COUNTY_FIPS_COL, VERA_COUNTY: std_col.COUNTY_NAME_COL}
def get_vera_col_types(demo_type: str):
"""
Returns a dictionary of column types for the given demo type.
The keys are also used to optimize the slow csv read by defining the usecols
"""

# NO AGE BREAKDOWN DATA
RACE_DATA_COLS = [
*RACE_PRISON_RAW_COLS_TO_STANDARD.keys(),
*RACE_PRISON_RATE_COLS_TO_STANDARD.keys(),
*RACE_JAIL_RAW_COLS_TO_STANDARD.keys(),
*RACE_JAIL_RATE_COLS_TO_STANDARD.keys(),
]

SEX_DATA_COLS = [
*SEX_PRISON_RAW_COLS_TO_STANDARD.keys(),
*SEX_PRISON_RATE_COLS_TO_STANDARD.keys(),
*SEX_JAIL_RAW_COLS_TO_STANDARD.keys(),
*SEX_JAIL_RATE_COLS_TO_STANDARD.keys(),
]

ALLS_DATA_COLS = [
PRISON_RAW_ALL,
JAIL_RAW_ALL,
PRISON_RATE_ALL,
JAIL_RATE_ALL,
*JUVENILE_COLS,
]

DATA_COLS = ALLS_DATA_COLS
if demo_type == std_col.RACE_OR_HISPANIC_COL:
DATA_COLS.extend(RACE_DATA_COLS)
if demo_type == std_col.SEX_COL:
DATA_COLS.extend(SEX_DATA_COLS)

POP_COLS = [POP_ALL, *RACE_POP_TO_STANDARD.keys(), *SEX_POP_TO_STANDARD.keys()]
POP_COLS = [POP_ALL, *RACE_POP_TO_STANDARD.keys(), *SEX_POP_TO_STANDARD.keys()]

location_col_types = {col: str for col in GEO_COLS_TO_STANDARD.keys()}
data_col_types = {col: float for col in DATA_COLS}
pop_col_types = {col: float for col in POP_COLS}
VERA_COL_TYPES = {
VERA_YEAR: str,
**location_col_types,
**data_col_types, # type: ignore
**pop_col_types, # type: ignore
}
location_col_types = {col: str for col in GEO_COLS_TO_STANDARD.keys()}
data_col_types = {col: float for col in DATA_COLS}
pop_col_types = {col: float for col in POP_COLS}
VERA_COL_TYPES = {
VERA_YEAR: str,
**location_col_types,
**data_col_types, # type: ignore
**pop_col_types, # type: ignore
}

return VERA_COL_TYPES


class VeraIncarcerationCounty(DataSource):
Expand All @@ -165,7 +185,11 @@ def upload_to_gcs(self, _, **attrs):
def write_to_bq(self, dataset, gcs_bucket, **attrs):
demo_type = self.get_attr(attrs, 'demographic')

df = gcs_to_bq_util.load_csv_as_df_from_web(BASE_VERA_URL, dtype=VERA_COL_TYPES)
vera_col_types = get_vera_col_types(demo_type)

df = gcs_to_bq_util.load_csv_as_df_from_data_dir(
"vera", 'incarceration_trends.csv', usecols=list(vera_col_types.keys()), dtype=vera_col_types
)
df = df.rename(columns={VERA_FIPS: std_col.COUNTY_FIPS_COL, VERA_YEAR: std_col.TIME_PERIOD_COL})
df = ensure_leading_zeros(df, std_col.COUNTY_FIPS_COL, 5)
df = merge_county_names(df)
Expand Down
Loading

0 comments on commit 1318273

Please sign in to comment.