Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/issue179/processor actors #208

Merged
merged 35 commits into from
Nov 9, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
c83cb4b
feat: Add ProcessorActor an LibvirtProcessorActor based on LibvirtMap…
roda82 Jul 26, 2023
f5f5acb
refactor: Clean code
roda82 Jul 26, 2023
7b5f867
refactor: Move UnknownMessageTypeException to exception.py
roda82 Jul 27, 2023
a05a650
feat: Add K8sMonitorActor and K8sProcessorActor
roda82 Aug 17, 2023
dc192b9
test: Add some test on generator regarding K8s Processors
roda82 Aug 18, 2023
075ddd9
test: Add test for binding regarding K8s Processors, add Monitor Gene…
roda82 Aug 21, 2023
7f32c84
feat: Add parsers for bindings and processors
roda82 Aug 22, 2023
01aca03
feat: Add monitors generation from a dictionary of processors
roda82 Aug 22, 2023
b873a43
test: Add test for monitors generation from a processors dictionary
roda82 Aug 22, 2023
341effe
refactor: Add new exceptions type for BindingManger
roda82 Aug 23, 2023
06899b8
feat: Add validation for processors and bindings as well ass related …
roda82 Aug 23, 2023
c7580d7
fix: Correct subparser k8s processor arguments types and names
roda82 Aug 24, 2023
ae20d2b
refactor: Separate pre-processors from post-processors, adapt process…
roda82 Sep 26, 2023
61d1e7f
refactor: Remove useless code
roda82 Sep 26, 2023
e6b2d38
refactor: Remove useless code
roda82 Sep 28, 2023
4e53ad2
refactor: Correct problem related to variable initialisation on K8sMo…
roda82 Oct 19, 2023
1bf7ce1
feat: Add api_key and host as paremeters of K8sPreProcessorActor for …
roda82 Oct 20, 2023
dfc9bf0
style: Update some debug messages
roda82 Oct 20, 2023
49c53e1
refactor: Use multiprocessing.Manager for dealing with Metadata Cache…
roda82 Oct 30, 2023
0623dc7
refactor: Readd BindingManager as it is required to create binding be…
roda82 Oct 31, 2023
4348722
refactor: Improve logger manangement in processors
roda82 Oct 31, 2023
a068652
fix: Correct issue related to parametres names in K8s configuration
roda82 Oct 31, 2023
767790c
refactor: Add support to BackendSupervisor to consider pre-processors…
roda82 Nov 2, 2023
2c35cb9
docs: Update BackendSupervisor documentation
roda82 Nov 6, 2023
8866117
refactor: Remove unnecessary code
roda82 Nov 6, 2023
a9e232a
refactor: Remove Report Modifiers fonctionality. They are replaced by…
roda82 Nov 6, 2023
37cb3b8
fix: Add kubernetes dependency
roda82 Nov 8, 2023
11c02bd
build: Define kuberntes as a group and Add it to all-platforms group
roda82 Nov 9, 2023
7070123
refactor: Skip some libvirt tests and remove some of them
roda82 Nov 9, 2023
4edebcd
refactor: Skip some libvirt tests
roda82 Nov 9, 2023
9b428aa
refactor: Remove some libvirt tests
roda82 Nov 9, 2023
3b26eab
refactor: Remove unnecessary code
roda82 Nov 9, 2023
0225dd5
refactor: Improve code quality
roda82 Nov 9, 2023
9d672c9
refactor: Improve code quality
roda82 Nov 9, 2023
f4a7765
refactor: Improve code quality
roda82 Nov 9, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions powerapi/actor/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,8 @@
import traceback
import setproctitle

from powerapi.exception import PowerAPIExceptionWithMessage
from powerapi.exception import PowerAPIExceptionWithMessage, UnknownMessageTypeException
from powerapi.message import PoisonPillMessage
from powerapi.message import UnknownMessageTypeException
from powerapi.handler import HandlerException

