diff --git a/docs/source/tutorial.rst b/docs/source/tutorial.rst index f0b01d9..6ea7d95 100644 --- a/docs/source/tutorial.rst +++ b/docs/source/tutorial.rst @@ -10,8 +10,7 @@ This tutorial gives an overview of the basic functions of mango agents and conta parts building a scenario of two PV plants, operated by their respective agents being directed by a remote controller. -Each part comes with a standalone executable file. Subsequent parts either extend the functionality or simplify -some concept in the previous part. +Subsequent parts either extend the functionality or simplify some concept in the previous part. As a whole, this tutorial covers: - container and agent creation @@ -26,8 +25,6 @@ As a whole, this tutorial covers: 1. Setup and Message Passing ***************************** -Corresponding file: `v1_basic_setup_and_message_passing.py` - For your first mango tutorial, you will learn the fundamentals of creating mango agents and containers as well as making them communicate with each other. @@ -37,101 +34,122 @@ This example covers: - basic message passing - clean shutdown of containers -.. raw:: html - -
- step by step - First, we want to create two simple agents and have the container send a message to one of them. An agent is created by defining a class that inherits from the base Agent class of mango. -Every agent must implement the ``handle_message`` method to which incoming messages are forwarded by the container. +Every agent must implement the :meth:`mango.Agent.handle_message` method to which incoming messages are forwarded by the container. -.. code-block:: python +.. testcode:: from mango import Agent class PVAgent(Agent): - def __init__(self, container): - super().__init__(container) - print(f"Hello I am a PV agent! My id is {self.aid}.") + def __init__(self): + super().__init__() + print("Hello I am a PV agent!") def handle_message(self, content, meta): print(f"Received message with content: {content} and meta {meta}.") + PVAgent() + +.. testoutput:: + + Hello I am a PV agent! + Now we are ready to instantiate our system. mango is fundamentally built on top of asyncio and a lot of its functions are provided as coroutines. This means, practically every mango executable file will implement some variation of this pattern: -.. code-block:: python +.. testcode:: import asyncio async def main(): + print("This will run in the asyncio loop.") # do whatever here - if __name__ == "__main__": - asyncio.run(main()) + asyncio.run(main()) + +.. testoutput:: + + This will run in the asyncio loop. -First, we create the container. A container is created via the ``mango.create_container`` coroutine which requires at least -the address of the container as a parameter. +First, we create the container. A tcp container is created via the :meth:`mango.create_tcp_container` function which requires at least +the address of the container as a parameter. Other container types available by using :meth:`mango.create_mqtt_container` and :meth:`mango.create_ec_container`. +For this tutorial we will cover the tcp container. -.. code-block:: python +.. testcode:: + + from mango import create_tcp_container PV_CONTAINER_ADDRESS = ("localhost", 5555) - # defaults to tcp connection - pv_container = await create_container(addr=PV_CONTAINER_ADDRESS) + pv_container = create_tcp_container(addr=PV_CONTAINER_ADDRESS) + + print(pv_container.addr) +.. testoutput:: -Now we can create our agents. Agents always live inside a container and this container must be passed to their constructor. + ('localhost', 5555) -.. code-block:: python +Now we can create our agents. Agents always live inside a container and therefore need to be registered to the container. + +.. testcode:: # agents always live inside a container - pv_agent_0 = PVAgent(pv_container) - pv_agent_1 = PVAgent(pv_container) - -For now, our agents are purely passive entities. To make them do something, we need to send them a message. Messages are -passed by the container via the ``send_message`` function always at least expects some content and a target address. -To send a message directly to an agent, we also need to provide its agent id which is set by the container when the agent -is created. - -.. code-block:: python - - # we can now send a simple message to an agent and observe that it is received: - # Note that as of now agent IDs are set automatically as agent0, agent1, ... in order of instantiation. - await pv_container.send_message( - "Hello, this is a simple message.", - receiver_addr=PV_CONTAINER_ADDRESS, - receiver_id="agent0", - ) + async def main(): + pv_agent_0 = pv_container.register(PVAgent()) + pv_agent_1 = pv_container.register(PVAgent()) -Finally, you should always cleanly shut down your containers before your program terminates. + print(pv_agent_1.addr) -.. code-block:: python + asyncio.run(main()) - # don't forget to properly shut down containers at the end of your program - # otherwise you will get an asyncio.exceptions.CancelledError - await pv_container.shutdown() +.. testoutput:: -This concludes the first part of our tutorial. If you run this code, you should receive the following output: + Hello I am a PV agent! + Hello I am a PV agent! + AgentAddress(protocol_addr=('localhost', 5555), aid='agent1') - | Hello I am a PV agent! My id is agent0. - | Hello I am a PV agent! My id is agent1. - | Received message with content: Hello, this is a simple message. and meta {'network_protocol': 'tcp', 'priority': 0}. +For now, our agents and containers are purely passive entities. First, we need to activate the container to start +the tcp server and its internal asynchronous behavior. In mango this can be done with :meth:`mango.activate` and the `async with` syntax. +Second, we need to send a message from one agent to the other. Messages are passed by the container via the :meth:`mango.Agent.send_message` +function always at least expects some content and a target agent address. To send a message directly to an agent, we also need to provide +its agent id which is set by the container when the agent is created. The address of the container and the aid +is wrapped in the :class:`mango.AgentAddress` class and can be retrieved with :meth:`mango.Agent.addr`. +.. testcode:: -.. raw:: html + from mango import activate + + # agents always live inside a container + async def main(): + pv_agent_0 = pv_container.register(PVAgent()) + pv_agent_1 = pv_container.register(PVAgent()) + + async with activate(pv_container) as c: + # we can now send a simple message to an agent and observe that it is received: + # Note that as of now agent IDs are set automatically as agent0, agent1, ... + # in order of instantiation. + await pv_agent_0.send_message( + "Hello, this is a simple message.", + receiver_addr=pv_agent_1.addr + ) + + asyncio.run(main()) + +.. testoutput:: + + Hello I am a PV agent! + Hello I am a PV agent! + Received message with content: Hello, this is a simple message. and meta {'sender_id': 'agent2', 'sender_addr': ('localhost', 5555), 'receiver_id': 'agent3', 'network_protocol': 'tcp', 'priority': 0}. -
********************************* 2. Messaging between Containers ********************************* -Corresponding file: `v2_inter_container_messaging_and_basic_functionality.py` - In the previous example, you learned how to create mango agents and containers and how to send basic messages between them. In this example, you expand upon this. We introduce a controller agent that asks the current feed_in of our PV agents and subsequently limits the output of both to their minimum. @@ -142,11 +160,6 @@ This example covers: - setting custom agent ids - use of metadata -.. raw:: html - -
- step by step - First, we define our controller Agent. To ensure it can message the pv agents we pass that information directly to it in the constructor. The control agent will send out messages to each pv agent, await their replies and act according to that information. To handle this, we also add some control structures to the @@ -154,26 +167,42 @@ constructor that we will later use to keep track of which agents have already an As an additional feature, we will make it possible to manually set the agent of our agents by. -.. code-block:: python +.. testcode:: + + from mango import Agent, addr class ControllerAgent(Agent): - def __init__(self, container, known_agents, suggested_aid=None): - super().__init__(container, suggested_aid=suggested_aid) + def __init__(self, known_agents): + super().__init__() + self.known_agents = known_agents self.reported_feed_ins = [] self.reported_acks = 0 self.reports_done = None self.acks_done = None -Next, we set up its ``handle_message`` function. The controller needs to distinguish between two message types: + print(ControllerAgent([addr("protocol_addr", "aid")]).known_agents) + +.. testoutput:: + + [AgentAddress(protocol_addr='protocol_addr', aid='aid')] + +Next, we set up its :meth:`mango.Agent.handle_message` function. The controller needs to distinguish between two message types: The replies to feed_in requests and later the acknowledgements that a new maximum feed_in was set by a pv agent. -We use the assign the key ``performative``of the metadata of the message to do this. We set the ``performative`` field to ``inform`` -for feed_in replies and to ``accept_proposal`` for feed_in change acknowledgements. +We assign the key `performative` of the metadata of the message to do this. We set the `performative` entry to `inform` +for feed_in replies and to `accept_proposal` for feed_in change acknowledgements. -.. code-block:: python +.. testcode:: class ControllerAgent(Agent): - """...""" + def __init__(self, known_agents): + super().__init__() + + self.known_agents = known_agents + self.reported_feed_ins = [] + self.reported_acks = 0 + self.reports_done = None + self.acks_done = None def handle_message(self, content, meta): performative = meta['performative'] @@ -198,65 +227,63 @@ for feed_in replies and to ``accept_proposal`` for feed_in change acknowledgemen if self.acks_done is not None: self.acks_done.set_result(True) -We do the same for our PV agents. We will also enable user defined agent ids here. +We do the same for our PV agents. + +.. testcode:: + + from mango import sender_addr -.. code-block:: python + PV_FEED_IN = { + "PV Agent 0": 2.0, + "PV Agent 1": 1.0, + } class PVAgent(Agent): - def __init__(self, container, suggested_aid=None): - super().__init__(container, suggested_aid=suggested_aid) + def __init__(self): + super().__init__() + self.max_feed_in = -1 def handle_message(self, content, meta): performative = meta["performative"] - sender_addr = meta["sender_addr"] - sender_id = meta["sender_id"] + sender = sender_addr(meta) if performative == Performatives.request: # ask_feed_in message - self.handle_ask_feed_in(sender_addr, sender_id) + self.handle_ask_feed_in(sender) elif performative == Performatives.propose: # set_max_feed_in message - self.handle_set_feed_in_max(content, sender_addr, sender_id) + self.handle_set_feed_in_max(content, sender) else: print(f"{self.aid}: Received an unexpected message with content {content} and meta {meta}") - -When a PV agent receives a request from the controller, it immediately answers. Note two important changes to the first -example here: First, within our message handling methods we can not ``await send_message`` directly -because ``handle_message`` is not a coroutine. Instead, we pass ``send_message`` as a task to the scheduler to be -executed at once via the ``schedule_instant_task`` method. -Second, we set ``meta`` to contain the typing information of our message. - -.. code-block:: python - - class PVAgent(Agent): - """...""" - - def handle_ask_feed_in(self, sender_addr, sender_id): + def handle_ask_feed_in(self, sender): reported_feed_in = PV_FEED_IN[self.aid] # PV_FEED_IN must be defined at the top content = reported_feed_in - meta = {"sender_addr": self.addr, "sender_id": self.aid, - "performative": Performatives.inform} - self.schedule_instant_message( content=content, - receiver_addr=sender_addr, - receiver_id=sender_id, - **meta, + receiver_addr=sender, + performative=Performatives.inform ) - def handle_set_feed_in_max(self, max_feed_in, sender_addr, sender_id): + def handle_set_feed_in_max(self, max_feed_in, sender): self.max_feed_in = float(max_feed_in) print(f"{self.aid}: Limiting my feed_in to {max_feed_in}") + self.schedule_instant_message( content=None, - receiver_addr=sender_addr, - receiver_id=sender_id, + receiver_addr=sender, performative=Performatives.accept_proposal, ) + +When a PV agent receives a request from the controller, it immediately answers. Note two important changes to the first +example here: First, within our message handling methods we can not ``await send_message`` directly +because ``handle_message`` is not a coroutine. Instead, we pass ``send_message`` as a task to the scheduler to be +executed at once via the ``schedule_instant_task`` method. +Second, we set ``meta`` to contain the typing information of our message. + Now both of our agents can handle their respective messages. The last thing to do is make the controller actually perform its active actions. We do this by implementing a ``run`` function with the following control flow: - send a feed_in request to each known pv agent @@ -266,10 +293,40 @@ perform its active actions. We do this by implementing a ``run`` function with t - again, wait for all pv agents to reply - terminate -.. code-block:: python +.. testcode:: class ControllerAgent(Agent): - """...""" + def __init__(self, known_agents): + super().__init__() + + self.known_agents = known_agents + self.reported_feed_ins = [] + self.reported_acks = 0 + self.reports_done = None + self.acks_done = None + + def handle_message(self, content, meta): + performative = meta['performative'] + if performative == Performatives.inform: + # feed_in_reply message + self.handle_feed_in_reply(content) + elif performative == Performatives.accept_proposal: + # set_max_ack message + self.handle_set_max_ack() + else: + print(f"{self.aid}: Received an unexpected message with content {content} and meta {meta}") + + def handle_feed_in_reply(self, feed_in_value): + self.reported_feed_ins.append(float(feed_in_value)) + if len(self.reported_feed_ins) == len(self.known_agents): + if self.reports_done is not None: + self.reports_done.set_result(True) + + def handle_set_max_ack(self): + self.reported_acks += 1 + if self.reported_acks == len(self.known_agents): + if self.acks_done is not None: + self.acks_done.set_result(True) async def run(self): # we define an asyncio future to await replies from all known pv agents: @@ -277,34 +334,30 @@ perform its active actions. We do this by implementing a ``run`` function with t self.acks_done = asyncio.Future() # ask pv agent feed-ins - for addr, aid in self.known_agents: - content = None - meta = {"sender_addr": self.addr, "sender_id": self.aid, - "performative": Performatives.request} + for addr in self.known_agents: self.schedule_instant_message( - content=content, + content=None, receiver_addr=addr, - receiver_id=aid, - **meta, + performative=Performatives.request ) # wait for both pv agents to answer await self.reports_done + # deterministic output + self.reported_feed_ins.sort() + # limit both pv agents to the smaller ones feed-in print(f"{self.aid}: received feed_ins: {self.reported_feed_ins}") min_feed_in = min(self.reported_feed_ins) - for addr, aid in self.known_agents: + for addr in self.known_agents: content = min_feed_in - meta = {"sender_addr": self.addr, "sender_id": self.aid, - "performative": Performatives.propose} self.schedule_instant_message( content=content, receiver_addr=addr, - receiver_id=aid, - **meta, + performative=Performatives.propose ) # wait for both pv agents to acknowledge the change @@ -312,7 +365,9 @@ perform its active actions. We do this by implementing a ``run`` function with t Lastly, we call all relevant instantiations and the run function within our main coroutine: -.. code-block:: python +.. testcode:: + + from mango import create_tcp_container, activate, Performatives PV_CONTAINER_ADDRESS = ("localhost", 5555) CONTROLLER_CONTAINER_ADDRESS = ("localhost", 5556) @@ -322,49 +377,39 @@ Lastly, we call all relevant instantiations and the run function within our main } async def main(): - pv_container = await create_container(addr=PV_CONTAINER_ADDRESS) - controller_container = await create_container(addr=CONTROLLER_CONTAINER_ADDRESS) + pv_container = create_tcp_container(addr=PV_CONTAINER_ADDRESS) + controller_container = create_tcp_container(addr=CONTROLLER_CONTAINER_ADDRESS) # agents always live inside a container - pv_agent_0 = PVAgent(pv_container, suggested_aid='PV Agent 0') - pv_agent_1 = PVAgent(pv_container, suggested_aid='PV Agent 1') + pv_agent_0 = pv_container.register(PVAgent(), suggested_aid='PV Agent 0') + pv_agent_1 = pv_container.register(PVAgent(), suggested_aid='PV Agent 1') # We pass info of the pv agents addresses to the controller here directly. # In reality, we would use some kind of discovery mechanism for this. known_agents = [ - (PV_CONTAINER_ADDRESS, pv_agent_0.aid), - (PV_CONTAINER_ADDRESS, pv_agent_1.aid), + pv_agent_0.addr, + pv_agent_1.addr, ] - controller_agent = ControllerAgent(controller_container, known_agents, suggested_aid='Controller') + controller_agent = controller_container.register(ControllerAgent(known_agents), suggested_aid='Controller') - # the only active component in this setup - await controller_agent.run() + async with activate(pv_container, controller_container) as cl: + # the only active component in this setup + await controller_agent.run() - # always properly shut down your containers - await pv_container.shutdown() - await controller_container.shutdown() + asyncio.run(main()) - if __name__ == "__main__": - asyncio.run(main()) +.. testoutput:: -This concludes the second part of our tutorial. If you run this code you should receive the following output: + Controller: received feed_ins: [1.0, 2.0] + PV Agent 0: Limiting my feed_in to 1.0 + PV Agent 1: Limiting my feed_in to 1.0 - | Controller: received feed_ins: [2.0, 1.0] - | PV Agent 0: Limiting my feed_in to 1.0 - | PV Agent 1: Limiting my feed_in to 1.0 - - -.. raw:: html - -
******************************************* 3. Using Codecs to simplify Message Types ******************************************* -Corresponding file: `v3_codecs_and_typing.py` - In example 2, you created some basic agent functionality and established inter-container communication. Message types were distinguished by the performative field of the meta information. This approach is tedious and prone to error. A better way is to use dedicated message objects and using their types to distinguish @@ -381,70 +426,76 @@ This example covers: - codec basics - the json_serializable decorator -.. raw:: html - -
- step by step - We want to use the types of custom message objects as the new mechanism for message typing. We define these -as simple data classes. For simple classes like this, we can use the ``json_serializable`` decorator to +as simple data classes. For simple classes like this, we can use the :meth:`mango.json_serializable`` decorator to provide us with the serialization functionality. -.. code-block:: python +.. testcode:: - import mango.messages.codecs as codecs + from mango import json_serializable from dataclasses import dataclass - @codecs.json_serializable + @json_serializable @dataclass class AskFeedInMsg: pass - @codecs.json_serializable + @json_serializable @dataclass class FeedInReplyMsg: feed_in: int - @codecs.json_serializable + @json_serializable @dataclass class SetMaxFeedInMsg: max_feed_in: int - @codecs.json_serializable + @json_serializable @dataclass class MaxFeedInAck: pass Next, we need to create a codec, make our message objects known to it, and pass it to our containers. -.. code-block:: python +.. testcode:: + + from mango import JSON + + PV_CONTAINER_ADDRESS = ("localhost", 5555) + CONTROLLER_CONTAINER_ADDRESS = ("localhost", 5556) - my_codec = codecs.JSON() + my_codec = JSON() my_codec.add_serializer(*AskFeedInMsg.__serializer__()) my_codec.add_serializer(*SetMaxFeedInMsg.__serializer__()) my_codec.add_serializer(*FeedInReplyMsg.__serializer__()) my_codec.add_serializer(*MaxFeedInAck.__serializer__()) - pv_container = await create_container(addr=PV_CONTAINER_ADDRESS, codec=my_codec) + pv_container = create_tcp_container(addr=PV_CONTAINER_ADDRESS, codec=my_codec) - controller_container = await create_container( + controller_container = create_tcp_container( addr=CONTROLLER_CONTAINER_ADDRESS, codec=my_codec ) Any time the content of a message matches one of these types now the corresponding serialize and deserialize functions are called. Of course, you can also create your own serialization and deserialization functions with -more sophisticated behaviours and pass them to the codec. For more details refer to the ``codecs`` section of +more sophisticated behaviours and pass them to the codec. For more details refer to the :doc:`codecs` section of the documentation. With this, the message handling in our agent classes can be simplified: -.. code-block:: python +.. testcode:: class ControllerAgent(Agent): - """...""" + def __init__(self, known_agents): + super().__init__() + self.known_agents = known_agents + self.reported_feed_ins = [] + self.reported_acks = 0 + self.reports_done = None + self.acks_done = None def handle_message(self, content, meta): if isinstance(content, FeedInReplyMsg): @@ -454,39 +505,125 @@ With this, the message handling in our agent classes can be simplified: else: print(f"{self.aid}: Received a message of unknown type {type(content)}") + def handle_feed_in_reply(self, feed_in_value): + self.reported_feed_ins.append(float(feed_in_value)) + if len(self.reported_feed_ins) == len(self.known_agents): + if self.reports_done is not None: + self.reports_done.set_result(True) + + def handle_set_max_ack(self): + self.reported_acks += 1 + if self.reported_acks == len(self.known_agents): + if self.acks_done is not None: + self.acks_done.set_result(True) + + async def run(self): + # we define an asyncio future to await replies from all known pv agents: + self.reports_done = asyncio.Future() + self.acks_done = asyncio.Future() + + # ask pv agent feed-ins + for addr in self.known_agents: + msg = AskFeedInMsg() + + # alternatively we could call send_acl_message here directly and await it + self.schedule_instant_message( + content=msg, + receiver_addr=addr, + ) + + # wait for both pv agents to answer + await self.reports_done + + # deterministic output + self.reported_feed_ins.sort() + + # limit both pv agents to the smaller ones feed-in + print(f"{self.aid}: received feed_ins: {self.reported_feed_ins}") + min_feed_in = min(self.reported_feed_ins) + + for addr in self.known_agents: + msg = SetMaxFeedInMsg(min_feed_in) + + # alternatively we could call send_acl_message here directly and await it + self.schedule_instant_message( + content=msg, + receiver_addr=addr + ) + + # wait for both pv agents to acknowledge the change + await self.acks_done class PVAgent(Agent): - """...""" + def __init__(self): + super().__init__() + + self.max_feed_in = -1 def handle_message(self, content, meta): - sender_addr = meta["sender_addr"] - sender_id = meta["sender_id"] + sender = sender_addr(meta) if isinstance(content, AskFeedInMsg): - self.handle_ask_feed_in(sender_addr, sender_id) + self.handle_ask_feed_in(sender) elif isinstance(content, SetMaxFeedInMsg): - self.handle_set_feed_in_max(content.max_feed_in, sender_addr, sender_id) + self.handle_set_feed_in_max(content.max_feed_in, sender) else: print(f"{self.aid}: Received a message of unknown type {type(content)}") + def handle_ask_feed_in(self, sender_addr): + reported_feed_in = PV_FEED_IN[self.aid] # PV_FEED_IN must be defined at the top + msg = FeedInReplyMsg(reported_feed_in) + + self.schedule_instant_message( + content=msg, + receiver_addr=sender_addr + ) -This concludes the third part of our tutorial. If you run the code, -you should receive the same output as in part 2: + def handle_set_feed_in_max(self, max_feed_in, sender_addr): + self.max_feed_in = float(max_feed_in) + print(f"{self.aid}: Limiting my feed_in to {max_feed_in}") + msg = MaxFeedInAck() - | Controller: received feed_ins: [2.0, 1.0] - | PV Agent 0: Limiting my feed_in to 1.0 - | PV Agent 1: Limiting my feed_in to 1.0 + self.schedule_instant_message( + content=msg, + receiver_addr=sender_addr, + ) -.. raw:: html +.. testcode:: -
+ async def main(): + # agents always live inside a container + pv_agent_0 = pv_container.register(PVAgent(), suggested_aid='PV Agent 0') + pv_agent_1 = pv_container.register(PVAgent(), suggested_aid='PV Agent 1') + + # We pass info of the pv agents addresses to the controller here directly. + # In reality, we would use some kind of discovery mechanism for this. + known_agents = [ + pv_agent_0.addr, + pv_agent_1.addr, + ] + + controller_agent = controller_container.register(ControllerAgent(known_agents), + suggested_aid='Controller') + + async with activate(pv_container, controller_container) as cl: + # the only active component in this setup + await controller_agent.run() + + asyncio.run(main()) + +.. testoutput:: + + Controller: received feed_ins: [1.0, 2.0] + PV Agent 0: Limiting my feed_in to 1.0 + PV Agent 1: Limiting my feed_in to 1.0 + +This concludes the third part of our tutorial. ************************* 4. Scheduling and Roles ************************* -Corresponding file: `v4_scheduling_and_roles.py` - In example 3, you restructured your code to use codecs for easier handling of typed message objects. Now it is time to expand the functionality of our controller. In addition to setting the maximum feed_in of the pv agents, the controller should now also periodically check if the pv agents are still reachable. @@ -499,43 +636,65 @@ With the introduction of this task, we know have different responsibilities for responsibilities we can use the role API. The idea of using roles is to divide the functionality of an agent by responsibility in a structured way. -A role is a python object that can be assigned to a RoleAgent. The two main functions each role implements are: +A role is a python object that can be assigned to a RoleAgent. There are several lifecycle functions each role may implement: - __init__ - where you do the initial object setup - - setup - which is called when the role is assigned to an agent + - :meth:`mango.Role.setup` - which is called when the role is assigned to an agent + - :meth:`mango.Role.on_start` - which is called when the container is started + - :meth:`mango.Role.on_ready` - which is called when are activated -This distinction is relevant because only within `setup` the RoleContext (i.e. access to the parent agent and container) exist. -Thus, things like message handlers that require container knowledge are introduced there. +This distinction is relevant because not all features exist after construction with __init__. Most of the time +you want to implement :meth:`mango.Role.on_ready` for actions like message sending, or scheduling, because only +since this point you can be sure that all relevant container are started and the agent the role belongs to has been registered. +However, the setup of the role itself should be done in :meth:`mango.Role.setup`. This example covers: - role API basics - scheduling and periodic tasks -.. raw:: html - -
- step by step - -The key part of defining roles are their ``__init__`` and ``setup`` methods. The first is called to create the role object. -The second is called when the role is assigned to an agent. In our case, the main change is that the previous distinction -of message types within ``handle_message`` is now done by subscribing to the corresponding message type to tell the agent -it should forward these messages to this role. -The ``subscribe_message`` method expects, besides the role and a handle method, a message condition function. +The key part of defining roles are their `__init__`, `setup`, and `on_ready` methods. +The first is called to create the role object. The second is called when the role is assigned to +an agent. While the third is called when all containers are started using :meth:`mango.activate`. +In our case, the main change is that the previous distinction of message types within `handle_message` is now done +by subscribing to the corresponding message type to tell the agent it should forward these messages +to this role. +The :meth:`mango.Role.subscribe_message` method expects, besides the role and a handle method, a message condition function. The idea of the condition function is to allow to define a condition filtering incoming messages. Another idea is that sending messages from the role is now done via its context with the method: -``self.context.send_message``. +`self.context.send_message`. -We first create the ``Ping`` role, which has to periodically send out its messages. +We first create the `Ping` role, which has to periodically send out its messages. We can use mango's scheduling API to handle -this for us via the ``schedule_periodic_tasks`` function. This takes a coroutine to execute and a time +this for us via the :meth:`mango.RoleContext.schedule_periodic_tasks` function. This takes a coroutine to execute and a time interval. Whenever the time interval runs out the coroutine is triggered. With the scheduling API you can also run tasks at specific times. For a full overview we refer to the documentation. -.. code-block:: python +.. testcode:: + + import asyncio + from dataclasses import dataclass + + from mango import sender_addr, Role, RoleAgent, JSON, create_tcp_container, json_serializable, agent_composed_of + + PV_CONTAINER_ADDRESS = ("localhost", 5555) + CONTROLLER_CONTAINER_ADDRESS = ("localhost", 5556) + PV_FEED_IN = { + "PV Agent 0": 2.0, + "PV Agent 1": 1.0, + } + + @json_serializable + @dataclass + class Ping: + ping_id: int - from mango import Role + @json_serializable + @dataclass + class Pong: + pong_id: int class PingRole(Role): def __init__(self, ping_recipients, time_between_pings): + super().__init__() self.ping_recipients = ping_recipients self.time_between_pings = time_between_pings self.ping_counter = 0 @@ -546,20 +705,17 @@ also run tasks at specific times. For a full overview we refer to the documentat self, self.handle_pong, lambda content, meta: isinstance(content, Pong) ) - # this task is automatically executed every "time_between_pings" seconds + def on_ready(self): self.context.schedule_periodic_task(self.send_pings, self.time_between_pings) async def send_pings(self): - for addr, aid in self.ping_recipients: + for addr in self.ping_recipients: ping_id = self.ping_counter msg = Ping(ping_id) - meta = {"sender_addr": self.context.addr, "sender_id": self.context.aid} await self.context.send_message( msg, receiver_addr=addr, - receiver_id=aid, - **meta, ) self.expected_pongs.append(ping_id) self.ping_counter += 1 @@ -574,42 +730,100 @@ also run tasks at specific times. For a full overview we refer to the documentat print( f"Pong {self.context.aid}: Received an unexpected pong with ID: {content.pong_id}" ) + print(Ping(1).ping_id) + print(Pong(1).pong_id) + print(PingRole(["addr"], 1).ping_recipients) + +.. testoutput:: + 1 + 1 + ['addr'] The ControllerRole now covers the former responsibilities of the controller: -.. code-block:: python +.. testcode:: class ControllerRole(Role): - def __init__(self, known_agents): - super().__init__() - self.known_agents = known_agents - self.reported_feed_ins = [] - self.reported_acks = 0 - self.reports_done = None - self.acks_done = None - - def setup(self): - self.context.subscribe_message( - self, - self.handle_feed_in_reply, - lambda content, meta: isinstance(content, FeedInReplyMsg), - ) + def __init__(self, known_agents): + super().__init__() + self.known_agents = known_agents + self.reported_feed_ins = [] + self.reported_acks = 0 + self.reports_done = None + self.acks_done = None - self.context.subscribe_message( - self, - self.handle_set_max_ack, - lambda content, meta: isinstance(content, MaxFeedInAck), - ) + def setup(self): + self.context.subscribe_message( + self, + self.handle_feed_in_reply, + lambda content, meta: isinstance(content, FeedInReplyMsg), + ) + + self.context.subscribe_message( + self, + self.handle_set_max_ack, + lambda content, meta: isinstance(content, MaxFeedInAck), + ) + + def on_ready(self): + self.context.schedule_instant_task(self.run()) + + def handle_feed_in_reply(self, content, meta): + feed_in_value = float(content.feed_in) + + self.reported_feed_ins.append(feed_in_value) + if len(self.reported_feed_ins) == len(self.known_agents): + if self.reports_done is not None: + self.reports_done.set_result(True) + + def handle_set_max_ack(self, content, meta): + self.reported_acks += 1 + if self.reported_acks == len(self.known_agents): + if self.acks_done is not None: + self.acks_done.set_result(True) + + async def run(self): + # we define an asyncio future to await replies from all known pv agents: + self.reports_done = asyncio.Future() + self.acks_done = asyncio.Future() + + # ask pv agent feed-ins + for addr in self.known_agents: + msg = AskFeedInMsg() + + await self.context.send_message( + content=msg, + receiver_addr=addr + ) + + # wait for both pv agents to answer + await self.reports_done + + # limit both pv agents to the smaller ones feed-in + print(f"Controller received feed_ins: {self.reported_feed_ins}") + min_feed_in = min(self.reported_feed_ins) + + for addr in self.known_agents: + msg = SetMaxFeedInMsg(min_feed_in) + + await self.context.send_message( + content=msg, + receiver_addr=addr, + ) + + # wait for both pv agents to acknowledge the change + await self.acks_done - self.context.schedule_instant_task(self.run()) + print(ControllerRole([]).known_agents) -The methods ``handle_feed_in_reply``, ``handle_set_max_ack`` and ``run`` are also part of this role and -remain unchanged. +.. testoutput:: + + [] The ``Pong`` role is associated with the PV Agents and purely reactive. -.. code-block:: python +.. testcode:: class PongRole(Role): def setup(self): @@ -619,27 +833,30 @@ The ``Pong`` role is associated with the PV Agents and purely reactive. def handle_ping(self, content, meta): ping_id = content.ping_id - sender_addr = meta["sender_addr"] - sender_id = meta["sender_id"] answer = Pong(ping_id) print(f"Ping {self.context.aid}: Received a ping with ID: {ping_id}") # message sending from roles is done via the RoleContext - self.context.schedule_message( - answer, - receiver_addr=sender_addr, - receiver_id=sender_id, + self.context.schedule_instant_message( + answer, + receiver_addr=sender_addr(meta) ) + print(type(PongRole())) + +.. testoutput:: + + Since the PV Agent is purely reactive, its other functionality stays basically unchanged and is simply moved to the PVRole. -.. code-block:: python +.. testcode:: class PVRole(Role): def __init__(self): + super().__init__() self.max_feed_in = -1 def setup(self): @@ -655,75 +872,90 @@ unchanged and is simply moved to the PVRole. ) def handle_ask_feed_in(self, content, meta): - """...""" + reported_feed_in = PV_FEED_IN[ + self.context.aid + ] + msg = FeedInReplyMsg(reported_feed_in) + self.context.schedule_instant_message( content=msg, - receiver_addr=sender_addr, - receiver_id=sender_id, + receiver_addr=sender_addr(meta) ) - def handle_set_feed_in_max(self, content, meta): - """...""" + max_feed_in = float(content.max_feed_in) + self.max_feed_in = max_feed_in + print(f"{self.context.aid}: Limiting my feed_in to {max_feed_in}") + + msg = MaxFeedInAck() + self.context.schedule_instant_message( content=msg, - receiver_addr=sender_addr, - receiver_id=sender_id, + receiver_addr=sender_addr(meta), ) + print(PVRole().max_feed_in) +.. testoutput:: -The definition of the agent classes itself now simply boils down to assigning it all the roles it has: - -.. code-block:: python - - from mango import RoleAgent + -1 - class PVAgent(RoleAgent): - def __init__(self, container): - super().__init__(container) - self.add_role(PongRole()) - self.add_role(PVRole()) +The definition of the agent classes itself now simply boils down to using the function :meth:`mango.agent_composed_of`. +The following shows the fully rewriten PV/Controller example featuring the newly introduced Ping function. - class ControllerAgent(RoleAgent): - def __init__(self, container, known_agents): - super().__init__(container) - self.add_role(PingRole(known_agents, 2)) - self.add_role(ControllerRole(known_agents)) +.. testcode:: + async def main(): + my_codec = JSON() + my_codec.add_serializer(*AskFeedInMsg.__serializer__()) + my_codec.add_serializer(*SetMaxFeedInMsg.__serializer__()) + my_codec.add_serializer(*FeedInReplyMsg.__serializer__()) + my_codec.add_serializer(*MaxFeedInAck.__serializer__()) -This concludes the last part of our tutorial. -If you want to run the code, you don't need to await the run method of the controller anymore, -since everything now happens automatically within the roles. -In your ``main``, you can replace the line: - -.. code-block:: python - - await controller_agent.run() - -with the following line: - -.. code-block:: python + # dont forget to add our new serializers + my_codec.add_serializer(*Ping.__serializer__()) + my_codec.add_serializer(*Pong.__serializer__()) - await asyncio.sleep(5) + pv_container = create_tcp_container(addr=PV_CONTAINER_ADDRESS, codec=my_codec) -If you then run this code, you should receive the following output: + controller_container = create_tcp_container( + addr=CONTROLLER_CONTAINER_ADDRESS, codec=my_codec + ) - | Ping PV Agent 0: Received a ping with ID: 0 - | Ping PV Agent 1: Received a ping with ID: 1 - | Pong Controller: Received an expected pong with ID: 0 - | Pong Controller: Received an expected pong with ID: 1 - | Controller received feed_ins: [2.0, 1.0] - | PV Agent 0: Limiting my feed_in to 1.0 - | PV Agent 1: Limiting my feed_in to 1.0 - | Ping PV Agent 0: Received a ping with ID: 2 - | Ping PV Agent 1: Received a ping with ID: 3 - | Pong Controller: Received an expected pong with ID: 2 - | Pong Controller: Received an expected pong with ID: 3 - | Ping PV Agent 0: Received a ping with ID: 4 - | Ping PV Agent 1: Received a ping with ID: 5 - | Pong Controller: Received an expected pong with ID: 4 - | Pong Controller: Received an expected pong with ID: 5 + pv_agent_0 = agent_composed_of(PongRole(), PVRole(), + register_in=pv_container, + suggested_aid="PV Agent 0") + pv_agent_1 = agent_composed_of(PongRole(), PVRole(), + register_in=pv_container, + suggested_aid="PV Agent 1") -.. raw:: html + known_agents = [ + pv_agent_0.addr, + pv_agent_1.addr, + ] -
+ controller_agent = agent_composed_of(PingRole(known_agents, 2), ControllerRole(known_agents), + register_in=pv_container, suggested_aid="Controller") + + async with activate(controller_container, pv_container) as cl: + # no more run call since everything now happens automatically within the roles + await asyncio.sleep(5) + + asyncio.run(main()) + +.. testoutput:: + + Ping PV Agent 0: Received a ping with ID: 0 + Ping PV Agent 1: Received a ping with ID: 1 + Pong Controller: Received an expected pong with ID: 0 + Pong Controller: Received an expected pong with ID: 1 + Controller received feed_ins: [2.0, 1.0] + PV Agent 0: Limiting my feed_in to 1.0 + PV Agent 1: Limiting my feed_in to 1.0 + Ping PV Agent 0: Received a ping with ID: 2 + Ping PV Agent 1: Received a ping with ID: 3 + Pong Controller: Received an expected pong with ID: 2 + Pong Controller: Received an expected pong with ID: 3 + Ping PV Agent 0: Received a ping with ID: 4 + Ping PV Agent 1: Received a ping with ID: 5 + Pong Controller: Received an expected pong with ID: 4 + Pong Controller: Received an expected pong with ID: 5 diff --git a/mango/__init__.py b/mango/__init__.py index c9d99e6..e9fa5e2 100644 --- a/mango/__init__.py +++ b/mango/__init__.py @@ -1,4 +1,4 @@ -from .messages.message import create_acl +from .messages.message import create_acl, Performatives from .agent.core import Agent, AgentAddress from .agent.role import Role, RoleAgent, RoleContext from .container.factory import ( @@ -17,3 +17,4 @@ ) from .util.distributed_clock import DistributedClockAgent, DistributedClockManager from .util.clock import ExternalClock +from .messages.codecs import json_serializable, JSON, FastJSON, PROTOBUF diff --git a/mango/container/core.py b/mango/container/core.py index 2c37bdd..25da073 100644 --- a/mango/container/core.py +++ b/mango/container/core.py @@ -46,7 +46,6 @@ def __init__( self._aid_counter: int = 0 # counter for aids self.running: bool = False # True until self.shutdown() is called - self._no_agents_running: asyncio.Future = None # inbox for all incoming messages self.inbox: asyncio.Queue = None @@ -118,8 +117,6 @@ def register(self, agent: Agent, suggested_aid: str = None): :return The agent ID """ - if not self._no_agents_running or self._no_agents_running.done(): - self._no_agents_running = asyncio.Future() aid = self._reserve_aid(suggested_aid) self._agents[aid] = agent agent._do_register(self, aid) @@ -154,8 +151,6 @@ def deregister(self, aid): :return: """ del self._agents[aid] - if len(self._agents) == 0: - self._no_agents_running.set_result(True) @abstractmethod async def send_message( @@ -308,10 +303,6 @@ def dispatch_to_agent_process(self, pid: int, coro_func, *args): async def start(self): self.running: bool = True # True until self.shutdown() is called - self._no_agents_running: asyncio.Future = asyncio.Future() - self._no_agents_running.set_result( - True - ) # signals that currently no agent lives in this container # inbox for all incoming messages self.inbox: asyncio.Queue = asyncio.Queue() diff --git a/mango/express/api.py b/mango/express/api.py index aeb57c7..11d5441 100644 --- a/mango/express/api.py +++ b/mango/express/api.py @@ -210,7 +210,9 @@ class ComposedAgent(RoleAgent): pass -def agent_composed_of(*roles: Role, register_in: None | Container) -> ComposedAgent: +def agent_composed_of( + *roles: Role, register_in: None | Container = None, suggested_aid: None | str = None +) -> ComposedAgent: """ Create an agent composed of the given `roles`. If a container is provided, the created agent is automatically registered with the container `register_in`. @@ -219,6 +221,8 @@ def agent_composed_of(*roles: Role, register_in: None | Container) -> ComposedAg :param register_in: container in which the created agent is registered, if provided :type register_in: None | Container + :param suggested_aid: the suggested aid for registration + :type suggested_aid: str :return: the composed agent :rtype: ComposedAgent """ @@ -226,7 +230,7 @@ def agent_composed_of(*roles: Role, register_in: None | Container) -> ComposedAg for role in roles: agent.add_role(role) if register_in is not None: - register_in.register(agent) + register_in.register(agent, suggested_aid=suggested_aid) return agent