diff --git a/presp/evolution.py b/presp/evolution.py index 3f0d369..7017a06 100644 --- a/presp/evolution.py +++ b/presp/evolution.py @@ -85,7 +85,7 @@ def selection(self, sorted_population: list[Prescriptor]) -> list[Prescriptor]: """ idx1 = min(random.choices(range(len(sorted_population)), k=2)) idx2 = min(random.choices(range(len(sorted_population)), k=2)) - return sorted_population[idx1], sorted_population[idx2] + return [sorted_population[idx1], sorted_population[idx2]] def create_pop(self, population: list[Prescriptor]) -> list[Prescriptor]: """ diff --git a/presp/prescriptor/nn.py b/presp/prescriptor/nn.py index c6fcdff..9de054b 100644 --- a/presp/prescriptor/nn.py +++ b/presp/prescriptor/nn.py @@ -56,7 +56,10 @@ def random_init(self) -> NNPrescriptor: return candidate - def crossover(self, parents: list[Prescriptor], mutation_rate: float, mutation_factor: float) -> list[Prescriptor]: + def crossover(self, + parents: list[NNPrescriptor], + mutation_rate: float, + mutation_factor: float) -> list[NNPrescriptor]: """ Crossover two parents to create a child. Take a random 50/50 choice of either parent's weights. @@ -84,7 +87,7 @@ def mutate_(self, candidate: NNPrescriptor, mutation_rate: float, mutation_facto param[mutate_mask].shape, device=param.device, dtype=param.dtype) - param[mutate_mask] += noise * param[mutate_mask] + param[mutate_mask] *= (1 + noise) def load(self, path: Path) -> Prescriptor: """ diff --git a/tests/dummy_evaluator.py b/tests/dummy_evaluator.py index 75e9f04..9c6d166 100644 --- a/tests/dummy_evaluator.py +++ b/tests/dummy_evaluator.py @@ -18,4 +18,4 @@ def update_predictor(self, elites: list[DummyPrescriptor]): pass def evaluate_candidate(self, candidate: DummyPrescriptor) -> np.ndarray: - return candidate.forward(None) * 10 + return np.array([candidate.forward(None) * 10]) diff --git a/tests/dummy_prescriptor.py b/tests/dummy_prescriptor.py index fd617a8..df14fd1 100644 --- a/tests/dummy_prescriptor.py +++ b/tests/dummy_prescriptor.py @@ -21,7 +21,8 @@ def forward(self, context): return self.number def save(self, path): - pass + with open(path, "w", encoding="utf-8") as file: + file.write(str(self.number)) class DummyFactory(PrescriptorFactory): @@ -35,7 +36,10 @@ def __init__(self, prescriptor_cls: Type[DummyPrescriptor]): def random_init(self) -> DummyPrescriptor: return DummyPrescriptor() - def crossover(self, parents: list[Prescriptor], mutation_rate: float, mutation_factor: float) -> list[Prescriptor]: + def crossover(self, + parents: list[DummyPrescriptor], + mutation_rate: float, + mutation_factor: float) -> list[DummyPrescriptor]: numb = np.mean([parent.number for parent in parents], axis=0) if np.random.rand(1) < mutation_rate: numb += np.random.normal(0, mutation_factor) @@ -43,5 +47,9 @@ def crossover(self, parents: list[Prescriptor], mutation_rate: float, mutation_f child.number = numb return [child] - def load(self, path) -> Prescriptor: - pass + def load(self, path) -> DummyPrescriptor: + with open(path, "r", encoding="utf-8") as file: + number = float(file.read()) + candidate = DummyPrescriptor() + candidate.number = number + return candidate diff --git a/tests/test_evolution.py b/tests/test_evolution.py new file mode 100644 index 0000000..b2244e9 --- /dev/null +++ b/tests/test_evolution.py @@ -0,0 +1,119 @@ +""" +Tests the Evolution class. +""" +from pathlib import Path +import random +import shutil +import unittest + +import numpy as np + +from presp.evolution import Evolution +from tests.dummy_evaluator import DummyEvaluator +from tests.dummy_prescriptor import DummyPrescriptor, DummyFactory + + +class TestInitialPopulation(unittest.TestCase): + """ + Tests the initial population creation of the Evolution class. + """ + + def setUp(self): + if Path("tests/temp").exists(): + shutil.rmtree(Path("tests/temp")) + if Path("tests/seeds").exists(): + shutil.rmtree(Path("tests/seeds")) + + def tearDown(self): + if Path("tests/temp").exists(): + shutil.rmtree(Path("tests/temp")) + if Path("tests/seeds").exists(): + shutil.rmtree(Path("tests/seeds")) + + def test_create_initial_population_random(self): + """ + Tests the creation of a new initial population randomly. Makes sure all of the candidates are initialized. + Also makes sure the generation is set properly. + """ + evaluator = DummyEvaluator() + factory = DummyFactory(DummyPrescriptor) + evolution = Evolution(10, 10, 0.1, 2, 0.1, 0.1, "tests/temp", None, factory, evaluator) + evolution.create_initial_population() + + self.assertEqual(len(evolution.population), 10) + for candidate in evolution.population: + self.assertTrue(candidate.metrics is not None) + self.assertTrue(candidate.cand_id is not None) + self.assertTrue(candidate.outcomes is not None) + self.assertTrue(candidate.distance is not None) + self.assertTrue(candidate.rank is not None) + + self.assertEqual(evolution.generation, 2) + + def test_create_initial_population_seeded(self): + """ + Adds some seeds to the initial population and checks they are loaded correctly. + Also checks that the rest of the population is initialized randomly. + """ + seed_dir = Path("tests/seeds") + seed_dir.mkdir() + for i in range(8): + candidate = DummyPrescriptor() + candidate.number = i + candidate.save(seed_dir / f"0_{i}.txt") + + evaluator = DummyEvaluator() + factory = DummyFactory(DummyPrescriptor) + evolution = Evolution(10, 10, 0.1, 2, 0.1, 0.1, "tests/temp", "tests/seeds", factory, evaluator) + evolution.create_initial_population() + + # Make sure population is the right size + self.assertEqual(len(evolution.population), 10) + cand_names = {f"0_{i}" for i in range(8)} + cand_names.update({f"1_{i}" for i in range(8, 10)}) + + # Check that every name is accounted for and that the number is set correctly + for candidate in evolution.population: + self.assertTrue(candidate.cand_id in cand_names) + cand_names.remove(candidate.cand_id) + if candidate.cand_id.startswith("0"): + self.assertEqual(candidate.number, int(candidate.cand_id.split("_")[1])) + else: + self.assertGreaterEqual(candidate.number, 0) + self.assertLessEqual(candidate.number, 1) + + self.assertEqual(len(cand_names), 0) + + +class TestSelection(unittest.TestCase): + """ + Tests the default parent selection (tournament selection) + """ + def test_selection(self): + """ + Tests tournament selection. We choose the min of 2 candidates, therefore with 2 candidates the smaller one + should get picked 3/4 of the time: (0, 0), (0, 1), (1, 0) and the larger one 1/4 of the time: (1, 1). + We run this many times and see if we're within 1%. + """ + random.seed(42) + np.random.seed(42) + + factory = DummyFactory(DummyPrescriptor) + evaluator = DummyEvaluator() + evolution = Evolution(10, 10, 0.1, 2, 0.1, 0.1, "tests/temp", None, factory, evaluator) + + cand_0 = DummyPrescriptor() + cand_0.number = 0 + cand_1 = DummyPrescriptor() + cand_1.number = 1 + + population = [cand_0, cand_1] + counts = {0: 0, 1: 0} + n = 100000 + for _ in range(n): + parents = evolution.selection(population) + counts[parents[0].number] += 1 + counts[parents[1].number] += 1 + + self.assertTrue(np.isclose(0.75, counts[0] / (n * 2), atol=0.01)) + self.assertTrue(np.isclose(0.25, counts[1] / (n * 2), atol=0.01)) diff --git a/tests/test_nn.py b/tests/test_nn.py new file mode 100644 index 0000000..7f3f16f --- /dev/null +++ b/tests/test_nn.py @@ -0,0 +1,87 @@ +""" +Tests the implementation of the neural network prescriptor. +""" +import random +import unittest + +import numpy as np +import torch + +from presp.prescriptor import NNPrescriptor, NNPrescriptorFactory + + +def init_uniform(model, c): + """ + Initializes a model uniformly by filling it with constant c. + """ + for parameter in model.parameters(): + with torch.no_grad(): + parameter.fill_(c) + + +class TestNN(unittest.TestCase): + """ + Tests the neural net prescriptor + """ + + def setUp(self): + random.seed(42) + np.random.seed(42) + torch.manual_seed(42) + + def test_crossover(self): + """ + Tests that we get 50/50 parameters from model 0 and model 1 after crossover. + """ + # Fill a candidate with all 0 and all 1 + factory = NNPrescriptorFactory(NNPrescriptor, {"in_size": 12, "hidden_size": 64, "out_size": 1}) + parent0 = factory.random_init() + parent1 = factory.random_init() + init_uniform(parent0.model, 0) + init_uniform(parent1.model, 1) + + # After crossover we should have 50% 1's + count = 0 + total_num = 0 + n = 100 + for _ in range(n): + child = factory.crossover([parent0, parent1], 0, 0)[0] + for parameter in child.model.parameters(): + count += torch.sum(parameter.data).item() + total_num += parameter.numel() + # Also double check that each parameter is either a 0 or 1 + self.assertTrue(torch.all(torch.logical_or(parameter.data == 0, parameter.data == 1))) + + self.assertAlmostEqual(count / total_num, 0.5, places=2) + + def test_mutate(self): + """ + Checks if the mean and standard deviation of the mutated parameters are correct. + We are using gaussian percent noise with std = f. If we initialize models to all be c then: + params ~ c * (1 + N(0, f)) = c * N(1, f) = N(c, f*c) + We try this with 3 sets of c and f, and test over 100,000 models to get an accurate distribution. + """ + factory = NNPrescriptorFactory(NNPrescriptor, {"in_size": 12, "hidden_size": 64, "out_size": 1}) + + for _ in range(3): + c = np.random.randint(0, 5) + f = np.random.randint(0, 5) + + parent0 = factory.random_init() + parent1 = factory.random_init() + init_uniform(parent0.model, c) + init_uniform(parent1.model, c) + + params = [] + n = 100000 + for _ in range(n): + child = factory.crossover([parent0, parent1], 1, f)[0] + for parameter in child.model.parameters(): + params.append(parameter) + + flattened = torch.cat([p.flatten() for p in params]) + mean = torch.mean(flattened).item() + std = torch.std(flattened).item() + + self.assertAlmostEqual(mean, c, places=2) + self.assertAlmostEqual(std, f * c, places=2)