Skip to content

Commit

Permalink
feat: allow multiple targets and robot control; minor fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
HeitorMelo committed Jan 3, 2025
1 parent b8e1a5c commit ebf4478
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 67 deletions.
7 changes: 4 additions & 3 deletions agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,17 @@
from utils.ssl.Navigation import Navigation
from utils.Point import Point
from utils.ssl.base_agent import BaseAgent
from itertools import permutations

class ExampleAgent(BaseAgent):
def __init__(self, id=0, yellow=False):
super().__init__(id, yellow)

def decision(self):
if self.target is None:
if len(self.targets) == 0:
return
target_velocity, target_angle_velocity = Navigation.goToPoint(self.robot, self.target)

target_velocity, target_angle_velocity = Navigation.goToPoint(self.robot, self.targets[0])
self.set_vel(target_velocity)
self.set_angle_vel(target_angle_velocity)

Expand Down
11 changes: 6 additions & 5 deletions random_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,17 @@
import random

class RandomAgent(BaseAgent):
def __init__(self, id=0, yellow=False):
def __init__(self, id=0, yellow=False, vel_mult=0.3):
super().__init__(id, yellow)
self.vel_mult = vel_mult

def decision(self):
if self.target is None:
if len(self.targets) == 0:
return

target_velocity, target_angle_velocity= Navigation.goToPoint(self.robot, self.target)

vel_mult = 0.3 #random.uniform(0.2, 0.6)
target_velocity, target_angle_velocity = Navigation.goToPoint(self.robot, self.targets[0])

vel_mult = self.vel_mult #random.uniform(0.2, 0.6)
target_velocity = Point(target_velocity.x * vel_mult, target_velocity.y * vel_mult)
self.set_vel(target_velocity)
self.set_angle_vel(target_angle_velocity)
Expand Down
133 changes: 78 additions & 55 deletions sslenv.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@
from rsoccer_gym.Entities import Ball, Frame, Robot
from rsoccer_gym.ssl.ssl_gym_base import SSLBaseEnv
from rsoccer_gym.Utils import KDTree
from utils.ssl.Navigation import Navigation
from utils.Point import Point
from utils.FixedQueue import FixedQueue
from utils.ssl.small_field import SSLHRenderField
from agent import ExampleAgent
from random_agent import RandomAgent
Expand All @@ -27,15 +27,22 @@ def __init__(self, render_mode="human"):
self.observation_space = Box(low=-self.field.length/2,\
high=self.field.length/2,shape=(n_obs, ))

self.target = Point(0,0)
self.targets = []
self.min_dist = 0.18
self.all_points = []
self.robot_path = []
self.agent = ExampleAgent(0, False) ## Um único agente por enquanto
self.all_points = FixedQueue(5)
self.robots_paths = [FixedQueue(40) for i in range(11)]

self.max_rounds = 10
self.rounds = self.max_rounds - 1 ## because of the first round
self.max_targets = 7
self.targets_per_round = 1

self.myAgents = {0: ExampleAgent(0, False)}
self.blue_agents = {i: RandomAgent(i, False) for i in range(1, 11)}
self.yellow_agents = {i: RandomAgent(i, True) for i in range(0, 11)}

self.gen_target_prob = 0.003

if field == 2:
self.field_renderer = SSLHRenderField()
self.window_size = self.field_renderer.window_size
Expand All @@ -45,52 +52,66 @@ def _frame_to_observations(self):
return np.array([ball.x, ball.y, robot.x, robot.y])

def _get_commands(self, actions):
robot = self.frame.robots_blue[0]
robot_pos = Point(x=robot.x,
y=robot.y)

current_target = self.target
self.all_points.append(current_target)
self.robot_path.append(robot_pos)

# Keep only the last M target points
max_target_points = 300
if len(self.all_points) > max_target_points:
self.all_points.pop(0)

for target in self.targets:
if target not in self.all_points:
self.all_points.push(target)
# Visible path drawing control
max_path_length = 300
if len(self.robot_path) > max_path_length:
self.robot_path.pop(0)
for i in self.myAgents:
self.robots_paths[i].push(Point(self.frame.robots_blue[i].x, self.frame.robots_blue[i].y))

