From 84649c56dcb530a050b26449e04c6c27a5232a32 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Wed, 14 Aug 2019 09:55:40 +0200 Subject: [PATCH 01/24] setup: Fixing the name of the package --- setup.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/setup.py b/setup.py index 50f3faea0..f1155c354 100644 --- a/setup.py +++ b/setup.py @@ -7,8 +7,10 @@ install_requires=[ 'numpy' ], - description='Multi-Robot System (MRS) components for allocating ' - 'and executing tasks with temporal constraints and uncertain durations', + description='Multi-Robot System (MRS) components for performing' + 'Multi-Robot Task Allocation (MRTA) and executing' + 'tasks with temporal constraints and uncertain ' + 'durations', author='Angela Enriquez Gomez', author_email='angela.enriquez@smail.inf.h-brs.de', package_dir={'': '.'} From 057f46612ab8a0907a49edc6cae54ae44560a8eb Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Wed, 14 Aug 2019 09:55:40 +0200 Subject: [PATCH 02/24] setup: Fixing the name of the package --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f1155c354..5131bdd56 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,7 @@ from setuptools import setup -setup(name='mrs', +setup(name='mrta', version='0.1.0', install_requires=[ 'numpy' From 9f3be42c1f9003ac788323884bfbf111e31a14b0 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Wed, 14 Aug 2019 16:34:59 +0200 Subject: [PATCH 03/24] dispatcher: Using the flag delayed from Task status --- mrs/task_execution/dispatching/dispatcher.py | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/mrs/task_execution/dispatching/dispatcher.py b/mrs/task_execution/dispatching/dispatcher.py index 126b86c15..92b496e9c 100644 --- a/mrs/task_execution/dispatching/dispatcher.py +++ b/mrs/task_execution/dispatching/dispatcher.py @@ -44,15 +44,22 @@ def check_earliest_task_status(self, task): self.recompute_timetable(task) self.scheduler.reset_schedule(self.timetable) - elif task.status.status == TaskStatus.DELAYED and self.corrective_measure == 're-schedule': + elif task.status.status == TaskStatus.ONGOING and task.status.delayed: + self.apply_corrective_measure(task) + + elif task.status.status == TaskStatus.SCHEDULED and self.time_to_dispatch(): + self.dispatch(task) + + def apply_corrective_measure(self, task): + if self.corrective_measure == 're-schedule': self.recompute_timetable(task) - elif task.status.status == TaskStatus.DELAYED and self.corrective_measure == 're-allocate': + elif self.corrective_measure == 're-allocate': self.scheduler.reset_schedule(self.timetable) self.request_reallocation(task) - elif task.status.status == TaskStatus.SCHEDULED and self.time_to_dispatch(): - self.dispatch(task) + else: + logging.debug("Not applying corrective measure") def get_earliest_task(self): task_id = self.timetable.get_earliest_task_id() From ecf44cc44d4d67ecd72dfc3ee20334be8618d2f9 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Wed, 14 Aug 2019 17:51:50 +0200 Subject: [PATCH 04/24] readme: Updating instructions --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index fa7c9f67c..dae5d4026 100644 --- a/README.md +++ b/README.md @@ -71,7 +71,7 @@ pip3 install -r requirements.txt Add the task_allocation to your `PYTHONPATH` by running: ``` -sudo pip3 install -e . +pip3 install --user -e . ``` Go to `/allocation` and run in a terminal From 030305f83bc80a517b2afe5fb8f2b1700bca2c2f Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Wed, 14 Aug 2019 17:52:37 +0200 Subject: [PATCH 05/24] Removing dependency to mrta_datasets Adding the task struct to this repo --- mrs/config/task_factory.py | 2 +- mrs/task.py | 103 +++++++++++++++++++ mrs/task_execution/dispatching/dispatcher.py | 2 +- mrs/task_execution/dispatching/scheduler.py | 2 +- mrs/utils/datasets.py | 21 ++++ 5 files changed, 127 insertions(+), 3 deletions(-) create mode 100644 mrs/task.py diff --git a/mrs/config/task_factory.py b/mrs/config/task_factory.py index 52fb44025..1b78b47da 100644 --- a/mrs/config/task_factory.py +++ b/mrs/config/task_factory.py @@ -20,7 +20,7 @@ def initialize(self): ropod_task_cls = getattr(import_module('ropod.structs.task'), 'Task') self.register_task_cls('ropod_task', ropod_task_cls) - generic_task_cls = getattr(import_module('dataset_lib.task'), 'Task') + generic_task_cls = getattr(import_module('mrs.task'), 'Task') self.register_task_cls('generic_task', generic_task_cls) task_request_cls = getattr(import_module('ropod.structs.task'), 'TaskRequest') diff --git a/mrs/task.py b/mrs/task.py new file mode 100644 index 000000000..7905068d2 --- /dev/null +++ b/mrs/task.py @@ -0,0 +1,103 @@ +from mrs.utils.uuid import generate_uuid +from mrs.utils.datasets import flatten_dict + + +class Task(object): + + def __init__(self, id='', + earliest_start_time=-1, + latest_start_time=-1, + start_pose_name='', + finish_pose_name='', + hard_constraints=True, + estimated_duration=-1): + + if not id: + self.id = generate_uuid() + else: + self.id = id + + self.earliest_start_time = earliest_start_time + self.latest_start_time = latest_start_time + self.start_pose_name = start_pose_name + self.finish_pose_name = finish_pose_name + self.hard_constraints = hard_constraints + + # Used by the dataset generator + self.estimated_duration = estimated_duration + self.status = TaskStatus(id) + + def to_dict(self): + task_dict = dict() + task_dict['id'] = self.id + task_dict['earliest_start_time'] = self.earliest_start_time + task_dict['latest_start_time'] = self.latest_start_time + task_dict['start_pose_name'] = self.start_pose_name + task_dict['finish_pose_name'] = self.finish_pose_name + task_dict['hard_constraints'] = self.hard_constraints + task_dict['estimated_duration'] = self.estimated_duration + task_dict['status'] = self.status.to_dict() + return task_dict + + @staticmethod + def from_dict(task_dict): + task = Task() + task.id = task_dict['id'] + task.earliest_start_time = task_dict['earliest_start_time'] + task.latest_start_time = task_dict['latest_start_time'] + task.start_pose_name = task_dict['start_pose_name'] + task.finish_pose_name = task_dict['finish_pose_name'] + task.hard_constraints = task_dict['hard_constraints'] + task.estimated_duration = task_dict['estimated_duration'] + task.status = TaskStatus.from_dict(task_dict['status']) + return task + + @staticmethod + def to_csv(task_dict): + """ Prepares dict to be written to a csv + :return: dict + """ + to_csv_dict = flatten_dict(task_dict) + + return to_csv_dict + + +class TaskStatus(object): + UNALLOCATED = 1 + ALLOCATED = 2 + SCHEDULED = 3 # Task is ready to be dispatched + SHIPPED = 4 # The task was sent to the robot + ONGOING = 5 + COMPLETED = 6 + ABORTED = 7 # Aborted by the system, not by the user + FAILED = 8 # Execution failed + CANCELED = 9 # Canceled before execution starts + PREEMPTED = 10 # Canceled during execution + + def __init__(self, task_id=''): + self.task_id = task_id + self.delayed = False + self.status = self.UNALLOCATED + + def to_dict(self): + task_dict = dict() + task_dict['task_id'] = self.task_id + task_dict['status'] = self.status + return task_dict + + @staticmethod + def from_dict(status_dict): + task_id = status_dict['task_id'] + status = TaskStatus(task_id) + status.task_id = task_id + status.status = status_dict['status'] + return status + + @staticmethod + def to_csv(status_dict): + """ Prepares dict to be written to a csv + :return: dict + """ + # The dictionary is already flat and ready to be exported + return status_dict + diff --git a/mrs/task_execution/dispatching/dispatcher.py b/mrs/task_execution/dispatching/dispatcher.py index 92b496e9c..7e0984ecb 100644 --- a/mrs/task_execution/dispatching/dispatcher.py +++ b/mrs/task_execution/dispatching/dispatcher.py @@ -8,7 +8,7 @@ from stn.stp import STP from mrs.exceptions.task_allocation import NoSTPSolution from mrs.exceptions.task_execution import InconsistentSchedule -from dataset_lib.task import TaskStatus +from mrs.task import TaskStatus class Dispatcher(object): diff --git a/mrs/task_execution/dispatching/scheduler.py b/mrs/task_execution/dispatching/scheduler.py index 58c85dcca..434a38b95 100644 --- a/mrs/task_execution/dispatching/scheduler.py +++ b/mrs/task_execution/dispatching/scheduler.py @@ -1,6 +1,6 @@ import logging from mrs.exceptions.task_execution import InconsistentSchedule -from dataset_lib.task import TaskStatus +from mrs.task import TaskStatus class Scheduler(object): diff --git a/mrs/utils/datasets.py b/mrs/utils/datasets.py index 6c99c7358..ccc84e4cd 100644 --- a/mrs/utils/datasets.py +++ b/mrs/utils/datasets.py @@ -30,3 +30,24 @@ def load_yaml_dataset(dataset_path): tasks.append(task) return tasks + +def flatten_dict(dict_input): + """ Returns a dictionary without nested dictionaries + + :param dict_input: nested dictionary + :return: flattened dictionary + + """ + flattened_dict = dict() + + for key, value in dict_input.items(): + if isinstance(value, dict): + new_keys = sorted(value.keys()) + for new_key in new_keys: + entry = {key + '_' + new_key: value[new_key]} + flattened_dict.update(entry) + else: + entry = {key: value} + flattened_dict.update(entry) + + return flattened_dict From e8d94a94bfd0f2eb8a4397b774ef4ae58deb778a Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Wed, 14 Aug 2019 17:53:03 +0200 Subject: [PATCH 06/24] setup: Adding all modules in the repository --- setup.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 5131bdd56..f6cfba073 100644 --- a/setup.py +++ b/setup.py @@ -2,7 +2,9 @@ from setuptools import setup -setup(name='mrta', +setup(name='mrs', + packages=['mrs', 'mrs.config', 'mrs.utils', 'mrs.exceptions', 'mrs.task_allocation', + 'mrs.task_execution', 'mrs.task_execution.dispatching'], version='0.1.0', install_requires=[ 'numpy' From 08c34aee411d7d695e91d9261301434e037ea2da Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Thu, 15 Aug 2019 08:57:48 +0200 Subject: [PATCH 07/24] setup: Adding dependency link to mrta_stn repo --- setup.py | 1 + 1 file changed, 1 insertion(+) diff --git a/setup.py b/setup.py index f6cfba073..c2a0569ec 100644 --- a/setup.py +++ b/setup.py @@ -9,6 +9,7 @@ install_requires=[ 'numpy' ], + dependency_links=['git+https://github.com/anenriquez/mrta_stn.git#egg=stn'], description='Multi-Robot System (MRS) components for performing' 'Multi-Robot Task Allocation (MRTA) and executing' 'tasks with temporal constraints and uncertain ' From 71b2764229dd60ffc4b532042d3ec0e65f6fcdd3 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Thu, 15 Aug 2019 09:11:56 +0200 Subject: [PATCH 08/24] Adding mrta_stn to requirements --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index b26e81fba..b52087382 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ numpy +git+https://github.com/anenriquez/mrta_stn.git From d475e3b02386837f305e9f555e303bdf88b6c6d7 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Thu, 15 Aug 2019 10:31:13 +0200 Subject: [PATCH 09/24] setup: Do not adding mrta_stn in setup --- setup.py | 1 - 1 file changed, 1 deletion(-) diff --git a/setup.py b/setup.py index c2a0569ec..f6cfba073 100644 --- a/setup.py +++ b/setup.py @@ -9,7 +9,6 @@ install_requires=[ 'numpy' ], - dependency_links=['git+https://github.com/anenriquez/mrta_stn.git#egg=stn'], description='Multi-Robot System (MRS) components for performing' 'Multi-Robot Task Allocation (MRTA) and executing' 'tasks with temporal constraints and uncertain ' From 7c06f0d7a244ed34c2f9c870e92e338e326c07b2 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Thu, 15 Aug 2019 10:55:49 +0200 Subject: [PATCH 10/24] auctioneer: Fixing TaskStatus import importing from mrs and not from dataset_lib --- mrs/task_allocation/auctioneer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/mrs/task_allocation/auctioneer.py b/mrs/task_allocation/auctioneer.py index 711adee14..38a408edd 100644 --- a/mrs/task_allocation/auctioneer.py +++ b/mrs/task_allocation/auctioneer.py @@ -8,7 +8,7 @@ from stn.stp import STP from mrs.exceptions.task_allocation import NoAllocation from mrs.exceptions.task_allocation import AlternativeTimeSlot -from dataset_lib.task import TaskStatus +from mrs.task import TaskStatus """ Implements a variation of the the TeSSI algorithm using the bidding_rule specified in the config file From 21037bdfc7ec8b94b31c1749d971a3f180be34f8 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Thu, 15 Aug 2019 12:21:27 +0200 Subject: [PATCH 11/24] bidder: If the timetable is empty, the number of tasks is 0 --- mrs/task_allocation/bidder.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/mrs/task_allocation/bidder.py b/mrs/task_allocation/bidder.py index 5235ff828..19b88522a 100644 --- a/mrs/task_allocation/bidder.py +++ b/mrs/task_allocation/bidder.py @@ -105,7 +105,10 @@ def insert_task(self, task, round_id): best_bid = Bid(self.bidding_rule, self.id, round_id, task, self.timetable) tasks = self.timetable.get_tasks() - n_tasks = len(tasks) + if tasks: + n_tasks = len(tasks) + else: + n_tasks = 0 # Add task to the STN from position 1 onwards (position 0 is reserved for the zero_timepoint) for position in range(1, n_tasks+2): From 2d422c29115657665ad03645edc163fee8bc7d86 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Fri, 16 Aug 2019 07:47:00 +0200 Subject: [PATCH 12/24] robot: terminating robot with KeyboardInterrupt --- mrs/robot.py | 11 +++-------- 1 file changed, 3 insertions(+), 8 deletions(-) diff --git a/mrs/robot.py b/mrs/robot.py index fcb836776..94b08221c 100644 --- a/mrs/robot.py +++ b/mrs/robot.py @@ -16,14 +16,9 @@ def __init__(self, api, bidder, **kwargs): self.bidder = bidder self.dispatcher = kwargs.get('dispatcher') - def start_components(self): - self.bidder.api.start() - if self.dispatcher is not None: - self.dispatcher.api.start() - def run(self): try: - self.start_components() + self.api.start() while True: self.bidder.api.run() if self.dispatcher is not None: @@ -31,8 +26,8 @@ def run(self): time.sleep(0.5) except (KeyboardInterrupt, SystemExit): - logging.info("Terminating %s proxy ...", robot_id) - self.bidder.api.shutdown() + logging.info("Terminating %s robot ...") + self.api.shutdown() logging.info("Exiting...") From beb6bac07b751e977731e203543eb4c6b02d14bc Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Fri, 16 Aug 2019 07:50:04 +0200 Subject: [PATCH 13/24] Adding db_interface with methods to read and write to the db --- mrs/db_interface.py | 61 ++++++++++++++++++++ mrs/task_allocation/auctioneer.py | 19 +++--- mrs/task_allocation/bidder.py | 7 ++- mrs/task_execution/dispatching/dispatcher.py | 23 +++----- mrs/task_execution/dispatching/scheduler.py | 14 ++--- mrs/timetable.py | 8 --- 6 files changed, 86 insertions(+), 46 deletions(-) create mode 100644 mrs/db_interface.py diff --git a/mrs/db_interface.py b/mrs/db_interface.py new file mode 100644 index 000000000..68664667b --- /dev/null +++ b/mrs/db_interface.py @@ -0,0 +1,61 @@ +import logging + +from mrs.timetable import Timetable + + +class DBInterface(object): + def __init__(self, ccu_store): + self.ccu_store = ccu_store + + def add_task(self, task): + """Saves the given task to a database as a new document under the "tasks" collection. + """ + collection = self.ccu_store.db['tasks'] + dict_task = task.to_dict() + self.ccu_store.unique_insert(collection, dict_task, 'id', task.id) + + def update_task(self, task): + """ Updates the given task under the "tasks" collection + """ + collection = self.ccu_store.db['tasks'] + task_dict = task.to_dict() + collection.replace_one({'id': task.id}, task_dict) + + def update_task_status(self, task, status): + task.status.status = status + logging.debug("Updating task status to %s", task.status.status) + self.update_task(task) + + def get_task(self, task_id): + """Returns a task dictionary representing the task with the given id. + """ + collection = self.ccu_store.db['tasks'] + task_dict = collection.find_one({'id': task_id}) + return task_dict + + def add_timetable(self, timetable): + """ + Saves the given timetable under the "timetables" collection + """ + collection = self.ccu_store.db['timetables'] + robot_id = timetable.robot_id + timetable_dict = timetable.to_dict() + + self.ccu_store.unique_insert(collection, timetable_dict, 'robot_id', robot_id) + + def update_timetable(self, timetable): + """ Updates the given timetable under the "timetables" collection + """ + collection = self.ccu_store.db['timetables'] + timetable_dict = timetable.to_dict() + robot_id = timetable.robot_id + collection.replace_one({'robot_id': robot_id}, timetable_dict) + + def get_timetable(self, robot_id, stp): + collection = self.ccu_store.db['timetables'] + timetable_dict = collection.find_one({'robot_id': robot_id}) + + if timetable_dict is None: + return + timetable = Timetable.from_dict(timetable_dict, stp) + return timetable diff --git a/mrs/task_allocation/auctioneer.py b/mrs/task_allocation/auctioneer.py index 38a408edd..d98bcf669 100644 --- a/mrs/task_allocation/auctioneer.py +++ b/mrs/task_allocation/auctioneer.py @@ -9,6 +9,7 @@ from mrs.exceptions.task_allocation import NoAllocation from mrs.exceptions.task_allocation import AlternativeTimeSlot from mrs.task import TaskStatus +from mrs.db_interface import DBInterface """ Implements a variation of the the TeSSI algorithm using the bidding_rule specified in the config file @@ -23,7 +24,7 @@ def __init__(self, robot_ids, ccu_store, api, stp_solver, task_cls, allocation_m logging.debug("Starting Auctioneer") self.robot_ids = robot_ids - self.ccu_store = ccu_store + self.db_interface = DBInterface(ccu_store) self.api = api self.allocation_method = allocation_method self.round_time = timedelta(seconds=round_time) @@ -37,7 +38,7 @@ def __init__(self, robot_ids, ccu_store, api, stp_solver, task_cls, allocation_m for robot_id in robot_ids: timetable = Timetable(self.stp, robot_id) self.timetables[robot_id] = timetable - self.ccu_store.add_timetable(timetable) + self.db_interface.add_timetable(timetable) self.tasks_to_allocate = dict() self.allocations = list() @@ -81,19 +82,13 @@ def process_allocation(self, round_result): logging.debug("Allocation: %s", allocation) logging.debug("Tasks to allocate %s", self.tasks_to_allocate) - self.update_task_status(task, TaskStatus.ALLOCATED) + self.db_interface.update_task_status(task, TaskStatus.ALLOCATED) self.update_timetable(robot_id, task, position) return allocation - def update_task_status(self, task, status): - task.status.status = status - logging.debug("Updating task status to %s", task.status.status) - self.ccu_store.update_task(task) - logging.debug('Tasks saved') - def update_timetable(self, robot_id, task, position): - timetable = Timetable.get_timetable(self.ccu_store, robot_id, self.stp) + timetable = self.db_interface.get_timetable(robot_id, self.stp) timetable.add_task_to_stn(task, position) timetable.solve_stp() @@ -103,7 +98,7 @@ def update_timetable(self, robot_id, task, position): pass self.timetables.update({robot_id: timetable}) - self.ccu_store.update_timetable(timetable) + self.db_interface.update_timetable(timetable) logging.debug("STN robot %s: %s", robot_id, timetable.stn) logging.debug("Dispatchable graph robot %s: %s", robot_id, timetable.dispatchable_graph) @@ -120,7 +115,7 @@ def process_alternative_allocation(self, exception): def add_task(self, task): self.tasks_to_allocate[task.id] = task - self.ccu_store.add_task(task) + self.db_interface.add_task(task) def allocate(self, tasks): if isinstance(tasks, list): diff --git a/mrs/task_allocation/bidder.py b/mrs/task_allocation/bidder.py index 19b88522a..187407456 100644 --- a/mrs/task_allocation/bidder.py +++ b/mrs/task_allocation/bidder.py @@ -8,6 +8,7 @@ from mrs.task_allocation.bidding_rule import BiddingRule from mrs.timetable import Timetable from mrs.exceptions.task_allocation import NoSTPSolution +from mrs.db_interface import DBInterface """ Implements a variation of the the TeSSI algorithm using the bidding_rule specified in the config file @@ -19,7 +20,7 @@ class Bidder(object): def __init__(self, robot_id, ccu_store, api, task_cls, bidding_rule, allocation_method, auctioneer, **kwargs): self.id = robot_id - self.ccu_store = ccu_store + self.db_interface = DBInterface(ccu_store) self.api = api @@ -35,7 +36,7 @@ def __init__(self, robot_id, ccu_store, api, task_cls, bidding_rule, allocation_ self.logger.debug("Starting robot %s", self.id) self.stp = STP(robustness) - self.timetable = Timetable.get_timetable(self.ccu_store, self.id, self.stp) + self.timetable = self.db_interface.get_timetable(self.id, self.stp) self.bid_placed = Bid() @@ -50,7 +51,7 @@ def task_announcement_cb(self, msg): self.logger.debug("Robot %s received TASK-ANNOUNCEMENT", self.id) round_id = msg['payload']['round_id'] received_tasks = msg['payload']['tasks'] - self.timetable = Timetable.get_timetable(self.ccu_store, self.id, self.stp) + self.timetable = self.db_interface.get_timetable(self.id, self.stp) self.compute_bids(received_tasks, round_id) def allocation_cb(self, msg): diff --git a/mrs/task_execution/dispatching/dispatcher.py b/mrs/task_execution/dispatching/dispatcher.py index 7e0984ecb..5eff2fdc9 100644 --- a/mrs/task_execution/dispatching/dispatcher.py +++ b/mrs/task_execution/dispatching/dispatcher.py @@ -4,18 +4,18 @@ from ropod.utils.timestamp import TimeStamp as ts from mrs.task_execution.dispatching.scheduler import Scheduler -from mrs.timetable import Timetable from stn.stp import STP from mrs.exceptions.task_allocation import NoSTPSolution from mrs.exceptions.task_execution import InconsistentSchedule from mrs.task import TaskStatus +from mrs.db_interface import DBInterface class Dispatcher(object): def __init__(self, robot_id, ccu_store, task_cls, stp_solver, corrective_measure, freeze_window, api, auctioneer): self.id = robot_id - self.ccu_store = ccu_store + self.db_interface = DBInterface(ccu_store) self.task_cls = task_cls self.stp = STP(stp_solver) self.stp_solver = stp_solver @@ -26,10 +26,10 @@ def __init__(self, robot_id, ccu_store, task_cls, stp_solver, corrective_measure self.scheduler = Scheduler(ccu_store, self.stp) - self.timetable = Timetable.get_timetable(self.ccu_store, self.id, self.stp) + self.timetable = self.db_interface.get_timetable(self.id, self.stp) def run(self): - self.timetable = Timetable.get_timetable(self.ccu_store, self.id, self.stp) + self.timetable = self.db_interface.get_timetable(self.id, self.stp) if self.timetable is not None: task = self.get_earliest_task() if task is not None: @@ -64,7 +64,7 @@ def apply_corrective_measure(self, task): def get_earliest_task(self): task_id = self.timetable.get_earliest_task_id() if task_id: - task_dict = self.ccu_store.get_task(task_id) + task_dict = self.db_interface.get_task(task_id) task = self.task_cls.from_dict(task_dict) return task @@ -84,7 +84,7 @@ def schedule_task(self, task): if self.corrective_measure == 're-allocate': self.request_reallocation(task) - self.timetable = Timetable.get_timetable(self.ccu_store, self.id, self.stp) + self.timetable = self.db_interface.get_timetable(self.id, self.stp) def recompute_timetable(self, task): try: @@ -95,14 +95,9 @@ def recompute_timetable(self, task): except NoSTPSolution: logging.exception("The stp solver could not solve the problem") - self.update_task_status(task, TaskStatus.FAILED) + self.db_interface.update_task_status(task, TaskStatus.FAILED) self.timetable.remove_task() - def update_task_status(self, task, status): - task.status.status = status - logging.debug("Updating task status to %s", task.status.status) - self.ccu_store.update_task(task) - def time_to_dispatch(self): current_time = ts.get_time_stamp() if current_time < self.scheduler.navigation_start_time: @@ -126,13 +121,13 @@ def dispatch(self, task): task_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' task_msg['payload']['task'] = task.to_dict() - self.update_task_status(task, TaskStatus.SHIPPED) + self.db_interface.update_task_status(task, TaskStatus.SHIPPED) self.timetable.remove_task() self.api.publish(task_msg, groups=['ROPOD']) def request_reallocation(self, task): - self.update_task_status(task, TaskStatus.ABORTED) # ABORTED + self.update_task_status(task, TaskStatus.UNALLOCATED) # ABORTED task_msg = dict() task_msg['header'] = dict() task_msg['payload'] = dict() diff --git a/mrs/task_execution/dispatching/scheduler.py b/mrs/task_execution/dispatching/scheduler.py index 434a38b95..91774def3 100644 --- a/mrs/task_execution/dispatching/scheduler.py +++ b/mrs/task_execution/dispatching/scheduler.py @@ -1,12 +1,13 @@ import logging from mrs.exceptions.task_execution import InconsistentSchedule from mrs.task import TaskStatus +from mrs.db_interface import DBInterface class Scheduler(object): def __init__(self, ccu_store, stp): - self.ccu_store = ccu_store + self.db_interface = DBInterface(ccu_store) self.stp = stp self.navigation_start_time = -float('inf') # of scheduled task @@ -32,8 +33,8 @@ def assign_timepoint(self, task, navigation_start, timetable): print("Schedule: ", timetable.schedule) - self.ccu_store.update_timetable(timetable) - self.update_task_status(task, TaskStatus.SCHEDULED) + self.db_interface.update_timetable(timetable) + self.db_interface.update_task_status(task, TaskStatus.SCHEDULED) self.navigation_start_time = navigation_start else: @@ -42,11 +43,6 @@ def assign_timepoint(self, task, navigation_start, timetable): def reallocate(self): pass - def update_task_status(self, task, status): - task.status.status = status - logging.debug("Updating task status to %s", task.status.status) - self.ccu_store.update_task(task) - def reset_schedule(self, timetable): timetable.remove_task() - self.ccu_store.update_timetable(timetable) + self.db_interface.update_timetable(timetable) diff --git a/mrs/timetable.py b/mrs/timetable.py index aad187036..210886f8a 100644 --- a/mrs/timetable.py +++ b/mrs/timetable.py @@ -25,14 +25,6 @@ def __init__(self, stp, robot_id): self.dispatchable_graph = stp.get_stn() self.schedule = stp.get_stn() - @staticmethod - def get_timetable(ccu_store, id, stp): - timetable_dict = ccu_store.get_timetable(id) - if timetable_dict is None: - return - timetable = Timetable.from_dict(timetable_dict, stp) - return timetable - def solve_stp(self): """ Computes the dispatchable graph and robustness metric from the given stn From 994a96755154fefbe1c941c2f5de7ce8a7f6b585 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Fri, 16 Aug 2019 07:50:54 +0200 Subject: [PATCH 14/24] Adding task_monitor and task_allocator task_allocator has an auctioneer and a task_monitor --- config/config.yaml | 11 +++++--- mrs/task_allocator.py | 38 ++++++++++++++++++++++++++ mrs/task_execution/task_monitor.py | 43 ++++++++++++++++++++++++++++++ 3 files changed, 89 insertions(+), 3 deletions(-) create mode 100644 mrs/task_allocator.py create mode 100644 mrs/task_execution/task_monitor.py diff --git a/config/config.yaml b/config/config.yaml index c984d9596..6a2dfad01 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -21,6 +21,8 @@ plugins: task_type: generic_task corrective_measure: re-allocate freeze_window: 300 # seconds + task_monitor: + task_type: ropod_task robot_proxy: api: @@ -70,6 +72,7 @@ api: - TASK-ALLOCATION message_types: # Types of messages the node will listen to. Messages not listed will be ignored - TASK + - TASK-PROGRESS - BID - FINISH-ROUND acknowledge: false @@ -82,11 +85,13 @@ api: method: shout callbacks: - msg_type: 'TASK' - component: '.task_cb' + component: 'auctioneer.task_cb' - msg_type: 'BID' - component: '.bid_cb' + component: 'auctioneer.bid_cb' - msg_type: 'FINISH-ROUND' - component: '.finish_round_cb' + component: 'auctioneer.finish_round_cb' + - msg_type: 'TASK-PROGRESS' + component: 'task_monitor.task_progress_cb' logger: version: 1 diff --git a/mrs/task_allocator.py b/mrs/task_allocator.py new file mode 100644 index 000000000..f09dbe4c4 --- /dev/null +++ b/mrs/task_allocator.py @@ -0,0 +1,38 @@ +import logging +import time + +from fleet_management.config.loader import Config, register_api_callbacks + + +class TaskAllocator(object): + def __init__(self, config_file=None): + self.logger = logging.getLogger('mrs') + self.logger.info("Starting MRS...") + self.config = Config(config_file, initialize=True) + self.config.configure_logger() + self.ccu_store = self.config.ccu_store + self.api = self.config.api + + self.auctioneer = self.config.configure_task_allocator(self.ccu_store) + self.task_monitor = self.config.configure_task_monitor(self.ccu_store) + + register_api_callbacks(self, self.api) + + def run(self): + try: + self.api.start() + while True: + self.api.run() + self.auctioneer.run() + self.task_monitor.run() + time.sleep(0.5) + except (KeyboardInterrupt, SystemExit): + self.logger.info("Terminating %s auctioneer ...") + self.api.shutdown() + logging.info("Exiting...") + + +if __name__ == '__main__': + config_file_path = '../config/config.yaml' + task_allocator = TaskAllocator(config_file_path) + task_allocator.run() diff --git a/mrs/task_execution/task_monitor.py b/mrs/task_execution/task_monitor.py new file mode 100644 index 000000000..dfd52bb7d --- /dev/null +++ b/mrs/task_execution/task_monitor.py @@ -0,0 +1,43 @@ +import logging +import time + +from mrs.db_interface import DBInterface +from mrs.task import TaskStatus + + +class TaskMonitor(object): + def __init__(self, ccu_store, task_cls, api): + self.db_interface = DBInterface(ccu_store) + self.task_cls = task_cls + self.api = api + + def run(self): + pass + + def task_progress_cb(self, msg): + task_id = msg["payload"]["taskId"] + robot_id = msg["payload"]["robotId"] + task_status = msg["payload"]["status"]["taskStatus"] + + logging.debug("Robot %s received TASK-PROGRESS msg of task %s from %s ", task_id, robot_id) + + task_dict = self.db_interface.get_task(task_id) + task = self.task_cls.from_dict(task_dict) + + if task_status == TaskStatus.COMPLETED or \ + task_status == TaskStatus.CANCELED or \ + task_status == TaskStatus.FAILED or \ + task_status == TaskStatus.PREEMPTED: + self.archieve_task(task) + + elif task_status == TaskStatus.ONGOING: + self.check_execution_progress(task) + + def archieve_task(self, task): + # TODO: Update timetable + pass + + def check_execution_progress(self, task): + # TODO: check schedule consistency + pass + From 0cb279369ad8c78a4bed3df1975e2e34ffaee850 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Sat, 17 Aug 2019 13:56:11 +0200 Subject: [PATCH 15/24] requirements: installing mrta_stn with egg=stn --- requirements.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index b52087382..8e34e5b85 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ numpy -git+https://github.com/anenriquez/mrta_stn.git +git+https://github.com/anenriquez/mrta_stn.git@egg=stn From 95a76f01e7b919ecb12b3421352572dc2cbe5b82 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Sun, 18 Aug 2019 14:09:04 +0200 Subject: [PATCH 16/24] Using the msg factory from ropod_common Creating structs for the messages --- config/config.yaml | 8 +- mrs/config/task_factory.py | 2 +- mrs/db_interface.py | 2 +- mrs/robot.py | 4 +- mrs/structs/__init__.py | 0 mrs/structs/allocation.py | 47 +++++++++ mrs/structs/bid.py | 74 +++++++++++++ mrs/{ => structs}/task.py | 0 mrs/{ => structs}/timetable.py | 0 mrs/task_allocation/auctioneer.py | 46 +++------ mrs/task_allocation/bid.py | 103 ------------------- mrs/task_allocation/bidder.py | 57 +++++----- mrs/task_allocation/round.py | 8 +- mrs/task_allocator.py | 5 +- mrs/task_execution/dispatching/dispatcher.py | 4 +- mrs/task_execution/dispatching/scheduler.py | 2 +- mrs/task_execution/task_monitor.py | 3 +- mrs/utils/models.py | 47 +++++++++ tests/allocation_test.py | 2 +- tests/dispatching_test.py | 2 +- tests/fms_integration_test.py | 2 +- 21 files changed, 234 insertions(+), 184 deletions(-) create mode 100644 mrs/structs/__init__.py create mode 100644 mrs/structs/allocation.py create mode 100644 mrs/structs/bid.py rename mrs/{ => structs}/task.py (100%) rename mrs/{ => structs}/timetable.py (100%) delete mode 100644 mrs/task_allocation/bid.py create mode 100644 mrs/utils/models.py diff --git a/config/config.yaml b/config/config.yaml index 6a2dfad01..423e4bb14 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -42,16 +42,16 @@ robot_proxy: acknowledge: false publish: task: - groups: ['ROPOD', 'TASK-ALLOCATION'] # Optional, if not present will shout to all groups msg_type: 'TASK' + groups: ['ROPOD', 'TASK-ALLOCATION'] # Optional, if not present will shout to all groups method: shout bid: - groups: ['TASK-ALLOCATION'] msg_type: 'BID' + groups: ['TASK-ALLOCATION'] method: whisper finish-round: - groups: ['TASK-ALLOCATION'] msg_type: 'FINISH-ROUND' + groups: ['TASK-ALLOCATION'] method: shout callbacks: - msg_type: 'TASK-ANNOUNCEMENT' @@ -78,9 +78,11 @@ api: acknowledge: false publish: task-announcement: + msg_type: 'TASK-ANNOUNCEMENT' groups: ['TASK-ALLOCATION'] method: shout allocation: + msg_type: 'ALLOCATION' groups: ['TASK-ALLOCATION'] method: shout callbacks: diff --git a/mrs/config/task_factory.py b/mrs/config/task_factory.py index 1b78b47da..09edac075 100644 --- a/mrs/config/task_factory.py +++ b/mrs/config/task_factory.py @@ -20,7 +20,7 @@ def initialize(self): ropod_task_cls = getattr(import_module('ropod.structs.task'), 'Task') self.register_task_cls('ropod_task', ropod_task_cls) - generic_task_cls = getattr(import_module('mrs.task'), 'Task') + generic_task_cls = getattr(import_module('mrs.structs.task'), 'Task') self.register_task_cls('generic_task', generic_task_cls) task_request_cls = getattr(import_module('ropod.structs.task'), 'TaskRequest') diff --git a/mrs/db_interface.py b/mrs/db_interface.py index 68664667b..69bb020d0 100644 --- a/mrs/db_interface.py +++ b/mrs/db_interface.py @@ -1,6 +1,6 @@ import logging -from mrs.timetable import Timetable +from mrs.structs.timetable import Timetable class DBInterface(object): diff --git a/mrs/robot.py b/mrs/robot.py index 94b08221c..a432ffe52 100644 --- a/mrs/robot.py +++ b/mrs/robot.py @@ -33,7 +33,7 @@ def run(self): if __name__ == '__main__': - from fleet_management.config.loader import Config, register_api_callbacks + from fleet_management.config.loader import Config config_file_path = '../config/config.yaml' config = Config(config_file_path, initialize=False) @@ -47,7 +47,7 @@ def run(self): robot = config.configure_robot_proxy(robot_id, ccu_store, dispatcher=True) - register_api_callbacks(robot, robot.api) + robot.api.register_callbacks(robot) robot.run() diff --git a/mrs/structs/__init__.py b/mrs/structs/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/mrs/structs/allocation.py b/mrs/structs/allocation.py new file mode 100644 index 000000000..cec440c8c --- /dev/null +++ b/mrs/structs/allocation.py @@ -0,0 +1,47 @@ +from ropod.utils.uuid import generate_uuid + + +class TaskAnnouncement(object): + def __init__(self, tasks=[], round_id=''): + """ + Struct with a list of tasks + :param tasks: list of tasks + """ + self.tasks = tasks + if not round_id: + self.round_id = generate_uuid() + else: + self.round_id = round_id + + def to_dict(self): + task_annoucement_dict = dict() + task_annoucement_dict['tasks'] = dict() + + for task in self.tasks: + task_annoucement_dict['tasks'][task.id] = task.to_dict() + + task_annoucement_dict['round_id'] = self.round_id + + return task_annoucement_dict + + +class Allocation(object): + def __init__(self, task_id, robot_id): + self.task_id = task_id + self.robot_id = robot_id + + def to_dict(self): + allocation_dict = dict() + allocation_dict['task_id'] = self.task_id + allocation_dict['robot_id'] = self.robot_id + return allocation_dict + + +class FinishRound(object): + def __init__(self, robot_id): + self.robot_id = robot_id + + def to_dict(self): + finish_round = dict() + finish_round['robot_id'] = self.robot_id + return finish_round diff --git a/mrs/structs/bid.py b/mrs/structs/bid.py new file mode 100644 index 000000000..a87fc370f --- /dev/null +++ b/mrs/structs/bid.py @@ -0,0 +1,74 @@ +import logging +from mrs.utils.uuid import generate_uuid +from mrs.structs.task import Task + + +class Bid(object): + def __init__(self, bidding_rule=None, robot_id='', round_id='', task=Task(), timetable=None, + **kwargs): + self.bidding_rule = bidding_rule + self.robot_id = robot_id + self.round_id = round_id + self.task = task + self.timetable = timetable + self.cost = kwargs.get('cost', float('inf')) + self.position = kwargs.get('position', 0) + self.hard_constraints = kwargs.get('hard_constraints', True) + self.alternative_start_time = None + + def __repr__(self): + return str(self.to_dict()) + + def __lt__(self, other): + if other is None: + return False + return self.cost < other.cost + + def __eq__(self, other): + if other is None: + return False + return self.cost == other.cost + + def compute_cost(self, position): + dispatchable_graph = self.timetable.dispatchable_graph + robustness_metric = self.timetable.robustness_metric + self.position = position + + if self.task.hard_constraints: + self.cost = self.bidding_rule.compute_bid_cost(dispatchable_graph, robustness_metric) + + else: # soft constraints + navigation_start_time = dispatchable_graph.get_task_navigation_start_time(self.task.id) + logging.debug("Navigation start time: %s", navigation_start_time) + self.cost = abs(navigation_start_time - self.task.earliest_start_time) + alternative_start_time = navigation_start_time + self.hard_constraints = False + self.alternative_start_time = alternative_start_time + + logging.debug("Cost: %s", self.cost) + + def to_dict(self): + bid_dict = dict() + bid_dict['cost'] = self.cost + bid_dict['robot_id'] = self.robot_id + bid_dict['task_id'] = self.task.id + bid_dict['position'] = self.position + bid_dict['round_id'] = self.round_id + bid_dict['hard_constraints'] = self.hard_constraints + bid_dict['alternative_start_time'] = self.alternative_start_time + return bid_dict + + @classmethod + def from_dict(cls, bid_dict): + bid = cls() + bid.cost = bid_dict['cost'] + bid.robot_id = bid_dict['robot_id'] + bid.task_id = bid_dict['task_id'] + bid.position = bid_dict['position'] + bid.round_id = bid_dict['round_id'] + bid.hard_constraints = bid_dict['hard_constraints'] + bid.alternative_start_time = bid_dict['alternative_start_time'] + return bid + + + diff --git a/mrs/task.py b/mrs/structs/task.py similarity index 100% rename from mrs/task.py rename to mrs/structs/task.py diff --git a/mrs/timetable.py b/mrs/structs/timetable.py similarity index 100% rename from mrs/timetable.py rename to mrs/structs/timetable.py diff --git a/mrs/task_allocation/auctioneer.py b/mrs/task_allocation/auctioneer.py index d98bcf669..0b4a07eec 100644 --- a/mrs/task_allocation/auctioneer.py +++ b/mrs/task_allocation/auctioneer.py @@ -4,12 +4,13 @@ import logging.config from datetime import timedelta from mrs.task_allocation.round import Round -from mrs.timetable import Timetable +from mrs.structs.timetable import Timetable from stn.stp import STP from mrs.exceptions.task_allocation import NoAllocation from mrs.exceptions.task_allocation import AlternativeTimeSlot -from mrs.task import TaskStatus +from mrs.structs.task import TaskStatus from mrs.db_interface import DBInterface +from mrs.structs.allocation import TaskAnnouncement, Allocation """ Implements a variation of the the TeSSI algorithm using the bidding_rule specified in the config file @@ -138,25 +139,16 @@ def announce_task(self): logging.info("Starting round: %s", self.round.id) logging.info("Number of tasks to allocate: %s", len(self.tasks_to_allocate)) - # Create task announcement message that contains all unallocated tasks - task_announcement = dict() - task_announcement['header'] = dict() - task_announcement['payload'] = dict() - task_announcement['header']['type'] = 'TASK-ANNOUNCEMENT' - task_announcement['header']['metamodel'] = 'ropod-msg-schema.json' - task_announcement['header']['msgId'] = str(uuid.uuid4()) - task_announcement['header']['timestamp'] = int(round(time.time()) * 1000) - task_announcement['payload']['metamodel'] = 'ropod-task-announcement-schema.json' - task_announcement['payload']['round_id'] = self.round.id - task_announcement['payload']['tasks'] = dict() - - for task_id, task in self.tasks_to_allocate.items(): - task_announcement['payload']['tasks'][task.id] = task.to_dict() + tasks = list(self.tasks_to_allocate.values()) + task_annoucement = TaskAnnouncement(tasks, self.round.id) + msg = self.api.create_message(task_annoucement) + + logging.debug('task annoucement msg: %s', msg) logging.debug("Auctioneer announces tasks %s", [task_id for task_id, task in self.tasks_to_allocate.items()]) self.round.start() - self.api.publish(task_announcement, groups=['TASK-ALLOCATION']) + self.api.publish(msg, groups=['TASK-ALLOCATION']) def task_cb(self, msg): task_dict = msg['payload']['task'] @@ -164,28 +156,16 @@ def task_cb(self, msg): self.add_task(task) def bid_cb(self, msg): - bid = msg['payload']['bid'] + bid = msg['payload'] self.round.process_bid(bid) def finish_round_cb(self, msg): self.round.finish() def announce_winner(self, task_id, robot_id): - - allocation = dict() - allocation['header'] = dict() - allocation['payload'] = dict() - allocation['header']['type'] = 'ALLOCATION' - allocation['header']['metamodel'] = 'ropod-msg-schema.json' - allocation['header']['msgId'] = str(uuid.uuid4()) - allocation['header']['timestamp'] = int(round(time.time()) * 1000) - - allocation['payload']['metamodel'] = 'ropod-mrs-schema.json' - allocation['payload']['task_id'] = task_id - allocation['payload']['winner_id'] = robot_id - - logging.debug("Accouncing winner...") - self.api.publish(allocation, groups=['TASK-ALLOCATION']) + allocation = Allocation(task_id, robot_id) + msg = self.api.create_message(allocation) + self.api.publish(msg, groups=['TASK-ALLOCATION']) def get_task_schedule(self, task_id, robot_id): # For now, returning the start navigation time from the dispatchable graph diff --git a/mrs/task_allocation/bid.py b/mrs/task_allocation/bid.py deleted file mode 100644 index bac24fe39..000000000 --- a/mrs/task_allocation/bid.py +++ /dev/null @@ -1,103 +0,0 @@ -import logging -from mrs.utils.uuid import generate_uuid - - -class Bid(object): - - def __init__(self, bidding_rule=None, robot_id='', round_id='', task=None, timetable=None, cost=float('inf')): - self.bidding_rule = bidding_rule - self.robot_id = robot_id - self.round_id = round_id - self.task = task - self.cost = cost - self.timetable = timetable - if not task: - task_id = generate_uuid() - else: - task_id = task.id - self.msg = BidMsg(cost, robot_id, task_id) - - def __repr__(self): - return str(self.msg.to_dict()) - - def __lt__(self, other): - if other is None: - return False - return self.cost < other.cost - - def __eq__(self, other): - if other is None: - return False - return self.cost == other.cost - - def compute_cost(self, position): - dispatchable_graph = self.timetable.dispatchable_graph - robustness_metric = self.timetable.robustness_metric - - if self.task.hard_constraints: - self.cost = self.bidding_rule.compute_bid_cost(dispatchable_graph, robustness_metric) - self.msg = BidMsg(self.cost, self.robot_id, self.task.id, position, self.round_id) - - else: # soft constraints - navigation_start_time = dispatchable_graph.get_task_navigation_start_time(self.task.id) - logging.debug("Navigation start time: %s", navigation_start_time) - self.cost = abs(navigation_start_time - self.task.earliest_start_time) - alternative_start_time = navigation_start_time - self.msg = BidMsg(self.cost, self.robot_id, self.task.id, position, self.round_id, - hard_constraints=False, alternative_start_time=alternative_start_time) - - logging.debug("Cost: %s", self.cost) - - -class BidMsg(object): - def __init__(self, cost=float('inf'), robot_id='', task_id='', position=0, - round_id='', **kwargs): - self.cost = cost - self.robot_id = robot_id - self.task_id = task_id - self.position = position - self.round_id = round_id - - self.hard_constraints = kwargs.get('hard_constraints', True) - self.alternative_start_time = kwargs.get('alternative_start_time') - - def __repr__(self): - return str(self.to_dict()) - - def __lt__(self, other): - if other is None: - return False - return self.cost < other.cost - - def __eq__(self, other): - if other is None: - return False - return self.cost == other.cost - - def to_dict(self): - bid_msg_dict = dict() - bid_msg_dict['cost'] = self.cost - bid_msg_dict['robot_id'] = self.robot_id - bid_msg_dict['task_id'] = self.task_id - bid_msg_dict['position'] = self.position - bid_msg_dict['round_id'] = self.round_id - bid_msg_dict['hard_constraints'] = self.hard_constraints - bid_msg_dict['alternative_start_time'] = self.alternative_start_time - return bid_msg_dict - - @classmethod - def from_dict(cls, bid_msg_dict): - bid_msg = cls() - bid_msg.cost = bid_msg_dict['cost'] - bid_msg.robot_id = bid_msg_dict['robot_id'] - bid_msg.task_id = bid_msg_dict['task_id'] - bid_msg.position = bid_msg_dict['position'] - bid_msg.round_id = bid_msg_dict['round_id'] - bid_msg.hard_constraints = bid_msg_dict['hard_constraints'] - bid_msg.alternative_start_time = bid_msg_dict['alternative_start_time'] - return bid_msg - - - - - diff --git a/mrs/task_allocation/bidder.py b/mrs/task_allocation/bidder.py index 187407456..7ca66934e 100644 --- a/mrs/task_allocation/bidder.py +++ b/mrs/task_allocation/bidder.py @@ -4,11 +4,11 @@ import logging.config from stn.stp import STP -from mrs.task_allocation.bid import Bid +from mrs.structs.bid import Bid from mrs.task_allocation.bidding_rule import BiddingRule -from mrs.timetable import Timetable from mrs.exceptions.task_allocation import NoSTPSolution from mrs.db_interface import DBInterface +from mrs.structs.allocation import FinishRound """ Implements a variation of the the TeSSI algorithm using the bidding_rule specified in the config file @@ -57,7 +57,7 @@ def task_announcement_cb(self, msg): def allocation_cb(self, msg): self.logger.debug("Robot %s received ALLOCATION", self.id) task_id = msg['payload']['task_id'] - winner_id = msg['payload']['winner_id'] + winner_id = msg['payload']['robot_id'] if winner_id == self.id: self.allocate_to_robot(task_id) @@ -172,23 +172,26 @@ def send_bid(self, bid): :param round_id: :return: """ - self.logger.debug("Bid %s", bid.msg.to_dict()) - - bid_msg = dict() - bid_msg['header'] = dict() - bid_msg['payload'] = dict() - bid_msg['header']['type'] = 'BID' - bid_msg['header']['metamodel'] = 'ropod-msg-schema.json' - bid_msg['header']['msgId'] = str(uuid.uuid4()) - bid_msg['header']['timestamp'] = int(round(time.time()) * 1000) - - bid_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' - bid_msg['payload']['bid'] = bid.msg.to_dict() + self.logger.debug("Bid %s", bid.to_dict()) + + # bid_msg = dict() + # bid_msg['header'] = dict() + # bid_msg['payload'] = dict() + # bid_msg['header']['type'] = 'BID' + # bid_msg['header']['metamodel'] = 'ropod-msg-schema.json' + # bid_msg['header']['msgId'] = str(uuid.uuid4()) + # bid_msg['header']['timestamp'] = int(round(time.time()) * 1000) + # + # bid_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' + # bid_msg['payload']['bid'] = bid.to_dict() + msg = self.api.create_message(bid) + + self.logger.debug("Bid msg %s", msg) tasks = [task for task in bid.timetable.get_tasks()] self.logger.info("Round %s: robod_id %s bids %s for task %s and tasks %s", bid.round_id, self.id, bid.cost, bid.task.id, tasks) - self.api.publish(bid_msg, peer=self.auctioneer) + self.api.publish(msg, peer=self.auctioneer) def allocate_to_robot(self, task_id): @@ -204,16 +207,18 @@ def allocate_to_robot(self, task_id): self.logger.debug("Tasks allocated to robot %s:%s", self.id, tasks) def send_finish_round(self): - close_msg = dict() - close_msg['header'] = dict() - close_msg['payload'] = dict() - close_msg['header']['type'] = 'FINISH-ROUND' - close_msg['header']['metamodel'] = 'ropod-msg-schema.json' - close_msg['header']['msgId'] = str(uuid.uuid4()) - close_msg['header']['timestamp'] = int(round(time.time()) * 1000) - close_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' - close_msg['payload']['robot_id'] = self.id + # close_msg = dict() + # close_msg['header'] = dict() + # close_msg['payload'] = dict() + # close_msg['header']['type'] = 'FINISH-ROUND' + # close_msg['header']['metamodel'] = 'ropod-msg-schema.json' + # close_msg['header']['msgId'] = str(uuid.uuid4()) + # close_msg['header']['timestamp'] = int(round(time.time()) * 1000) + # close_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' + # close_msg['payload']['robot_id'] = self.id + finish_round = FinishRound(self.id) + msg = self.api.create_message(finish_round) self.logger.info("Robot %s sends close round msg ", self.id) - self.api.publish(close_msg) + self.api.publish(msg, groups=['TASK-ALLOCATION']) diff --git a/mrs/task_allocation/round.py b/mrs/task_allocation/round.py index 987afc4ed..66d6a73e5 100644 --- a/mrs/task_allocation/round.py +++ b/mrs/task_allocation/round.py @@ -1,7 +1,7 @@ from mrs.utils.uuid import generate_uuid import logging from ropod.utils.timestamp import TimeStamp as ts -from mrs.task_allocation.bid import BidMsg +from mrs.structs.bid import Bid import copy from mrs.exceptions.task_allocation import NoAllocation from mrs.exceptions.task_allocation import AlternativeTimeSlot @@ -47,8 +47,8 @@ def start(self): self.finished = False self.opened = True - def process_bid(self, bid_dict): - bid = BidMsg.from_dict(bid_dict) + def process_bid(self, bid_msg): + bid = Bid.from_dict(bid_msg) logging.debug("Processing bid from robot %s, cost: %s", bid.robot_id, bid.cost) @@ -144,7 +144,7 @@ def elect_winner(self): value - list of robots assigned to the task """ - lowest_bid = BidMsg() + lowest_bid = Bid() for task_id, bid in self.received_bids.items(): if bid < lowest_bid: diff --git a/mrs/task_allocator.py b/mrs/task_allocator.py index f09dbe4c4..a098160ee 100644 --- a/mrs/task_allocator.py +++ b/mrs/task_allocator.py @@ -1,7 +1,7 @@ import logging import time -from fleet_management.config.loader import Config, register_api_callbacks +from fleet_management.config.loader import Config class TaskAllocator(object): @@ -16,7 +16,7 @@ def __init__(self, config_file=None): self.auctioneer = self.config.configure_task_allocator(self.ccu_store) self.task_monitor = self.config.configure_task_monitor(self.ccu_store) - register_api_callbacks(self, self.api) + self.api.register_callbacks(self) def run(self): try: @@ -24,7 +24,6 @@ def run(self): while True: self.api.run() self.auctioneer.run() - self.task_monitor.run() time.sleep(0.5) except (KeyboardInterrupt, SystemExit): self.logger.info("Terminating %s auctioneer ...") diff --git a/mrs/task_execution/dispatching/dispatcher.py b/mrs/task_execution/dispatching/dispatcher.py index 5eff2fdc9..b0dafc1a8 100644 --- a/mrs/task_execution/dispatching/dispatcher.py +++ b/mrs/task_execution/dispatching/dispatcher.py @@ -7,7 +7,7 @@ from stn.stp import STP from mrs.exceptions.task_allocation import NoSTPSolution from mrs.exceptions.task_execution import InconsistentSchedule -from mrs.task import TaskStatus +from mrs.structs.task import TaskStatus from mrs.db_interface import DBInterface @@ -127,7 +127,7 @@ def dispatch(self, task): self.api.publish(task_msg, groups=['ROPOD']) def request_reallocation(self, task): - self.update_task_status(task, TaskStatus.UNALLOCATED) # ABORTED + self.db_interface.update_task_status(task, TaskStatus.UNALLOCATED) # ABORTED task_msg = dict() task_msg['header'] = dict() task_msg['payload'] = dict() diff --git a/mrs/task_execution/dispatching/scheduler.py b/mrs/task_execution/dispatching/scheduler.py index 91774def3..cc7f902af 100644 --- a/mrs/task_execution/dispatching/scheduler.py +++ b/mrs/task_execution/dispatching/scheduler.py @@ -1,6 +1,6 @@ import logging from mrs.exceptions.task_execution import InconsistentSchedule -from mrs.task import TaskStatus +from mrs.structs.task import TaskStatus from mrs.db_interface import DBInterface diff --git a/mrs/task_execution/task_monitor.py b/mrs/task_execution/task_monitor.py index dfd52bb7d..ad1bce90d 100644 --- a/mrs/task_execution/task_monitor.py +++ b/mrs/task_execution/task_monitor.py @@ -1,8 +1,7 @@ import logging -import time from mrs.db_interface import DBInterface -from mrs.task import TaskStatus +from mrs.structs.task import TaskStatus class TaskMonitor(object): diff --git a/mrs/utils/models.py b/mrs/utils/models.py new file mode 100644 index 000000000..e5d37d160 --- /dev/null +++ b/mrs/utils/models.py @@ -0,0 +1,47 @@ +from ropod.utils.timestamp import TimeStamp as ts +from ropod.utils.uuid import generate_uuid +meta_model_template = 'ropod-%s-schema.json' +from mrs.structs.allocation import Tasks + + +class MessageFactory(object): + # def __init__(self): + # self._msgs = {} + # + # def register_msg(self, msg_type, msg_struct): + # self._msgs[msg_type] = msg_struct + # + # def get_msg_struct(self, msg_type): + # msg_struct = self._msgs.get(msg_type) + # if not msg_struct: + # raise ValueError(msg_type) + # return msg_struct + + def create_msg(self, msg_type, contents_dict, recipients=[]): + # msg_struct = self.get_msg_struct(msg_type) + # msg = msg_struct(**contents) + + msg = self.get_header(msg_type, recipients=recipients) + payload = self.get_payload(contents_dict, msg_type.lower()) + msg.update(payload) + return msg + + + @staticmethod + def get_header(msg_type, meta_model='msg', recipients=[]): + if recipients is not None and not isinstance(recipients, list): + raise Exception("Recipients must be a list of strings") + + return {"header": {'type': msg_type, + 'metamodel': 'ropod-%s-schema.json' % meta_model, + 'msgId': generate_uuid(), + 'timestamp': ts.get_time_stamp(), + 'receiverIds': recipients}} + + @staticmethod + def get_payload(contents_dict, model): + # payload = contents.to_dict() + metamodel = meta_model_template % model + contents_dict.update(metamodel=metamodel) + return {"payload": contents_dict} + diff --git a/tests/allocation_test.py b/tests/allocation_test.py index 11fd09614..58b189d2f 100644 --- a/tests/allocation_test.py +++ b/tests/allocation_test.py @@ -50,7 +50,7 @@ def receive_msg_cb(self, msg_content): if msg['header']['type'] == 'ALLOCATION': self.logger.debug("Received allocation message") task_id = msg['payload']['task_id'] - winner_id = msg['payload']['winner_id'] + winner_id = msg['payload']['robot_id'] allocation = (task_id, [winner_id]) self.allocations.append(allocation) logging.debug("Receiving allocation %s", allocation) diff --git a/tests/dispatching_test.py b/tests/dispatching_test.py index 364d66252..9088c759e 100644 --- a/tests/dispatching_test.py +++ b/tests/dispatching_test.py @@ -4,7 +4,7 @@ from fleet_management.config.loader import Config from mrs.utils.datasets import load_yaml_dataset -from mrs.timetable import Timetable +from mrs.structs.timetable import Timetable class TestDispatcher(object): diff --git a/tests/fms_integration_test.py b/tests/fms_integration_test.py index 70a2738dd..07db40ea1 100644 --- a/tests/fms_integration_test.py +++ b/tests/fms_integration_test.py @@ -3,7 +3,7 @@ from fleet_management.config.loader import Config from mrs.utils.datasets import load_yaml_dataset -from mrs.timetable import Timetable +from mrs.structs.timetable import Timetable class TaskAllocator(object): From ff13d6a5403b77e2e973e075f999b9b1ad54298f Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Mon, 19 Aug 2019 07:22:38 +0200 Subject: [PATCH 17/24] db_interface: Adding methods get_tasks and remove_task update_task and update_timetable add an entry if the entry does not exist in the db --- mrs/db_interface.py | 33 +++++++++++++++++++++++++++++++-- 1 file changed, 31 insertions(+), 2 deletions(-) diff --git a/mrs/db_interface.py b/mrs/db_interface.py index 69bb020d0..b1625d0cd 100644 --- a/mrs/db_interface.py +++ b/mrs/db_interface.py @@ -19,7 +19,29 @@ def update_task(self, task): """ collection = self.ccu_store.db['tasks'] task_dict = task.to_dict() - collection.replace_one({'id': task.id}, task_dict) + + found_dict = collection.find_one({'id': task_dict['id']}) + + if found_dict is None: + collection.insert(task_dict) + else: + collection.replace_one({'id': task.id}, task_dict) + + def get_tasks(self): + """ Returns a dictionary with the tasks in the "tasks" collection + + """ + collection = self.ccu_store.db['tasks'] + tasks_dict = dict() + for task in collection.find(): + tasks_dict[task['id']] = task + return tasks_dict + + def remove_task(self, task_id): + """ Removes task with task_id from the collection "tasks" + """ + collection = self.ccu_store.db['tasks'] + collection.delete_one({'id': task_id}) def update_task_status(self, task, status): task.status.status = status @@ -49,7 +71,13 @@ def update_timetable(self, timetable): collection = self.ccu_store.db['timetables'] timetable_dict = timetable.to_dict() robot_id = timetable.robot_id - collection.replace_one({'robot_id': robot_id}, timetable_dict) + + found_dict = collection.find_one({'robot_id': robot_id}) + + if found_dict is None: + collection.insert(timetable_dict) + else: + collection.replace_one({'robot_id': robot_id}, timetable_dict) def get_timetable(self, robot_id, stp): collection = self.ccu_store.db['timetables'] @@ -59,3 +87,4 @@ def get_timetable(self, robot_id, stp): return timetable = Timetable.from_dict(timetable_dict, stp) return timetable + From 7494a134416eb3c95b6210f8a5d2487db42e8094 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Mon, 19 Aug 2019 07:27:27 +0200 Subject: [PATCH 18/24] Components (auctioneer, bidder, dispatcher) read the timetable from the ccu_store when initialized allocation_test resets the timetables and tasks in the ccu_store --- mrs/structs/task.py | 4 ++- mrs/task_allocation/auctioneer.py | 7 +++-- mrs/task_allocation/bidder.py | 26 ++++-------------- mrs/task_execution/dispatching/dispatcher.py | 6 +++- requirements.txt | 2 +- tests/allocation_test.py | 29 ++++++++++++++++++-- tests/data/non_overlapping_1.yaml | 5 ++++ 7 files changed, 52 insertions(+), 27 deletions(-) diff --git a/mrs/structs/task.py b/mrs/structs/task.py index 7905068d2..8db3b52a1 100644 --- a/mrs/structs/task.py +++ b/mrs/structs/task.py @@ -76,13 +76,14 @@ class TaskStatus(object): def __init__(self, task_id=''): self.task_id = task_id - self.delayed = False self.status = self.UNALLOCATED + self.delayed = False def to_dict(self): task_dict = dict() task_dict['task_id'] = self.task_id task_dict['status'] = self.status + task_dict['delayed'] = self.delayed return task_dict @staticmethod @@ -91,6 +92,7 @@ def from_dict(status_dict): status = TaskStatus(task_id) status.task_id = task_id status.status = status_dict['status'] + status.status = status_dict['delayed'] return status @staticmethod diff --git a/mrs/task_allocation/auctioneer.py b/mrs/task_allocation/auctioneer.py index 0b4a07eec..012c7a0e8 100644 --- a/mrs/task_allocation/auctioneer.py +++ b/mrs/task_allocation/auctioneer.py @@ -37,9 +37,12 @@ def __init__(self, robot_ids, ccu_store, api, stp_solver, task_cls, allocation_m # TODO: Inititalize the timetables in the loader? and read the timetables here self.timetables = dict() for robot_id in robot_ids: - timetable = Timetable(self.stp, robot_id) + timetable = self.db_interface.get_timetable(robot_id, self.stp) + if timetable is None: + timetable = Timetable(self.stp, robot_id) + self.db_interface.add_timetable(timetable) + self.timetables[robot_id] = timetable - self.db_interface.add_timetable(timetable) self.tasks_to_allocate = dict() self.allocations = list() diff --git a/mrs/task_allocation/bidder.py b/mrs/task_allocation/bidder.py index 7ca66934e..1d00ad143 100644 --- a/mrs/task_allocation/bidder.py +++ b/mrs/task_allocation/bidder.py @@ -9,6 +9,7 @@ from mrs.exceptions.task_allocation import NoSTPSolution from mrs.db_interface import DBInterface from mrs.structs.allocation import FinishRound +from mrs.structs.timetable import Timetable """ Implements a variation of the the TeSSI algorithm using the bidding_rule specified in the config file @@ -36,7 +37,10 @@ def __init__(self, robot_id, ccu_store, api, task_cls, bidding_rule, allocation_ self.logger.debug("Starting robot %s", self.id) self.stp = STP(robustness) - self.timetable = self.db_interface.get_timetable(self.id, self.stp) + timetable = self.db_interface.get_timetable(self.id, self.stp) + if timetable is None: + timetable = Timetable(self.stp, robot_id) + self.timetable = timetable self.bid_placed = Bid() @@ -174,16 +178,6 @@ def send_bid(self, bid): """ self.logger.debug("Bid %s", bid.to_dict()) - # bid_msg = dict() - # bid_msg['header'] = dict() - # bid_msg['payload'] = dict() - # bid_msg['header']['type'] = 'BID' - # bid_msg['header']['metamodel'] = 'ropod-msg-schema.json' - # bid_msg['header']['msgId'] = str(uuid.uuid4()) - # bid_msg['header']['timestamp'] = int(round(time.time()) * 1000) - # - # bid_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' - # bid_msg['payload']['bid'] = bid.to_dict() msg = self.api.create_message(bid) self.logger.debug("Bid msg %s", msg) @@ -197,6 +191,7 @@ def allocate_to_robot(self, task_id): # Update the timetable self.timetable = copy.deepcopy(self.bid_placed.timetable) + self.db_interface.update_timetable(self.timetable) self.logger.info("Robot %s allocated task %s", self.id, task_id) self.logger.debug("STN %s", self.timetable.stn) @@ -207,15 +202,6 @@ def allocate_to_robot(self, task_id): self.logger.debug("Tasks allocated to robot %s:%s", self.id, tasks) def send_finish_round(self): - # close_msg = dict() - # close_msg['header'] = dict() - # close_msg['payload'] = dict() - # close_msg['header']['type'] = 'FINISH-ROUND' - # close_msg['header']['metamodel'] = 'ropod-msg-schema.json' - # close_msg['header']['msgId'] = str(uuid.uuid4()) - # close_msg['header']['timestamp'] = int(round(time.time()) * 1000) - # close_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' - # close_msg['payload']['robot_id'] = self.id finish_round = FinishRound(self.id) msg = self.api.create_message(finish_round) diff --git a/mrs/task_execution/dispatching/dispatcher.py b/mrs/task_execution/dispatching/dispatcher.py index b0dafc1a8..95720f674 100644 --- a/mrs/task_execution/dispatching/dispatcher.py +++ b/mrs/task_execution/dispatching/dispatcher.py @@ -9,6 +9,7 @@ from mrs.exceptions.task_execution import InconsistentSchedule from mrs.structs.task import TaskStatus from mrs.db_interface import DBInterface +from mrs.structs.timetable import Timetable class Dispatcher(object): @@ -26,7 +27,10 @@ def __init__(self, robot_id, ccu_store, task_cls, stp_solver, corrective_measure self.scheduler = Scheduler(ccu_store, self.stp) - self.timetable = self.db_interface.get_timetable(self.id, self.stp) + timetable = self.db_interface.get_timetable(self.id, self.stp) + if timetable is None: + timetable = Timetable(self.stp, robot_id) + self.timetable = timetable def run(self): self.timetable = self.db_interface.get_timetable(self.id, self.stp) diff --git a/requirements.txt b/requirements.txt index 8e34e5b85..f921b6792 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ numpy -git+https://github.com/anenriquez/mrta_stn.git@egg=stn +git+https://github.com/anenriquez/mrta_stn.git#egg=stn diff --git a/tests/allocation_test.py b/tests/allocation_test.py index 58b189d2f..7dbd444ae 100644 --- a/tests/allocation_test.py +++ b/tests/allocation_test.py @@ -1,9 +1,14 @@ +import logging import time import uuid -import logging -from ropod.pyre_communicator.base_class import RopodPyre from fleet_management.config.loader import Config +from fleet_management.db.ccu_store import CCUStore +from ropod.pyre_communicator.base_class import RopodPyre +from stn.stp import STP + +from mrs.db_interface import DBInterface +from mrs.structs.timetable import Timetable from mrs.utils.datasets import load_yaml_dataset @@ -15,13 +20,31 @@ def __init__(self, config_file): super().__init__(zyre_config, acknowledge=False) config = Config(config_file, initialize=False) + ccu_store = CCUStore('ropod_ccu_store') + self.db_interface = DBInterface(ccu_store) allocator_config = config.config_params.get("plugins").get("task_allocation") + stp_solver = allocator_config.get('bidding_rule').get('robustness') + self.stp = STP(stp_solver) + self.robot_ids = config.config_params.get('resources').get('fleet') + self.auctioneer_name = allocator_config.get('auctioneer') self.allocations = list() self.n_tasks = 0 self.terminated = False + def reset_timetables(self): + logging.info("Resetting timetables") + for robot_id in self.robot_ids: + timetable = Timetable(self.stp, robot_id) + self.db_interface.update_timetable(timetable) + + def reset_tasks(self): + logging.info("Resetting tasks") + tasks_dict = self.db_interface.get_tasks() + for task_id, task_info in tasks_dict.items(): + self.db_interface.remove_task(task_id) + def allocate(self, tasks): self.n_tasks = len(tasks) for task in tasks: @@ -76,6 +99,8 @@ def check_termination_test(self): try: time.sleep(5) + test.reset_timetables() + test.reset_tasks() test.allocate(tasks) start_time = time.time() while not test.terminated and start_time + timeout_duration > time.time(): diff --git a/tests/data/non_overlapping_1.yaml b/tests/data/non_overlapping_1.yaml index daec77133..36a638643 100644 --- a/tests/data/non_overlapping_1.yaml +++ b/tests/data/non_overlapping_1.yaml @@ -15,6 +15,7 @@ tasks: status: status: 1 task_id: 002767db-01f3-4e0c-af4e-52c4d81bd713 + delayed: false 173d27eb-d0a4-4b94-986b-1e9b8eca0579: earliest_start_time: 3197.08 estimated_duration: 6.84 @@ -26,6 +27,7 @@ tasks: status: status: 1 task_id: 173d27eb-d0a4-4b94-986b-1e9b8eca0579 + delayed: false 4076f78b-16d8-4057-a1ad-d16d239764fa: earliest_start_time: 479.33 estimated_duration: 2.74 @@ -37,6 +39,7 @@ tasks: status: status: 1 task_id: 002767db-01f3-4e0c-af4e-52c4d81bd713 + delayed: false 6d17963b-10b5-4143-b279-84d8e93aaaa3: earliest_start_time: 3642.88 estimated_duration: 2.3 @@ -48,6 +51,7 @@ tasks: status: status: 1 task_id: 6d17963b-10b5-4143-b279-84d8e93aaaa3 + delayed: false 73ace543-260e-474a-9112-3abd470ed5b5: earliest_start_time: 2503.91 estimated_duration: 8.24 @@ -59,4 +63,5 @@ tasks: status: status: 1 task_id: 73ace543-260e-474a-9112-3abd470ed5b5 + delayed: false upper_bound: 300 From 89558de8de760f60cc3a53b291fba31d95dd2e22 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Mon, 19 Aug 2019 07:56:16 +0200 Subject: [PATCH 19/24] Using task_allocator and allocation_test with docker_compose and travis --- .travis.yml | 1 + README.md | 14 ++++++++++---- docker-compose.yml | 12 +++++++++++- 3 files changed, 22 insertions(+), 5 deletions(-) diff --git a/.travis.yml b/.travis.yml index 9e2495959..007860e7f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -11,6 +11,7 @@ before_install: script: - docker-compose up -d robot + - docker-compose up -d task_allocator - docker-compose up --exit-code-from task_allocation_test after_script: - docker stop $(docker ps -aq) diff --git a/README.md b/README.md index dae5d4026..ad1675607 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,8 @@ docker-compose build task_allocation_test docker-compose up -d robot +docker-compose up -d task_allocator + docker-compose up task_allocation_test @@ -60,8 +62,6 @@ Install the repositories - [ropod_common](https://github.com/ropod-project/ropod_common) -- [mrta_datasets](https://github.com/anenriquez/mrta_datasets.git ) - Get the requirements: ``` @@ -74,15 +74,21 @@ Add the task_allocation to your `PYTHONPATH` by running: pip3 install --user -e . ``` -Go to `/allocation` and run in a terminal +Go to `/mrs` and run in a terminal ``` python3 robot.py ropod_001 ``` +Run in another terminal + +``` +python3 task_allocator.py +``` + Go to `/tests` and run test in another terminal ``` -python3 task_allocator.py three_tasks.csv +python3 allocation_test.py ``` ## References diff --git a/docker-compose.yml b/docker-compose.yml index ed531fe8f..992ab65fc 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -10,16 +10,26 @@ services: tty: true stdin_open: true + task_allocator: + container_name: task-allocator + image: ropod-mrs + working_dir: /mrta/mrs/ + command: ["python3", "task_allocator.py"] + network_mode: "host" + tty: true + stdin_open: true + task_allocation_test: build: . container_name: task-mrs-test image: ropod-mrs working_dir: /mrta/tests/ - command: ["python3", "fms_integration_test.py"] + command: ["python3", "allocation_test.py"] network_mode: "host" tty: true stdin_open: true depends_on: - robot + - task_allocator From eb1d97121b6e413489d2a9b0c81f613b7fcc8cdb Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Tue, 20 Aug 2019 10:32:39 +0200 Subject: [PATCH 20/24] The robot and the auctioneer have different databases The allocation_test resets the timetables and tasks in the ccu_store and sends messages to the robot to reset its timetables and tasks Fixing logging The robot has a robot comon class for common variables The dispatcher is not launched --- config/config.yaml | 79 +++++++++--- config/logging.yaml | 18 --- mrs/db_interface.py | 27 ++-- mrs/robot.py | 74 ++++++++--- mrs/structs/bid.py | 5 - mrs/structs/status.py | 40 ++++++ mrs/structs/task.py | 43 +------ mrs/structs/timetable.py | 6 + mrs/task_allocation/auctioneer.py | 96 ++++++++------ mrs/task_allocation/bidder.py | 121 +++++++----------- mrs/task_allocation/round.py | 24 ++-- mrs/task_allocator.py | 8 +- .../{dispatching => }/dispatcher.py | 25 ++-- mrs/task_execution/dispatching/__init__.py | 0 mrs/task_execution/schedule_monitor.py | 6 + .../{dispatching => }/scheduler.py | 7 +- mrs/utils/config_logger.py | 9 -- mrs/utils/models.py | 47 ------- mrs/utils/uuid.py | 7 - tests/allocation_test.py | 48 ++++++- tests/fms_integration_test.py | 45 ++++++- 21 files changed, 399 insertions(+), 336 deletions(-) delete mode 100644 config/logging.yaml create mode 100644 mrs/structs/status.py rename mrs/task_execution/{dispatching => }/dispatcher.py (91%) delete mode 100644 mrs/task_execution/dispatching/__init__.py create mode 100644 mrs/task_execution/schedule_monitor.py rename mrs/task_execution/{dispatching => }/scheduler.py (94%) delete mode 100644 mrs/utils/config_logger.py delete mode 100644 mrs/utils/models.py delete mode 100644 mrs/utils/uuid.py diff --git a/config/config.yaml b/config/config.yaml index 423e4bb14..c815fece5 100644 --- a/config/config.yaml +++ b/config/config.yaml @@ -2,6 +2,9 @@ version: 2 ccu_store: db_name: ropod_ccu_store port: 27017 +robot_store: + db_name: ropod_store + port: 27017 resources: fleet: - ropod_001 @@ -10,21 +13,27 @@ resources: plugins: task_allocation: - bidding_rule: - robustness: srea - temporal: completion_time + # Assuming we can have different types of tasks, + # specify from where to import the Task and TaskStatus classes + task_type: + class: 'mrs.structs.task' allocation_method: tessi - round_time: 15 # seconds robot_proxies: true - auctioneer: fms_zyre_api # This is completely Zyre dependent + stp_solver: srea + auctioneer: + round_time: 15 # seconds alternative_timeslots: True - task_type: generic_task - corrective_measure: re-allocate + dispatcher: freeze_window: 300 # seconds - task_monitor: - task_type: ropod_task robot_proxy: + bidder: + bidding_rule: + robustness: srea # has to be the same as the stp_solver + temporal: completion_time + auctioneer_name: fms_zyre_api # This is completely Zyre dependent + schedule_monitor: + corrective_measure: re-allocate api: version: 0.1.0 middleware: @@ -39,43 +48,54 @@ robot_proxy: message_types: # Types of messages the node will listen to. Messages not listed will be ignored - TASK-ANNOUNCEMENT - ALLOCATION + - TIMETABLE + - DELETE-TASK + debug_msgs: false acknowledge: false publish: task: - msg_type: 'TASK' groups: ['ROPOD', 'TASK-ALLOCATION'] # Optional, if not present will shout to all groups + msg_type: 'TASK' method: shout bid: - msg_type: 'BID' groups: ['TASK-ALLOCATION'] + msg_type: 'BID' method: whisper finish-round: - msg_type: 'FINISH-ROUND' groups: ['TASK-ALLOCATION'] + msg_type: 'FINISH-ROUND' method: shout callbacks: - msg_type: 'TASK-ANNOUNCEMENT' component: 'bidder.task_announcement_cb' - msg_type: 'ALLOCATION' component: 'bidder.allocation_cb' - + - msg_type: 'TIMETABLE' + component: '.timetable_cb' + - msg_type: 'DELETE-TASK' + component: '.delete_task_cb' api: version: 0.1.0 middleware: - zyre + #- rest + #- ros zyre: zyre_node: node_name: fms_zyre_api interface: null groups: + - ROPOD - TASK-ALLOCATION message_types: # Types of messages the node will listen to. Messages not listed will be ignored - - TASK - TASK-PROGRESS - BID - FINISH-ROUND + - ALLOCATE-TASK acknowledge: false + debug_messages: + - 'TASK-REQUEST' publish: task-announcement: msg_type: 'TASK-ANNOUNCEMENT' @@ -86,20 +106,37 @@ api: groups: ['TASK-ALLOCATION'] method: shout callbacks: - - msg_type: 'TASK' - component: 'auctioneer.task_cb' + - msg_type: 'ALLOCATE-TASK' + component: 'auctioneer.allocate_task_cb' - msg_type: 'BID' component: 'auctioneer.bid_cb' - msg_type: 'FINISH-ROUND' component: 'auctioneer.finish_round_cb' - - msg_type: 'TASK-PROGRESS' - component: 'task_monitor.task_progress_cb' + rest: + server: + ip: 127.0.0.1 + port: 8081 + routes: + - path: '/number' + resource: + module: 'fleet_management.api.rest.resources' + class: 'RandomGenerator' + ros: + publishers: + - topic: '/fms/task' + msg_type: Task + msg_module: ropod_ros_msgs.msg + subscribers: + - topic: '/fms/task_request' + msg_type: TaskRequest + msg_module: ropod_ros_msgs.msg + callback: task_cb logger: version: 1 formatters: default: - format: '[%(levelname)-5.5s] %(asctime)s [%(name)-25.25s] %(message)s' + format: '[%(levelname)-5.5s] %(asctime)s [%(name)-35.35s] %(message)s' datefmt: '%Y-%m-%d %H:%M:%S' handlers: console: @@ -135,6 +172,8 @@ logger: handlers: [console] fms: level: DEBUG + mrs: + level: DEBUG root: level: DEBUG - handlers: [console, file] + handlers: [console, file] \ No newline at end of file diff --git a/config/logging.yaml b/config/logging.yaml deleted file mode 100644 index 83d468186..000000000 --- a/config/logging.yaml +++ /dev/null @@ -1,18 +0,0 @@ -version: 1 -formatters: - default: - format: '[%(levelname)-5.5s] %(asctime)s [%(name)-25.25s] %(message)s' - datefmt: '%Y-%m-%d %H:%M:%S' - -handlers: - console: - class: logging.StreamHandler - level: DEBUG - formatter: default - stream: ext://sys.stdout -loggers: - allocation: - level: DEBUG -root: - level: DEBUG - handlers: [console] diff --git a/mrs/db_interface.py b/mrs/db_interface.py index b1625d0cd..b1d8051ab 100644 --- a/mrs/db_interface.py +++ b/mrs/db_interface.py @@ -1,23 +1,21 @@ -import logging - from mrs.structs.timetable import Timetable class DBInterface(object): - def __init__(self, ccu_store): - self.ccu_store = ccu_store + def __init__(self, store): + self.store = store def add_task(self, task): """Saves the given task to a database as a new document under the "tasks" collection. """ - collection = self.ccu_store.db['tasks'] + collection = self.store.db['tasks'] dict_task = task.to_dict() - self.ccu_store.unique_insert(collection, dict_task, 'id', task.id) + self.store.unique_insert(collection, dict_task, 'id', task.id) def update_task(self, task): """ Updates the given task under the "tasks" collection """ - collection = self.ccu_store.db['tasks'] + collection = self.store.db['tasks'] task_dict = task.to_dict() found_dict = collection.find_one({'id': task_dict['id']}) @@ -31,7 +29,7 @@ def get_tasks(self): """ Returns a dictionary with the tasks in the "tasks" collection """ - collection = self.ccu_store.db['tasks'] + collection = self.store.db['tasks'] tasks_dict = dict() for task in collection.find(): tasks_dict[task['id']] = task @@ -40,18 +38,17 @@ def get_tasks(self): def remove_task(self, task_id): """ Removes task with task_id from the collection "tasks" """ - collection = self.ccu_store.db['tasks'] + collection = self.store.db['tasks'] collection.delete_one({'id': task_id}) def update_task_status(self, task, status): task.status.status = status - logging.debug("Updating task status to %s", task.status.status) self.update_task(task) def get_task(self, task_id): """Returns a task dictionary representing the task with the given id. """ - collection = self.ccu_store.db['tasks'] + collection = self.store.db['tasks'] task_dict = collection.find_one({'id': task_id}) return task_dict @@ -59,16 +56,16 @@ def add_timetable(self, timetable): """ Saves the given timetable under the "timetables" collection """ - collection = self.ccu_store.db['timetables'] + collection = self.store.db['timetables'] robot_id = timetable.robot_id timetable_dict = timetable.to_dict() - self.ccu_store.unique_insert(collection, timetable_dict, 'robot_id', robot_id) + self.store.unique_insert(collection, timetable_dict, 'robot_id', robot_id) def update_timetable(self, timetable): """ Updates the given timetable under the "timetables" collection """ - collection = self.ccu_store.db['timetables'] + collection = self.store.db['timetables'] timetable_dict = timetable.to_dict() robot_id = timetable.robot_id @@ -80,7 +77,7 @@ def update_timetable(self, timetable): collection.replace_one({'robot_id': robot_id}, timetable_dict) def get_timetable(self, robot_id, stp): - collection = self.ccu_store.db['timetables'] + collection = self.store.db['timetables'] timetable_dict = collection.find_one({'robot_id': robot_id}) if timetable_dict is None: diff --git a/mrs/robot.py b/mrs/robot.py index a432ffe52..ac4cb8acc 100644 --- a/mrs/robot.py +++ b/mrs/robot.py @@ -1,34 +1,69 @@ import argparse -import time import logging +import time +from importlib import import_module -""" Includes: - - bidder - - dispatcher - - monitor -""" +from stn.stp import STP +from mrs.db_interface import DBInterface +from mrs.structs.timetable import Timetable +from mrs.task_allocation.bidder import Bidder +from mrs.task_execution.schedule_monitor import ScheduleMonitor -class Robot(object): - def __init__(self, api, bidder, **kwargs): +class RobotCommon(object): + def __init__(self, robot_id, api, robot_store, stp_solver, task_type): + + self.id = robot_id self.api = api - self.bidder = bidder - self.dispatcher = kwargs.get('dispatcher') + self.db_interface = DBInterface(robot_store) + self.stp = STP(stp_solver) + task_class_path = task_type.get('class', 'mrs.structs.task') + self.task_cls = getattr(import_module(task_class_path), 'Task') + + self.timetable = Timetable.get_timetable(self.db_interface, self.id, self.stp) + self.db_interface.update_timetable(self.timetable) + + +class Robot(object): + + def __init__(self, robot_common_config, bidder_config, **kwargs): + + self.common = RobotCommon(**robot_common_config) + + self.bidder = Bidder(self.common, bidder_config) + + schedule_monitor_config = kwargs.get("schedule_monitor_config") + if schedule_monitor_config: + self.schedule_monitor = ScheduleMonitor(self.common, schedule_monitor_config) + + self.logger = logging.getLogger('mrs.robot.%s' % self.common.id) + self.logger.info("Robot %s initialized", self.common.id) + + def timetable_cb(self, msg): + robot_id = msg['payload']['timetable']['robot_id'] + if robot_id == self.common.id: + timetable_dict = msg['payload']['timetable'] + self.logger.debug("Robot %s received timetable msg", self.common.id) + timetable = Timetable.from_dict(timetable_dict, self.common.stp) + self.common.db_interface.update_timetable(timetable) + + def delete_task_cb(self, msg): + task_dict = msg['payload']['task'] + task = self.common.task_cls.from_dict(task_dict) + self.logger.debug("Deleting task %s ", task.id) + self.common.db_interface.remove_task(task.id) def run(self): try: - self.api.start() + self.common.api.start() while True: - self.bidder.api.run() - if self.dispatcher is not None: - self.dispatcher.run() time.sleep(0.5) except (KeyboardInterrupt, SystemExit): - logging.info("Terminating %s robot ...") - self.api.shutdown() - logging.info("Exiting...") + self.logger.info("Terminating %s robot ...", self.common.id) + self.common.api.shutdown() + self.logger.info("Exiting...") if __name__ == '__main__': @@ -38,16 +73,15 @@ def run(self): config_file_path = '../config/config.yaml' config = Config(config_file_path, initialize=False) config.configure_logger() - ccu_store = config.configure_ccu_store() parser = argparse.ArgumentParser() parser.add_argument('robot_id', type=str, help='example: ropod_001') args = parser.parse_args() robot_id = args.robot_id - robot = config.configure_robot_proxy(robot_id, ccu_store, dispatcher=True) + robot = config.configure_robot_proxy(robot_id) - robot.api.register_callbacks(robot) + robot.common.api.register_callbacks(robot) robot.run() diff --git a/mrs/structs/bid.py b/mrs/structs/bid.py index a87fc370f..1080fff5d 100644 --- a/mrs/structs/bid.py +++ b/mrs/structs/bid.py @@ -1,5 +1,3 @@ -import logging -from mrs.utils.uuid import generate_uuid from mrs.structs.task import Task @@ -39,14 +37,11 @@ def compute_cost(self, position): else: # soft constraints navigation_start_time = dispatchable_graph.get_task_navigation_start_time(self.task.id) - logging.debug("Navigation start time: %s", navigation_start_time) self.cost = abs(navigation_start_time - self.task.earliest_start_time) alternative_start_time = navigation_start_time self.hard_constraints = False self.alternative_start_time = alternative_start_time - logging.debug("Cost: %s", self.cost) - def to_dict(self): bid_dict = dict() bid_dict['cost'] = self.cost diff --git a/mrs/structs/status.py b/mrs/structs/status.py new file mode 100644 index 000000000..9a3cae084 --- /dev/null +++ b/mrs/structs/status.py @@ -0,0 +1,40 @@ +class TaskStatus(object): + UNALLOCATED = 1 + ALLOCATED = 2 + SCHEDULED = 3 # Task is ready to be dispatched + SHIPPED = 4 # The task was sent to the robot + ONGOING = 5 + COMPLETED = 6 + ABORTED = 7 # Aborted by the system, not by the user + FAILED = 8 # Execution failed + CANCELED = 9 # Canceled before execution starts + PREEMPTED = 10 # Canceled during execution + + def __init__(self, task_id=''): + self.task_id = task_id + self.status = self.UNALLOCATED + self.delayed = False + + def to_dict(self): + task_dict = dict() + task_dict['task_id'] = self.task_id + task_dict['status'] = self.status + task_dict['delayed'] = self.delayed + return task_dict + + @staticmethod + def from_dict(status_dict): + task_id = status_dict['task_id'] + status = TaskStatus(task_id) + status.task_id = task_id + status.status = status_dict['status'] + status.delayed = status_dict['delayed'] + return status + + @staticmethod + def to_csv(status_dict): + """ Prepares dict to be written to a csv + :return: dict + """ + # The dictionary is already flat and ready to be exported + return status_dict diff --git a/mrs/structs/task.py b/mrs/structs/task.py index 8db3b52a1..fb5ed0b21 100644 --- a/mrs/structs/task.py +++ b/mrs/structs/task.py @@ -1,5 +1,6 @@ -from mrs.utils.uuid import generate_uuid +from ropod.utils.uuid import generate_uuid from mrs.utils.datasets import flatten_dict +from mrs.structs.status import TaskStatus class Task(object): @@ -62,44 +63,4 @@ def to_csv(task_dict): return to_csv_dict -class TaskStatus(object): - UNALLOCATED = 1 - ALLOCATED = 2 - SCHEDULED = 3 # Task is ready to be dispatched - SHIPPED = 4 # The task was sent to the robot - ONGOING = 5 - COMPLETED = 6 - ABORTED = 7 # Aborted by the system, not by the user - FAILED = 8 # Execution failed - CANCELED = 9 # Canceled before execution starts - PREEMPTED = 10 # Canceled during execution - - def __init__(self, task_id=''): - self.task_id = task_id - self.status = self.UNALLOCATED - self.delayed = False - - def to_dict(self): - task_dict = dict() - task_dict['task_id'] = self.task_id - task_dict['status'] = self.status - task_dict['delayed'] = self.delayed - return task_dict - - @staticmethod - def from_dict(status_dict): - task_id = status_dict['task_id'] - status = TaskStatus(task_id) - status.task_id = task_id - status.status = status_dict['status'] - status.status = status_dict['delayed'] - return status - - @staticmethod - def to_csv(status_dict): - """ Prepares dict to be written to a csv - :return: dict - """ - # The dictionary is already flat and ready to be exported - return status_dict diff --git a/mrs/structs/timetable.py b/mrs/structs/timetable.py index 210886f8a..3ead38295 100644 --- a/mrs/structs/timetable.py +++ b/mrs/structs/timetable.py @@ -123,6 +123,12 @@ def from_dict(timetable_dict, stp): return timetable + @staticmethod + def get_timetable(db_interface, robot_id, stp): + timetable = db_interface.get_timetable(robot_id, stp) + if timetable is None: + timetable = Timetable(stp, robot_id) + return timetable diff --git a/mrs/task_allocation/auctioneer.py b/mrs/task_allocation/auctioneer.py index 012c7a0e8..798ea4aa4 100644 --- a/mrs/task_allocation/auctioneer.py +++ b/mrs/task_allocation/auctioneer.py @@ -1,16 +1,17 @@ -import uuid -import time import logging -import logging.config +import time from datetime import timedelta -from mrs.task_allocation.round import Round -from mrs.structs.timetable import Timetable +from importlib import import_module + from stn.stp import STP -from mrs.exceptions.task_allocation import NoAllocation -from mrs.exceptions.task_allocation import AlternativeTimeSlot -from mrs.structs.task import TaskStatus + from mrs.db_interface import DBInterface +from mrs.exceptions.task_allocation import AlternativeTimeSlot +from mrs.exceptions.task_allocation import NoAllocation from mrs.structs.allocation import TaskAnnouncement, Allocation +from mrs.structs.task import TaskStatus +from mrs.structs.timetable import Timetable +from mrs.task_allocation.round import Round """ Implements a variation of the the TeSSI algorithm using the bidding_rule specified in the config file @@ -19,29 +20,29 @@ class Auctioneer(object): - def __init__(self, robot_ids, ccu_store, api, stp_solver, task_cls, allocation_method, round_time=5, - **kwargs): + def __init__(self, robot_ids, ccu_store, api, stp_solver, + task_type, allocation_method, round_time=5, **kwargs): - logging.debug("Starting Auctioneer") + self.logger = logging.getLogger("mrs.auctioneer") self.robot_ids = robot_ids self.db_interface = DBInterface(ccu_store) self.api = api + self.stp = STP(stp_solver) + self.allocation_method = allocation_method self.round_time = timedelta(seconds=round_time) self.alternative_timeslots = kwargs.get('alternative_timeslots', False) - self.stp = STP(stp_solver) - self.task_cls = task_cls + task_class_path = task_type.get('class', 'mrs.structs.task') + self.task_cls = getattr(import_module(task_class_path), 'Task') + self.logger.debug("Auctioneer started") # TODO: Inititalize the timetables in the loader? and read the timetables here self.timetables = dict() for robot_id in robot_ids: - timetable = self.db_interface.get_timetable(robot_id, self.stp) - if timetable is None: - timetable = Timetable(self.stp, robot_id) - self.db_interface.add_timetable(timetable) - + timetable = Timetable.get_timetable(self.db_interface, robot_id, self.stp) + self.db_interface.add_timetable(timetable) self.timetables[robot_id] = timetable self.tasks_to_allocate = dict() @@ -55,6 +56,13 @@ def __str__(self): to_print += "Groups {}".format(self.api.interfaces[0].groups()) return to_print + def check_db(self): + tasks_dict = self.db_interface.get_tasks() + for task_id, task_dict in tasks_dict.items(): + task = self.task_cls.from_dict(task_dict) + if task.status == TaskStatus.UNALLOCATED: + self.add_task(task) + def run(self): if self.tasks_to_allocate and self.round.finished: self.announce_task() @@ -68,7 +76,7 @@ def run(self): self.announce_winner(allocated_task, robot_id) except NoAllocation as exception: - logging.exception("No mrs made in round %s ", exception.round_id) + self.logger.exception("No mrs made in round %s ", exception.round_id) self.round.finish() except AlternativeTimeSlot as exception: @@ -83,9 +91,10 @@ def process_allocation(self, round_result): self.allocations.append(allocation) self.tasks_to_allocate = tasks_to_allocate - logging.debug("Allocation: %s", allocation) - logging.debug("Tasks to allocate %s", self.tasks_to_allocate) + self.logger.debug("Allocation: %s", allocation) + self.logger.debug("Tasks to allocate %s", self.tasks_to_allocate) + self.logger.debug("Updating task status to ALLOCATED") self.db_interface.update_task_status(task, TaskStatus.ALLOCATED) self.update_timetable(robot_id, task, position) @@ -104,14 +113,14 @@ def update_timetable(self, robot_id, task, position): self.timetables.update({robot_id: timetable}) self.db_interface.update_timetable(timetable) - logging.debug("STN robot %s: %s", robot_id, timetable.stn) - logging.debug("Dispatchable graph robot %s: %s", robot_id, timetable.dispatchable_graph) + self.logger.debug("STN robot %s: %s", robot_id, timetable.stn) + self.logger.debug("Dispatchable graph robot %s: %s", robot_id, timetable.dispatchable_graph) def process_alternative_allocation(self, exception): task_id = exception.task_id robot_id = exception.robot_id alternative_start_time = exception.alternative_start_time - logging.exception("Alternative timeslot for task %s: robot %s, alternative start time: %s ", task_id, robot_id, + self.logger.exception("Alternative timeslot for task %s: robot %s, alternative start time: %s ", task_id, robot_id, alternative_start_time) alternative_allocation = (task_id, [robot_id], alternative_start_time) @@ -119,16 +128,16 @@ def process_alternative_allocation(self, exception): def add_task(self, task): self.tasks_to_allocate[task.id] = task - self.db_interface.add_task(task) + self.db_interface.update_task(task) def allocate(self, tasks): if isinstance(tasks, list): for task in tasks: self.add_task(task) - logging.debug('Auctioneer received a list of tasks') + self.logger.debug('Auctioneer received a list of tasks') else: self.add_task(tasks) - logging.debug('Auctioneer received one task') + self.logger.debug('Auctioneer received one task') def announce_task(self): @@ -139,21 +148,27 @@ def announce_task(self): self.round = Round(**round_) - logging.info("Starting round: %s", self.round.id) - logging.info("Number of tasks to allocate: %s", len(self.tasks_to_allocate)) + self.logger.info("Starting round: %s", self.round.id) + self.logger.info("Number of tasks to allocate: %s", len(self.tasks_to_allocate)) tasks = list(self.tasks_to_allocate.values()) task_annoucement = TaskAnnouncement(tasks, self.round.id) msg = self.api.create_message(task_annoucement) - logging.debug('task annoucement msg: %s', msg) + self.logger.debug('task annoucement msg: %s', msg) - logging.debug("Auctioneer announces tasks %s", [task_id for task_id, task in self.tasks_to_allocate.items()]) + self.logger.debug("Auctioneer announces tasks %s", [task_id for task_id, task in self.tasks_to_allocate.items()]) self.round.start() self.api.publish(msg, groups=['TASK-ALLOCATION']) - def task_cb(self, msg): + def send_timetable(self, robot_id): + timetable = Timetable.get_timetable(self.db_interface, robot_id, self.stp) + msg = self.api.create_message(timetable) + self.api.publish(msg) + + def allocate_task_cb(self, msg): + self.logger.debug("Task received") task_dict = msg['payload']['task'] task = self.task_cls.from_dict(task_dict) self.add_task(task) @@ -179,7 +194,7 @@ def get_task_schedule(self, task_id, robot_id): start_time = timetable.dispatchable_graph.get_task_navigation_start_time(task_id) - logging.debug("Start time of task %s: %s", task_id, start_time) + self.logger.debug("Start time of task %s: %s", task_id, start_time) task_schedule['start_time'] = start_time task_schedule['finish_time'] = -1 # This info is not available here. @@ -189,23 +204,22 @@ def get_task_schedule(self, task_id, robot_id): if __name__ == '__main__': - from fleet_management.config.loader import Config, register_api_callbacks + from fleet_management.config.loader import Config config_file_path = '../../config/config.yaml' config = Config(config_file_path, initialize=True) - auctioneer = config.configure_task_allocator(config.ccu_store) + auctioneer = config.configure_auctioneer(config.ccu_store) time.sleep(5) - register_api_callbacks(auctioneer, auctioneer.api) - - auctioneer.api.start() + auctioneer.api.register_callbacks(auctioneer) try: + auctioneer.api.start() while True: - auctioneer.api.run() auctioneer.run() + auctioneer.api.run() time.sleep(0.5) except (KeyboardInterrupt, SystemExit): - logging.info("Terminating %s auctioneer ...") + print("Terminating auctioneer ...") auctioneer.api.shutdown() - logging.info("Exiting...") + print("Exiting...") diff --git a/mrs/task_allocation/bidder.py b/mrs/task_allocation/bidder.py index 1d00ad143..969b7a0c3 100644 --- a/mrs/task_allocation/bidder.py +++ b/mrs/task_allocation/bidder.py @@ -1,15 +1,11 @@ import copy -import uuid -import time -import logging.config +import logging -from stn.stp import STP -from mrs.structs.bid import Bid -from mrs.task_allocation.bidding_rule import BiddingRule from mrs.exceptions.task_allocation import NoSTPSolution -from mrs.db_interface import DBInterface from mrs.structs.allocation import FinishRound -from mrs.structs.timetable import Timetable +from mrs.structs.bid import Bid +from mrs.structs.task import TaskStatus +from mrs.task_allocation.bidding_rule import BiddingRule """ Implements a variation of the the TeSSI algorithm using the bidding_rule specified in the config file @@ -18,52 +14,32 @@ class Bidder(object): - def __init__(self, robot_id, ccu_store, api, task_cls, bidding_rule, allocation_method, auctioneer, **kwargs): - - self.id = robot_id - self.db_interface = DBInterface(ccu_store) + def __init__(self, robot_common, bidder_config): - self.api = api + self.common = robot_common + self.logger = logging.getLogger('mrs.bidder.%s' % self.common.id) - robustness = bidding_rule.get('robustness') - temporal = bidding_rule.get('temporal') + robustness = bidder_config.get('bidding_rule').get('robustness') + temporal = bidder_config.get('bidding_rule').get('temporal') self.bidding_rule = BiddingRule(robustness, temporal) - - self.task_cls = task_cls - self.allocation_method = allocation_method - self.auctioneer = auctioneer - - self.logger = logging.getLogger('mrs.robot.%s' % self.id) - self.logger.debug("Starting robot %s", self.id) - - self.stp = STP(robustness) - timetable = self.db_interface.get_timetable(self.id, self.stp) - if timetable is None: - timetable = Timetable(self.stp, robot_id) - self.timetable = timetable - + self.auctioneer_name = bidder_config.get("auctioneer_name") self.bid_placed = Bid() - def __str__(self): - to_print = "" - to_print += "Robot {}".format(self.id) - to_print += '\n' - to_print += "Groups {}".format(self.api.zyre.groups()) - return to_print + self.logger.debug("Bidder initialized %s", self.common.id) def task_announcement_cb(self, msg): - self.logger.debug("Robot %s received TASK-ANNOUNCEMENT", self.id) + self.logger.debug("Robot %s received TASK-ANNOUNCEMENT", self.common.id) round_id = msg['payload']['round_id'] received_tasks = msg['payload']['tasks'] - self.timetable = self.db_interface.get_timetable(self.id, self.stp) + self.common.timetable = self.common.db_interface.get_timetable(self.common.id, self.common.stp) self.compute_bids(received_tasks, round_id) def allocation_cb(self, msg): - self.logger.debug("Robot %s received ALLOCATION", self.id) + self.logger.debug("Robot %s received ALLOCATION", self.common.id) task_id = msg['payload']['task_id'] winner_id = msg['payload']['robot_id'] - if winner_id == self.id: + if winner_id == self.common.id: self.allocate_to_robot(task_id) self.send_finish_round() @@ -72,7 +48,8 @@ def compute_bids(self, received_tasks, round_id): no_bids = list() for task_id, task_info in received_tasks.items(): - task = self.task_cls.from_dict(task_info) + task = self.common.task_cls.from_dict(task_info) + self.common.db_interface.update_task(task) self.logger.debug("Computing bid of task %s", task.id) # Insert task in each possible position of the stn and @@ -98,7 +75,7 @@ def send_bids(self, bid, no_bids): :param no_bids: list of no bids """ if bid: - self.logger.debug("Robot %s placed bid %s", self.id, self.bid_placed) + self.logger.debug("Robot %s placed bid %s", self.common.id, self.bid_placed) self.send_bid(bid) if no_bids: @@ -107,9 +84,9 @@ def send_bids(self, bid, no_bids): self.send_bid(no_bid) def insert_task(self, task, round_id): - best_bid = Bid(self.bidding_rule, self.id, round_id, task, self.timetable) + best_bid = Bid(self.bidding_rule, self.common.id, round_id, task, self.common.timetable) - tasks = self.timetable.get_tasks() + tasks = self.common.timetable.get_tasks() if tasks: n_tasks = len(tasks) else: @@ -119,23 +96,24 @@ def insert_task(self, task, round_id): for position in range(1, n_tasks+2): # TODO check if the robot can make it to the task, if not, return - self.logger.debug("Schedule: %s", self.timetable.schedule) - if position == 1 and self.timetable.is_scheduled(): + self.logger.debug("Schedule: %s", self.common.timetable.schedule) + if position == 1 and self.common.timetable.is_scheduled(): self.logger.debug("Not adding task in position %s", position) continue self.logger.debug("Adding task %s in position %s", task.id, position) - self.timetable.add_task_to_stn(task, position) + self.common.timetable.add_task_to_stn(task, position) try: - self.timetable.solve_stp() + self.common.timetable.solve_stp() - self.logger.debug("STN %s: ", self.timetable.stn) - self.logger.debug("Dispatchable graph %s: ", self.timetable.dispatchable_graph) - self.logger.debug("Robustness Metric %s: ", self.timetable.robustness_metric) + self.logger.debug("STN %s: ", self.common.timetable.stn) + self.logger.debug("Dispatchable graph %s: ", self.common.timetable.dispatchable_graph) + self.logger.debug("Robustness Metric %s: ", self.common.timetable.robustness_metric) - bid = Bid(self.bidding_rule, self.id, round_id, task, self.timetable) + bid = Bid(self.bidding_rule, self.common.id, round_id, task, self.common.timetable) bid.compute_cost(position) + self.logger.debug("Cost: %s", bid.cost) if bid < best_bid or (bid == best_bid and bid.task.id < best_bid.task.id): best_bid = copy.deepcopy(bid) @@ -145,7 +123,7 @@ def insert_task(self, task, round_id): " task %s in position %s", task.id, position) # Restore schedule for the next iteration - self.timetable.remove_task_from_stn(position) + self.common.timetable.remove_task_from_stn(position) self.logger.debug("Best bid for task %s: %s", task.id, best_bid) @@ -170,41 +148,38 @@ def get_smallest_bid(bids): return smallest_bid def send_bid(self, bid): - """ Creates bid_msg and sends it to the auctioneer + """ Creates bid_msg and sends it to the auctioneer_name :param bid: :param round_id: :return: """ - self.logger.debug("Bid %s", bid.to_dict()) - - msg = self.api.create_message(bid) - - self.logger.debug("Bid msg %s", msg) - + msg = self.common.api.create_message(bid) tasks = [task for task in bid.timetable.get_tasks()] - self.logger.info("Round %s: robod_id %s bids %s for task %s and tasks %s", bid.round_id, self.id, bid.cost, bid.task.id, tasks) - self.api.publish(msg, peer=self.auctioneer) + self.logger.info("Round %s: robod_id %s bids %s for task %s and tasks %s", bid.round_id, self.common.id, bid.cost, bid.task.id, tasks) + self.common.api.publish(msg, peer=self.auctioneer_name) def allocate_to_robot(self, task_id): - # Update the timetable - self.timetable = copy.deepcopy(self.bid_placed.timetable) - self.db_interface.update_timetable(self.timetable) + self.common.timetable = copy.deepcopy(self.bid_placed.timetable) + self.common.db_interface.update_timetable(self.common.timetable) + task_dict = self.common.db_interface.get_task(task_id) + task = self.common.task_cls.from_dict(task_dict) + self.common.db_interface.update_task_status(task, TaskStatus.ALLOCATED) - self.logger.info("Robot %s allocated task %s", self.id, task_id) - self.logger.debug("STN %s", self.timetable.stn) - self.logger.debug("Dispatchable graph %s", self.timetable.dispatchable_graph) + self.logger.info("Robot %s allocated task %s", self.common.id, task_id) + self.logger.debug("STN %s", self.common.timetable.stn) + self.logger.debug("Dispatchable graph %s", self.common.timetable.dispatchable_graph) - tasks = [task for task in self.timetable.get_tasks()] + tasks = [task for task in self.common.timetable.get_tasks()] - self.logger.debug("Tasks allocated to robot %s:%s", self.id, tasks) + self.logger.debug("Tasks allocated to robot %s:%s", self.common.id, tasks) def send_finish_round(self): - finish_round = FinishRound(self.id) - msg = self.api.create_message(finish_round) + finish_round = FinishRound(self.common.id) + msg = self.common.api.create_message(finish_round) - self.logger.info("Robot %s sends close round msg ", self.id) - self.api.publish(msg, groups=['TASK-ALLOCATION']) + self.logger.info("Robot %s sends close round msg ", self.common.id) + self.common.api.publish(msg, groups=['TASK-ALLOCATION']) diff --git a/mrs/task_allocation/round.py b/mrs/task_allocation/round.py index 66d6a73e5..b337272c6 100644 --- a/mrs/task_allocation/round.py +++ b/mrs/task_allocation/round.py @@ -1,16 +1,20 @@ -from mrs.utils.uuid import generate_uuid +import copy import logging + from ropod.utils.timestamp import TimeStamp as ts -from mrs.structs.bid import Bid -import copy -from mrs.exceptions.task_allocation import NoAllocation +from ropod.utils.uuid import generate_uuid + from mrs.exceptions.task_allocation import AlternativeTimeSlot +from mrs.exceptions.task_allocation import NoAllocation +from mrs.structs.bid import Bid class Round(object): def __init__(self, **kwargs): + self.logger = logging.getLogger('mrs.auctioneer.round') + self.tasks_to_allocate = kwargs.get('tasks_to_allocate', dict()) self.round_time = kwargs.get('round_time', 0) self.n_robots = kwargs.get('n_robots', 0) @@ -41,7 +45,7 @@ def start(self): """ open_time = ts.get_time_stamp() self.closure_time = ts.get_time_stamp(self.round_time) - logging.debug("Round opened at %s and will close at %s", + self.logger.debug("Round opened at %s and will close at %s", open_time, self.closure_time) self.finished = False @@ -50,7 +54,7 @@ def start(self): def process_bid(self, bid_msg): bid = Bid.from_dict(bid_msg) - logging.debug("Processing bid from robot %s, cost: %s", bid.robot_id, bid.cost) + self.logger.debug("Processing bid from robot %s, cost: %s", bid.robot_id, bid.cost) if bid.cost != float('inf'): # Process a bid @@ -83,7 +87,7 @@ def time_to_close(self): if current_time < self.closure_time: return False - logging.debug("Closing round at %s", current_time) + self.logger.debug("Closing round at %s", current_time) self.opened = False return True @@ -117,12 +121,12 @@ def get_result(self): return round_result except NoAllocation: - logging.exception("No mrs made in round %s ", self.id) + self.logger.exception("No mrs made in round %s ", self.id) raise NoAllocation(self.id) def finish(self): self.finished = True - logging.debug("Round finished") + self.logger.debug("Round finished") def set_soft_constraints(self): """ If the number of no-bids for a task is equal to the number of robots, @@ -134,7 +138,7 @@ def set_soft_constraints(self): task = self.tasks_to_allocate.get(task_id) task.hard_constraints = False self.tasks_to_allocate.update({task_id: task}) - logging.debug("Setting soft constraints for task %s", task_id) + self.logger.debug("Setting soft constraints for task %s", task_id) def elect_winner(self): """ Elects the winner of the round diff --git a/mrs/task_allocator.py b/mrs/task_allocator.py index a098160ee..b9e00aa0f 100644 --- a/mrs/task_allocator.py +++ b/mrs/task_allocator.py @@ -13,20 +13,18 @@ def __init__(self, config_file=None): self.ccu_store = self.config.ccu_store self.api = self.config.api - self.auctioneer = self.config.configure_task_allocator(self.ccu_store) - self.task_monitor = self.config.configure_task_monitor(self.ccu_store) - + self.auctioneer = self.config.configure_auctioneer(self.ccu_store) self.api.register_callbacks(self) def run(self): try: self.api.start() while True: - self.api.run() self.auctioneer.run() + self.api.run() time.sleep(0.5) except (KeyboardInterrupt, SystemExit): - self.logger.info("Terminating %s auctioneer ...") + self.logger.info("Terminating task allocator ...") self.api.shutdown() logging.info("Exiting...") diff --git a/mrs/task_execution/dispatching/dispatcher.py b/mrs/task_execution/dispatcher.py similarity index 91% rename from mrs/task_execution/dispatching/dispatcher.py rename to mrs/task_execution/dispatcher.py index 95720f674..0223fd6be 100644 --- a/mrs/task_execution/dispatching/dispatcher.py +++ b/mrs/task_execution/dispatcher.py @@ -1,31 +1,38 @@ import logging -import uuid import time +import uuid from ropod.utils.timestamp import TimeStamp as ts -from mrs.task_execution.dispatching.scheduler import Scheduler from stn.stp import STP +from importlib import import_module + +from mrs.db_interface import DBInterface from mrs.exceptions.task_allocation import NoSTPSolution from mrs.exceptions.task_execution import InconsistentSchedule from mrs.structs.task import TaskStatus -from mrs.db_interface import DBInterface from mrs.structs.timetable import Timetable +from mrs.task_execution.scheduler import Scheduler class Dispatcher(object): - def __init__(self, robot_id, ccu_store, task_cls, stp_solver, corrective_measure, freeze_window, api, auctioneer): + def __init__(self, robot_id, api, robot_store, task_type, + stp_solver, corrective_measure, freeze_window): + self.id = robot_id - self.db_interface = DBInterface(ccu_store) - self.task_cls = task_cls + self.api = api + self.db_interface = DBInterface(robot_store) + + task_class_path = task_type.get('class', 'mrs.structs.task') + self.task_cls = getattr(import_module(task_class_path), 'Task') + self.stp = STP(stp_solver) self.stp_solver = stp_solver + self.corrective_measure = corrective_measure self.freeze_window = freeze_window - self.api = api - self.auctioneer = auctioneer - self.scheduler = Scheduler(ccu_store, self.stp) + self.scheduler = Scheduler(robot_store, self.stp) timetable = self.db_interface.get_timetable(self.id, self.stp) if timetable is None: diff --git a/mrs/task_execution/dispatching/__init__.py b/mrs/task_execution/dispatching/__init__.py deleted file mode 100644 index e69de29bb..000000000 diff --git a/mrs/task_execution/schedule_monitor.py b/mrs/task_execution/schedule_monitor.py new file mode 100644 index 000000000..cd6709baf --- /dev/null +++ b/mrs/task_execution/schedule_monitor.py @@ -0,0 +1,6 @@ +from mrs.db_interface import DBInterface + + +class ScheduleMonitor(object): + def __init__(self, robot_common, schedule_monitor_config): + self.common = robot_common diff --git a/mrs/task_execution/dispatching/scheduler.py b/mrs/task_execution/scheduler.py similarity index 94% rename from mrs/task_execution/dispatching/scheduler.py rename to mrs/task_execution/scheduler.py index cc7f902af..e4a4f8dde 100644 --- a/mrs/task_execution/dispatching/scheduler.py +++ b/mrs/task_execution/scheduler.py @@ -1,13 +1,14 @@ import logging + +from mrs.db_interface import DBInterface from mrs.exceptions.task_execution import InconsistentSchedule from mrs.structs.task import TaskStatus -from mrs.db_interface import DBInterface class Scheduler(object): - def __init__(self, ccu_store, stp): - self.db_interface = DBInterface(ccu_store) + def __init__(self, robot_store, stp): + self.db_interface = DBInterface(robot_store) self.stp = stp self.navigation_start_time = -float('inf') # of scheduled task diff --git a/mrs/utils/config_logger.py b/mrs/utils/config_logger.py deleted file mode 100644 index 7fa1c8411..000000000 --- a/mrs/utils/config_logger.py +++ /dev/null @@ -1,9 +0,0 @@ -import logging.config -import yaml - - -def config_logger(logging_file): - - with open(logging_file) as f: - log_config = yaml.safe_load(f) - logging.config.dictConfig(log_config) diff --git a/mrs/utils/models.py b/mrs/utils/models.py deleted file mode 100644 index e5d37d160..000000000 --- a/mrs/utils/models.py +++ /dev/null @@ -1,47 +0,0 @@ -from ropod.utils.timestamp import TimeStamp as ts -from ropod.utils.uuid import generate_uuid -meta_model_template = 'ropod-%s-schema.json' -from mrs.structs.allocation import Tasks - - -class MessageFactory(object): - # def __init__(self): - # self._msgs = {} - # - # def register_msg(self, msg_type, msg_struct): - # self._msgs[msg_type] = msg_struct - # - # def get_msg_struct(self, msg_type): - # msg_struct = self._msgs.get(msg_type) - # if not msg_struct: - # raise ValueError(msg_type) - # return msg_struct - - def create_msg(self, msg_type, contents_dict, recipients=[]): - # msg_struct = self.get_msg_struct(msg_type) - # msg = msg_struct(**contents) - - msg = self.get_header(msg_type, recipients=recipients) - payload = self.get_payload(contents_dict, msg_type.lower()) - msg.update(payload) - return msg - - - @staticmethod - def get_header(msg_type, meta_model='msg', recipients=[]): - if recipients is not None and not isinstance(recipients, list): - raise Exception("Recipients must be a list of strings") - - return {"header": {'type': msg_type, - 'metamodel': 'ropod-%s-schema.json' % meta_model, - 'msgId': generate_uuid(), - 'timestamp': ts.get_time_stamp(), - 'receiverIds': recipients}} - - @staticmethod - def get_payload(contents_dict, model): - # payload = contents.to_dict() - metamodel = meta_model_template % model - contents_dict.update(metamodel=metamodel) - return {"payload": contents_dict} - diff --git a/mrs/utils/uuid.py b/mrs/utils/uuid.py deleted file mode 100644 index 7fb3ef347..000000000 --- a/mrs/utils/uuid.py +++ /dev/null @@ -1,7 +0,0 @@ -import uuid - - -def generate_uuid(): - """ Returns a string containing a random uuid - """ - return str(uuid.uuid4()) diff --git a/tests/allocation_test.py b/tests/allocation_test.py index 7dbd444ae..fef12a68b 100644 --- a/tests/allocation_test.py +++ b/tests/allocation_test.py @@ -7,6 +7,9 @@ from ropod.pyre_communicator.base_class import RopodPyre from stn.stp import STP +from ropod.utils.timestamp import TimeStamp as ts +from ropod.utils.uuid import generate_uuid + from mrs.db_interface import DBInterface from mrs.structs.timetable import Timetable from mrs.utils.datasets import load_yaml_dataset @@ -24,11 +27,12 @@ def __init__(self, config_file): self.db_interface = DBInterface(ccu_store) allocator_config = config.config_params.get("plugins").get("task_allocation") - stp_solver = allocator_config.get('bidding_rule').get('robustness') + robot_proxy = config.config_params.get("robot_proxy") + stp_solver = allocator_config.get('stp_solver') self.stp = STP(stp_solver) self.robot_ids = config.config_params.get('resources').get('fleet') - self.auctioneer_name = allocator_config.get('auctioneer') + self.auctioneer_name = robot_proxy.get("bidder").get("auctioneer_name") self.allocations = list() self.n_tasks = 0 self.terminated = False @@ -38,12 +42,28 @@ def reset_timetables(self): for robot_id in self.robot_ids: timetable = Timetable(self.stp, robot_id) self.db_interface.update_timetable(timetable) + self.send_timetable(timetable, robot_id) def reset_tasks(self): logging.info("Resetting tasks") tasks_dict = self.db_interface.get_tasks() - for task_id, task_info in tasks_dict.items(): + for task_id, task_dict in tasks_dict.items(): self.db_interface.remove_task(task_id) + self.request_task_delete(task_dict) + + def send_timetable(self, timetable, robot_id): + logging.debug("Sending timetable to %s", robot_id) + timetable_msg = dict() + timetable_msg['header'] = dict() + timetable_msg['payload'] = dict() + timetable_msg['header']['type'] = 'TIMETABLE' + timetable_msg['header']['metamodel'] = 'ropod-msg-schema.json' + timetable_msg['header']['msgId'] = generate_uuid() + timetable_msg['header']['timestamp'] = ts.get_time_stamp() + + timetable_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' + timetable_msg['payload']['timetable'] = timetable.to_dict() + self.shout(timetable_msg) def allocate(self, tasks): self.n_tasks = len(tasks) @@ -55,16 +75,31 @@ def request_allocation(self, task): task_msg = dict() task_msg['header'] = dict() task_msg['payload'] = dict() - task_msg['header']['type'] = 'TASK' + task_msg['header']['type'] = 'ALLOCATE-TASK' task_msg['header']['metamodel'] = 'ropod-msg-schema.json' - task_msg['header']['msgId'] = str(uuid.uuid4()) - task_msg['header']['timestamp'] = int(round(time.time()) * 1000) + task_msg['header']['msgId'] = generate_uuid() + task_msg['header']['timestamp'] = ts.get_time_stamp() task_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' task_msg['payload']['task'] = task.to_dict() self.whisper(task_msg, peer=self.auctioneer_name) + def request_task_delete(self, task_dict): + logging.debug("Requesting delete of task %s", task_dict['id']) + task_msg = dict() + task_msg['header'] = dict() + task_msg['payload'] = dict() + task_msg['header']['type'] = 'DELETE-TASK' + task_msg['header']['metamodel'] = 'ropod-msg-schema.json' + task_msg['header']['msgId'] = generate_uuid() + task_msg['header']['timestamp'] = ts.get_time_stamp() + + task_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' + task_msg['payload']['task'] = task_dict + + self.shout(task_msg) + def receive_msg_cb(self, msg_content): msg = self.convert_zyre_msg_to_dict(msg_content) if msg is None: @@ -101,6 +136,7 @@ def check_termination_test(self): time.sleep(5) test.reset_timetables() test.reset_tasks() + time.sleep(5) test.allocate(tasks) start_time = time.time() while not test.terminated and start_time + timeout_duration > time.time(): diff --git a/tests/fms_integration_test.py b/tests/fms_integration_test.py index 07db40ea1..8eb930ebf 100644 --- a/tests/fms_integration_test.py +++ b/tests/fms_integration_test.py @@ -4,17 +4,25 @@ from fleet_management.config.loader import Config from mrs.utils.datasets import load_yaml_dataset from mrs.structs.timetable import Timetable +from mrs.db_interface import DBInterface +from ropod.pyre_communicator.base_class import RopodPyre -class TaskAllocator(object): +class TaskAllocator(RopodPyre): def __init__(self, config_file=None): + zyre_config = {'node_name': 'task_request_test', + 'groups': ['TASK-ALLOCATION'], + 'message_types': ['TASK', 'ALLOCATION']} + super().__init__(zyre_config, acknowledge=False) + self.logger = logging.getLogger('test') config = Config(config_file, initialize=True) config.configure_logger() - self.ccu_store = config.ccu_store - - self.auctioneer = config.configure_task_allocator(self.ccu_store) + ccu_store = config.ccu_store + self.db_interface = DBInterface(ccu_store) + self.auctioneer = config.configure_auctioneer(ccu_store) + self.auctioneer.api.register_callbacks(self.auctioneer) self.allocated_tasks = dict() self.test_terminated = False @@ -26,18 +34,39 @@ def reset_timetables(self): for robot_id in self.auctioneer.robot_ids: timetable = Timetable(self.auctioneer.stp, robot_id) - self.ccu_store.update_timetable(timetable) + self.db_interface.update_timetable(timetable) + self.send_timetable(timetable, robot_id) + + def reset_tasks(self): + self.logger.info("Resetting tasks") + tasks_dict = self.db_interface.get_tasks() + for task_id, task_info in tasks_dict.items(): + self.db_interface.remove_task(task_id) + + def send_timetable(self, timetable, robot_id): + self.logger.debug("Sending timetable to %s", robot_id) + timetable_msg = dict() + timetable_msg['header'] = dict() + timetable_msg['payload'] = dict() + timetable_msg['header']['type'] = 'TIMETABLE' + timetable_msg['header']['metamodel'] = 'ropod-msg-schema.json' + timetable_msg['header']['msgId'] = generate_uuid() + timetable_msg['header']['timestamp'] = ts.get_time_stamp() + + timetable_msg['payload']['metamodel'] = 'ropod-bid_round-schema.json' + timetable_msg['payload']['timetable'] = timetable.to_dict() + self.shout(timetable_msg) def get_robots_for_task(self, tasks): """ Adds a task or list of tasks to the list of tasks_to_allocate - in the auctioneer + in the auctioneer_name :param tasks: list of tasks to allocate """ self.auctioneer.allocate(tasks) def get_allocation(self): - """ Gets the allocation of a task when the auctioneer terminates an + """ Gets the allocation of a task when the auctioneer_name terminates an allocation round """ @@ -56,6 +85,8 @@ def run(self): timeout_duration = 300 # 5 minutes try: self.auctioneer.api.start() + self.reset_timetables() + self.reset_tasks() start_time = time.time() while not self.test_terminated and start_time + timeout_duration > time.time(): self.auctioneer.api.run() From 47e88da77555d7fec796ca0c3d3cf19c2591ddfd Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Tue, 20 Aug 2019 11:29:44 +0200 Subject: [PATCH 21/24] setup: deleting mrs.task_execution.dispatching package --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index f6cfba073..332895519 100644 --- a/setup.py +++ b/setup.py @@ -4,7 +4,7 @@ setup(name='mrs', packages=['mrs', 'mrs.config', 'mrs.utils', 'mrs.exceptions', 'mrs.task_allocation', - 'mrs.task_execution', 'mrs.task_execution.dispatching'], + 'mrs.task_execution'], version='0.1.0', install_requires=[ 'numpy' From d081fb4fe3758c17564092f9e7cb04c5e6a8b8c2 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Tue, 20 Aug 2019 13:45:19 +0200 Subject: [PATCH 22/24] setup: Adding package mrs.structs --- setup.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/setup.py b/setup.py index 332895519..fc23be32a 100644 --- a/setup.py +++ b/setup.py @@ -3,7 +3,7 @@ from setuptools import setup setup(name='mrs', - packages=['mrs', 'mrs.config', 'mrs.utils', 'mrs.exceptions', 'mrs.task_allocation', + packages=['mrs', 'mrs.config', 'mrs.structs', 'mrs.utils', 'mrs.exceptions', 'mrs.task_allocation', 'mrs.task_execution'], version='0.1.0', install_requires=[ From 460ca09739cf2b1a907d1722270f8493d6483224 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Tue, 20 Aug 2019 18:00:57 +0200 Subject: [PATCH 23/24] robot: robot, bidder and schedule_monitor inherit from RobotBase --- mrs/robot.py | 55 ++++++----------- mrs/robot_base.py | 25 ++++++++ mrs/task_allocation/bidder.py | 82 +++++++++++++------------- mrs/task_execution/schedule_monitor.py | 7 ++- 4 files changed, 88 insertions(+), 81 deletions(-) create mode 100644 mrs/robot_base.py diff --git a/mrs/robot.py b/mrs/robot.py index ac4cb8acc..69df67b35 100644 --- a/mrs/robot.py +++ b/mrs/robot.py @@ -1,68 +1,49 @@ import argparse import logging import time -from importlib import import_module -from stn.stp import STP - -from mrs.db_interface import DBInterface +from mrs.robot_base import RobotBase from mrs.structs.timetable import Timetable from mrs.task_allocation.bidder import Bidder from mrs.task_execution.schedule_monitor import ScheduleMonitor -class RobotCommon(object): - def __init__(self, robot_id, api, robot_store, stp_solver, task_type): - - self.id = robot_id - self.api = api - self.db_interface = DBInterface(robot_store) - self.stp = STP(stp_solver) - task_class_path = task_type.get('class', 'mrs.structs.task') - self.task_cls = getattr(import_module(task_class_path), 'Task') - - self.timetable = Timetable.get_timetable(self.db_interface, self.id, self.stp) - self.db_interface.update_timetable(self.timetable) - - -class Robot(object): - - def __init__(self, robot_common_config, bidder_config, **kwargs): - - self.common = RobotCommon(**robot_common_config) +class Robot(RobotBase): + def __init__(self, robot_config, bidder_config, **kwargs): + super().__init__(**robot_config) - self.bidder = Bidder(self.common, bidder_config) + self.bidder = Bidder(robot_config, bidder_config) schedule_monitor_config = kwargs.get("schedule_monitor_config") if schedule_monitor_config: - self.schedule_monitor = ScheduleMonitor(self.common, schedule_monitor_config) + self.schedule_monitor = ScheduleMonitor(robot_config, schedule_monitor_config) - self.logger = logging.getLogger('mrs.robot.%s' % self.common.id) - self.logger.info("Robot %s initialized", self.common.id) + self.logger = logging.getLogger('mrs.robot.%s' % self.id) + self.logger.info("Robot %s initialized", self.id) def timetable_cb(self, msg): robot_id = msg['payload']['timetable']['robot_id'] - if robot_id == self.common.id: + if robot_id == self.id: timetable_dict = msg['payload']['timetable'] - self.logger.debug("Robot %s received timetable msg", self.common.id) - timetable = Timetable.from_dict(timetable_dict, self.common.stp) - self.common.db_interface.update_timetable(timetable) + self.logger.debug("Robot %s received timetable msg", self.id) + timetable = Timetable.from_dict(timetable_dict, self.stp) + self.db_interface.update_timetable(timetable) def delete_task_cb(self, msg): task_dict = msg['payload']['task'] - task = self.common.task_cls.from_dict(task_dict) + task = self.task_cls.from_dict(task_dict) self.logger.debug("Deleting task %s ", task.id) - self.common.db_interface.remove_task(task.id) + self.db_interface.remove_task(task.id) def run(self): try: - self.common.api.start() + self.api.start() while True: time.sleep(0.5) except (KeyboardInterrupt, SystemExit): - self.logger.info("Terminating %s robot ...", self.common.id) - self.common.api.shutdown() + self.logger.info("Terminating %s robot ...", self.id) + self.api.shutdown() self.logger.info("Exiting...") @@ -81,7 +62,7 @@ def run(self): robot = config.configure_robot_proxy(robot_id) - robot.common.api.register_callbacks(robot) + robot.api.register_callbacks(robot) robot.run() diff --git a/mrs/robot_base.py b/mrs/robot_base.py new file mode 100644 index 000000000..bbafdc11f --- /dev/null +++ b/mrs/robot_base.py @@ -0,0 +1,25 @@ +import logging +from importlib import import_module + +from stn.stp import STP + +from mrs.db_interface import DBInterface +from mrs.structs.timetable import Timetable + + +class RobotBase(object): + def __init__(self, robot_id, api, robot_store, stp_solver, task_type): + + self.id = robot_id + self.api = api + self.db_interface = DBInterface(robot_store) + self.stp = STP(stp_solver) + task_class_path = task_type.get('class', 'mrs.structs.task') + self.task_cls = getattr(import_module(task_class_path), 'Task') + + self.timetable = Timetable.get_timetable(self.db_interface, self.id, self.stp) + self.db_interface.update_timetable(self.timetable) + + self.logger = logging.getLogger('mrs.robot_base.%s' % self.id) + self.logger.info("Robot base %s initialized", self.id) + diff --git a/mrs/task_allocation/bidder.py b/mrs/task_allocation/bidder.py index 969b7a0c3..a7bf4493c 100644 --- a/mrs/task_allocation/bidder.py +++ b/mrs/task_allocation/bidder.py @@ -6,18 +6,18 @@ from mrs.structs.bid import Bid from mrs.structs.task import TaskStatus from mrs.task_allocation.bidding_rule import BiddingRule +from mrs.robot_base import RobotBase """ Implements a variation of the the TeSSI algorithm using the bidding_rule specified in the config file """ -class Bidder(object): +class Bidder(RobotBase): - def __init__(self, robot_common, bidder_config): - - self.common = robot_common - self.logger = logging.getLogger('mrs.bidder.%s' % self.common.id) + def __init__(self, robot_config, bidder_config): + super().__init__(**robot_config) + self.logger = logging.getLogger('mrs.bidder.%s' % self.id) robustness = bidder_config.get('bidding_rule').get('robustness') temporal = bidder_config.get('bidding_rule').get('temporal') @@ -25,21 +25,21 @@ def __init__(self, robot_common, bidder_config): self.auctioneer_name = bidder_config.get("auctioneer_name") self.bid_placed = Bid() - self.logger.debug("Bidder initialized %s", self.common.id) + self.logger.debug("Bidder initialized %s", self.id) def task_announcement_cb(self, msg): - self.logger.debug("Robot %s received TASK-ANNOUNCEMENT", self.common.id) + self.logger.debug("Robot %s received TASK-ANNOUNCEMENT", self.id) round_id = msg['payload']['round_id'] received_tasks = msg['payload']['tasks'] - self.common.timetable = self.common.db_interface.get_timetable(self.common.id, self.common.stp) + self.timetable = self.db_interface.get_timetable(self.id, self.stp) self.compute_bids(received_tasks, round_id) def allocation_cb(self, msg): - self.logger.debug("Robot %s received ALLOCATION", self.common.id) + self.logger.debug("Robot %s received ALLOCATION", self.id) task_id = msg['payload']['task_id'] winner_id = msg['payload']['robot_id'] - if winner_id == self.common.id: + if winner_id == self.id: self.allocate_to_robot(task_id) self.send_finish_round() @@ -48,8 +48,8 @@ def compute_bids(self, received_tasks, round_id): no_bids = list() for task_id, task_info in received_tasks.items(): - task = self.common.task_cls.from_dict(task_info) - self.common.db_interface.update_task(task) + task = self.task_cls.from_dict(task_info) + self.db_interface.update_task(task) self.logger.debug("Computing bid of task %s", task.id) # Insert task in each possible position of the stn and @@ -75,7 +75,7 @@ def send_bids(self, bid, no_bids): :param no_bids: list of no bids """ if bid: - self.logger.debug("Robot %s placed bid %s", self.common.id, self.bid_placed) + self.logger.debug("Robot %s placed bid %s", self.id, self.bid_placed) self.send_bid(bid) if no_bids: @@ -84,9 +84,9 @@ def send_bids(self, bid, no_bids): self.send_bid(no_bid) def insert_task(self, task, round_id): - best_bid = Bid(self.bidding_rule, self.common.id, round_id, task, self.common.timetable) + best_bid = Bid(self.bidding_rule, self.id, round_id, task, self.timetable) - tasks = self.common.timetable.get_tasks() + tasks = self.timetable.get_tasks() if tasks: n_tasks = len(tasks) else: @@ -96,22 +96,22 @@ def insert_task(self, task, round_id): for position in range(1, n_tasks+2): # TODO check if the robot can make it to the task, if not, return - self.logger.debug("Schedule: %s", self.common.timetable.schedule) - if position == 1 and self.common.timetable.is_scheduled(): + self.logger.debug("Schedule: %s", self.timetable.schedule) + if position == 1 and self.timetable.is_scheduled(): self.logger.debug("Not adding task in position %s", position) continue self.logger.debug("Adding task %s in position %s", task.id, position) - self.common.timetable.add_task_to_stn(task, position) + self.timetable.add_task_to_stn(task, position) try: - self.common.timetable.solve_stp() + self.timetable.solve_stp() - self.logger.debug("STN %s: ", self.common.timetable.stn) - self.logger.debug("Dispatchable graph %s: ", self.common.timetable.dispatchable_graph) - self.logger.debug("Robustness Metric %s: ", self.common.timetable.robustness_metric) + self.logger.debug("STN %s: ", self.timetable.stn) + self.logger.debug("Dispatchable graph %s: ", self.timetable.dispatchable_graph) + self.logger.debug("Robustness Metric %s: ", self.timetable.robustness_metric) - bid = Bid(self.bidding_rule, self.common.id, round_id, task, self.common.timetable) + bid = Bid(self.bidding_rule, self.id, round_id, task, self.timetable) bid.compute_cost(position) self.logger.debug("Cost: %s", bid.cost) @@ -123,7 +123,7 @@ def insert_task(self, task, round_id): " task %s in position %s", task.id, position) # Restore schedule for the next iteration - self.common.timetable.remove_task_from_stn(position) + self.timetable.remove_task_from_stn(position) self.logger.debug("Best bid for task %s: %s", task.id, best_bid) @@ -154,32 +154,32 @@ def send_bid(self, bid): :param round_id: :return: """ - msg = self.common.api.create_message(bid) + msg = self.api.create_message(bid) tasks = [task for task in bid.timetable.get_tasks()] - self.logger.info("Round %s: robod_id %s bids %s for task %s and tasks %s", bid.round_id, self.common.id, bid.cost, bid.task.id, tasks) - self.common.api.publish(msg, peer=self.auctioneer_name) + self.logger.info("Round %s: robod_id %s bids %s for task %s and tasks %s", bid.round_id, self.id, bid.cost, bid.task.id, tasks) + self.api.publish(msg, peer=self.auctioneer_name) def allocate_to_robot(self, task_id): - self.common.timetable = copy.deepcopy(self.bid_placed.timetable) - self.common.db_interface.update_timetable(self.common.timetable) - task_dict = self.common.db_interface.get_task(task_id) - task = self.common.task_cls.from_dict(task_dict) - self.common.db_interface.update_task_status(task, TaskStatus.ALLOCATED) + self.timetable = copy.deepcopy(self.bid_placed.timetable) + self.db_interface.update_timetable(self.timetable) + task_dict = self.db_interface.get_task(task_id) + task = self.task_cls.from_dict(task_dict) + self.db_interface.update_task_status(task, TaskStatus.ALLOCATED) - self.logger.info("Robot %s allocated task %s", self.common.id, task_id) - self.logger.debug("STN %s", self.common.timetable.stn) - self.logger.debug("Dispatchable graph %s", self.common.timetable.dispatchable_graph) + self.logger.info("Robot %s allocated task %s", self.id, task_id) + self.logger.debug("STN %s", self.timetable.stn) + self.logger.debug("Dispatchable graph %s", self.timetable.dispatchable_graph) - tasks = [task for task in self.common.timetable.get_tasks()] + tasks = [task for task in self.timetable.get_tasks()] - self.logger.debug("Tasks allocated to robot %s:%s", self.common.id, tasks) + self.logger.debug("Tasks allocated to robot %s:%s", self.id, tasks) def send_finish_round(self): - finish_round = FinishRound(self.common.id) - msg = self.common.api.create_message(finish_round) + finish_round = FinishRound(self.id) + msg = self.api.create_message(finish_round) - self.logger.info("Robot %s sends close round msg ", self.common.id) - self.common.api.publish(msg, groups=['TASK-ALLOCATION']) + self.logger.info("Robot %s sends close round msg ", self.id) + self.api.publish(msg, groups=['TASK-ALLOCATION']) diff --git a/mrs/task_execution/schedule_monitor.py b/mrs/task_execution/schedule_monitor.py index cd6709baf..f63c5c1c5 100644 --- a/mrs/task_execution/schedule_monitor.py +++ b/mrs/task_execution/schedule_monitor.py @@ -1,6 +1,7 @@ from mrs.db_interface import DBInterface +from mrs.robot_base import RobotBase -class ScheduleMonitor(object): - def __init__(self, robot_common, schedule_monitor_config): - self.common = robot_common +class ScheduleMonitor(RobotBase): + def __init__(self, robot_config, schedule_monitor_config): + super().__init__(**robot_config) From fb72d56a6b9274fedbf543b70cb3ae9cadefac29 Mon Sep 17 00:00:00 2001 From: Angela Enriquez Date: Tue, 20 Aug 2019 18:52:06 +0200 Subject: [PATCH 24/24] robot_base: removing logger --- mrs/robot_base.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/mrs/robot_base.py b/mrs/robot_base.py index bbafdc11f..b8c7aa7ff 100644 --- a/mrs/robot_base.py +++ b/mrs/robot_base.py @@ -1,4 +1,3 @@ -import logging from importlib import import_module from stn.stp import STP @@ -20,6 +19,4 @@ def __init__(self, robot_id, api, robot_store, stp_solver, task_type): self.timetable = Timetable.get_timetable(self.db_interface, self.id, self.stp) self.db_interface.update_timetable(self.timetable) - self.logger = logging.getLogger('mrs.robot_base.%s' % self.id) - self.logger.info("Robot base %s initialized", self.id)