Skip to content

Commit

Permalink
Merge pull request #8 from Project-Resilience/testing
Browse files Browse the repository at this point in the history
Added some unittests for parent selection, population creation, and crossover/mutation
  • Loading branch information
danyoungday authored Jan 28, 2025
2 parents e669c67 + 04c5a73 commit e761964
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 8 deletions.
2 changes: 1 addition & 1 deletion presp/evolution.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def selection(self, sorted_population: list[Prescriptor]) -> list[Prescriptor]:
"""
idx1 = min(random.choices(range(len(sorted_population)), k=2))
idx2 = min(random.choices(range(len(sorted_population)), k=2))
return sorted_population[idx1], sorted_population[idx2]
return [sorted_population[idx1], sorted_population[idx2]]

def create_pop(self, population: list[Prescriptor]) -> list[Prescriptor]:
"""
Expand Down
7 changes: 5 additions & 2 deletions presp/prescriptor/nn.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,10 @@ def random_init(self) -> NNPrescriptor:

return candidate

def crossover(self, parents: list[Prescriptor], mutation_rate: float, mutation_factor: float) -> list[Prescriptor]:
def crossover(self,
parents: list[NNPrescriptor],
mutation_rate: float,
mutation_factor: float) -> list[NNPrescriptor]:
"""
Crossover two parents to create a child.
Take a random 50/50 choice of either parent's weights.
Expand Down Expand Up @@ -84,7 +87,7 @@ def mutate_(self, candidate: NNPrescriptor, mutation_rate: float, mutation_facto
param[mutate_mask].shape,
device=param.device,
dtype=param.dtype)
param[mutate_mask] += noise * param[mutate_mask]
param[mutate_mask] *= (1 + noise)

def load(self, path: Path) -> Prescriptor:
"""
Expand Down
2 changes: 1 addition & 1 deletion tests/dummy_evaluator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,4 @@ def update_predictor(self, elites: list[DummyPrescriptor]):
pass

def evaluate_candidate(self, candidate: DummyPrescriptor) -> np.ndarray:
return candidate.forward(None) * 10
return np.array([candidate.forward(None) * 10])
16 changes: 12 additions & 4 deletions tests/dummy_prescriptor.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ def forward(self, context):
return self.number

def save(self, path):
pass
with open(path, "w", encoding="utf-8") as file:
file.write(str(self.number))


class DummyFactory(PrescriptorFactory):
Expand All @@ -35,13 +36,20 @@ def __init__(self, prescriptor_cls: Type[DummyPrescriptor]):
def random_init(self) -> DummyPrescriptor:
return DummyPrescriptor()

def crossover(self, parents: list[Prescriptor], mutation_rate: float, mutation_factor: float) -> list[Prescriptor]:
def crossover(self,
parents: list[DummyPrescriptor],
mutation_rate: float,
mutation_factor: float) -> list[DummyPrescriptor]:
numb = np.mean([parent.number for parent in parents], axis=0)
if np.random.rand(1) < mutation_rate:
numb += np.random.normal(0, mutation_factor)
child = DummyPrescriptor()
child.number = numb
return [child]

def load(self, path) -> Prescriptor:
pass
def load(self, path) -> DummyPrescriptor:
with open(path, "r", encoding="utf-8") as file:
number = float(file.read())
candidate = DummyPrescriptor()
candidate.number = number
return candidate
119 changes: 119 additions & 0 deletions tests/test_evolution.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
"""
Tests the Evolution class.
"""
from pathlib import Path
import random
import shutil
import unittest

import numpy as np

from presp.evolution import Evolution
from tests.dummy_evaluator import DummyEvaluator
from tests.dummy_prescriptor import DummyPrescriptor, DummyFactory


class TestInitialPopulation(unittest.TestCase):
"""
Tests the initial population creation of the Evolution class.
"""

def setUp(self):
if Path("tests/temp").exists():
shutil.rmtree(Path("tests/temp"))
if Path("tests/seeds").exists():
shutil.rmtree(Path("tests/seeds"))

def tearDown(self):
if Path("tests/temp").exists():
shutil.rmtree(Path("tests/temp"))
if Path("tests/seeds").exists():
shutil.rmtree(Path("tests/seeds"))

def test_create_initial_population_random(self):
"""
Tests the creation of a new initial population randomly. Makes sure all of the candidates are initialized.
Also makes sure the generation is set properly.
"""
evaluator = DummyEvaluator()
factory = DummyFactory(DummyPrescriptor)
evolution = Evolution(10, 10, 0.1, 2, 0.1, 0.1, "tests/temp", None, factory, evaluator)
evolution.create_initial_population()

self.assertEqual(len(evolution.population), 10)
for candidate in evolution.population:
self.assertTrue(candidate.metrics is not None)
self.assertTrue(candidate.cand_id is not None)
self.assertTrue(candidate.outcomes is not None)
self.assertTrue(candidate.distance is not None)
self.assertTrue(candidate.rank is not None)

self.assertEqual(evolution.generation, 2)

