From 9ff7b8fdcfb79f755026f965a0a72ca46d947eb3 Mon Sep 17 00:00:00 2001 From: Kye Date: Fri, 6 Oct 2023 22:18:52 -0400 Subject: [PATCH] code quality --- example.py | 4 +- nnt.py | 26 ++++---- swarms_torch/__init__.py | 3 +- swarms_torch/ant_colony_swarm.py | 45 ++++++------- swarms_torch/cellular_transformer.py | 58 +++++++--------- swarms_torch/graph_cellular_automa.py | 48 ++++++++------ swarms_torch/ma_agent.py | 29 ++++++-- swarms_torch/neuronal_transformer.py | 91 +++++++++++-------------- swarms_torch/particle_swarm.py | 50 +++++++------- swarms_torch/queen_bee.py | 95 +++++++++++++++++---------- swarms_torch/spiral_optimization.py | 48 +++++++------- swarms_torch/transformer_pso.py | 57 ++++++++++------ tests/ant_colony.py | 28 +++++--- tests/particle_swarm.py | 19 ++++-- tests/queen_bee.py | 18 +++-- 15 files changed, 336 insertions(+), 283 deletions(-) diff --git a/example.py b/example.py index 15cc842..ccb5055 100644 --- a/example.py +++ b/example.py @@ -1,5 +1,5 @@ from swarms_torch import ParticleSwarmOptimization -#test +# test pso = ParticleSwarmOptimization(goal="Attention is all you need", n_particles=100) -pso.optimize(iterations=1000) \ No newline at end of file +pso.optimize(iterations=1000) diff --git a/nnt.py b/nnt.py index 7e08e7d..5706e31 100644 --- a/nnt.py +++ b/nnt.py @@ -4,22 +4,18 @@ x = torch.randn(1, 10) network = NNTransformer( - #transformer cells - neuron_count = 5, - - #num states - num_states = 10, - - #input dim - input_dim = 10, - - #output dim - output_dim = 10, - - #nhead - nhead = 2, + # transformer cells + neuron_count=5, + # num states + num_states=10, + # input dim + input_dim=10, + # output dim + output_dim=10, + # nhead + nhead=2, ) output = network(x) -print(output) \ No newline at end of file +print(output) diff --git a/swarms_torch/__init__.py b/swarms_torch/__init__.py index 076e771..85fc8a6 100644 --- a/swarms_torch/__init__.py +++ b/swarms_torch/__init__.py @@ -10,5 +10,4 @@ "ParticleSwarmOptimization", "AntColonyOptimization", "QueenBeeGa", - "SPO" -] \ No newline at end of file + "SPO"] diff --git a/swarms_torch/ant_colony_swarm.py b/swarms_torch/ant_colony_swarm.py index e92644c..65c7eb6 100644 --- a/swarms_torch/ant_colony_swarm.py +++ b/swarms_torch/ant_colony_swarm.py @@ -1,5 +1,6 @@ import torch + class AntColonyOptimization: """ Ant Colony Optimization @@ -27,9 +28,9 @@ class AntColonyOptimization: goal_string = "Hello ACO" aco = AntColonyOptimization(goal_string, num_iterations=1000) best_solution = aco.optimize() - + print("Best Matched String:", best_solution) - + Features to implement -------- 1. Add a stopping criterion @@ -39,14 +40,15 @@ class AntColonyOptimization: 5. Add a function to plot the best solution """ + def __init__( - self, - goal: str = None, - num_ants: int = 10000, - evaporation_rate: float = 0.1, - alpha: int = 1, - beta: int = 1, - num_iterations: int = 10010 + self, + goal: str = None, + num_ants: int = 10000, + evaporation_rate: float = 0.1, + alpha: int = 1, + beta: int = 1, + num_iterations: int = 10010, ): self.goal = torch.tensor([ord(c) for c in goal], dtype=torch.float32) self.num_ants = num_ants @@ -54,7 +56,8 @@ def __init__( self.alpha = alpha self.beta = beta self.num_iterations = num_iterations - # Pheromone levels can be initialized for different paths (architectures) + # Pheromone levels can be initialized for different paths + # (architectures) self.pheromones = torch.ones(num_ants) self.solutions = [] @@ -65,15 +68,15 @@ def fitness(self, solution): def update_pheromones(self): """Update pheromone levels""" for i, solution in enumerate(self.solutions): - self.pheromones[i] = ( - 1 - self.evaporation_rate - ) * self.pheromones[i] + self.fitness(solution) + self.pheromones[i] = (1 - self.evaporation_rate) * self.pheromones[ + i + ] + self.fitness(solution) def choose_next_path(self): """Choose the next path based on the pheromone levels""" - probabilities = ( - self.pheromones ** self.alpha - ) * ((1.0 / (1 + self.pheromones)) ** self.beta) + probabilities = (self.pheromones**self.alpha) * ( + (1.0 / (1 + self.pheromones)) ** self.beta + ) probabilities /= probabilities.sum() @@ -84,13 +87,11 @@ def optimize(self): for iteration in range(self.num_iterations): self.solutions = [] for _ in range(self.num_ants): - # This is a placeholder. Actual implementation will define how ants traverse the search space. + # This is a placeholder. Actual implementation will define how + # ants traverse the search space. solution = torch.randint( - 32, - 127, - ( - len(self.goal), - ), dtype=torch.float32) # Random characters. + 32, 127, (len(self.goal),), dtype=torch.float32 + ) # Random characters. self.solutions.append(solution) self.update_pheromones() diff --git a/swarms_torch/cellular_transformer.py b/swarms_torch/cellular_transformer.py index 1155a42..2a7dbc1 100644 --- a/swarms_torch/cellular_transformer.py +++ b/swarms_torch/cellular_transformer.py @@ -1,6 +1,7 @@ -import torch +import torch from torch import nn + class TransformerCell(nn.Module): def __init__( self, @@ -11,20 +12,17 @@ def __init__( ): super(TransformerCell, self).__init__() self.transformer = nn.Transformer( - input_dim, - nhead=nhead, - num_encoder_layers=num_layers + input_dim, nhead=nhead, num_encoder_layers=num_layers ) self.neighborhood_size = neighborhood_size - - def forward(self, x, neigbors): + def forward(self, x, neigbors): x = self.transformer(x, x) out = torch.cat([x] + neigbors, dim=0) - + return out - + class CellularSwarm(nn.Module): """ @@ -42,51 +40,39 @@ class CellularSwarm(nn.Module): input_dim (int): Input dimension nhead (int): Number of heads in the transformer cell time_steps (int): Number of time steps to run the network - + Returns: torch.Tensor: Output tensor - + Usage: >>> x = torch.randn(10, 32, 512) >>> model = CellularSwarm(cell_count=5, input_dim=512, nhead=8) >>> output = model(x) >>> print(output) - - + + """ - def __init__( - self, - cell_count, - input_dim, - nhead, - time_steps=4 - ): + + def __init__(self, cell_count, input_dim, nhead, time_steps=4): super(CellularSwarm, self).__init__() - self.cells = nn.ModuleList([ - TransformerCell(input_dim, nhead) for _ in range(cell_count) - ]) + self.cells = nn.ModuleList( + [TransformerCell(input_dim, nhead) for _ in range(cell_count)] + ) self.time_steps = time_steps - + def forward(self, x): for _ in range(self.time_steps): for i, cell in enumerate(self.cells): - #get neighboring cells states - start_idx = max( - 0, - i - cell.neighborhood_size - ) + # get neighboring cells states + start_idx = max(0, i - cell.neighborhood_size) - end_idx = min( - len(self.cells), i + cell.neighborhood_size + 1 - ) + end_idx = min(len(self.cells), i + cell.neighborhood_size + 1) neighbors = [ - self.cells[j].transformer(x, x) for j in range( - start_idx, - end_idx - ) if j != i + self.cells[j].transformer(x, x) + for j in range(start_idx, end_idx) + if j != i ] x = cell(x, neighbors) return x - diff --git a/swarms_torch/graph_cellular_automa.py b/swarms_torch/graph_cellular_automa.py index 41a13c0..e0ca114 100644 --- a/swarms_torch/graph_cellular_automa.py +++ b/swarms_torch/graph_cellular_automa.py @@ -9,9 +9,9 @@ def __init__(self, input_dim, hidden_dim, output_dim): self.mlp = nn.Sequential( nn.Linear(input_dim, hidden_dim), nn.ReLU(), - nn.Linear(hidden_dim, output_dim) + nn.Linear(hidden_dim, output_dim), ) - + def forward(self, x): return self.mlp(x) @@ -24,52 +24,60 @@ def __init__(self, input_dim, hidden_dim): nn.Linear(input_dim, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, 1), - nn.Sigmoid() # for binary classification + nn.Sigmoid(), # for binary classification ) - + def forward(self, x): return self.mlp(x) + class WeightUpdateModel(nn.Module): def __init__(self, input_dim, hidden_dim): super(WeightUpdateModel, self).__init__() self.mlp = nn.Sequential( - nn.Linear(input_dim, hidden_dim), - nn.ReLU(), - nn.Linear(hidden_dim, 1) - ) - + nn.Linear( + input_dim, hidden_dim), nn.ReLU(), nn.Linear( + hidden_dim, 1)) + def forward(self, x): return self.mlp(x) + class NDP(nn.Module): def __init__(self, embedding_dim, hidden_dim): super(NDP, self).__init__() - self.gc_automata = GraphCellularAutomata(embedding_dim, hidden_dim, embedding_dim) + self.gc_automata = GraphCellularAutomata( + embedding_dim, hidden_dim, embedding_dim + ) self.replication_model = ReplicationModel(embedding_dim, hidden_dim) - self.weight_update_model = WeightUpdateModel(2 * embedding_dim, hidden_dim) - + self.weight_update_model = WeightUpdateModel( + 2 * embedding_dim, hidden_dim) + def forward(self, node_embeddings, adjacency_matrix): # Update node embeddings using Graph Cellular Automata updated_embeddings = self.gc_automata(node_embeddings) - + # Check which nodes need to replicate replication_decisions = self.replication_model(updated_embeddings) - + # Weight update (assuming weighted network) num_nodes = node_embeddings.shape[0] edge_weights = torch.zeros((num_nodes, num_nodes)) for i in range(num_nodes): for j in range(num_nodes): - combined_embedding = torch.cat((updated_embeddings[i], updated_embeddings[j])) - - edge_weights[i, j] = self.weight_update_model(combined_embedding) - + combined_embedding = torch.cat( + (updated_embeddings[i], updated_embeddings[j]) + ) + + edge_weights[i, j] = self.weight_update_model( + combined_embedding) + return updated_embeddings, replication_decisions, edge_weights + # Usage examples embedding_dim = 16 hidden_dim = 32 @@ -77,7 +85,9 @@ def forward(self, node_embeddings, adjacency_matrix): adjacency_matrix = torch.rand((10, 10)) # Dummy adjacency matrix for 10 nodes model = NDP(embedding_dim, hidden_dim) -updated_embeddings, replication_decisions, edge_weights = model(node_embeddings, adjacency_matrix) +updated_embeddings, replication_decisions, edge_weights = model( + node_embeddings, adjacency_matrix +) print(updated_embeddings.shape) print(replication_decisions.shape) diff --git a/swarms_torch/ma_agent.py b/swarms_torch/ma_agent.py index 1da3591..a8c69d2 100644 --- a/swarms_torch/ma_agent.py +++ b/swarms_torch/ma_agent.py @@ -3,6 +3,7 @@ import torch.optim as optim import gym + class MAgent: class Agent(nn.Module): def __init__(self, input_dim, output_dim): @@ -11,7 +12,7 @@ def __init__(self, input_dim, output_dim): nn.Linear(input_dim, 128), nn.ReLU(), nn.Linear(128, output_dim), - nn.Softmax(dim=-1) + nn.Softmax(dim=-1), ) def forward(self, state): @@ -20,8 +21,14 @@ def forward(self, state): class MultiGymEnvironment: def __init__(self, env_name, num_agents): self.envs = [gym.make(env_name) for _ in range(num_agents)] - self.agents = [MAgent.Agent(self.envs[0].observation_space.shape[0], self.envs[0].action_space.n) for _ in range(num_agents)] - self.optimizers = [optim.Adam(agent.parameters()) for agent in self.agents] + self.agents = [ + MAgent.Agent( + self.envs[0].observation_space.shape[0], self.envs[0].action_space.n + ) + for _ in range(num_agents) + ] + self.optimizers = [optim.Adam(agent.parameters()) + for agent in self.agents] def step(self, agent_actions): rewards = [] @@ -37,16 +44,24 @@ def get_states(self): def train(self, epochs=1000): for epoch in range(epochs): states = self.get_states() - actions = [torch.argmax(agent(torch.FloatTensor(state))).item() for agent, state in zip(self.agents, states)] + actions = [ + torch.argmax(agent(torch.FloatTensor(state))).item() + for agent, state in zip(self.agents, states) + ] rewards = self.step(actions) - for agent, optimizer, reward in zip(self.agents, self.optimizers, rewards): - loss = -torch.log(agent(torch.FloatTensor(states))) * reward # Example loss function + for agent, optimizer, reward in zip( + self.agents, self.optimizers, rewards + ): + loss = ( + -torch.log(agent(torch.FloatTensor(states))) * reward + ) # Example loss function optimizer.zero_grad() loss.backward() optimizer.step() + # Example usage: num_agents = 5 -simulation = MAgent.MultiGymEnvironment('CartPole-v1', num_agents) +simulation = MAgent.MultiGymEnvironment("CartPole-v1", num_agents) simulation.train() diff --git a/swarms_torch/neuronal_transformer.py b/swarms_torch/neuronal_transformer.py index a0a9197..f4283c9 100644 --- a/swarms_torch/neuronal_transformer.py +++ b/swarms_torch/neuronal_transformer.py @@ -1,6 +1,6 @@ """ -Cellular neural network +Cellular neural network Architecture: - Input -> Linear -> ReLU -> Linear -> ReLU -> Output @@ -10,26 +10,28 @@ """ -import torch +import torch from torch import nn + class TransformerLayer(nn.Module): """ Transformer Layer Architecture: - Input -> Linear -> ReLU -> Linear -> ReLU -> Output - + Args: input_dim (int): Input dimension output_dim (int): Output dimension - + Returns: torch.Tensor: Output tensor Usage """ + def __init__( self, input_dim, @@ -41,40 +43,27 @@ def __init__( d_model=input_dim, nhead=nhead, ) - self.fc = nn.Linear( - input_dim, - output_dim - ) - + self.fc = nn.Linear(input_dim, output_dim) + def forward(self, x): return self.fc(self.transformer(x)) - + class Neuron(nn.Module): def __init__(self, num_states): super(Neuron, self).__init__() self.states = nn.Parameter(torch.randn(num_states)) - + + class SynapseTransformer(nn.Module): - def __init__( - self, - input_dim, - output_dim, - nhead: int - ): - super( - SynapseTransformer, - self - ).__init__() - self.transformer = TransformerLayer( - input_dim, - output_dim, - nhead - ) - + def __init__(self, input_dim, output_dim, nhead: int): + super(SynapseTransformer, self).__init__() + self.transformer = TransformerLayer(input_dim, output_dim, nhead) + def forward(self, x): return self.transformer(x) + class NNTransformer(nn.Module): """ Neural Network NNTransformer @@ -85,22 +74,22 @@ class NNTransformer(nn.Module): input_dim (int): Input dimension output_dim (int): Output dimension nhead (int): Number of heads in transformer layer - + Returns: torch.Tensor: Output tensor - + Architecture: - Input -> Linear -> ReLU -> Linear -> ReLU -> Output - Neuron states are updated after each synapse - Softmax is applied after each synapse - Layer normalization is applied after each synapse - + Usage: network = CellularNN(5, 10, 10, 10, 2) output = network(torch.randn(1, 10)) print(output) - - + + Training: network = NNTransformer(5, 10, 10, 10, 2) output = network(torch.randn(1, 10)) @@ -139,15 +128,15 @@ class NNTransformer(nn.Module): for epoch in range(epochs): # Forward pass outputs = network(x) - + # Compute loss loss = criterion(outputs, y) - + # Backward pass and optimization optimizer.zero_grad() loss.backward() optimizer.step() - + # Print loss every 100 epochs if (epoch+1) % 100 == 0: print(f"Epoch [{epoch+1}/{epochs}], Loss: {loss.item():.4f}") @@ -157,41 +146,35 @@ class NNTransformer(nn.Module): test_output = network(test_input) print(test_output) - + """ - def __init__( - self, - neuron_count, - num_states, - input_dim, - output_dim, - nhead - ): + + def __init__(self, neuron_count, num_states, input_dim, output_dim, nhead): super(NNTransformer, self).__init__() # Initialize neurons and synapses self.neurons = nn.ModuleList( - [Neuron(num_states) for _ in range(neuron_count)] - ) + [Neuron(num_states) for _ in range(neuron_count)]) self.synapses = nn.ModuleList( - [SynapseTransformer(input_dim, output_dim, nhead) for _ in range(neuron_count)] + [ + SynapseTransformer(input_dim, output_dim, nhead) + for _ in range(neuron_count) + ] ) self.norm = nn.LayerNorm(output_dim) self.softmax = nn.Softmax(dim=1) - + def forward(self, x): for neuron, synapse in zip(self.neurons[:-1], self.synapses): - #norm before synapse + # norm before synapse x = self.norm(x) - #synapse + # synapse x = synapse(x) - #softmax after synapse + # softmax after synapse x = self.softmax(x) - + neuron.states.data = x return self.neurons[-1].states - - diff --git a/swarms_torch/particle_swarm.py b/swarms_torch/particle_swarm.py index 83452c8..2bdfdc3 100644 --- a/swarms_torch/particle_swarm.py +++ b/swarms_torch/particle_swarm.py @@ -1,5 +1,6 @@ import torch + class ParticleSwarmOptimization: """ Particle Swarm Optimization @@ -46,9 +47,10 @@ class ParticleSwarmOptimization: 9. Add a function to plot the personal best weight 10. Add a function to plot the global best weight - - + + """ + def __init__( self, goal: str = None, @@ -56,7 +58,7 @@ def __init__( inertia: float = 0.5, personal_best_weight: float = 1.5, global_best_weight: float = 1.5, - dim: int = 1 + dim: int = 1, ): self.goal = torch.tensor([ord(c) for c in goal]) self.n_particles = n_particles @@ -69,14 +71,13 @@ def __init__( self.personal_best = self.particles.clone() self.global_best = self.particles[0].clone() - + def compute_fitness( self, particle, ): return 1.0 / (1.0 + torch.norm((particle - self.goal).float())) - def update( self, ): @@ -92,23 +93,31 @@ def update( if fitness > personal_best_fitness: self.personal_best[i] = self.particles[i] - + global_best_fitness = self.compute_fitness(self.global_best) if fitness > global_best_fitness: self.global_best = self.particles[i] - #update velocity - personal_attraction = self.personal_best_weight * torch.rand( - self.goal.size() - ) * (self.personal_best[i] - self.particles[i]) - - global_attraction = self.global_best_weight * torch.rand( - self.goal.size() - ) * (self.global_best - self.particles[i]) - - self.velocities[i] = self.inertia * self.velocities[i] + personal_attraction + global_attraction - + # update velocity + personal_attraction = ( + self.personal_best_weight + * torch.rand(self.goal.size()) + * (self.personal_best[i] - self.particles[i]) + ) + + global_attraction = ( + self.global_best_weight + * torch.rand(self.goal.size()) + * (self.global_best - self.particles[i]) + ) + + self.velocities[i] = ( + self.inertia * self.velocities[i] + + personal_attraction + + global_attraction + ) + # Update position self.particles[i] += self.velocities[i].int() self.particles[i].clamp_(0, 255) @@ -121,8 +130,5 @@ def optimize( for _ in range(iterations): self.update() best_particle = self.global_best - print( - "Best Particle: ", - ''.join([chr(int(i)) for i in best_particle]) - ) - + print("Best Particle: ", "".join( + [chr(int(i)) for i in best_particle])) diff --git a/swarms_torch/queen_bee.py b/swarms_torch/queen_bee.py index 867aaa8..b55393a 100644 --- a/swarms_torch/queen_bee.py +++ b/swarms_torch/queen_bee.py @@ -1,5 +1,6 @@ import torch + class QueenBeeGa: """ Queen Bee evolution for genetic algos @@ -11,7 +12,7 @@ class QueenBeeGa: Reference: --------- - https://www.researchgate.net/publication/228961729_A_Queen_Bee_GA_for_optimization + https://www.researchgate.net/publication/228961729_A_Queen_Bee_GA_for_optimization Usage ----- @@ -21,10 +22,11 @@ class QueenBeeGa: mutation_prob=0.04, strong_mutation_rate=0.1, strong_mutation_prob=0.25, - num_tournament_participants=25 + num_tournament_participants=25 ) - optimizer.run(max_generations=100) + optimizer.run(max_generations=100) """ + def __init__( self, goal: str = "Attention is all you need", @@ -32,7 +34,7 @@ def __init__( mutation_prob: float = 0.04, strong_mutation_rate: float = 0.1, strong_mutation_prob: float = 0.25, - num_tournament_participants: int = 25 + num_tournament_participants: int = 25, ): """ QueenBeeGa with params and initial configs @@ -50,7 +52,7 @@ def __init__( strong_mutation_prob: float Strong mutation probability num_tournament_participants: int - Number of tournament participants + Number of tournament participants """ self.goal = goal self.pop_size = pop_size @@ -64,24 +66,24 @@ def __init__( self.target_gene = self.encode(goal) self.strong_mutate_pool_size = strong_mutation_rate * pop_size - self.num_code_mutate = mutation_prob * self.gene_length + self.num_code_mutate = mutation_prob * self.gene_length self.strong_num_code_mutate = strong_mutation_prob * self.gene_length self.pool = torch.randint(0, 255, (pop_size, self.gene_length)) self.queen = None self.queen_fitness = None self.generation = 0 - + @staticmethod def encode(s): """Convert string to it's values""" return torch.tensor([ord(c) for c in s]) - + @staticmethod def decode(t): """Convert ASCII values tensor back to string""" return "".join([chr(i) for i in t.tolist()]) - + def run(self, max_generations: int = 1000): """ Run the queen genertic algorithm evolution @@ -97,67 +99,92 @@ def run(self, max_generations: int = 1000): self._evolve() if self._check_convergence(): pass - + def _evolve(self): """ Execute one step of the evolution process. """ - + # Sort population by fitness - fitnesses = 1. / torch.square(self.pool - self.target_gene).sum(dim=-1) + fitnesses = 1.0 / \ + torch.square(self.pool - self.target_gene).sum(dim=-1) indices = fitnesses.sort(descending=True).indices self.pool, fitnesses = self.pool[indices], fitnesses[indices] - + # Display every generation if self.queen is not None: print("queen:") print(f"{self.decode(self.queen)} ({self.queen_fitness.item():.3f})\n") for gene, fitness in zip(self.pool, fitnesses): print(f"{self.decode(gene)} ({fitness.item():.3f})") - + # If one of the children has a better fitness than queen, that child becomes the new queen # and the queen replaces the worst bee in the population if self.queen is not None and self.queen_fitness < fitnesses[0]: self.pool = torch.cat((self.pool, self.queen[None, :]), dim=0) fitnesses = torch.cat((fitnesses, self.queen_fitness[None]), dim=0) self.queen = self.queen_fitness = None - + # Separate the queen bee from the rest of the population if self.queen is None: self.queen, self.pool = self.pool[0], self.pool[1:] self.queen_fitness, fitnesses = fitnesses[0], fitnesses[1:] - + # Deterministic tournament selection - contender_ids = torch.randn((self.pop_size - 1, self.pop_size - 1)).argsort(dim=-1)[..., :self.num_tournament_participants] + contender_ids = torch.randn((self.pop_size - + 1, self.pop_size - + 1)).argsort(dim=- + 1)[..., : self.num_tournament_participants] participants, tournaments = self.pool[contender_ids], fitnesses[contender_ids] - top_winner = tournaments.topk(1, dim=-1, largest=True, sorted=False).indices + top_winner = tournaments.topk( + 1, dim=-1, largest=True, sorted=False).indices top_winner = top_winner.unsqueeze(-1).expand(-1, -1, self.gene_length) parents = participants.gather(1, top_winner).squeeze(1) - + # Cross over all chosen drones with the queen - queen_parents = self.queen.unsqueeze(0).expand(self.pop_size - 1, self.gene_length) - self.pool = torch.cat((queen_parents[:, :self.gene_midpoint], parents[:, self.gene_midpoint:]), dim=-1) - + queen_parents = self.queen.unsqueeze(0).expand( + self.pop_size - 1, self.gene_length + ) + self.pool = torch.cat( + (queen_parents[:, : self.gene_midpoint], parents[:, self.gene_midpoint:]), + dim=-1, + ) + # Mutate genes in population - mutate_mask = torch.randn(self.pool.shape).argsort(dim=-1) < self.num_code_mutate + mutate_mask = ( + torch.randn(self.pool.shape).argsort(dim=-1) < self.num_code_mutate + ) noise = torch.randint(0, 2, self.pool.shape) * 2 - 1 mutated_pool = torch.where(mutate_mask, self.pool + noise, self.pool) - - strong_mutate_mask = torch.randn(self.pool.shape).argsort(dim=-1) < self.strong_num_code_mutate + + strong_mutate_mask = ( + torch.randn( + self.pool.shape).argsort( + dim=- + 1) < self.strong_num_code_mutate) noise = torch.randint(0, 2, self.pool.shape) * 2 - 1 - strong_mutated_pool = torch.where(strong_mutate_mask, self.pool + noise, self.pool) - - strong_mutate_pool_mask = torch.randn(self.pop_size - 1).argsort(dim=-1) < self.strong_mutate_pool_size - self.pool = torch.where(strong_mutate_pool_mask[:, None], strong_mutated_pool, mutated_pool) + strong_mutated_pool = torch.where( + strong_mutate_mask, self.pool + noise, self.pool + ) + + strong_mutate_pool_mask = ( + torch.randn(self.pop_size - 1).argsort(dim=-1) + < self.strong_mutate_pool_size + ) + self.pool = torch.where( + strong_mutate_pool_mask[:, None], strong_mutated_pool, mutated_pool + ) self.pool.clamp_(0, 255) - + def _check_convergence(self): """ Check if any of the solutions has achieved the goal """ - fitnesses = 1. / torch.square(self.pool - self.target_gene).sum(dim=-1) - return (fitnesses == float('inf')).any().item() - + fitnesses = 1.0 / \ + torch.square(self.pool - self.target_gene).sum(dim=-1) + return (fitnesses == float("inf")).any().item() + + # # Usage: # optimizer = QueenBeeGa() -# optimizer.run(max_generations=100) \ No newline at end of file +# optimizer.run(max_generations=100) diff --git a/swarms_torch/spiral_optimization.py b/swarms_torch/spiral_optimization.py index 882c09f..90a034d 100644 --- a/swarms_torch/spiral_optimization.py +++ b/swarms_torch/spiral_optimization.py @@ -1,9 +1,10 @@ import torch + class SPO: """ Spiral Optimization (SPO) Algorithm in PyTorch. - + Implements the SPO algorithm for optimization towards a target string. How does it work? @@ -30,50 +31,44 @@ class SPO: 2. Add a callback function to track the progress 3. Add a function to plot the search points 4. Add a function to plot the best solution - + """ - def __init__( - self, - goal: str = None, - m: int = 10, - k_max: int = 1000 - ): + + def __init__(self, goal: str = None, m: int = 10, k_max: int = 1000): """ Initialize the SPO class. - + Args: - goal: The target string. - m: Number of search points (strings). - k_max: Maximum number of iterations. """ - self.goal = torch.tensor([ - ord(c) for c in goal - ], dtype=torch.float32) # ASCII representation + self.goal = torch.tensor( + [ord(c) for c in goal], dtype=torch.float32 + ) # ASCII representation self.m = m self.k_max = k_max self.n_dim = len(goal) - + # Initializing the search points and center randomly # Note: 32-126 is the ASCII range for all printable characters self.points = torch.randint( - 32, 127, (self.m, self.n_dim), dtype=torch.float32 - ) + 32, 127, (self.m, self.n_dim), dtype=torch.float32) self.center = torch.randint( - 32, 127, (self.n_dim,), dtype=torch.float32 - ) - + 32, 127, (self.n_dim,), dtype=torch.float32) + def _step_rate(self, k): """ Define the step rate function. - + Args: - k: Current iteration. - + Returns: Step rate for the current iteration. """ return 1 / (1 + k) - + def _update_points(self, k): """Update the search points based on the spiral model.""" r = self._step_rate(k) @@ -82,25 +77,28 @@ def _update_points(self, k): self.points[i] = self.center + r * torch.mv( R, (self.points[i] - self.center) ) - + def _update_center(self): """Find the best search point and set as the new center.""" fitnesses = torch.norm(self.points - self.goal, dim=1) best_idx = torch.argmin(fitnesses) self.center = self.points[best_idx] - + def optimize(self): """Run the optimization loop.""" for k in range(self.k_max): self._update_points(k) self._update_center() - if torch.norm(self.center - self.goal) < 1e-5: # Example convergence condition + if ( + torch.norm(self.center - self.goal) < 1e-5 + ): # Example convergence condition break def best_string(self): """Convert the best found point to its string representation""" return "".join([chr(int(c)) for c in self.center.round()]) - + + # # Example Usage # goal = "Attention is all you need" # optimizer = SPO(goal) diff --git a/swarms_torch/transformer_pso.py b/swarms_torch/transformer_pso.py index e25a1c6..da0327d 100644 --- a/swarms_torch/transformer_pso.py +++ b/swarms_torch/transformer_pso.py @@ -2,11 +2,13 @@ import torch.nn as nn from copy import deepcopy + class SimpleTransformer(nn.Module): def __init__(self, input_dim, d_model, nhead, num_layers, output_dim): super(SimpleTransformer, self).__init__() self.embedding = nn.Embedding(input_dim, d_model) - self.transformer = nn.Transformer(d_model, nhead, num_layers, num_layers) + self.transformer = nn.Transformer( + d_model, nhead, num_layers, num_layers) self.fc = nn.Linear(d_model, output_dim) def forward(self, x): @@ -14,18 +16,19 @@ def forward(self, x): x = self.transformer(x, x) return self.fc(x[-1]) + class ParticleSwarmOptimization: def __init__( - self, + self, model_constructor, # Function to create a new model instance - model_args, # Arguments for the model constructor - device, # 'cuda' or 'cpu' - criterion, - data_loader, - n_particles=10, - inertia=0.5, - personal_best_weight=1.5, - global_best_weight=1.5 + model_args, # Arguments for the model constructor + device, # 'cuda' or 'cpu' + criterion, + data_loader, + n_particles=10, + inertia=0.5, + personal_best_weight=1.5, + global_best_weight=1.5, ): self.model_constructor = model_constructor self.model_args = model_args @@ -39,9 +42,14 @@ def __init__( self.global_best_weight = global_best_weight # Representing particles using model parameters - param_size = sum(p.numel() for p in model_constructor(*model_args).parameters()) - self.particles = [self.model_constructor(*model_args).to(device) for _ in range(n_particles)] - self.velocities = [torch.zeros((param_size,)).to(device) for _ in range(n_particles)] + param_size = sum(p.numel() + for p in model_constructor(*model_args).parameters()) + self.particles = [ + self.model_constructor( + *model_args).to(device) for _ in range(n_particles)] + self.velocities = [ + torch.zeros((param_size,)).to(device) for _ in range(n_particles) + ] self.personal_best = [deepcopy(p.state_dict()) for p in self.particles] self.global_best = deepcopy(self.particles[0].state_dict()) @@ -73,24 +81,29 @@ def update(self): # Update velocities and positions for name, param in particle.named_parameters(): - delta = (self.personal_best_weight * torch.rand_like(param) * - (self.personal_best[idx][name].to(self.device) - param.data) + - self.global_best_weight * torch.rand_like(param) * - (self.global_best[name].to(self.device) - param.data)) - self.velocities[idx] += self.inertia * self.velocities[idx] + delta + delta = self.personal_best_weight * torch.rand_like(param) * ( + self.personal_best[idx][name].to(self.device) - param.data + ) + self.global_best_weight * torch.rand_like(param) * ( + self.global_best[name].to(self.device) - param.data + ) + self.velocities[idx] += self.inertia * \ + self.velocities[idx] + delta param.data += self.velocities[idx] def optimize(self, iterations=1000): for _ in range(iterations): self.update() best_particle_score = self.compute_fitness(self.global_best) - print(f"Iteration {_ + 1}/{iterations} - Best Particle Fitness: {best_particle_score}") + print( + f"Iteration {_ + 1}/{iterations} - Best Particle Fitness: {best_particle_score}" + ) def get_best_model(self): best_model = self.model_constructor(*self.model_args).to(self.device) best_model.load_state_dict(self.global_best) return best_model + # Define model and optimization parameters input_dim = 1000 d_model = 512 @@ -105,7 +118,7 @@ def get_best_model(self): pso = ParticleSwarmOptimization( SimpleTransformer, (input_dim, d_model, nhead, num_layers, output_dim), - device='cuda', # or 'cpu' + device="cuda", # or 'cpu' criterion=nn.CrossEntropyLoss(), # data_loader=your_dataloader # replace with your dataloader ) @@ -117,7 +130,9 @@ def get_best_model(self): best_model = pso.get_best_model() # Generate a random input tensor -x = torch.randint(0, input_dim, (batch_size, sequence_length)).to('cuda') # ensure it's on the same device as your model +x = torch.randint(0, input_dim, (batch_size, sequence_length)).to( + "cuda" +) # ensure it's on the same device as your model # Pass the tensor through the model output = best_model(x) diff --git a/tests/ant_colony.py b/tests/ant_colony.py index 5cc06e8..4a3dbd8 100644 --- a/tests/ant_colony.py +++ b/tests/ant_colony.py @@ -3,39 +3,46 @@ from swarms_torch import AntColonyOptimization # Import your class -class TestAntColonyOptimization(unittest.TestCase): +class TestAntColonyOptimization(unittest.TestCase): def setUp(self): - self.aco = AntColonyOptimization(goal="Hello ACO", num_ants=1000, num_iterations=10) - + self.aco = AntColonyOptimization( + goal="Hello ACO", num_ants=1000, num_iterations=10 + ) + def test_initialization(self): self.assertEqual(self.aco.goal.tolist(), [ord(c) for c in "Hello ACO"]) self.assertEqual(self.aco.pheromones.size(), torch.Size([1000])) self.assertEqual(self.aco.pheromones.tolist(), [1.0] * 1000) - + def test_fitness(self): solution = torch.tensor([ord(c) for c in "Hello ACO"], dtype=torch.float32) - self.assertEqual(self.aco.fitness(solution).item(), 0) # Should be maximum fitness - + self.assertEqual( + self.aco.fitness(solution).item(), 0 + ) # Should be maximum fitness + def test_update_pheromones(self): initial_pheromones = self.aco.pheromones.clone() - self.aco.solutions = [torch.tensor([ord(c) for c in "Hello ACO"], dtype=torch.float32) for _ in range(1000)] + self.aco.solutions = [ + torch.tensor([ord(c) for c in "Hello ACO"], dtype=torch.float32) + for _ in range(1000) + ] self.aco.update_pheromones() # After updating, pheromones should not remain the same self.assertFalse(torch.equal(initial_pheromones, self.aco.pheromones)) - + def test_choose_next_path(self): path = self.aco.choose_next_path() # Path should be an integer index within the number of ants self.assertIsInstance(path, int) self.assertGreaterEqual(path, 0) self.assertLess(path, 1000) - + def test_optimize(self): solution = self.aco.optimize() self.assertIsInstance(solution, str) # Given enough iterations and ants, the solution should approach the goal. For short runs, this might not hold. - # self.assertEqual(solution, "Hello ACO") + # self.assertEqual(solution, "Hello ACO") def test_invalid_parameters(self): with self.assertRaises(ValueError): @@ -43,5 +50,6 @@ def test_invalid_parameters(self): with self.assertRaises(ValueError): _ = AntColonyOptimization(evaporation_rate=1.5) + if __name__ == "__main__": unittest.main() diff --git a/tests/particle_swarm.py b/tests/particle_swarm.py index f4e0b82..b54ccdf 100644 --- a/tests/particle_swarm.py +++ b/tests/particle_swarm.py @@ -3,34 +3,39 @@ from swarms_torch import ParticleSwarmOptimization # Import your class here -class TestParticleSwarmOptimization(unittest.TestCase): +class TestParticleSwarmOptimization(unittest.TestCase): def setUp(self): self.pso = ParticleSwarmOptimization(goal="Hello", n_particles=10) - + def test_initialization(self): self.assertEqual(self.pso.goal.tolist(), [ord(c) for c in "Hello"]) self.assertEqual(self.pso.particles.size(), (10, 5)) self.assertEqual(self.pso.velocities.size(), (10, 5)) - + def test_compute_fitness(self): particle = torch.tensor([ord(c) for c in "Hello"]) fitness = self.pso.compute_fitness(particle) self.assertEqual(fitness.item(), 1.0) - + def test_update(self): initial_particle = self.pso.particles.clone() self.pso.update() # After updating, particles should not remain the same (in most cases) self.assertFalse(torch.equal(initial_particle, self.pso.particles)) - + def test_optimize(self): initial_best_particle = self.pso.global_best.clone() self.pso.optimize(iterations=10) # After optimization, global best should be closer to the goal - initial_distance = torch.norm((initial_best_particle - self.pso.goal).float()).item() - final_distance = torch.norm((self.pso.global_best - self.pso.goal).float()).item() + initial_distance = torch.norm( + (initial_best_particle - self.pso.goal).float() + ).item() + final_distance = torch.norm( + (self.pso.global_best - self.pso.goal).float() + ).item() self.assertLess(final_distance, initial_distance) + if __name__ == "__main__": unittest.main() diff --git a/tests/queen_bee.py b/tests/queen_bee.py index 45c93ad..74d9c8e 100644 --- a/tests/queen_bee.py +++ b/tests/queen_bee.py @@ -2,8 +2,8 @@ import torch from swarms_torch.queen_bee import QueenBeeGa # Import the class -class TestQueenBeeGa(unittest.TestCase): +class TestQueenBeeGa(unittest.TestCase): def setUp(self): self.optimizer = QueenBeeGa(goal="Hello QBGA", pop_size=50) @@ -12,31 +12,35 @@ def test_initialization(self): self.assertEqual(self.optimizer.gene_length, len("Hello QBGA")) self.assertIsNone(self.optimizer.queen) self.assertIsNone(self.optimizer.queen_fitness) - + def test_encode_decode(self): encoded = QueenBeeGa.encode("Hello") decoded = QueenBeeGa.decode(encoded) self.assertEqual(decoded, "Hello") - + def test_evolution(self): initial_population = self.optimizer.pool.clone() self.optimizer._evolve() self.assertFalse(torch.equal(initial_population, self.optimizer.pool)) - + def test_run(self): initial_population = self.optimizer.pool.clone() self.optimizer.run(max_generations=10) - self.assertNotEqual(QueenBeeGa.decode(self.optimizer.queen), QueenBeeGa.decode(initial_population[0])) - + self.assertNotEqual( + QueenBeeGa.decode(self.optimizer.queen), + QueenBeeGa.decode(initial_population[0]), + ) + def test_check_convergence(self): self.optimizer.pool = torch.stack([self.optimizer.target_gene] * 50) self.assertTrue(self.optimizer._check_convergence()) - + def test_invalid_parameters(self): with self.assertRaises(ValueError): _ = QueenBeeGa(mutation_prob=1.5) with self.assertRaises(ValueError): _ = QueenBeeGa(strong_mutation_rate=-0.5) + if __name__ == "__main__": unittest.main()