Skip to content

Commit

Permalink
Merge pull request #194 from Urban-Analytics/better-opencl-seeding
Browse files Browse the repository at this point in the history
OpenCL has same initial case seeding logic
  • Loading branch information
nickmalleson authored Nov 2, 2020
2 parents c3101a3 + 3a23688 commit 0812347
Show file tree
Hide file tree
Showing 9 changed files with 369 additions and 99 deletions.
204 changes: 168 additions & 36 deletions experiments/analyse_opencl_results.ipynb

Large diffs are not rendered by default.

44 changes: 44 additions & 0 deletions experiments/functions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# This is a workaround to allow multiprocessing.Pool to work in the pf_experiments_plots notebook.
# The function called by pool.map ('count_wiggles') needs to be defined in this separate file and imported.
# https://stackoverflow.com/questions/41385708/multiprocessing-example-giving-attributeerror/42383397
import os
import multiprocessing
import itertools # TEMP

from microsim.opencl.ramp.snapshot import Snapshot
from microsim.opencl.ramp.simulator import Simulator
from microsim.opencl.ramp.run import run_headless


def run_opencl_model_multiprocess(*args):
#*al_i, l_snapshot_filepath, l_params, l_opencl_dir, l_num_seed_days, l_use_gpu):
try:
with multiprocessing.Pool(processes=int(os.cpu_count())) as pool:
results = pool.starmap(_run_opencl_model, zip(*args))
#results = itertools.starmap(_run_opencl_model, zip(*args))
return results

finally: # Make sure they get closed (shouldn't be necessary)
pool.close()


def _run_opencl_model(i, iterations, snapshot_filepath, params, opencl_dir, num_seed_days, use_gpu,
store_detailed_counts=True):

# load snapshot
snapshot = Snapshot.load_full_snapshot(path=snapshot_filepath)

# set params
snapshot.update_params(params)

# set the random seed of the model for each repetition, otherwise it is completely deterministic
snapshot.seed_prngs(i)

# Create a simulator and upload the snapshot data to the OpenCL device
simulator = Simulator(snapshot, opencl_dir=opencl_dir, gpu=use_gpu, num_seed_days=num_seed_days)
simulator.upload_all(snapshot.buffers)

