Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test switching functionality #16

Merged
merged 10 commits into from
Jun 28, 2024
41 changes: 41 additions & 0 deletions crcsim/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,23 @@ def choose_tests(self):
self.diagnostic_test = self.params["diagnostic_test"]
self.surveillance_test = self.params["surveillance_test"]

if self.params["use_variable_routine_test"]:
# If the simulation is using variable routine tests, then we do not pick
# a single routine test for each person. Instead, we will refer to the
# routine_testing_year and routine_test_by_year parameters to determine
# which test to use each year. This allows a person to switch tests during
# their lifetime. We still assign the routine_test attribute here to
# avoid errors from yearly actions that expect a person to have a routine
# test attribute.
starting_test = self.params["routine_test_by_year"][0]
self.routine_test = starting_test
self.out.add_routine_test_chosen(
person_id=self.id,
test_name=starting_test,
time=self.scheduler.time,
)
return

# Choose the routine test based on proportions specified in
# parameters file.
#
Expand Down Expand Up @@ -903,6 +920,7 @@ def choose_tests(self):
self.out.add_routine_test_chosen(
person_id=self.id,
test_name=test,
time=self.scheduler.time,
)
break

Expand Down Expand Up @@ -1576,6 +1594,29 @@ def handle_ongoing_treatment(self, message="Ongoing treatment"):
)

def handle_yearly_actions(self, message="Conduct yearly actions"):
if self.params["use_variable_routine_test"]:
# If the simulation is using variable routine tests, then the parameters
# specify a single routine test that every person in the simulation will
# use for each testing year. This allows a person to switch tests during
# their lifetime. In this case, we assign the routine test for each year
# rather than choosing a single routine test at initiatilization.
#
# Indices 0 and -1 of self.params["routine_testing_year"] safely return
# the min and max testing years, because crcsim.parameters raises an
# error if this parameter is not sorted in increasing order.
rjmorris marked this conversation as resolved.
Show resolved Hide resolved
if (
self.scheduler.time >= self.params["routine_testing_year"][0]
and self.scheduler.time <= self.params["routine_testing_year"][-1]
):
self.routine_test = self.params["variable_routine_test"](
self.scheduler.time
)
self.out.add_routine_test_chosen(
person_id=self.id,
test_name=self.routine_test,
time=self.scheduler.time,
)

self.do_tests()

