diff --git a/.travis.yml b/.travis.yml index 9e2495959..007860e7f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,6 +11,7 @@ before_install: script: - docker-compose up -d robot + - docker-compose up -d task_allocator - docker-compose up --exit-code-from task_allocation_test after_script: - docker stop $(docker ps -aq) diff --git a/README.md b/README.md index fa7c9f67c..ad1675607 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ docker-compose build task_allocation_test docker-compose up -d robot +docker-compose up -d task_allocator + docker-compose up task_allocation_test @@ -60,8 +62,6 @@ Install the repositories - [ropod_common](https://github.com/ropod-project/ropod_common) -- [mrta_datasets](https://github.com/anenriquez/mrta_datasets.git ) - Get the requirements: ``` @@ -71,18 +71,24 @@ pip3 install -r requirements.txt Add the task_allocation to your `PYTHONPATH` by running: ``` -sudo pip3 install -e . +pip3 install --user -e . ``` -Go to `/allocation` and run in a terminal +Go to `/mrs` and run in a terminal ``` python3 robot.py ropod_001 ``` +Run in another terminal + +``` +python3 task_allocator.py +``` + Go to `/tests` and run test in another terminal ``` -python3 task_allocator.py three_tasks.csv +python3 allocation_test.py ``` ## References diff --git a/config/config.yaml b/config/config.yaml index c984d9596..c815fece5 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -2,6 +2,9 @@ version: 2 ccu_store: db_name: ropod_ccu_store port: 27017 +robot_store: + db_name: ropod_store + port: 27017 resources: fleet: - ropod_001 @@ -10,19 +13,27 @@ resources: plugins: task_allocation: - bidding_rule: - robustness: srea - temporal: completion_time + # Assuming we can have different types of tasks, + # specify from where to import the Task and TaskStatus classes + task_type: + class: 'mrs.structs.task' allocation_method: tessi - round_time: 15 # seconds robot_proxies: true - auctioneer: fms_zyre_api # This is completely Zyre dependent + stp_solver: srea + auctioneer: + round_time: 15 # seconds alternative_timeslots: True - task_type: generic_task - corrective_measure: re-allocate + dispatcher: freeze_window: 300 # seconds robot_proxy: + bidder: + bidding_rule: + robustness: srea # has to be the same as the stp_solver + temporal: completion_time + auctioneer_name: fms_zyre_api # This is completely Zyre dependent + schedule_monitor: + corrective_measure: re-allocate api: version: 0.1.0 middleware: @@ -37,6 +48,9 @@ robot_proxy: message_types: # Types of messages the node will listen to. Messages not listed will be ignored - TASK-ANNOUNCEMENT - ALLOCATION + - TIMETABLE + - DELETE-TASK + debug_msgs: false acknowledge: false publish: task: @@ -56,43 +70,73 @@ robot_proxy: component: 'bidder.task_announcement_cb' - msg_type: 'ALLOCATION' component: 'bidder.allocation_cb' - + - msg_type: 'TIMETABLE' + component: '.timetable_cb' + - msg_type: 'DELETE-TASK' + component: '.delete_task_cb' api: version: 0.1.0 middleware: - zyre + #- rest + #- ros zyre: zyre_node: node_name: fms_zyre_api interface: null groups: + - ROPOD - TASK-ALLOCATION message_types: # Types of messages the node will listen to. Messages not listed will be ignored - - TASK + - TASK-PROGRESS - BID - FINISH-ROUND + - ALLOCATE-TASK acknowledge: false + debug_messages: + - 'TASK-REQUEST' publish: task-announcement: + msg_type: 'TASK-ANNOUNCEMENT' groups: ['TASK-ALLOCATION'] method: shout allocation: + msg_type: 'ALLOCATION' groups: ['TASK-ALLOCATION'] method: shout callbacks: - - msg_type: 'TASK' - component: '.task_cb' + - msg_type: 'ALLOCATE-TASK' + component: 'auctioneer.allocate_task_cb' - msg_type: 'BID' - component: '.bid_cb' + component: 'auctioneer.bid_cb' - msg_type: 'FINISH-ROUND' - component: '.finish_round_cb' + component: 'auctioneer.finish_round_cb' + rest: + server: + ip: 127.0.0.1 + port: 8081 + routes: + - path: '/number' + resource: + module: 'fleet_management.api.rest.resources' + class: 'RandomGenerator' + ros: + publishers: + - topic: '/fms/task' + msg_type: Task + msg_module: ropod_ros_msgs.msg + subscribers: + - topic: '/fms/task_request' + msg_type: TaskRequest + msg_module: ropod_ros_msgs.msg + callback: task_cb logger: version: 1 formatters: default: - format: '[%(levelname)-5.5s] %(asctime)s [%(name)-25.25s] %(message)s' + format: '[%(levelname)-5.5s] %(asctime)s [%(name)-35.35s] %(message)s' datefmt: '%Y-%m-%d %H:%M:%S' handlers: console: @@ -128,6 +172,8 @@ logger: handlers: [console] fms: level: DEBUG + mrs: + level: DEBUG root: level: DEBUG - handlers: [console, file] + handlers: [console, file] \ No newline at end of file diff --git a/config/logging.yaml b/config/logging.yaml deleted file mode 100644 index 83d468186..000000000 --- a/config/logging.yaml +++ /dev/null @@ -1,18 +0,0 @@ -version: 1 -formatters: - default: - format: '[%(levelname)-5.5s] %(asctime)s [%(name)-25.25s] %(message)s' - datefmt: '%Y-%m-%d %H:%M:%S' - -handlers: - console: - class: logging.StreamHandler - level: DEBUG - formatter: default - stream: ext://sys.stdout -loggers: - allocation: - level: DEBUG -root: - level: DEBUG - handlers: [console] diff --git a/docker-compose.yml b/docker-compose.yml index ed531fe8f..992ab65fc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,16 +10,26 @@ services: tty: true stdin_open: true + task_allocator: + container_name: task-allocator + image: ropod-mrs + working_dir: /mrta/mrs/ + command: ["python3", "task_allocator.py"] + network_mode: "host" + tty: true + stdin_open: true + task_allocation_test: build: . container_name: task-mrs-test image: ropod-mrs working_dir: /mrta/tests/ - command: ["python3", "fms_integration_test.py"] + command: ["python3", "allocation_test.py"] network_mode: "host" tty: true stdin_open: true depends_on: - robot + - task_allocator diff --git a/mrs/config/task_factory.py b/mrs/config/task_factory.py index 52fb44025..09edac075 100644 --- a/mrs/config/task_factory.py +++ b/mrs/config/task_factory.py @@ -20,7 +20,7 @@ def initialize(self): ropod_task_cls = getattr(import_module('ropod.structs.task'), 'Task') self.register_task_cls('ropod_task', ropod_task_cls) - generic_task_cls = getattr(import_module('dataset_lib.task'), 'Task') + generic_task_cls = getattr(import_module('mrs.structs.task'), 'Task') self.register_task_cls('generic_task', generic_task_cls) task_request_cls = getattr(import_module('ropod.structs.task'), 'TaskRequest') diff --git a/mrs/db_interface.py b/mrs/db_interface.py new file mode 100644 index 000000000..b1d8051ab --- /dev/null +++ b/mrs/db_interface.py @@ -0,0 +1,87 @@ +from mrs.structs.timetable import Timetable + + +class DBInterface(object): + def __init__(self, store): + self.store = store + + def add_task(self, task): + """Saves the given task to a database as a new document under the "tasks" collection. + """ + collection = self.store.db['tasks'] + dict_task = task.to_dict() + self.store.unique_insert(collection, dict_task, 'id', task.id) + + def update_task(self, task): + """ Updates the given task under the "tasks" collection + """ + collection = self.store.db['tasks'] + task_dict = task.to_dict() + + found_dict = collection.find_one({'id': task_dict['id']}) + + if found_dict is None: + collection.insert(task_dict) + else: + collection.replace_one({'id': task.id}, task_dict) + + def get_tasks(self): + """ Returns a dictionary with the tasks in the "tasks" collection + + """ + collection = self.store.db['tasks'] + tasks_dict = dict() + for task in collection.find(): + tasks_dict[task['id']] = task + return tasks_dict + + def remove_task(self, task_id): + """ Removes task with task_id from the collection "tasks" + """ + collection = self.store.db['tasks'] + collection.delete_one({'id': task_id}) + + def update_task_status(self, task, status): + task.status.status = status + self.update_task(task) + + def get_task(self, task_id): + """Returns a task dictionary representing the task with the given id. + """ + collection = self.store.db['tasks'] + task_dict = collection.find_one({'id': task_id}) + return task_dict + + def add_timetable(self, timetable): + """ + Saves the given timetable under the "timetables" collection + """ + collection = self.store.db['timetables'] + robot_id = timetable.robot_id + timetable_dict = timetable.to_dict() + + self.store.unique_insert(collection, timetable_dict, 'robot_id', robot_id) + + def update_timetable(self, timetable): + """ Updates the given timetable under the "timetables" collection + """ + collection = self.store.db['timetables'] + timetable_dict = timetable.to_dict() + robot_id = timetable.robot_id + + found_dict = collection.find_one({'robot_id': robot_id}) + + if found_dict is None: + collection.insert(timetable_dict) + else: + collection.replace_one({'robot_id': robot_id}, timetable_dict) + + def get_timetable(self, robot_id, stp): + collection = self.store.db['timetables'] + timetable_dict = collection.find_one({'robot_id': robot_id}) + + if timetable_dict is None: + return + timetable = Timetable.from_dict(timetable_dict, stp) + return timetable + diff --git a/mrs/robot.py b/mrs/robot.py index fcb836776..69df67b35 100644 --- a/mrs/robot.py +++ b/mrs/robot.py @@ -1,58 +1,68 @@ import argparse -import time import logging +import time + +from mrs.robot_base import RobotBase +from mrs.structs.timetable import Timetable +from mrs.task_allocation.bidder import Bidder +from mrs.task_execution.schedule_monitor import ScheduleMonitor + + +class Robot(RobotBase): + def __init__(self, robot_config, bidder_config, **kwargs): + super().__init__(**robot_config) -""" Includes: - - bidder - - dispatcher - - monitor -""" + self.bidder = Bidder(robot_config, bidder_config) + schedule_monitor_config = kwargs.get("schedule_monitor_config") + if schedule_monitor_config: + self.schedule_monitor = ScheduleMonitor(robot_config, schedule_monitor_config) -class Robot(object): + self.logger = logging.getLogger('mrs.robot.%s' % self.id) + self.logger.info("Robot %s initialized", self.id) - def __init__(self, api, bidder, **kwargs): - self.api = api - self.bidder = bidder - self.dispatcher = kwargs.get('dispatcher') + def timetable_cb(self, msg): + robot_id = msg['payload']['timetable']['robot_id'] + if robot_id == self.id: + timetable_dict = msg['payload']['timetable'] + self.logger.debug("Robot %s received timetable msg", self.id) + timetable = Timetable.from_dict(timetable_dict, self.stp) + self.db_interface.update_timetable(timetable) - def start_components(self): - self.bidder.api.start() - if self.dispatcher is not None: - self.dispatcher.api.start() + def delete_task_cb(self, msg): + task_dict = msg['payload']['task'] + task = self.task_cls.from_dict(task_dict) + self.logger.debug("Deleting task %s ", task.id) + self.db_interface.remove_task(task.id) def run(self): try: - self.start_components() + self.api.start() while True: - self.bidder.api.run() - if self.dispatcher is not None: - self.dispatcher.run() time.sleep(0.5) except (KeyboardInterrupt, SystemExit): - logging.info("Terminating %s proxy ...", robot_id) - self.bidder.api.shutdown() - logging.info("Exiting...") + self.logger.info("Terminating %s robot ...", self.id) + self.api.shutdown() + self.logger.info("Exiting...") if __name__ == '__main__': - from fleet_management.config.loader import Config, register_api_callbacks + from fleet_management.config.loader import Config config_file_path = '../config/config.yaml' config = Config(config_file_path, initialize=False) config.configure_logger() - ccu_store = config.configure_ccu_store() parser = argparse.ArgumentParser() parser.add_argument('robot_id', type=str, help='example: ropod_001') args = parser.parse_args() robot_id = args.robot_id - robot = config.configure_robot_proxy(robot_id, ccu_store, dispatcher=True) + robot = config.configure_robot_proxy(robot_id) - register_api_callbacks(robot, robot.api) + robot.api.register_callbacks(robot) robot.run() diff --git a/mrs/robot_base.py b/mrs/robot_base.py new file mode 100644 index 000000000..b8c7aa7ff --- /dev/null +++ b/mrs/robot_base.py @@ -0,0 +1,22 @@ +from importlib import import_module + +from stn.stp import STP + +from mrs.db_interface import DBInterface +from mrs.structs.timetable import Timetable + + +class RobotBase(object): + def __init__(self, robot_id, api, robot_store, stp_solver, task_type): + + self.id = robot_id + self.api = api + self.db_interface = DBInterface(robot_store) + self.stp = STP(stp_solver) + task_class_path = task_type.get('class', 'mrs.structs.task') + self.task_cls = getattr(import_module(task_class_path), 'Task') + + self.timetable = Timetable.get_timetable(self.db_interface, self.id, self.stp) + self.db_interface.update_timetable(self.timetable) + + diff --git a/mrs/task_execution/dispatching/__init__.py b/mrs/structs/__init__.py similarity index 100% rename from mrs/task_execution/dispatching/__init__.py rename to mrs/structs/__init__.py diff --git a/mrs/structs/allocation.py b/mrs/structs/allocation.py new file mode 100644 index 000000000..cec440c8c --- /dev/null +++ b/mrs/structs/allocation.py @@ -0,0 +1,47 @@ +from ropod.utils.uuid import generate_uuid + + +class TaskAnnouncement(object): + def __init__(self, tasks=[], round_id=''): + """ + Struct with a list of tasks + :param tasks: list of tasks + """ + self.tasks = tasks + if not round_id: + self.round_id = generate_uuid() + else: + self.round_id = round_id + + def to_dict(self): + task_annoucement_dict = dict() + task_annoucement_dict['tasks'] = dict() + + for task in self.tasks: + task_annoucement_dict['tasks'][task.id] = task.to_dict() + + task_annoucement_dict['round_id'] = self.round_id + + return task_annoucement_dict + + +class Allocation(object): + def __init__(self, task_id, robot_id): + self.task_id = task_id + self.robot_id = robot_id + + def to_dict(self): + allocation_dict = dict() + allocation_dict['task_id'] = self.task_id + allocation_dict['robot_id'] = self.robot_id + return allocation_dict + + +class FinishRound(object): + def __init__(self, robot_id): + self.robot_id = robot_id + + def to_dict(self): + finish_round = dict() + finish_round['robot_id'] = self.robot_id + return finish_round diff --git a/mrs/structs/bid.py b/mrs/structs/bid.py new file mode 100644 index 000000000..1080fff5d --- /dev/null +++ b/mrs/structs/bid.py @@ -0,0 +1,69 @@ +from mrs.structs.task import Task + + +class Bid(object): + def __init__(self, bidding_rule=None, robot_id='', round_id='', task=Task(), timetable=None, + **kwargs): + self.bidding_rule = bidding_rule + self.robot_id = robot_id + self.round_id = round_id + self.task = task + self.timetable = timetable + self.cost = kwargs.get('cost', float('inf')) + self.position = kwargs.get('position', 0) + self.hard_constraints = kwargs.get('hard_constraints', True) + self.alternative_start_time = None + + def __repr__(self): + return str(self.to_dict()) + + def __lt__(self, other): + if other is None: + return False + return self.cost < other.cost + + def __eq__(self, other): + if other is None: + return False + return self.cost == other.cost + + def compute_cost(self, position): + dispatchable_graph = self.timetable.dispatchable_graph + robustness_metric = self.timetable.robustness_metric + self.position = position + + if self.task.hard_constraints: + self.cost = self.bidding_rule.compute_bid_cost(dispatchable_graph, robustness_metric) + + else: # soft constraints + navigation_start_time = dispatchable_graph.get_task_navigation_start_time(self.task.id) + self.cost = abs(navigation_start_time - self.task.earliest_start_time) + alternative_start_time = navigation_start_time + self.hard_constraints = False + self.alternative_start_time = alternative_start_time + + def to_dict(self): + bid_dict = dict() + bid_dict['cost'] = self.cost + bid_dict['robot_id'] = self.robot_id + bid_dict['task_id'] = self.task.id + bid_dict['position'] = self.position + bid_dict['round_id'] = self.round_id + bid_dict['hard_constraints'] = self.hard_constraints + bid_dict['alternative_start_time'] = self.alternative_start_time + return bid_dict + + @classmethod + def from_dict(cls, bid_dict): + bid = cls() + bid.cost = bid_dict['cost'] + bid.robot_id = bid_dict['robot_id'] + bid.task_id = bid_dict['task_id'] + bid.position = bid_dict['position'] + bid.round_id = bid_dict['round_id'] + bid.hard_constraints = bid_dict['hard_constraints'] + bid.alternative_start_time = bid_dict['alternative_start_time'] + return bid + + + diff --git a/mrs/structs/status.py b/mrs/structs/status.py new file mode 100644 index 000000000..9a3cae084 --- /dev/null +++ b/mrs/structs/status.py @@ -0,0 +1,40 @@ +class TaskStatus(object): + UNALLOCATED = 1 + ALLOCATED = 2 + SCHEDULED = 3 # Task is ready to be dispatched + SHIPPED = 4 # The task was sent to the robot + ONGOING = 5 + COMPLETED = 6 + ABORTED = 7 # Aborted by the system, not by the user + FAILED = 8 # Execution failed + CANCELED = 9 # Canceled before execution starts + PREEMPTED = 10 # Canceled during execution + + def __init__(self, task_id=''): + self.task_id = task_id + self.status = self.UNALLOCATED + self.delayed = False + + def to_dict(self): + task_dict = dict() + task_dict['task_id'] = self.task_id + task_dict['status'] = self.status + task_dict['delayed'] = self.delayed + return task_dict + + @staticmethod + def from_dict(status_dict): + task_id = status_dict['task_id'] + status = TaskStatus(task_id) + status.task_id = task_id + status.status = status_dict['status'] + status.delayed = status_dict['delayed'] + return status + + @staticmethod + def to_csv(status_dict): + """ Prepares dict to be written to a csv + :return: dict + """ + # The dictionary is already flat and ready to be exported + return status_dict diff --git a/mrs/structs/task.py b/mrs/structs/task.py new file mode 100644 index 000000000..fb5ed0b21 --- /dev/null +++ b/mrs/structs/task.py @@ -0,0 +1,66 @@ +from ropod.utils.uuid import generate_uuid +from mrs.utils.datasets import flatten_dict +from mrs.structs.status import TaskStatus + + +class Task(object): + + def __init__(self, id='', + earliest_start_time=-1, + latest_start_time=-1, + start_pose_name='', + finish_pose_name='', + hard_constraints=True, + estimated_duration=-1): + + if not id: + self.id = generate_uuid() + else: + self.id = id + + self.earliest_start_time = earliest_start_time + self.latest_start_time = latest_start_time + self.start_pose_name = start_pose_name + self.finish_pose_name = finish_pose_name + self.hard_constraints = hard_constraints + + # Used by the dataset generator + self.estimated_duration = estimated_duration + self.status = TaskStatus(id) + + def to_dict(self): + task_dict = dict() + task_dict['id'] = self.id + task_dict['earliest_start_time'] = self.earliest_start_time + task_dict['latest_start_time'] = self.latest_start_time + task_dict['start_pose_name'] = self.start_pose_name + task_dict['finish_pose_name'] = self.finish_pose_name + task_dict['hard_constraints'] = self.hard_constraints + task_dict['estimated_duration'] = self.estimated_duration + task_dict['status'] = self.status.to_dict() + return task_dict + + @staticmethod + def from_dict(task_dict): + task = Task() + task.id = task_dict['id'] + task.earliest_start_time = task_dict['earliest_start_time'] + task.latest_start_time = task_dict['latest_start_time'] + task.start_pose_name = task_dict['start_pose_name'] + task.finish_pose_name = task_dict['finish_pose_name'] + task.hard_constraints = task_dict['hard_constraints'] + task.estimated_duration = task_dict['estimated_duration'] + task.status = TaskStatus.from_dict(task_dict['status']) + return task + + @staticmethod + def to_csv(task_dict): + """ Prepares dict to be written to a csv + :return: dict + """ + to_csv_dict = flatten_dict(task_dict) + + return to_csv_dict + + + diff --git a/mrs/timetable.py b/mrs/structs/timetable.py similarity index 95% rename from mrs/timetable.py rename to mrs/structs/timetable.py index aad187036..3ead38295 100644 --- a/mrs/timetable.py +++ b/mrs/structs/timetable.py @@ -25,14 +25,6 @@ def __init__(self, stp, robot_id): self.dispatchable_graph = stp.get_stn() self.schedule = stp.get_stn() - @staticmethod - def get_timetable(ccu_store, id, stp): - timetable_dict = ccu_store.get_timetable(id) - if timetable_dict is None: - return - timetable = Timetable.from_dict(timetable_dict, stp) - return timetable - def solve_stp(self): """ Computes the dispatchable graph and robustness metric from the given stn @@ -131,6 +123,12 @@ def from_dict(timetable_dict, stp): return timetable + @staticmethod + def get_timetable(db_interface, robot_id, stp): + timetable = db_interface.get_timetable(robot_id, stp) + if timetable is None: + timetable = Timetable(stp, robot_id) + return timetable diff --git a/mrs/task_allocation/auctioneer.py b/mrs/task_allocation/auctioneer.py index 711adee14..798ea4aa4 100644 --- a/mrs/task_allocation/auctioneer.py +++ b/mrs/task_allocation/auctioneer.py @@ -1,14 +1,17 @@ -import uuid -import time import logging -import logging.config +import time from datetime import timedelta -from mrs.task_allocation.round import Round -from mrs.timetable import Timetable +from importlib import import_module + from stn.stp import STP -from mrs.exceptions.task_allocation import NoAllocation + +from mrs.db_interface import DBInterface from mrs.exceptions.task_allocation import AlternativeTimeSlot -from dataset_lib.task import TaskStatus +from mrs.exceptions.task_allocation import NoAllocation +from mrs.structs.allocation import TaskAnnouncement, Allocation +from mrs.structs.task import TaskStatus +from mrs.structs.timetable import Timetable +from mrs.task_allocation.round import Round """ Implements a variation of the the TeSSI algorithm using the bidding_rule specified in the config file @@ -17,27 +20,30 @@ class Auctioneer(object): - def __init__(self, robot_ids, ccu_store, api, stp_solver, task_cls, allocation_method, round_time=5, - **kwargs): + def __init__(self, robot_ids, ccu_store, api, stp_solver, + task_type, allocation_method, round_time=5, **kwargs): - logging.debug("Starting Auctioneer") + self.logger = logging.getLogger("mrs.auctioneer") self.robot_ids = robot_ids - self.ccu_store = ccu_store + self.db_interface = DBInterface(ccu_store) self.api = api + self.stp = STP(stp_solver) + self.allocation_method = allocation_method self.round_time = timedelta(seconds=round_time) self.alternative_timeslots = kwargs.get('alternative_timeslots', False) - self.stp = STP(stp_solver) - self.task_cls = task_cls + task_class_path = task_type.get('class', 'mrs.structs.task') + self.task_cls = getattr(import_module(task_class_path), 'Task') + self.logger.debug("Auctioneer started") # TODO: Inititalize the timetables in the loader? and read the timetables here self.timetables = dict() for robot_id in robot_ids: - timetable = Timetable(self.stp, robot_id) + timetable = Timetable.get_timetable(self.db_interface, robot_id, self.stp) + self.db_interface.add_timetable(timetable) self.timetables[robot_id] = timetable - self.ccu_store.add_timetable(timetable) self.tasks_to_allocate = dict() self.allocations = list() @@ -50,6 +56,13 @@ def __str__(self): to_print += "Groups {}".format(self.api.interfaces[0].groups()) return to_print + def check_db(self): + tasks_dict = self.db_interface.get_tasks() + for task_id, task_dict in tasks_dict.items(): + task = self.task_cls.from_dict(task_dict) + if task.status == TaskStatus.UNALLOCATED: + self.add_task(task) + def run(self): if self.tasks_to_allocate and self.round.finished: self.announce_task() @@ -63,7 +76,7 @@ def run(self): self.announce_winner(allocated_task, robot_id) except NoAllocation as exception: - logging.exception("No mrs made in round %s ", exception.round_id) + self.logger.exception("No mrs made in round %s ", exception.round_id) self.round.finish() except AlternativeTimeSlot as exception: @@ -78,22 +91,17 @@ def process_allocation(self, round_result): self.allocations.append(allocation) self.tasks_to_allocate = tasks_to_allocate - logging.debug("Allocation: %s", allocation) - logging.debug("Tasks to allocate %s", self.tasks_to_allocate) + self.logger.debug("Allocation: %s", allocation) + self.logger.debug("Tasks to allocate %s", self.tasks_to_allocate) - self.update_task_status(task, TaskStatus.ALLOCATED) + self.logger.debug("Updating task status to ALLOCATED") + self.db_interface.update_task_status(task, TaskStatus.ALLOCATED) self.update_timetable(robot_id, task, position) return allocation - def update_task_status(self, task, status): - task.status.status = status - logging.debug("Updating task status to %s", task.status.status) - self.ccu_store.update_task(task) - logging.debug('Tasks saved') - def update_timetable(self, robot_id, task, position): - timetable = Timetable.get_timetable(self.ccu_store, robot_id, self.stp) + timetable = self.db_interface.get_timetable(robot_id, self.stp) timetable.add_task_to_stn(task, position) timetable.solve_stp() @@ -103,16 +111,16 @@ def update_timetable(self, robot_id, task, position): pass self.timetables.update({robot_id: timetable}) - self.ccu_store.update_timetable(timetable) + self.db_interface.update_timetable(timetable) - logging.debug("STN robot %s: %s", robot_id, timetable.stn) - logging.debug("Dispatchable graph robot %s: %s", robot_id, timetable.dispatchable_graph) + self.logger.debug("STN robot %s: %s", robot_id, timetable.stn) + self.logger.debug("Dispatchable graph robot %s: %s", robot_id, timetable.dispatchable_graph) def process_alternative_allocation(self, exception): task_id = exception.task_id robot_id = exception.robot_id alternative_start_time = exception.alternative_start_time - logging.exception("Alternative timeslot for task %s: robot %s, alternative start time: %s ", task_id, robot_id, + self.logger.exception("Alternative timeslot for task %s: robot %s, alternative start time: %s ", task_id, robot_id, alternative_start_time) alternative_allocation = (task_id, [robot_id], alternative_start_time) @@ -120,16 +128,16 @@ def process_alternative_allocation(self, exception): def add_task(self, task): self.tasks_to_allocate[task.id] = task - self.ccu_store.add_task(task) + self.db_interface.update_task(task) def allocate(self, tasks): if isinstance(tasks, list): for task in tasks: self.add_task(task) - logging.debug('Auctioneer received a list of tasks') + self.logger.debug('Auctioneer received a list of tasks') else: self.add_task(tasks) - logging.debug('Auctioneer received one task') + self.logger.debug('Auctioneer received one task') def announce_task(self): @@ -140,57 +148,42 @@ def announce_task(self): self.round = Round(**round_) - logging.info("Starting round: %s", self.round.id) - logging.info("Number of tasks to allocate: %s", len(self.tasks_to_allocate)) + self.logger.info("Starting round: %s", self.round.id) + self.logger.info("Number of tasks to allocate: %s", len(self.tasks_to_allocate)) - # Create task announcement message that contains all unallocated tasks - task_announcement = dict() - task_announcement['header'] = dict() - task_announcement['payload'] = dict() - task_announcement['header']['type'] = 'TASK-ANNOUNCEMENT' - task_announcement['header']['metamodel'] = 'ropod-msg-schema.json' - task_announcement['header']['msgId'] = str(uuid.uuid4()) - task_announcement['header']['timestamp'] = int(round(time.time()) * 1000) - task_announcement['payload']['metamodel'] = 'ropod-task-announcement-schema.json' - task_announcement['payload']['round_id'] = self.round.id - task_announcement['payload']['tasks'] = dict() + tasks = list(self.tasks_to_allocate.values()) + task_annoucement = TaskAnnouncement(tasks, self.round.id) + msg = self.api.create_message(task_annoucement) - for task_id, task in self.tasks_to_allocate.items(): - task_announcement['payload']['tasks'][task.id] = task.to_dict() + self.logger.debug('task annoucement msg: %s', msg) - logging.debug("Auctioneer announces tasks %s", [task_id for task_id, task in self.tasks_to_allocate.items()]) + self.logger.debug("Auctioneer announces tasks %s", [task_id for task_id, task in self.tasks_to_allocate.items()]) self.round.start() - self.api.publish(task_announcement, groups=['TASK-ALLOCATION']) + self.api.publish(msg, groups=['TASK-ALLOCATION']) - def task_cb(self, msg): + def send_timetable(self, robot_id): + timetable = Timetable.get_timetable(self.db_interface, robot_id, self.stp) + msg = self.api.create_message(timetable) + self.api.publish(msg) + + def allocate_task_cb(self, msg): + self.logger.debug("Task received") task_dict = msg['payload']['task'] task = self.task_cls.from_dict(task_dict) self.add_task(task) def bid_cb(self, msg): - bid = msg['payload']['bid'] + bid = msg['payload'] self.round.process_bid(bid) def finish_round_cb(self, msg): self.round.finish() def announce_winner(self, task_id, robot_id): - - allocation = dict() - allocation['header'] = dict() - allocation['payload'] = dict() - allocation['header']['type'] = 'ALLOCATION' - allocation['header']['metamodel'] = 'ropod-msg-schema.json' - allocation['header']['msgId'] = str(uuid.uuid4()) - allocation['header']['timestamp'] = int(round(time.time()) * 1000) - - allocation['payload']['metamodel'] = 'ropod-mrs-schema.json' - allocation['payload']['task_id'] = task_id - allocation['payload']['winner_id'] = robot_id - - logging.debug("Accouncing winner...") - self.api.publish(allocation, groups=['TASK-ALLOCATION']) + allocation = Allocation(task_id, robot_id) + msg = self.api.create_message(allocation) + self.api.publish(msg, groups=['TASK-ALLOCATION']) def get_task_schedule(self, task_id, robot_id): # For now, returning the start navigation time from the dispatchable graph @@ -201,7 +194,7 @@ def get_task_schedule(self, task_id, robot_id): start_time = timetable.dispatchable_graph.get_task_navigation_start_time(task_id) - logging.debug("Start time of task %s: %s", task_id, start_time) + self.logger.debug("Start time of task %s: %s", task_id, start_time) task_schedule['start_time'] = start_time task_schedule['finish_time'] = -1 # This info is not available here. @@ -211,23 +204,22 @@ def get_task_schedule(self, task_id, robot_id): if __name__ == '__main__': - from fleet_management.config.loader import Config, register_api_callbacks + from fleet_management.config.loader import Config config_file_path = '../../config/config.yaml' config = Config(config_file_path, initialize=True) - auctioneer = config.configure_task_allocator(config.ccu_store) + auctioneer = config.configure_auctioneer(config.ccu_store) time.sleep(5) - register_api_callbacks(auctioneer, auctioneer.api) - - auctioneer.api.start() + auctioneer.api.register_callbacks(auctioneer) try: + auctioneer.api.start() while True: - auctioneer.api.run() auctioneer.run() + auctioneer.api.run() time.sleep(0.5) except (KeyboardInterrupt, SystemExit): - logging.info("Terminating %s auctioneer ...") + print("Terminating auctioneer ...") auctioneer.api.shutdown() - logging.info("Exiting...") + print("Exiting...") diff --git a/mrs/task_allocation/bid.py b/mrs/task_allocation/bid.py deleted file mode 100644 index bac24fe39..000000000 --- a/mrs/task_allocation/bid.py +++ /dev/null @@ -1,103 +0,0 @@ -import logging -from mrs.utils.uuid import generate_uuid - - -class Bid(object): - - def __init__(self, bidding_rule=None, robot_id='', round_id='', task=None, timetable=None, cost=float('inf')): - self.bidding_rule = bidding_rule - self.robot_id = robot_id - self.round_id = round_id - self.task = task - self.cost = cost - self.timetable = timetable - if not task: - task_id = generate_uuid() - else: - task_id = task.id - self.msg = BidMsg(cost, robot_id, task_id) - - def __repr__(self): - return str(self.msg.to_dict()) - - def __lt__(self, other): - if other is None: - return False - return self.cost < other.cost - - def __eq__(self, other): - if other is None: - return False - return self.cost == other.cost - - def compute_cost(self, position): - dispatchable_graph = self.timetable.dispatchable_graph - robustness_metric = self.timetable.robustness_metric - - if self.task.hard_constraints: - self.cost = self.bidding_rule.compute_bid_cost(dispatchable_graph, robustness_metric) - self.msg = BidMsg(self.cost, self.robot_id, self.task.id, position, self.round_id) - - else: # soft constraints - navigation_start_time = dispatchable_graph.get_task_navigation_start_time(self.task.id) - logging.debug("Navigation start time: %s", navigation_start_time) - self.cost = abs(navigation_start_time - self.task.earliest_start_time) - alternative_start_time = navigation_start_time - self.msg = BidMsg(self.cost, self.robot_id, self.task.id, position, self.round_id, - hard_constraints=False, alternative_start_time=alternative_start_time) - - logging.debug("Cost: %s", self.cost) - - -class BidMsg(object): - def __init__(self, cost=float('inf'), robot_id='', task_id='', position=0, - round_id='', **kwargs): - self.cost = cost - self.robot_id = robot_id - self.task_id = task_id - self.position = position - self.round_id = round_id - - self.hard_constraints = kwargs.get('hard_constraints', True) - self.alternative_start_time = kwargs.get('alternative_start_time') - - def __repr__(self): - return str(self.to_dict()) - - def __lt__(self, other): - if other is None: - return False - return self.cost < other.cost - - def __eq__(self, other): - if other is None: - return False - return self.cost == other.cost - - def to_dict(self): - bid_msg_dict = dict() - bid_msg_dict['cost'] = self.cost - bid_msg_dict['robot_id'] = self.robot_id - bid_msg_dict['task_id'] = self.task_id - bid_msg_dict['position'] = self.position - bid_msg_dict['round_id'] = self.round_id - bid_msg_dict['hard_constraints'] = self.hard_constraints - bid_msg_dict['alternative_start_time'] = self.alternative_start_time - return bid_msg_dict - - @classmethod - def from_dict(cls, bid_msg_dict): - bid_msg = cls() - bid_msg.cost = bid_msg_dict['cost'] - bid_msg.robot_id = bid_msg_dict['robot_id'] - bid_msg.task_id = bid_msg_dict['task_id'] - bid_msg.position = bid_msg_dict['position'] - bid_msg.round_id = bid_msg_dict['round_id'] - bid_msg.hard_constraints = bid_msg_dict['hard_constraints'] - bid_msg.alternative_start_time = bid_msg_dict['alternative_start_time'] - return bid_msg - - - - - diff --git a/mrs/task_allocation/bidder.py b/mrs/task_allocation/bidder.py index 5235ff828..a7bf4493c 100644 --- a/mrs/task_allocation/bidder.py +++ b/mrs/task_allocation/bidder.py @@ -1,62 +1,43 @@ import copy -import uuid -import time -import logging.config +import logging -from stn.stp import STP -from mrs.task_allocation.bid import Bid -from mrs.task_allocation.bidding_rule import BiddingRule -from mrs.timetable import Timetable from mrs.exceptions.task_allocation import NoSTPSolution +from mrs.structs.allocation import FinishRound +from mrs.structs.bid import Bid +from mrs.structs.task import TaskStatus +from mrs.task_allocation.bidding_rule import BiddingRule +from mrs.robot_base import RobotBase """ Implements a variation of the the TeSSI algorithm using the bidding_rule specified in the config file """ -class Bidder(object): - - def __init__(self, robot_id, ccu_store, api, task_cls, bidding_rule, allocation_method, auctioneer, **kwargs): +class Bidder(RobotBase): - self.id = robot_id - self.ccu_store = ccu_store + def __init__(self, robot_config, bidder_config): + super().__init__(**robot_config) + self.logger = logging.getLogger('mrs.bidder.%s' % self.id) - self.api = api - - robustness = bidding_rule.get('robustness') - temporal = bidding_rule.get('temporal') + robustness = bidder_config.get('bidding_rule').get('robustness') + temporal = bidder_config.get('bidding_rule').get('temporal') self.bidding_rule = BiddingRule(robustness, temporal) - - self.task_cls = task_cls - self.allocation_method = allocation_method - self.auctioneer = auctioneer - - self.logger = logging.getLogger('mrs.robot.%s' % self.id) - self.logger.debug("Starting robot %s", self.id) - - self.stp = STP(robustness) - self.timetable = Timetable.get_timetable(self.ccu_store, self.id, self.stp) - + self.auctioneer_name = bidder_config.get("auctioneer_name") self.bid_placed = Bid() - def __str__(self): - to_print = "" - to_print += "Robot {}".format(self.id) - to_print += '\n' - to_print += "Groups {}".format(self.api.zyre.groups()) - return to_print + self.logger.debug("Bidder initialized %s", self.id) def task_announcement_cb(self, msg): self.logger.debug("Robot %s received TASK-ANNOUNCEMENT", self.id) round_id = msg['payload']['round_id'] received_tasks = msg['payload']['tasks'] - self.timetable = Timetable.get_timetable(self.ccu_store, self.id, self.stp) + self.timetable = self.db_interface.get_timetable(self.id, self.stp) self.compute_bids(received_tasks, round_id) def allocation_cb(self, msg): self.logger.debug("Robot %s received ALLOCATION", self.id) task_id = msg['payload']['task_id'] - winner_id = msg['payload']['winner_id'] + winner_id = msg['payload']['robot_id'] if winner_id == self.id: self.allocate_to_robot(task_id) @@ -68,6 +49,7 @@ def compute_bids(self, received_tasks, round_id): for task_id, task_info in received_tasks.items(): task = self.task_cls.from_dict(task_info) + self.db_interface.update_task(task) self.logger.debug("Computing bid of task %s", task.id) # Insert task in each possible position of the stn and @@ -105,7 +87,10 @@ def insert_task(self, task, round_id): best_bid = Bid(self.bidding_rule, self.id, round_id, task, self.timetable) tasks = self.timetable.get_tasks() - n_tasks = len(tasks) + if tasks: + n_tasks = len(tasks) + else: + n_tasks = 0 # Add task to the STN from position 1 onwards (position 0 is reserved for the zero_timepoint) for position in range(1, n_tasks+2): @@ -128,6 +113,7 @@ def insert_task(self, task, round_id): bid = Bid(self.bidding_rule, self.id, round_id, task, self.timetable) bid.compute_cost(position) + self.logger.debug("Cost: %s", bid.cost) if bid < best_bid or (bid == best_bid and bid.task.id < best_bid.task.id): best_bid = copy.deepcopy(bid) @@ -162,34 +148,25 @@ def get_smallest_bid(bids): return smallest_bid def send_bid(self, bid): - """ Creates bid_msg and sends it to the auctioneer + """ Creates bid_msg and sends it to the auctioneer_name :param bid: :param round_id: :return: """ - self.logger.debug("Bid %s", bid.msg.to_dict()) - - bid_msg = dict() - bid_msg['header'] = dict() - bid_msg['payload'] = dict() - bid_msg['header']['type'] = 'BID' - bid_msg['header']['metamodel'] = 'ropod-msg-schema.json' - bid_msg['header']['msgId'] = str(uuid.uuid4()) - bid_msg['header']['timestamp'] = int(round(time.time()) * 1000) - - bid_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' - bid_msg['payload']['bid'] = bid.msg.to_dict() - + msg = self.api.create_message(bid) tasks = [task for task in bid.timetable.get_tasks()] self.logger.info("Round %s: robod_id %s bids %s for task %s and tasks %s", bid.round_id, self.id, bid.cost, bid.task.id, tasks) - self.api.publish(bid_msg, peer=self.auctioneer) + self.api.publish(msg, peer=self.auctioneer_name) def allocate_to_robot(self, task_id): - # Update the timetable self.timetable = copy.deepcopy(self.bid_placed.timetable) + self.db_interface.update_timetable(self.timetable) + task_dict = self.db_interface.get_task(task_id) + task = self.task_cls.from_dict(task_dict) + self.db_interface.update_task_status(task, TaskStatus.ALLOCATED) self.logger.info("Robot %s allocated task %s", self.id, task_id) self.logger.debug("STN %s", self.timetable.stn) @@ -200,16 +177,9 @@ def allocate_to_robot(self, task_id): self.logger.debug("Tasks allocated to robot %s:%s", self.id, tasks) def send_finish_round(self): - close_msg = dict() - close_msg['header'] = dict() - close_msg['payload'] = dict() - close_msg['header']['type'] = 'FINISH-ROUND' - close_msg['header']['metamodel'] = 'ropod-msg-schema.json' - close_msg['header']['msgId'] = str(uuid.uuid4()) - close_msg['header']['timestamp'] = int(round(time.time()) * 1000) - close_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' - close_msg['payload']['robot_id'] = self.id + finish_round = FinishRound(self.id) + msg = self.api.create_message(finish_round) self.logger.info("Robot %s sends close round msg ", self.id) - self.api.publish(close_msg) + self.api.publish(msg, groups=['TASK-ALLOCATION']) diff --git a/mrs/task_allocation/round.py b/mrs/task_allocation/round.py index 987afc4ed..b337272c6 100644 --- a/mrs/task_allocation/round.py +++ b/mrs/task_allocation/round.py @@ -1,16 +1,20 @@ -from mrs.utils.uuid import generate_uuid +import copy import logging + from ropod.utils.timestamp import TimeStamp as ts -from mrs.task_allocation.bid import BidMsg -import copy -from mrs.exceptions.task_allocation import NoAllocation +from ropod.utils.uuid import generate_uuid + from mrs.exceptions.task_allocation import AlternativeTimeSlot +from mrs.exceptions.task_allocation import NoAllocation +from mrs.structs.bid import Bid class Round(object): def __init__(self, **kwargs): + self.logger = logging.getLogger('mrs.auctioneer.round') + self.tasks_to_allocate = kwargs.get('tasks_to_allocate', dict()) self.round_time = kwargs.get('round_time', 0) self.n_robots = kwargs.get('n_robots', 0) @@ -41,16 +45,16 @@ def start(self): """ open_time = ts.get_time_stamp() self.closure_time = ts.get_time_stamp(self.round_time) - logging.debug("Round opened at %s and will close at %s", + self.logger.debug("Round opened at %s and will close at %s", open_time, self.closure_time) self.finished = False self.opened = True - def process_bid(self, bid_dict): - bid = BidMsg.from_dict(bid_dict) + def process_bid(self, bid_msg): + bid = Bid.from_dict(bid_msg) - logging.debug("Processing bid from robot %s, cost: %s", bid.robot_id, bid.cost) + self.logger.debug("Processing bid from robot %s, cost: %s", bid.robot_id, bid.cost) if bid.cost != float('inf'): # Process a bid @@ -83,7 +87,7 @@ def time_to_close(self): if current_time < self.closure_time: return False - logging.debug("Closing round at %s", current_time) + self.logger.debug("Closing round at %s", current_time) self.opened = False return True @@ -117,12 +121,12 @@ def get_result(self): return round_result except NoAllocation: - logging.exception("No mrs made in round %s ", self.id) + self.logger.exception("No mrs made in round %s ", self.id) raise NoAllocation(self.id) def finish(self): self.finished = True - logging.debug("Round finished") + self.logger.debug("Round finished") def set_soft_constraints(self): """ If the number of no-bids for a task is equal to the number of robots, @@ -134,7 +138,7 @@ def set_soft_constraints(self): task = self.tasks_to_allocate.get(task_id) task.hard_constraints = False self.tasks_to_allocate.update({task_id: task}) - logging.debug("Setting soft constraints for task %s", task_id) + self.logger.debug("Setting soft constraints for task %s", task_id) def elect_winner(self): """ Elects the winner of the round @@ -144,7 +148,7 @@ def elect_winner(self): value - list of robots assigned to the task """ - lowest_bid = BidMsg() + lowest_bid = Bid() for task_id, bid in self.received_bids.items(): if bid < lowest_bid: diff --git a/mrs/task_allocator.py b/mrs/task_allocator.py new file mode 100644 index 000000000..b9e00aa0f --- /dev/null +++ b/mrs/task_allocator.py @@ -0,0 +1,35 @@ +import logging +import time + +from fleet_management.config.loader import Config + + +class TaskAllocator(object): + def __init__(self, config_file=None): + self.logger = logging.getLogger('mrs') + self.logger.info("Starting MRS...") + self.config = Config(config_file, initialize=True) + self.config.configure_logger() + self.ccu_store = self.config.ccu_store + self.api = self.config.api + + self.auctioneer = self.config.configure_auctioneer(self.ccu_store) + self.api.register_callbacks(self) + + def run(self): + try: + self.api.start() + while True: + self.auctioneer.run() + self.api.run() + time.sleep(0.5) + except (KeyboardInterrupt, SystemExit): + self.logger.info("Terminating task allocator ...") + self.api.shutdown() + logging.info("Exiting...") + + +if __name__ == '__main__': + config_file_path = '../config/config.yaml' + task_allocator = TaskAllocator(config_file_path) + task_allocator.run() diff --git a/mrs/task_execution/dispatching/dispatcher.py b/mrs/task_execution/dispatcher.py similarity index 72% rename from mrs/task_execution/dispatching/dispatcher.py rename to mrs/task_execution/dispatcher.py index 126b86c15..0223fd6be 100644 --- a/mrs/task_execution/dispatching/dispatcher.py +++ b/mrs/task_execution/dispatcher.py @@ -1,35 +1,46 @@ import logging -import uuid import time +import uuid from ropod.utils.timestamp import TimeStamp as ts -from mrs.task_execution.dispatching.scheduler import Scheduler -from mrs.timetable import Timetable from stn.stp import STP +from importlib import import_module + +from mrs.db_interface import DBInterface from mrs.exceptions.task_allocation import NoSTPSolution from mrs.exceptions.task_execution import InconsistentSchedule -from dataset_lib.task import TaskStatus +from mrs.structs.task import TaskStatus +from mrs.structs.timetable import Timetable +from mrs.task_execution.scheduler import Scheduler class Dispatcher(object): - def __init__(self, robot_id, ccu_store, task_cls, stp_solver, corrective_measure, freeze_window, api, auctioneer): + def __init__(self, robot_id, api, robot_store, task_type, + stp_solver, corrective_measure, freeze_window): + self.id = robot_id - self.ccu_store = ccu_store - self.task_cls = task_cls + self.api = api + self.db_interface = DBInterface(robot_store) + + task_class_path = task_type.get('class', 'mrs.structs.task') + self.task_cls = getattr(import_module(task_class_path), 'Task') + self.stp = STP(stp_solver) self.stp_solver = stp_solver + self.corrective_measure = corrective_measure self.freeze_window = freeze_window - self.api = api - self.auctioneer = auctioneer - self.scheduler = Scheduler(ccu_store, self.stp) + self.scheduler = Scheduler(robot_store, self.stp) - self.timetable = Timetable.get_timetable(self.ccu_store, self.id, self.stp) + timetable = self.db_interface.get_timetable(self.id, self.stp) + if timetable is None: + timetable = Timetable(self.stp, robot_id) + self.timetable = timetable def run(self): - self.timetable = Timetable.get_timetable(self.ccu_store, self.id, self.stp) + self.timetable = self.db_interface.get_timetable(self.id, self.stp) if self.timetable is not None: task = self.get_earliest_task() if task is not None: @@ -44,20 +55,27 @@ def check_earliest_task_status(self, task): self.recompute_timetable(task) self.scheduler.reset_schedule(self.timetable) - elif task.status.status == TaskStatus.DELAYED and self.corrective_measure == 're-schedule': + elif task.status.status == TaskStatus.ONGOING and task.status.delayed: + self.apply_corrective_measure(task) + + elif task.status.status == TaskStatus.SCHEDULED and self.time_to_dispatch(): + self.dispatch(task) + + def apply_corrective_measure(self, task): + if self.corrective_measure == 're-schedule': self.recompute_timetable(task) - elif task.status.status == TaskStatus.DELAYED and self.corrective_measure == 're-allocate': + elif self.corrective_measure == 're-allocate': self.scheduler.reset_schedule(self.timetable) self.request_reallocation(task) - elif task.status.status == TaskStatus.SCHEDULED and self.time_to_dispatch(): - self.dispatch(task) + else: + logging.debug("Not applying corrective measure") def get_earliest_task(self): task_id = self.timetable.get_earliest_task_id() if task_id: - task_dict = self.ccu_store.get_task(task_id) + task_dict = self.db_interface.get_task(task_id) task = self.task_cls.from_dict(task_dict) return task @@ -77,7 +95,7 @@ def schedule_task(self, task): if self.corrective_measure == 're-allocate': self.request_reallocation(task) - self.timetable = Timetable.get_timetable(self.ccu_store, self.id, self.stp) + self.timetable = self.db_interface.get_timetable(self.id, self.stp) def recompute_timetable(self, task): try: @@ -88,14 +106,9 @@ def recompute_timetable(self, task): except NoSTPSolution: logging.exception("The stp solver could not solve the problem") - self.update_task_status(task, TaskStatus.FAILED) + self.db_interface.update_task_status(task, TaskStatus.FAILED) self.timetable.remove_task() - def update_task_status(self, task, status): - task.status.status = status - logging.debug("Updating task status to %s", task.status.status) - self.ccu_store.update_task(task) - def time_to_dispatch(self): current_time = ts.get_time_stamp() if current_time < self.scheduler.navigation_start_time: @@ -119,13 +132,13 @@ def dispatch(self, task): task_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' task_msg['payload']['task'] = task.to_dict() - self.update_task_status(task, TaskStatus.SHIPPED) + self.db_interface.update_task_status(task, TaskStatus.SHIPPED) self.timetable.remove_task() self.api.publish(task_msg, groups=['ROPOD']) def request_reallocation(self, task): - self.update_task_status(task, TaskStatus.ABORTED) # ABORTED + self.db_interface.update_task_status(task, TaskStatus.UNALLOCATED) # ABORTED task_msg = dict() task_msg['header'] = dict() task_msg['payload'] = dict() diff --git a/mrs/task_execution/schedule_monitor.py b/mrs/task_execution/schedule_monitor.py new file mode 100644 index 000000000..f63c5c1c5 --- /dev/null +++ b/mrs/task_execution/schedule_monitor.py @@ -0,0 +1,7 @@ +from mrs.db_interface import DBInterface +from mrs.robot_base import RobotBase + + +class ScheduleMonitor(RobotBase): + def __init__(self, robot_config, schedule_monitor_config): + super().__init__(**robot_config) diff --git a/mrs/task_execution/dispatching/scheduler.py b/mrs/task_execution/scheduler.py similarity index 72% rename from mrs/task_execution/dispatching/scheduler.py rename to mrs/task_execution/scheduler.py index 58c85dcca..e4a4f8dde 100644 --- a/mrs/task_execution/dispatching/scheduler.py +++ b/mrs/task_execution/scheduler.py @@ -1,12 +1,14 @@ import logging + +from mrs.db_interface import DBInterface from mrs.exceptions.task_execution import InconsistentSchedule -from dataset_lib.task import TaskStatus +from mrs.structs.task import TaskStatus class Scheduler(object): - def __init__(self, ccu_store, stp): - self.ccu_store = ccu_store + def __init__(self, robot_store, stp): + self.db_interface = DBInterface(robot_store) self.stp = stp self.navigation_start_time = -float('inf') # of scheduled task @@ -32,8 +34,8 @@ def assign_timepoint(self, task, navigation_start, timetable): print("Schedule: ", timetable.schedule) - self.ccu_store.update_timetable(timetable) - self.update_task_status(task, TaskStatus.SCHEDULED) + self.db_interface.update_timetable(timetable) + self.db_interface.update_task_status(task, TaskStatus.SCHEDULED) self.navigation_start_time = navigation_start else: @@ -42,11 +44,6 @@ def assign_timepoint(self, task, navigation_start, timetable): def reallocate(self): pass - def update_task_status(self, task, status): - task.status.status = status - logging.debug("Updating task status to %s", task.status.status) - self.ccu_store.update_task(task) - def reset_schedule(self, timetable): timetable.remove_task() - self.ccu_store.update_timetable(timetable) + self.db_interface.update_timetable(timetable) diff --git a/mrs/task_execution/task_monitor.py b/mrs/task_execution/task_monitor.py new file mode 100644 index 000000000..ad1bce90d --- /dev/null +++ b/mrs/task_execution/task_monitor.py @@ -0,0 +1,42 @@ +import logging + +from mrs.db_interface import DBInterface +from mrs.structs.task import TaskStatus + + +class TaskMonitor(object): + def __init__(self, ccu_store, task_cls, api): + self.db_interface = DBInterface(ccu_store) + self.task_cls = task_cls + self.api = api + + def run(self): + pass + + def task_progress_cb(self, msg): + task_id = msg["payload"]["taskId"] + robot_id = msg["payload"]["robotId"] + task_status = msg["payload"]["status"]["taskStatus"] + + logging.debug("Robot %s received TASK-PROGRESS msg of task %s from %s ", task_id, robot_id) + + task_dict = self.db_interface.get_task(task_id) + task = self.task_cls.from_dict(task_dict) + + if task_status == TaskStatus.COMPLETED or \ + task_status == TaskStatus.CANCELED or \ + task_status == TaskStatus.FAILED or \ + task_status == TaskStatus.PREEMPTED: + self.archieve_task(task) + + elif task_status == TaskStatus.ONGOING: + self.check_execution_progress(task) + + def archieve_task(self, task): + # TODO: Update timetable + pass + + def check_execution_progress(self, task): + # TODO: check schedule consistency + pass + diff --git a/mrs/utils/config_logger.py b/mrs/utils/config_logger.py deleted file mode 100644 index 7fa1c8411..000000000 --- a/mrs/utils/config_logger.py +++ /dev/null @@ -1,9 +0,0 @@ -import logging.config -import yaml - - -def config_logger(logging_file): - - with open(logging_file) as f: - log_config = yaml.safe_load(f) - logging.config.dictConfig(log_config) diff --git a/mrs/utils/datasets.py b/mrs/utils/datasets.py index 6c99c7358..ccc84e4cd 100644 --- a/mrs/utils/datasets.py +++ b/mrs/utils/datasets.py @@ -30,3 +30,24 @@ def load_yaml_dataset(dataset_path): tasks.append(task) return tasks + +def flatten_dict(dict_input): + """ Returns a dictionary without nested dictionaries + + :param dict_input: nested dictionary + :return: flattened dictionary + + """ + flattened_dict = dict() + + for key, value in dict_input.items(): + if isinstance(value, dict): + new_keys = sorted(value.keys()) + for new_key in new_keys: + entry = {key + '_' + new_key: value[new_key]} + flattened_dict.update(entry) + else: + entry = {key: value} + flattened_dict.update(entry) + + return flattened_dict diff --git a/mrs/utils/uuid.py b/mrs/utils/uuid.py deleted file mode 100644 index 7fb3ef347..000000000 --- a/mrs/utils/uuid.py +++ /dev/null @@ -1,7 +0,0 @@ -import uuid - - -def generate_uuid(): - """ Returns a string containing a random uuid - """ - return str(uuid.uuid4()) diff --git a/requirements.txt b/requirements.txt index b26e81fba..f921b6792 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ numpy +git+https://github.com/anenriquez/mrta_stn.git#egg=stn diff --git a/setup.py b/setup.py index 5131bdd56..fc23be32a 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,9 @@ from setuptools import setup -setup(name='mrta', +setup(name='mrs', + packages=['mrs', 'mrs.config', 'mrs.structs', 'mrs.utils', 'mrs.exceptions', 'mrs.task_allocation', + 'mrs.task_execution'], version='0.1.0', install_requires=[ 'numpy' diff --git a/tests/allocation_test.py b/tests/allocation_test.py index 11fd09614..fef12a68b 100644 --- a/tests/allocation_test.py +++ b/tests/allocation_test.py @@ -1,9 +1,17 @@ +import logging import time import uuid -import logging -from ropod.pyre_communicator.base_class import RopodPyre from fleet_management.config.loader import Config +from fleet_management.db.ccu_store import CCUStore +from ropod.pyre_communicator.base_class import RopodPyre +from stn.stp import STP + +from ropod.utils.timestamp import TimeStamp as ts +from ropod.utils.uuid import generate_uuid + +from mrs.db_interface import DBInterface +from mrs.structs.timetable import Timetable from mrs.utils.datasets import load_yaml_dataset @@ -15,13 +23,48 @@ def __init__(self, config_file): super().__init__(zyre_config, acknowledge=False) config = Config(config_file, initialize=False) + ccu_store = CCUStore('ropod_ccu_store') + self.db_interface = DBInterface(ccu_store) allocator_config = config.config_params.get("plugins").get("task_allocation") - self.auctioneer_name = allocator_config.get('auctioneer') + robot_proxy = config.config_params.get("robot_proxy") + stp_solver = allocator_config.get('stp_solver') + self.stp = STP(stp_solver) + self.robot_ids = config.config_params.get('resources').get('fleet') + + self.auctioneer_name = robot_proxy.get("bidder").get("auctioneer_name") self.allocations = list() self.n_tasks = 0 self.terminated = False + def reset_timetables(self): + logging.info("Resetting timetables") + for robot_id in self.robot_ids: + timetable = Timetable(self.stp, robot_id) + self.db_interface.update_timetable(timetable) + self.send_timetable(timetable, robot_id) + + def reset_tasks(self): + logging.info("Resetting tasks") + tasks_dict = self.db_interface.get_tasks() + for task_id, task_dict in tasks_dict.items(): + self.db_interface.remove_task(task_id) + self.request_task_delete(task_dict) + + def send_timetable(self, timetable, robot_id): + logging.debug("Sending timetable to %s", robot_id) + timetable_msg = dict() + timetable_msg['header'] = dict() + timetable_msg['payload'] = dict() + timetable_msg['header']['type'] = 'TIMETABLE' + timetable_msg['header']['metamodel'] = 'ropod-msg-schema.json' + timetable_msg['header']['msgId'] = generate_uuid() + timetable_msg['header']['timestamp'] = ts.get_time_stamp() + + timetable_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' + timetable_msg['payload']['timetable'] = timetable.to_dict() + self.shout(timetable_msg) + def allocate(self, tasks): self.n_tasks = len(tasks) for task in tasks: @@ -32,16 +75,31 @@ def request_allocation(self, task): task_msg = dict() task_msg['header'] = dict() task_msg['payload'] = dict() - task_msg['header']['type'] = 'TASK' + task_msg['header']['type'] = 'ALLOCATE-TASK' task_msg['header']['metamodel'] = 'ropod-msg-schema.json' - task_msg['header']['msgId'] = str(uuid.uuid4()) - task_msg['header']['timestamp'] = int(round(time.time()) * 1000) + task_msg['header']['msgId'] = generate_uuid() + task_msg['header']['timestamp'] = ts.get_time_stamp() task_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' task_msg['payload']['task'] = task.to_dict() self.whisper(task_msg, peer=self.auctioneer_name) + def request_task_delete(self, task_dict): + logging.debug("Requesting delete of task %s", task_dict['id']) + task_msg = dict() + task_msg['header'] = dict() + task_msg['payload'] = dict() + task_msg['header']['type'] = 'DELETE-TASK' + task_msg['header']['metamodel'] = 'ropod-msg-schema.json' + task_msg['header']['msgId'] = generate_uuid() + task_msg['header']['timestamp'] = ts.get_time_stamp() + + task_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' + task_msg['payload']['task'] = task_dict + + self.shout(task_msg) + def receive_msg_cb(self, msg_content): msg = self.convert_zyre_msg_to_dict(msg_content) if msg is None: @@ -50,7 +108,7 @@ def receive_msg_cb(self, msg_content): if msg['header']['type'] == 'ALLOCATION': self.logger.debug("Received allocation message") task_id = msg['payload']['task_id'] - winner_id = msg['payload']['winner_id'] + winner_id = msg['payload']['robot_id'] allocation = (task_id, [winner_id]) self.allocations.append(allocation) logging.debug("Receiving allocation %s", allocation) @@ -75,6 +133,9 @@ def check_termination_test(self): test.start() try: + time.sleep(5) + test.reset_timetables() + test.reset_tasks() time.sleep(5) test.allocate(tasks) start_time = time.time() diff --git a/tests/data/non_overlapping_1.yaml b/tests/data/non_overlapping_1.yaml index daec77133..36a638643 100644 --- a/tests/data/non_overlapping_1.yaml +++ b/tests/data/non_overlapping_1.yaml @@ -15,6 +15,7 @@ tasks: status: status: 1 task_id: 002767db-01f3-4e0c-af4e-52c4d81bd713 + delayed: false 173d27eb-d0a4-4b94-986b-1e9b8eca0579: earliest_start_time: 3197.08 estimated_duration: 6.84 @@ -26,6 +27,7 @@ tasks: status: status: 1 task_id: 173d27eb-d0a4-4b94-986b-1e9b8eca0579 + delayed: false 4076f78b-16d8-4057-a1ad-d16d239764fa: earliest_start_time: 479.33 estimated_duration: 2.74 @@ -37,6 +39,7 @@ tasks: status: status: 1 task_id: 002767db-01f3-4e0c-af4e-52c4d81bd713 + delayed: false 6d17963b-10b5-4143-b279-84d8e93aaaa3: earliest_start_time: 3642.88 estimated_duration: 2.3 @@ -48,6 +51,7 @@ tasks: status: status: 1 task_id: 6d17963b-10b5-4143-b279-84d8e93aaaa3 + delayed: false 73ace543-260e-474a-9112-3abd470ed5b5: earliest_start_time: 2503.91 estimated_duration: 8.24 @@ -59,4 +63,5 @@ tasks: status: status: 1 task_id: 73ace543-260e-474a-9112-3abd470ed5b5 + delayed: false upper_bound: 300 diff --git a/tests/dispatching_test.py b/tests/dispatching_test.py index 364d66252..9088c759e 100644 --- a/tests/dispatching_test.py +++ b/tests/dispatching_test.py @@ -4,7 +4,7 @@ from fleet_management.config.loader import Config from mrs.utils.datasets import load_yaml_dataset -from mrs.timetable import Timetable +from mrs.structs.timetable import Timetable class TestDispatcher(object): diff --git a/tests/fms_integration_test.py b/tests/fms_integration_test.py index 70a2738dd..8eb930ebf 100644 --- a/tests/fms_integration_test.py +++ b/tests/fms_integration_test.py @@ -3,18 +3,26 @@ from fleet_management.config.loader import Config from mrs.utils.datasets import load_yaml_dataset -from mrs.timetable import Timetable +from mrs.structs.timetable import Timetable +from mrs.db_interface import DBInterface +from ropod.pyre_communicator.base_class import RopodPyre -class TaskAllocator(object): +class TaskAllocator(RopodPyre): def __init__(self, config_file=None): + zyre_config = {'node_name': 'task_request_test', + 'groups': ['TASK-ALLOCATION'], + 'message_types': ['TASK', 'ALLOCATION']} + super().__init__(zyre_config, acknowledge=False) + self.logger = logging.getLogger('test') config = Config(config_file, initialize=True) config.configure_logger() - self.ccu_store = config.ccu_store - - self.auctioneer = config.configure_task_allocator(self.ccu_store) + ccu_store = config.ccu_store + self.db_interface = DBInterface(ccu_store) + self.auctioneer = config.configure_auctioneer(ccu_store) + self.auctioneer.api.register_callbacks(self.auctioneer) self.allocated_tasks = dict() self.test_terminated = False @@ -26,18 +34,39 @@ def reset_timetables(self): for robot_id in self.auctioneer.robot_ids: timetable = Timetable(self.auctioneer.stp, robot_id) - self.ccu_store.update_timetable(timetable) + self.db_interface.update_timetable(timetable) + self.send_timetable(timetable, robot_id) + + def reset_tasks(self): + self.logger.info("Resetting tasks") + tasks_dict = self.db_interface.get_tasks() + for task_id, task_info in tasks_dict.items(): + self.db_interface.remove_task(task_id) + + def send_timetable(self, timetable, robot_id): + self.logger.debug("Sending timetable to %s", robot_id) + timetable_msg = dict() + timetable_msg['header'] = dict() + timetable_msg['payload'] = dict() + timetable_msg['header']['type'] = 'TIMETABLE' + timetable_msg['header']['metamodel'] = 'ropod-msg-schema.json' + timetable_msg['header']['msgId'] = generate_uuid() + timetable_msg['header']['timestamp'] = ts.get_time_stamp() + + timetable_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' + timetable_msg['payload']['timetable'] = timetable.to_dict() + self.shout(timetable_msg) def get_robots_for_task(self, tasks): """ Adds a task or list of tasks to the list of tasks_to_allocate - in the auctioneer + in the auctioneer_name :param tasks: list of tasks to allocate """ self.auctioneer.allocate(tasks) def get_allocation(self): - """ Gets the allocation of a task when the auctioneer terminates an + """ Gets the allocation of a task when the auctioneer_name terminates an allocation round """ @@ -56,6 +85,8 @@ def run(self): timeout_duration = 300 # 5 minutes try: self.auctioneer.api.start() + self.reset_timetables() + self.reset_tasks() start_time = time.time() while not self.test_terminated and start_time + timeout_duration > time.time(): self.auctioneer.api.run()