Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/development' into improvement-er…
Browse files Browse the repository at this point in the history
…ror-handling
  • Loading branch information
rcschrg committed Nov 22, 2023
2 parents 7a71c64 + d16eb09 commit 3bffe5f
Show file tree
Hide file tree
Showing 9 changed files with 45 additions and 52 deletions.
2 changes: 1 addition & 1 deletion docs/source/agents-container.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mango container

In mango, agents live in a ``container``. The container is responsible for everything network related of the agent.
This includes in particular sending and receiving of messages, but also message distribution to the correct agent or
(de-)serialization of messages.
serialization and deserialization of messages.
Container also help to to speed up message exchange between agents that run on the same physical hardware,
as data that is exchanged between such agents will not have to be sent through the network.

Expand Down
2 changes: 1 addition & 1 deletion docs/source/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
# -- Project information -----------------------------------------------------

project = 'mango'
copyright = '2021, mango team'
copyright = '2023, mango team'
author = 'mango team'

# The full version, including alpha/beta/rc tags
Expand Down
2 changes: 1 addition & 1 deletion docs/source/impressum.rst
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ DE 811582102

**Verantwortlich im Sinne der Presse**

| Dr. Christoph Mayer (Bereichsleiter)
| Dr. Ing. Jürgen Meister (Bereichsleiter)
| OFFIS e.V.
| Escherweg 2
| 26121 Oldenburg
Expand Down
2 changes: 1 addition & 1 deletion docs/source/legals.rst
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ DE 811582102

**Responsible in the sense of press law**

| Dr. Christoph Mayer (Director)
| Dr. Ing. Jürgen Meister (Director)
| OFFIS e.V.
| Escherweg 2
| 26121 Oldenburg
Expand Down
28 changes: 28 additions & 0 deletions docs/source/scheduling.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@ The core of this API is the scheduler, which is part of every agent. To schedule
- Executes a coroutine periodically with a static delay between the cycles
* - ConditionalScheduledTask
- Executes the coroutine when a specified condition evaluates to True
* - AwaitingTask
- Execute a given coroutine after another given coroutine has been awaited
* - RecurrentScheduledTask
- Will get executed periodically with a specified delay



Furthermore there are convenience methods to get rid of the class imports when using these types of tasks.

Expand Down Expand Up @@ -56,6 +62,28 @@ Dispatch Tasks to other Process
As asyncio does not provide real parallelism to utilize multiple cores and agents may have tasks, which need a lot computational power, the need to dispatch certain tasks to other processes appear. Handling inter process communication manually is quite exhausting and having multiple process pools across different roles or agents leads to inefficient resource allocations. As a result mango offers a way to dispatch tasks, based on coroutine-functions, to other processes, managed by the framework.

Analogues to the normal API there are two different ways, first you create a ScheduledProcessTask and call ``schedule_process_task``, second you invoke the convnience methods with "process" in the name. These methods exists on any Agent, the RoleContext and the Scheduler.
In mango the following process tasks are available:

.. list-table:: Available ProcessTasks
:widths: 30 70
:header-rows: 1

* - Class
- Description
* - ScheduledProcessTask
- Marks a ScheduledTask as process compatible
* - TimestampScheduledProcessTask
- Timestamp based one-shot task
* - AwaitingProcessTask
- Await a coroutine, then execute another
* - InstantScheduledProcessTask
- One-shot task, which will get executed instantly
* - PeriodicScheduledProcessTask
- Executes a coroutine periodically with a static delay between the cycles
* - RecurrentScheduledProcessTask
- Will get executed periodically with a specified delay
* - ConditionalProcessTask
- Will get executed as soon as the given condition is fulfilled

.. code-block:: python3
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def handle_ask_feed_in(self, sender_addr, sender_id):
content = reported_feed_in