from .socket_interface import SocketInterface
Expand Down Expand Up @@ -93,7 +92,7 @@ def __init__(self, name, level_logger=logging.WARNING, timeout=None):
:param str name: unique name that will be used to indentify the actor
processus
:param int level_logger: Define the level of the logger
:param int timeout: if define, do something if no msg is recv every
:param int timeout: if defined, do something if no msg is recv every
timeout (in ms)
"""
multiprocessing.Process.__init__(self, name=name)
Expand Down Expand Up @@ -229,7 +228,7 @@ def _initial_behaviour(self):
handler = self.state.get_corresponding_handler(msg)
handler.handle_message(msg)
except UnknownMessageTypeException:
self.logger.warning("UnknowMessageTypeException: " + str(msg))
self.logger.warning("UnknownMessageTypeException: " + str(msg))
except HandlerException:
self.logger.warning("HandlerException")

Expand Down
2 changes: 1 addition & 1 deletion powerapi/actor/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.

from powerapi.message import UnknownMessageTypeException
from powerapi.exception import UnknownMessageTypeException
from powerapi.actor.supervisor import Supervisor


Expand Down
13 changes: 11 additions & 2 deletions powerapi/backend_supervisor/backend_supervisor.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,10 @@
from powerapi.actor import Supervisor
from powerapi.puller import PullerActor
from powerapi.dispatcher import DispatcherActor
from powerapi.pusher import PusherActor


class BackendSupervisor(Supervisor):

"""
Provide additional functionality to deal with actors: join
"""
Expand All @@ -55,6 +55,9 @@ def __init__(self, stream_mode):
#: (list): List of Pusher
self.pushers = []

#: (list): List of pre processors
self.pre_processors = []

def join(self):
"""
wait until all actor are terminated
Expand All @@ -65,8 +68,10 @@ def join(self):
self.pullers.append(actor)
elif isinstance(actor, DispatcherActor):
self.dispatchers.append(actor)
else:
elif isinstance(actor, PusherActor):
self.pushers.append(actor)
else:
self.pre_processors.append(actor)

if self.stream_mode:
self.join_stream_mode_on()
Expand Down Expand Up @@ -97,12 +102,16 @@ def join_stream_mode_off(self):
"""
Supervisor behaviour when stream mode is off.
- Supervisor wait the Puller death
- Supervisor wait the pre-processors death
- Supervisor wait for the dispatcher death
- Supervisor send a PoisonPill (by_data) to the Pusher
- Supervisor wait for the Pusher death
"""
for puller in self.pullers:
puller.join()
for pre_processor in self.pre_processors:
pre_processor.soft_kill()
pre_processor.join()
for dispatcher in self.dispatchers:
dispatcher.soft_kill()
dispatcher.join()
Expand Down
233 changes: 233 additions & 0 deletions powerapi/cli/binding_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
# Copyright (c) 2023, INRIA
# Copyright (c) 2023, University of Lille
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# * Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# * Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
#
# * Neither the name of the copyright holder nor the names of its
# contributors may be used to endorse or promote products derived from
# this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
# DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
# FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
# SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
# pylint: disable=R1702
from powerapi.exception import UnsupportedActorTypeException, UnexistingActorException, TargetActorAlreadyUsed
from powerapi.processor.processor_actor import ProcessorActor
from powerapi.puller import PullerActor
from powerapi.pusher import PusherActor


class BindingManager:
"""
Class for management the binding between actors during their creation process
"""

def __init__(self, actors: dict = {}):
"""
:param dict actors: Dictionary of actors to create the bindings. The name of the actor is the key
"""
if not actors:
self.actors = {}
else:
self.actors = actors

def process_bindings(self):
"""
Define bindings between self.actors according to the processors' targets.
"""
raise NotImplementedError()


class ProcessorBindingManager(BindingManager):
"""
Class for management of bindings between processor actors and others actors
"""

def __init__(self, actors: dict, processors: dict):
"""
The ProcessorBindingManager defines bindings between actors and processors
:param dict actors: Dictionary of actors with structure {<actor1_key>:actor1,<actor2_key>:actor2...}
:param dict processors: Dictionary of processors with structure {<processor1_key>:processor1,
<processor2_key>:processor2...}
"""

BindingManager.__init__(self, actors=actors)
if not processors:
self.processors = {}
else:
self.processors = processors

def check_processor_targets(self, processor: ProcessorActor):
"""
Check that targets of a processor exist in the dictionary of targets.
If it is not the case, it raises a UnexistingActorException
"""
for target_actor_name in processor.state.target_actors_names:
if target_actor_name not in self.actors:
raise UnexistingActorException(actor=target_actor_name)

def check_processors_targets_are_unique(self):
"""
Check that processors targets are unique, i.e., the same target is not related to
two different processors
"""
used_targets = []
for _, processor in self.processors.items():
for target_actor_name in processor.state.target_actors_names:
if target_actor_name in used_targets:
raise TargetActorAlreadyUsed(target_actor=target_actor_name)
else:
used_targets.append(target_actor_name)