# Check if the robot is close to the target
for j in range(len(self.targets) - 1, -1, -1):
for i in self.myAgents:
if Point(self.frame.robots_blue[i].x, self.frame.robots_blue[i].y).dist_to(self.targets[j]) < self.min_dist:
self.targets.pop(j)
break

# Finish the phase and increase the number of targets for the next phase
if self.rounds == 0:
self.rounds = self.max_rounds
if self.targets_per_round < self.max_targets:
self.targets_per_round += 1
self.blue_agents.pop(len(self.myAgents))
self.myAgents[len(self.myAgents)] = ExampleAgent(len(self.myAgents), False)

# Generate new targets
if len(self.targets) == 0:
for i in range(self.targets_per_round):
self.targets.append(Point(self.x(), self.y()))
self.rounds -= 1

if robot_pos.dist_to(self.target) < self.min_dist:
self.target = Point(x=self.x(), y=self.y())
opponents = {id: robot for id, robot in self.frame.robots_yellow.items()}
for i in range(0, self.n_robots_blue):
if (i != robot.id):
opponents[i + self.n_robots_yellow] = self.frame.robots_blue[i]

obstacles = {id: robot for id, robot in self.frame.robots_blue.items()}
for i in range(0, self.n_robots_yellow):
obstacles[i + self.n_robots_blue] = self.frame.robots_yellow[i]
teammates = {id: self.frame.robots_blue[id] for id in self.myAgents.keys()}

remove_self = lambda robots, selfId: {id: robot for id, robot in robots.items() if id != selfId}

action = self.agent.step(robot, opponents, self.target)
myActions = []
for i in self.myAgents.keys():
action = self.myAgents[i].step(self.frame.robots_blue[i], remove_self(obstacles, i), teammates, self.targets)
myActions.append(action)

others_actions = []
for i in range(1, self.n_robots_blue):
if random.uniform(0.0, 1.0) < 0.003:
random_target = Point(x=self.x(), y=self.y())
else:
random_target = None
for i in self.blue_agents.keys():
random_target = []
if random.uniform(0.0, 1.0) < self.gen_target_prob:
random_target.append(Point(x=self.x(), y=self.y()))


others_actions.append(self.blue_agents[i].step(self.frame.robots_blue[i], obstacles, dict(), random_target, True))

others_actions.append(self.blue_agents[i].step(self.frame.robots_blue[i], opponents, random_target))
for i in self.yellow_agents.keys():
random_target = []
if random.uniform(0.0, 1.0) < self.gen_target_prob:
random_target.append(Point(x=self.x(), y=self.y()))

for i in range(0, self.n_robots_yellow):
if random.uniform(0.0, 1.0) < 0.003:
random_target = Point(x=self.x(), y=self.y())
else:
random_target = None
others_actions.append(self.yellow_agents[i].step(self.frame.robots_yellow[i], obstacles, dict(), random_target, True))

others_actions.append(self.yellow_agents[i].step(self.frame.robots_yellow[i], opponents, random_target))

return [action] + others_actions
return myActions + others_actions

def _calculate_reward_and_done(self):
if self.frame.ball.x > self.field.length / 2 \
Expand All @@ -117,7 +138,7 @@ def theta():

pos_frame.robots_blue[0] = Robot(x=self.x(), y=self.y(), theta=theta())

self.target = Point(x=self.x(), y=self.y())
self.targets = [Point(x=self.x(), y=self.y())]

places = KDTree()
places.insert((pos_frame.ball.x, pos_frame.ball.y))
Expand Down Expand Up @@ -151,21 +172,23 @@ def pos_transform(pos_x, pos_y):

super()._render()

self.draw_target(
self.window_surface,
pos_transform,
self.target,
(255, 0, 255),
)

if len(self.all_points) > 1:
my_path = [pos_transform(*p) for p in self.all_points[:-1]]
for target in self.targets:
self.draw_target(
self.window_surface,
pos_transform,
target,
(255, 0, 255),
)