self.scheduler.add_event(
Expand Down
19 changes: 16 additions & 3 deletions crcsim/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,12 +423,25 @@ def summarize(self):

# Number of times each test was adopted for routine screening
routine_tests_chosen = self.raw_output[
self.raw_output.record_type.eq("test_chosen")
self.raw_output.record_type.eq("test_chosen") & self.raw_output.time.eq(0)
]
for rt in self.params["routine_tests"]:
rt_chosen = routine_tests_chosen[routine_tests_chosen.test_name.eq(rt)]
replication_output_row[f"{rt}_adopted"] = len(rt_chosen.index)

# Number of years each routine test was used
# (if test variable routine test was enabled in the simulation)
if self.params["use_variable_routine_test"]:
rt_years = self.raw_output[
self.raw_output.record_type.eq("test_chosen")
& self.raw_output.time.gt(0)
]
rt_years_grouped = rt_years.groupby(["test_name"]).agg(
count=("time", "count")
)
for ix, row in rt_years_grouped.iterrows():
replication_output_row[f"{ix}_available_as_routine"] = row["count"]

# Number of times each test was performed for routine screening
# and number of times per thousand unscreened and undiagnosed 40-year-olds
routine_tests = tests[tests.role.eq(str(TestingRole.ROUTINE))]
Expand Down Expand Up @@ -970,11 +983,11 @@ def compute_pop_rates(self, status_arrays: list):
"""
# Sum all of the person status arrays to get an array of counts of the number of
# people in each status for each year.
statuses: np.ndarray = sum(status_arrays)
status_array: np.ndarray = sum(status_arrays)

# Convert to DataFrame so we can index by column name
statuses = pd.DataFrame(
statuses,
status_array,
rjmorris marked this conversation as resolved.
Show resolved Hide resolved
columns=[
"alive",
"crc_death",
Expand Down
3 changes: 2 additions & 1 deletion crcsim/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -137,13 +137,14 @@ def add_expected_lifespan(self, person_id: Any, time: float):
{"record_type": "lifespan", "person_id": person_id, "time": time}
)

def add_routine_test_chosen(self, person_id: Any, test_name: str):
def add_routine_test_chosen(self, person_id: Any, test_name: str, time: float):
self.rows.append(
{
"record_type": "test_chosen",
"person_id": person_id,
"test_name": test_name,
"role": TestingRole.ROUTINE,
"time": time,
}
)

Expand Down
19 changes: 19 additions & 0 deletions crcsim/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,23 @@ def load_params(file):
y=params[f"death_rate_{race}_{sex}_rates"],
)

if params["use_variable_routine_test"]:
params["variable_routine_test"] = StepFunction(
x=params["routine_testing_year"], y=params["routine_test_by_year"]
)
for test_name, test_params in params["tests"].items():
# Indexing 0 and -1 here safely returns the min and max testing years,
# because initializing the StepFunction raises an error if the testing
# years are not sorted in increasing order.
if test_params["routine_start"] != params["routine_testing_year"][0]:
raise ValueError(
f"routine_start for {test_name} does not equal the first year"
" of routine testing specified in routine_testing_year."
)
if test_params["routine_end"] != params["routine_testing_year"][-1]:
raise ValueError(
f"routine_start for {test_name} does not equal the last year"
" of routine testing specified in routine_testing_year."
)

return params
3 changes: 3 additions & 0 deletions parameters.json
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,9 @@
"diagnostic_test": "Colonoscopy",
"surveillance_test": "Colonoscopy",
"polypectomy_proportion_lethal": 0.00002,
"use_variable_routine_test": true,
"routine_testing_year": [ 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75 ],
"routine_test_by_year": ["Colonoscopy", "Colonoscopy", "Colonoscopy", "Colonoscopy", "Colonoscopy", "Colonoscopy", "Colonoscopy", "Colonoscopy", "Colonoscopy", "Colonoscopy", "Colonoscopy", "FIT", "FIT", "FIT", "FIT", "FIT", "FIT", "FIT", "FIT", "FIT", "FIT", "FIT", "FIT", "FIT", "FIT", "FIT" ],

"cost_discount_age": 50,
"cost_discount_rate": 0.03,
Expand Down
218 changes: 218 additions & 0 deletions tests/test_variable_routine_testing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import json
import logging
import random
from copy import deepcopy
from pathlib import Path
from tempfile import TemporaryDirectory

import pytest

from crcsim.agent import (
Person,
PersonDiseaseMessage,
PersonTestingMessage,
PersonTreatmentMessage,
)
from crcsim.output import Output
from crcsim.parameters import StepFunction, load_params
from crcsim.scheduler import Scheduler


def test_testing_year_misalignment():
"""
Asserts that misaligned testing years in routine_testing_year and a single test's
routine start or end raises an error.
"""
# We load parameters.json here instead of using the params fixture because
# load_params has already been run in the fixture, which creates additional
# parameters of StepFunction type, and those are not JSON serializable.
with open("parameters.json", "r") as f:
params = json.load(f)
params["tests"]["Colonoscopy"]["routine_end"] = 85
with TemporaryDirectory() as tmp_dir:
tmp_path = Path(tmp_dir) / "parameters.json"
with open(tmp_path, "w") as f:
json.dump(params, f)
with pytest.raises(ValueError):
load_params(tmp_path)


@pytest.fixture(scope="module")
def params():
"""
Default parameters. Some tests use these as-is; others override some values
to test specific scenarios. These are also the current values in parameters.json,
but we specify them here so that any future changes to parameters.json don't
affect these tests.
"""
p = load_params("parameters.json")

# All test scenarios use FIT and Colonoscopy with testing from age 50 to 75.
p["routine_testing_year"] = list(range(50, 76))
p["tests"]["FIT"]["routine_start"] = 50
p["tests"]["FIT"]["routine_end"] = 75
p["tests"]["Colonoscopy"]["routine_start"] = 50
p["tests"]["Colonoscopy"]["routine_end"] = 75

# All test scenarios use 100% compliance.
p["initial_compliance_rate"] = 1.0
p["tests"]["FIT"]["compliance_rate_given_prev_compliant"] = [1.0] * 26
p["tests"]["Colonoscopy"]["compliance_rate_given_prev_compliant"] = [1.0] * 26

# We don't want any false positives for these tests, because we rely on each
# person completing a normal course of routine testing without any diagnostic
# or surveillance testing. In the PersonForTests class, we ensure that the person
# never has CRC; here we ensure that no routine test returns a false positive
# and causes a diagnostic test.
p["tests"]["FIT"]["specificity"] = 1
p["tests"]["Colonoscopy"]["specificity"] = 1

# Default test switching scenario. We directly specify variable_routine_test,
# which is normally computed from the parameters routine_testing_year and
# routine_test_by_year in load_params. We have to specify if directly here
# because we've already called load_params, so if we just change
# routine_testing_year and routine_test_by_year, variable_routine_test won't
# be recomputed, and it is the parameter which ultimately determines test
# switching behavior.
p["variable_routine_test"] = StepFunction(
p["routine_testing_year"], ["Colonoscopy"] * 11 + ["FIT"] * 15
)

return p


class PersonForTests(Person):
"""
Overrides or adds to the Person class in two ways that are crucial to these tests:

1. Overrides the start method to ensure that the person never has CRC and lives to
100, so they always complete the full course of routine testing.
2. Adds a simulate method to simulate one PersonForTests at a time without running
the main simulation on a cohort of people.

Also, for convenience, assigns attributes directly in __init__ so we don't have
to pass them at instantiation.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.scheduler = Scheduler()
self.rng = random.Random(1)
# Output class requires a file name, but we don't write to disk in these tests,
# so we give it a dummy file name.
self.out = Output(file_name="unused")
# Sex and race_ethnicity are irrelevant to this test but we need to choose an
# arbitrary value for the simulation to run.
self.sex = "female"
self.race_ethnicity = "black_non_hispanic"

def start(self):
self.choose_tests()

self.handle_disease_message(PersonDiseaseMessage.INIT)
self.handle_testing_message(PersonTestingMessage.INIT)
self.handle_treatment_message(PersonTreatmentMessage.INIT)

self.scheduler.add_event(
message="Conduct yearly actions",
delay=1,
handler=self.handle_yearly_actions,
)

# Fix lifespan at 100 for testing instead of calling self.start_life_timer()
self.expected_lifespan = 100
self.scheduler.add_event(
message=PersonDiseaseMessage.OTHER_DEATH,
handler=self.handle_disease_message,
delay=self.expected_lifespan,
)
self.out.add_expected_lifespan(
person_id=self.id,
time=self.expected_lifespan,
)

# Person.start has lesion delay functions here to add an event to the
# scheduler for the person's first lesion. We don't want any lesions for the
# test person, so that chunk is omitted here. Because the next lesion delay
# is computed when a lesion onset is handled, this results in the person
# never having a lesion.

def simulate(self):
"""
Simplified version of the simulation loop used in crcsim.__main__.
Enables us to simulate one PersonForTests at a time without running the
main simulation on a cohort of people.
"""
while not self.scheduler.is_empty():
event = self.scheduler.consume_next_event()
if not event.enabled:
continue
if event.message == "end_simulation":
logging.debug("[scheduler] ending simulation \n")
break
handler = event.handler
handler(event.message)


@pytest.mark.parametrize(
"case",
[
# Switches to FIT at age 51, but they shouldn't get a FIT test until age 60
# because they had a colonoscopy at age 50.
{
"routine_test_by_year": ["Colonoscopy"] + ["FIT"] * 25,
"expected_colonoscopies": 1,
"expected_fits": 16,
},
# Switches to FIT at age 60, and they should get a FIT test that year,
# because the last colonoscopy was at age 50.
{
"routine_test_by_year": ["Colonoscopy"] * 10 + ["FIT"] * 16,
"expected_colonoscopies": 1,
"expected_fits": 16,
},
# Gets a FIT test every year from age 50 to 59, then a colonoscopy at age 60
# and 70. They will be due for a third colonoscopy at age 80, but routine
# testing ends at age 75.
{
"routine_test_by_year": ["FIT"] * 10 + ["Colonoscopy"] * 16,
"expected_colonoscopies": 2,
"expected_fits": 10,
},
# Gets a FIT test every year from age 50 to 54, then a colonoscopy at age 55,
# then a FIT test every year from age 65 to 75 (in total, one colonoscopy and
# 16 FIT tests)
{
"routine_test_by_year": ["FIT"] * 5 + ["Colonoscopy"] * 1 + ["FIT"] * 20,
"expected_colonoscopies": 1,
"expected_fits": 16,
},
],
)
def test_switching_scenario(params, case):
"""
Asserts that the routine_test_by_year sequences parametrized in the test cases
result in the expected number of colonoscopies and FIT tests.
"""
params_ = deepcopy(params)
params_["variable_routine_test"] = StepFunction(
params["routine_testing_year"], case["routine_test_by_year"]
)

p = PersonForTests(
id=None,
sex=None,
race_ethnicity=None,
params=params_,
scheduler=None,
rng=None,
out=None,
)
p.start()
p.simulate()

tests = [row for row in p.out.rows if row["record_type"] == "test_performed"]
colonoscopies = [test for test in tests if test["test_name"] == "Colonoscopy"]
fits = [test for test in tests if test["test_name"] == "FIT"]
assert len(colonoscopies) == case["expected_colonoscopies"]
assert len(fits) == case["expected_fits"]
Loading