class PreProcessorBindingManager(ProcessorBindingManager):
"""
Class for management the binding between pullers and pre-processor actors
"""

def __init__(self, pullers: dict, processors: dict):
"""
The PreProcessorBindingManager defines bindings between pullers and processors: puller->processor->dispatcher
:param dict pullers: Dictionary of actors with structure {<actor1_key>:actor1,<actor2_key>:actor2...}
:param dict processors: Dictionary of processors with structure {<processor1_key>:processor1,
<processor2_key>:processor2...}
"""

ProcessorBindingManager.__init__(self, actors=pullers, processors=processors)

def process_bindings(self):
"""
Define bindings between self.actors according to the pre-processors' targets.

"""

# Check that processors targets are unique
self.check_processors_targets_are_unique()

# For each processor, we get targets and create the binding:
# puller->processor->dispatcher
for _, processor in self.processors.items():

self.check_processor_targets(processor=processor)

for target_actor_name in processor.state.target_actors_names:

# The processor has to be between the puller and the dispatcher
# The dispatcher becomes a target of the processor

puller_actor = self.actors[target_actor_name]

# The dispatcher defines the relationship between the Formula and
# Puller
number_of_filters = len(puller_actor.state.report_filter.filters)

for index in range(number_of_filters):
# The filters define the relationship with the dispatcher
# The relationship has to be updated
current_filter = list(puller_actor.state.report_filter.filters[index])
current_filter_dispatcher = current_filter[1]
processor.add_target_actor(actor=current_filter_dispatcher)
current_filter[1] = processor
puller_actor.state.report_filter.filters[index] = tuple(current_filter)

def check_processor_targets(self, processor: ProcessorActor):
"""
Check that targets of a processor exist in the dictionary of targets.
If it is not the case, it raises a UnexistingActorException
It also checks that the actor is a PullerActor instance.
If it is not the case, it raises UnsupportedActorTypeException
"""
ProcessorBindingManager.check_processor_targets(self, processor=processor)

for target_actor_name in processor.state.target_actors_names:
actor = self.actors[target_actor_name]

if not isinstance(actor, PullerActor):
raise UnsupportedActorTypeException(actor_type=type(actor).__name__)


class PostProcessorBindingManager(ProcessorBindingManager):
"""
Class for management the binding between post-processor and pusher actors
"""

def __init__(self, pushers: dict, processors: dict, pullers: dict):
"""
The PostProcessorBindingManager defines bindings between processors and pushers: formula->processor->pushers
:param dict pushers: Dictionary of PusherActors with structure {<actor1_key>:actor1,<actor2_key>:actor2...}
:param dict processors: Dictionary of processors with structure {<processor1_key>:processor1,
<processor2_key>:processor2...}
"""
ProcessorBindingManager.__init__(self, actors=pushers, processors=processors)
self.pullers = pullers

def process_bindings(self):
"""
Define bindings between self.actors according to the post-processors' targets.

"""

# For each processor, we get targets and create the binding:
# formula->processor->pusher
for _, processor in self.processors.items():

self.check_processor_targets(processor=processor)

for target_actor_name in processor.state.target_actors_names:

# The processor has to be between the formula and the pusher
# The pusher becomes a target of the processor

pusher_actor = self.actors[target_actor_name]

processor.add_target_actor(actor=pusher_actor)

# We look for the pusher on each dispatcher in order to replace it by
# the processor
for _, puller in self.pullers:

for current_filter in puller.state.report_filter.filters:
dispatcher = current_filter[1]

number_of_pushers = len(dispatcher.pusher)
pusher_updated = False

for index in range(number_of_pushers):
if dispatcher.pusher[index] == pusher_actor:
dispatcher.pusher[index] = processor
pusher_updated = True
break

if pusher_updated:
dispatcher.update_state_formula_factory()

def check_processor_targets(self, processor: ProcessorActor):
"""
Check that targets of a processor exist in the dictionary of targets.
If it is not the case, it raises a UnexistingActorException
It also checks that the actor is a PusherActor instance.
If it is not the case, it raises UnsupportedActorTypeException
"""
ProcessorBindingManager.check_processor_targets(self, processor=processor)

for target_actor_name in processor.state.target_actors_names:
actor = self.actors[target_actor_name]

if not isinstance(actor, PusherActor):
raise UnsupportedActorTypeException(actor_type=type(actor).__name__)
Loading