def test_create_initial_population_seeded(self):
"""
Adds some seeds to the initial population and checks they are loaded correctly.
Also checks that the rest of the population is initialized randomly.
"""
seed_dir = Path("tests/seeds")
seed_dir.mkdir()
for i in range(8):
candidate = DummyPrescriptor()
candidate.number = i
candidate.save(seed_dir / f"0_{i}.txt")

evaluator = DummyEvaluator()
factory = DummyFactory(DummyPrescriptor)
evolution = Evolution(10, 10, 0.1, 2, 0.1, 0.1, "tests/temp", "tests/seeds", factory, evaluator)
evolution.create_initial_population()

# Make sure population is the right size
self.assertEqual(len(evolution.population), 10)
cand_names = {f"0_{i}" for i in range(8)}
cand_names.update({f"1_{i}" for i in range(8, 10)})

# Check that every name is accounted for and that the number is set correctly
for candidate in evolution.population:
self.assertTrue(candidate.cand_id in cand_names)
cand_names.remove(candidate.cand_id)
if candidate.cand_id.startswith("0"):
self.assertEqual(candidate.number, int(candidate.cand_id.split("_")[1]))
else:
self.assertGreaterEqual(candidate.number, 0)
self.assertLessEqual(candidate.number, 1)

self.assertEqual(len(cand_names), 0)


class TestSelection(unittest.TestCase):
"""
Tests the default parent selection (tournament selection)
"""
def test_selection(self):
"""
Tests tournament selection. We choose the min of 2 candidates, therefore with 2 candidates the smaller one
should get picked 3/4 of the time: (0, 0), (0, 1), (1, 0) and the larger one 1/4 of the time: (1, 1).
We run this many times and see if we're within 1%.
"""
random.seed(42)
np.random.seed(42)

factory = DummyFactory(DummyPrescriptor)
evaluator = DummyEvaluator()
evolution = Evolution(10, 10, 0.1, 2, 0.1, 0.1, "tests/temp", None, factory, evaluator)

cand_0 = DummyPrescriptor()
cand_0.number = 0
cand_1 = DummyPrescriptor()
cand_1.number = 1

population = [cand_0, cand_1]
counts = {0: 0, 1: 0}
n = 100000
for _ in range(n):
parents = evolution.selection(population)
counts[parents[0].number] += 1
counts[parents[1].number] += 1

self.assertTrue(np.isclose(0.75, counts[0] / (n * 2), atol=0.01))
self.assertTrue(np.isclose(0.25, counts[1] / (n * 2), atol=0.01))
87 changes: 87 additions & 0 deletions tests/test_nn.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
"""
Tests the implementation of the neural network prescriptor.
"""
import random
import unittest

import numpy as np
import torch

from presp.prescriptor import NNPrescriptor, NNPrescriptorFactory


def init_uniform(model, c):
"""
Initializes a model uniformly by filling it with constant c.
"""
for parameter in model.parameters():
with torch.no_grad():
parameter.fill_(c)


class TestNN(unittest.TestCase):
"""
Tests the neural net prescriptor
"""

def setUp(self):
random.seed(42)
np.random.seed(42)
torch.manual_seed(42)

def test_crossover(self):
"""
Tests that we get 50/50 parameters from model 0 and model 1 after crossover.
"""
# Fill a candidate with all 0 and all 1
factory = NNPrescriptorFactory(NNPrescriptor, {"in_size": 12, "hidden_size": 64, "out_size": 1})
parent0 = factory.random_init()
parent1 = factory.random_init()
init_uniform(parent0.model, 0)
init_uniform(parent1.model, 1)

# After crossover we should have 50% 1's
count = 0
total_num = 0
n = 100
for _ in range(n):
child = factory.crossover([parent0, parent1], 0, 0)[0]
for parameter in child.model.parameters():
count += torch.sum(parameter.data).item()
total_num += parameter.numel()
# Also double check that each parameter is either a 0 or 1
self.assertTrue(torch.all(torch.logical_or(parameter.data == 0, parameter.data == 1)))

self.assertAlmostEqual(count / total_num, 0.5, places=2)

def test_mutate(self):
"""
Checks if the mean and standard deviation of the mutated parameters are correct.
We are using gaussian percent noise with std = f. If we initialize models to all be c then:
params ~ c * (1 + N(0, f)) = c * N(1, f) = N(c, f*c)
We try this with 3 sets of c and f, and test over 100,000 models to get an accurate distribution.
"""
factory = NNPrescriptorFactory(NNPrescriptor, {"in_size": 12, "hidden_size": 64, "out_size": 1})

for _ in range(3):
c = np.random.randint(0, 5)
f = np.random.randint(0, 5)

parent0 = factory.random_init()
parent1 = factory.random_init()
init_uniform(parent0.model, c)
init_uniform(parent1.model, c)

params = []
n = 100000
for _ in range(n):
child = factory.crossover([parent0, parent1], 1, f)[0]
for parameter in child.model.parameters():
params.append(parameter)

flattened = torch.cat([p.flatten() for p in params])
mean = torch.mean(flattened).item()
std = torch.std(flattened).item()

self.assertAlmostEqual(mean, c, places=2)
self.assertAlmostEqual(std, f * c, places=2)

0 comments on commit e761964

Please sign in to comment.