print(f"Running simulation {i+1}.")
summary, final_state = run_headless(simulator, snapshot, iterations, quiet=True,
store_detailed_counts=store_detailed_counts)
return summary, final_state
6 changes: 1 addition & 5 deletions microsim/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,13 @@ def run_opencl_model(individuals_df, activity_locations, time_activity_multiplie
if disease_params["improve_health"]:
print("Switching to healthier population")
snapshot.switch_to_healthier_population()

# seed initial infections using GAM initial cases
snapshot.seed_initial_infections(num_seed_days=disease_params["seed_days"])

if initialise:
print("Have finished initialising model. -init flag is set so not running it. Exitting")
return

run_mode = "GUI" if use_gui else "headless"
print(f"\nRunning OpenCL model in {run_mode} mode")
run_opencl(snapshot, iterations, data_dir, use_gui, use_gpu, quiet=False)
run_opencl(snapshot, iterations, data_dir, use_gui, use_gpu, num_seed_days=disease_params["seed_days"], quiet=False)


def run_python_model(individuals_df, activity_locations_df, time_activity_multiplier, msim_args, iterations,
Expand Down
40 changes: 40 additions & 0 deletions microsim/opencl/ramp/initial_cases.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
import pandas as pd
import numpy as np
import os


class InitialCases:
def __init__(self, area_codes, not_home_probs, data_dir="microsim/opencl/data/"):
"""
This class loads the initial cases data for seeding infections in the model.
Once the data is loaded it selects the people from higher risk area codes who
spend more time outside of their home.
"""

# load initial case data
self.initial_cases = pd.read_csv(os.path.join(data_dir, "devon_initial_cases.csv"))

msoa_risks_df = pd.read_csv(os.path.join(data_dir, "msoas.csv"), usecols=[1, 2])

# combine into a single dataframe to allow easy filtering based on high risk area codes and
# not home probabilities
people_df = pd.DataFrame({"area_code": area_codes,
"not_home_prob": not_home_probs})
people_df = people_df.merge(msoa_risks_df, on="area_code")

# get people_ids for people in high risk MSOAs and high not home probability
self.high_risk_ids = np.where((people_df["risk"] == "High") & (people_df["not_home_prob"] > 0.3))[0]

def get_seed_people_ids_for_day(self, day):
"""Randomly choose a given number of people ids from the high risk people"""

num_cases = self.initial_cases.loc[day, "num_cases"]
if num_cases > self.high_risk_ids.shape[0]: # if there aren't enough high risk individuals then return all of them
return self.high_risk_ids

selected_ids = np.random.choice(self.high_risk_ids, num_cases, replace=False)

# remove people from high_risk_ids so they are not chosen again
self.high_risk_ids = np.setdiff1d(self.high_risk_ids, selected_ids)

return selected_ids
4 changes: 2 additions & 2 deletions microsim/opencl/ramp/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from microsim.opencl.ramp.disease_statuses import DiseaseStatus


def run_opencl(snapshot, iterations=100, data_dir="./data", use_gui=True, use_gpu=False, quiet=False):
def run_opencl(snapshot, iterations=100, data_dir="./data", use_gui=True, use_gpu=False, num_seed_days=5, quiet=False):
"""
Entry point for running the OpenCL simulation either with the UI or in headless mode.
NB: in order to write output data for the OpenCL dashboard you must run in headless mode.
Expand All @@ -20,7 +20,7 @@ def run_opencl(snapshot, iterations=100, data_dir="./data", use_gui=True, use_gp
print(f"\nSnapshot Size:\t{int(snapshot.num_bytes() / 1000000)} MB\n")

# Create a simulator and upload the snapshot data to the OpenCL device
simulator = Simulator(snapshot, use_gpu)
simulator = Simulator(snapshot, use_gpu, num_seed_days=num_seed_days)
simulator.upload_all(snapshot.buffers)
if not quiet:
print(f"Platform:\t{simulator.platform_name()}\nDevice:\t\t{simulator.device_name()}\n")
Expand Down
40 changes: 38 additions & 2 deletions microsim/opencl/ramp/simulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from microsim.opencl.ramp.kernels import Kernels
from microsim.opencl.ramp.params import Params
from microsim.opencl.ramp.snapshot import Snapshot
from microsim.opencl.ramp.initial_cases import InitialCases


class Simulator:
Expand All @@ -14,7 +15,7 @@ class Simulator:
and a step() method to execute the kernels to calculate one timestep of the model.
"""

def __init__(self, snapshot, gpu=True, kernel_dir="microsim/opencl/ramp/kernels/"):
def __init__(self, snapshot, gpu=True, opencl_dir="microsim/opencl/", num_seed_days=5):
"""Initialise OpenCL context, kernels, and buffers for the simulator.
Args:
Expand Down Expand Up @@ -63,6 +64,8 @@ def __init__(self, snapshot, gpu=True, kernel_dir="microsim/opencl/ramp/kernels/
params=cl.Buffer(ctx, cl.mem_flags.READ_WRITE, Params().num_bytes()),
)

kernel_dir = os.path.join(opencl_dir, "ramp/kernels/")

# Load the OpenCL kernel programs
with open(os.path.join(kernel_dir, "ramp_ua.cl")) as f:
program = cl.Program(ctx, f.read())
Expand Down Expand Up @@ -106,10 +109,16 @@ def __init__(self, snapshot, gpu=True, kernel_dir="microsim/opencl/ramp/kernels/
self.platform = platform
self.ctx = ctx
self.queue = queue


self.start_snapshot = snapshot
self.buffers = buffers
self.kernels = kernels

data_dir = os.path.join(opencl_dir, "data/")
self.initial_cases = InitialCases(snapshot.area_codes, snapshot.not_home_probs, data_dir)

self.num_seed_days = num_seed_days

def platform_name(self):
"""The name of the OpenCL platform being used for simulation."""
return self.platform.get_info(cl.platform_info.NAME)
Expand Down Expand Up @@ -152,6 +161,13 @@ def download_all(self, host_buffers):
self.download(name, getattr(host_buffers, name))

def step(self):
"""Choose whether to run the normal step function or the one for initial case seeding"""
if self.time < self.num_seed_days:
self.step_with_seeding()
else:
self.step_all_kernels()

def step_all_kernels(self):
"""Runs each kernel in order and updates the time. Blocks until complete."""
reset_event = cl.enqueue_nd_range_kernel(
self.queue, self.kernels.places_reset, (self.nplaces,), None)
Expand All @@ -175,3 +191,23 @@ def step_kernel(self, name):
event.wait()
else:
raise ValueError("No kernel with name {}".format(name))

def step_with_seeding(self):
"""For initial case seeding: sets a number of people infected based on the initial cases data, then runs only
the kernel which updates people statuses."""
max_hazard_val = np.finfo(np.float32).max

people_hazards = np.zeros(self.npeople, dtype=np.float32)

initial_case_ids = self.initial_cases.get_seed_people_ids_for_day(self.time)

# set hazard to maximum float val, so these people will have infection_prob=1
# and will transition to exposed state
people_hazards[initial_case_ids] = max_hazard_val

self.upload("people_hazards", people_hazards)

# run only the update statuses kernel so that people transition through disease states
self.step_kernel("people_update_statuses")
self.time += np.uint32(1)

40 changes: 2 additions & 38 deletions microsim/opencl/ramp/snapshot.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
import numpy as np
import pandas as pd
import os

from microsim.opencl.ramp.buffers import Buffers
from microsim.opencl.ramp.params import Params
from microsim.opencl.ramp.disease_statuses import DiseaseStatus


class Snapshot:
Expand Down Expand Up @@ -35,7 +32,7 @@ def zeros(cls, nplaces, npeople, nslots):
npeople = np.uint32(npeople)
nslots = np.uint32(nslots)
time = np.uint32(0)
area_codes = np.full("", npeople)
area_codes = np.full(npeople, "E02004129")
not_home_probs = np.zeros(npeople).astype(np.float32)

lockdown_multipliers = np.ones(100)
Expand Down Expand Up @@ -71,7 +68,7 @@ def random(cls, nplaces, npeople, nslots, lat=50.7, lon=-3.5):
npeople = np.uint32(npeople)
nslots = np.uint32(nslots)
time = np.uint32(0)
area_codes = np.full("", npeople)
area_codes = np.full(npeople, "E02004129")
not_home_probs = np.random.rand(npeople).astype(np.float32)

lockdown_multipliers = np.ones(100)
Expand Down Expand Up @@ -152,39 +149,6 @@ def from_arrays(cls, people_ages, people_obesity, people_cvd, people_diabetes, p

return cls(nplaces, npeople, nslots, time, area_codes, not_home_probs, lockdown_multipliers, buffers)

def seed_initial_infections(self, num_seed_days=5, data_dir="microsim/opencl/data/"):
"""
Seeds initial infections by assigning initial cases based on the GAM assigned cases data.
The cases for the first num_seed_days days are all seeded at once, eg. they are in the snapshot before the
simulation is run.
Initial cases are assigned to people from higher risk area codes who spend more time outside of their home.
"""

# load initial case data
initial_cases = pd.read_csv(os.path.join(data_dir, "devon_initial_cases.csv"))
num_cases = initial_cases.loc[:num_seed_days - 1, "num_cases"].sum()

msoa_risks_df = pd.read_csv(os.path.join(data_dir, "msoas.csv"), usecols=[1, 2])

# combine into a single dataframe to allow easy filtering based on high risk area codes and
# not home probabilities
people_df = pd.DataFrame({"area_code": self.area_codes, "not_home_prob": self.not_home_probs})
people_df = people_df.merge(msoa_risks_df, on="area_code")

# get people_ids for people in high risk MSOAs and high not home probability
high_risk_ids = np.where((people_df["risk"] == "High") & (people_df["not_home_prob"] > 0.3))[0]

# randomly choose a given number of cases from the high risk people ids.
initial_case_ids = np.random.choice(high_risk_ids, num_cases)

# Assign initial cases by updating people_statuses buffer
self.buffers.people_statuses[initial_case_ids] = DiseaseStatus.Exposed.value

# Set initial transition times to 1
self.buffers.people_transition_times[initial_case_ids] = 1

self.time = num_seed_days

def update_params(self, new_params):
try:
self.buffers.params[:] = new_params.asarray()
Expand Down
16 changes: 0 additions & 16 deletions tests/opencl/test_snapshots.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,22 +74,6 @@ def test_load_existing_snapshot():
assert np.all(np.isclose(expected_people_flows, loaded_snapshot.buffers.people_baseline_flows))


def test_seed_initial_infections():
# Load initial snapshot generated from the SnapshotConverter test
snapshot = Snapshot.load_full_snapshot("tests/opencl/test_snapshot.npz")

# assert that no people are infected before seeding
assert not snapshot.buffers.people_statuses.any()

snapshot.seed_initial_infections(num_seed_days=1)

expected_num_infections = 1 # since there is 1 person in a high risk area with not_home_prob > 0.3

num_people_infected = np.count_nonzero(snapshot.buffers.people_statuses)

assert num_people_infected == expected_num_infections


def test_seed_prngs():
snapshot = Snapshot.random(nplaces=50, npeople=100, nslots=5)
prngs_before = np.zeros(400, dtype=np.uint32)
Expand Down
74 changes: 74 additions & 0 deletions tests/opencl/test_update_statuses_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -566,3 +566,77 @@ def test_infection_transition_times_distribution(visualize=False):
ax.hist(expected_samples, bins=50, range=[0, 60])
plt.title("Expected Samples")
plt.show()


def test_seed_initial_infections_all_high_risk():
npeople = 200
snapshot = Snapshot.random(nplaces, npeople, nslots)

# set all people as high risk
snapshot.area_codes = np.full(npeople, "E02004143") # high risk area code
snapshot.not_home_probs = np.full(npeople, 0.8)

num_seed_days = 5
simulator = Simulator(snapshot, gpu=False, num_seed_days=num_seed_days)
simulator.upload_all(snapshot.buffers)

people_statuses_before = np.zeros(npeople, dtype=np.uint32)
simulator.download("people_statuses", people_statuses_before)
# assert that no people are infected before seeding
assert not people_statuses_before.any()

# run one step with seeding and check number of infections
simulator.step_with_seeding()

people_statuses_after = np.zeros(snapshot.npeople, dtype=np.uint32)
simulator.download("people_statuses", people_statuses_after)

expected_num_infections = 37 # taken from devon_initial_cases.csv file

num_people_infected = np.count_nonzero(people_statuses_after)

assert num_people_infected == expected_num_infections

# run another step with seeding and check number of infections
simulator.step_with_seeding()

people_statuses_after = np.zeros(snapshot.npeople, dtype=np.uint32)
simulator.download("people_statuses", people_statuses_after)

expected_num_infections += 38 # taken from devon_initial_cases.csv file

num_people_infected = np.count_nonzero(people_statuses_after)

assert num_people_infected == expected_num_infections


def test_seed_initial_infections_some_low_risk():
npeople = 200
snapshot = Snapshot.random(nplaces, npeople, nslots)

# set all people as high risk
snapshot.area_codes = np.full(npeople, "E02004187") # low risk area code
snapshot.area_codes[1:4] = "E02004143" # high risk area code
snapshot.not_home_probs = np.full(npeople, 0.0)
snapshot.not_home_probs[1:4] = 0.8

num_seed_days = 5
simulator = Simulator(snapshot, gpu=False, num_seed_days=num_seed_days)
simulator.upload_all(snapshot.buffers)

people_statuses_before = np.zeros(npeople, dtype=np.uint32)
simulator.download("people_statuses", people_statuses_before)
# assert that no people are infected before seeding
assert not people_statuses_before.any()

# run one step with seeding and check number of infections
simulator.step_with_seeding()

people_statuses_after = np.zeros(snapshot.npeople, dtype=np.uint32)
simulator.download("people_statuses", people_statuses_after)

expected_num_infections = 3 # taken from devon_initial_cases.csv file

num_people_infected = np.count_nonzero(people_statuses_after)

assert num_people_infected == expected_num_infections

0 comments on commit 0812347

Please sign in to comment.