if len(self.all_points) > 0:
my_path = [pos_transform(*p) for p in self.all_points]
for point in my_path:
pygame.draw.circle(self.window_surface, (255, 0, 0), point, 3)

if len(self.robot_path) > 1:
my_path = [pos_transform(*p) for p in self.robot_path]
pygame.draw.lines(self.window_surface, (255, 0, 0), False, my_path, 1)
for i in range(len(self.robots_paths)):
if len(self.robots_paths[i]) > 1:
my_path = [pos_transform(*p) for p in self.robots_paths[i]]
pygame.draw.lines(self.window_surface, (255, 0, 0), False, my_path, 1)

def draw_target(self, screen, transformer, point, color):
x, y = transformer(point.x, point.y)
Expand Down
48 changes: 48 additions & 0 deletions utils/FixedQueue.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
class FixedQueue:
def __init__(self, size):
self.size = size
self.queue = [None] * size
self.head = 0
self.tail = 0
self.count = 0

def push(self, item):
if self.count < self.size:
self.queue[self.tail] = item
self.tail = (self.tail + 1) % self.size
self.count += 1
else:
self.queue[self.tail] = item
self.tail = (self.tail + 1) % self.size
self.head = (self.head + 1) % self.size

def pop(self):
if self.count == 0:
return None
item = self.queue[self.head]
self.head = (self.head + 1) % self.size
self.count -= 1
return item

def __len__(self):
return self.count

def __getitem__(self, key):
if key < 0 or key >= self.count:
raise IndexError("Index out of range")
return self.queue[(self.head + key) % self.size]

def __iter__(self):
idx = self.head
num_items = self.count
for _ in range(num_items):
yield self.queue[idx]
idx = (idx + 1) % self.size

def __contains__(self, item):
idx = self.head
for _ in range(self.count):
if self.queue[idx] == item:
return True
idx = (idx + 1) % self.size
return False
25 changes: 21 additions & 4 deletions utils/ssl/base_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,33 @@ def __init__(self, id=0, yellow=False):
self.pos = Point(0, 0)
self.vel = Point(0, 0)
self.body_angle = float(0)
self.target = None
self.targets = []
self.yellow = yellow
self.opponents = dict()
self.teammates = dict()

self.next_vel = Point(0, 0)
self.angle_vel = float(0)

def step(self, self_robot : Robot, opponents: dict[int, Robot], target: Point = None) -> Robot:
def step(self, self_robot : Robot,
opponents: dict[int, Robot] = dict(),
teammates: dict[int, Robot] = dict(),
targets: list[Point] = [],
keep_targets=False) -> Robot:
inverter = -1

self.reset()
self.pos = Point(self_robot.x, self_robot.y * inverter)
self.vel = Point(self_robot.v_x, self_robot.v_y * inverter)
self.body_angle = self_robot.theta
if target is not None:
self.target = Point(target.x, target.y * inverter)

if len(targets) > 0:
self.targets = []
for target in targets:
self.targets.append(Point(target.x, target.y * inverter))
elif len(self.targets) == 0 or not keep_targets:
self.targets = []

self.robot = Robot(id=self_robot.id, yellow=self.yellow,
x=self.pos.x, y=self.pos.y,
v_x=self.vel.x, v_y=self.vel.y, theta=self.body_angle)
Expand All @@ -35,6 +46,12 @@ def step(self, self_robot : Robot, opponents: dict[int, Robot], target: Point =
x=robot.x, y=robot.y * inverter,
v_x=robot.v_x, v_y=robot.v_y * inverter,
theta=robot.theta)

for id, robot in teammates.items():
self.teammates[id] = Robot(id=robot.id, yellow=robot.yellow,
x=robot.x, y=robot.y * inverter,
v_x=robot.v_x, v_y=robot.v_y * inverter,
theta=robot.theta)

self.decision()
self.post_decision()
Expand Down

0 comments on commit ebf4478

Please sign in to comment.