acl_meta = {
"sender_addr": self.context.addr,
"sender_addr": self._context.addr,
"sender_id": self.aid,
"performative": Performatives.inform,
}
Expand All @@ -68,7 +68,7 @@ def handle_set_feed_in_max(self, max_feed_in, sender_addr, sender_id):
self.max_feed_in = float(max_feed_in)
print(f"{self.aid}: Limiting my feed_in to {max_feed_in}")
self.schedule_instant_task(
self.context.send_acl_message(
self._context.send_acl_message(
content=None,
receiver_addr=sender_addr,
receiver_id=sender_id,
Expand Down Expand Up @@ -125,13 +125,13 @@ async def run(self):
for addr, aid in self.known_agents:
content = None
acl_meta = {
"sender_addr": self.context.addr,
"sender_addr": self._context.addr,
"sender_id": self.aid,
"performative": Performatives.request,
}
# alternatively we could call send_acl_message here directly and await it
self.schedule_instant_task(
self.context.send_acl_message(
self._context.send_acl_message(
content=content,
receiver_addr=addr,
receiver_id=aid,
Expand All @@ -149,14 +149,14 @@ async def run(self):
for addr, aid in self.known_agents:
content = min_feed_in
acl_meta = {
"sender_addr": self.context.addr,
"sender_addr": self._context.addr,
"sender_id": self.aid,
"performative": Performatives.propose,
}

# alternatively we could call send_acl_message here directly and await it
self.schedule_instant_task(
self.context.send_acl_message(
self._context.send_acl_message(
content=content,
receiver_addr=addr,
receiver_id=aid,
Expand Down
12 changes: 6 additions & 6 deletions examples/tutorial/v3_codecs_and_typing.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def handle_ask_feed_in(self, sender_addr, sender_id):
msg = FeedInReplyMsg(reported_feed_in)

self.schedule_instant_task(
self.context.send_acl_message(
self._context.send_acl_message(
content=msg,
receiver_addr=sender_addr,
receiver_id=sender_id,
Expand All @@ -82,7 +82,7 @@ def handle_set_feed_in_max(self, max_feed_in, sender_addr, sender_id):
msg = MaxFeedInAck()

self.schedule_instant_task(
self.context.send_acl_message(
self._context.send_acl_message(
content=msg,
receiver_addr=sender_addr,
receiver_id=sender_id,
Expand Down Expand Up @@ -132,11 +132,11 @@ async def run(self):
# ask pv agent feed-ins
for addr, aid in self.known_agents:
msg = AskFeedInMsg()
acl_meta = {"sender_addr": self.context.addr, "sender_id": self.aid}
acl_meta = {"sender_addr": self._context.addr, "sender_id": self.aid}

# alternatively we could call send_acl_message here directly and await it
self.schedule_instant_task(
self.context.send_acl_message(
self._context.send_acl_message(
content=msg,
receiver_addr=addr,
receiver_id=aid,
Expand All @@ -153,11 +153,11 @@ async def run(self):

for addr, aid in self.known_agents:
msg = SetMaxFeedInMsg(min_feed_in)
acl_meta = {"sender_addr": self.context.addr, "sender_id": self.aid}
acl_meta = {"sender_addr": self._context.addr, "sender_id": self.aid}

# alternatively we could call send_acl_message here directly and await it
self.schedule_instant_task(
self.context.send_acl_message(
self._context.send_acl_message(
content=msg,
receiver_addr=addr,
receiver_id=aid,
Expand Down
35 changes: 0 additions & 35 deletions mango/agent/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,41 +224,6 @@ def schedule_conditional_task(
src=src,
)

def schedule_datetime_process_task(
self, coroutine_creator, date_time: datetime, on_stop=None, src=None
):
"""Schedule a task at specified datetime in another process.
:param coroutine_creator: coroutine_creator creating couroutine to be scheduled
:type coroutine_creator: Coroutine-creator
:param date_time: datetime defining when the task should start
:type date_time: datetime
:param src: creator of the task
:type src: Object
"""
return self._scheduler.schedule_datetime_process_task(
coroutine_creator=coroutine_creator,
date_time=date_time,
on_stop=on_stop,
src=src,
)

def schedule_datetime_task(
self, coroutine, date_time: datetime, on_stop=None, src=None
):
"""Schedule a task at specified datetime.
:param coroutine: coroutine to be scheduled
:type coroutine: Coroutine
:param date_time: datetime defining when the task should start
:type date_time: datetime
:param src: creator of the task
:type src: Object
"""
return self._scheduler.schedule_datetime_task(
coroutine=coroutine, date_time=date_time, on_stop=on_stop, src=src
)

def schedule_timestamp_task(
self, coroutine, timestamp: float, on_stop=None, src=None
):
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ packaging==20.4
paho-mqtt==1.5.1
pika==1.1.0
pluggy==0.13.1
protobuf==3.19.1
protobuf==4.24.4
py==1.9.0
pyparsing==2.4.7
pytest==6.2.5
Expand Down

0 comments on commit 3bffe5f

Please sign in to comment.