Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(agent): unify treatment of output and other events #332

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 5 additions & 4 deletions agent/testflinger_agent/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,21 +13,22 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>

from .client import TestflingerClient
from .runner import OutputEvent


class LiveOutputHandler:
def __init__(self, client: TestflingerClient, job_id: str):
self.client = client
self.job_id = job_id

def __call__(self, data: str):
self.client.post_live_output(self.job_id, data)
def __call__(self, event: OutputEvent):
self.client.post_live_output(self.job_id, event.output)


class LogUpdateHandler:
def __init__(self, log_file: str):
self.log_file = log_file

def __call__(self, data: str):
def __call__(self, event: OutputEvent):
with open(self.log_file, "a") as log:
log.write(data)
log.write(event.output)
10 changes: 4 additions & 6 deletions agent/testflinger_agent/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import time

from testflinger_agent.errors import TFServerError
from .runner import CommandRunner, RunnerEvents
from .runner import CommandRunner, OutputEvent
from .handlers import LiveOutputHandler, LogUpdateHandler
from .stop_condition_checkers import (
JobCancelledChecker,
Expand Down Expand Up @@ -85,8 +85,8 @@ def run_test_phase(self, phase, rundir):
runner = CommandRunner(cwd=rundir, env=self.client.config)
output_log_handler = LogUpdateHandler(output_log)
live_output_handler = LiveOutputHandler(self.client, self.job_id)
runner.register_output_handler(output_log_handler)
runner.register_output_handler(live_output_handler)
runner.subscribe_event(OutputEvent, output_log_handler)
runner.subscribe_event(OutputEvent, live_output_handler)

# Reserve phase uses a separate timeout handler
if phase != "reserve":
Expand All @@ -101,9 +101,7 @@ def run_test_phase(self, phase, rundir):
self.get_output_timeout()
)
runner.register_stop_condition_checker(output_timeout_checker)
runner.subscribe_event(
RunnerEvents.OUTPUT_RECEIVED, output_timeout_checker.update
)
runner.subscribe_event(OutputEvent, output_timeout_checker.update)

# Do not allow cancellation during provision for safety reasons
if phase != "provision":
Expand Down
65 changes: 39 additions & 26 deletions agent/testflinger_agent/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,60 +22,72 @@
import time

from collections import defaultdict
from enum import Enum
from typing import Callable, List, Optional, Tuple
from typing import Callable, List, Optional, Tuple, Type

from testflinger_common.enums import TestEvent

logger = logging.getLogger(__name__)

OutputHandlerType = Callable[[str], None]
StopConditionType = Callable[[], Optional[str]]


class RunnerEvents(Enum):
class RunnerEvent:
"""
Runner events that can be subscribed to.
"""

OUTPUT_RECEIVED = "output_received"
@classmethod
def event_name(cls):
"""
Return a unique event name (the fully qualified class name) to use
as a key in dictionaries.
"""
return f"{cls.__module__}.{cls.__qualname__}"


# Any event handler expects to be called with the event as an argument
RunnerEventHandlerType = Callable[[RunnerEvent], None]


class OutputEvent(RunnerEvent):
"""
A type of event corresponding to the generation of output during a test.
The output is stored in the corresponding `output` attribute.
"""

def __init__(self, output: str):
self.output = output


class CommandRunner:
"""
Run a command and handle output and stop conditions.
Run a command and handle events and stop conditions.

There are also events that can be subscribed to for notifications. The
known event types are defined in RunnerEvents.
There are also events that can be subscribed to for notifications.
"""

def __init__(self, cwd: Optional[str], env: Optional[dict]):
self.output_handlers: List[OutputHandlerType] = []
self.stop_condition_checkers: List[StopConditionType] = []
self.process: Optional[subprocess.Popen] = None
self.cwd = cwd
self.env = os.environ.copy()
# a mapping of event names to lists of registered event handlers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# a mapping of event names to lists of registered event handlers
# A mapping of event names to lists of registered event handlers

self.events = defaultdict(list)
if env:
self.env.update(
{k: str(v) for k, v in env.items() if isinstance(v, str)}
)

def register_output_handler(self, handler: OutputHandlerType):
self.output_handlers.append(handler)

def subscribe_event(self, event_name: RunnerEvents, handler: Callable):
def subscribe_event(
self, event_cls: Type[RunnerEvent], handler: RunnerEventHandlerType
):
"""Set a callback for an event that we want to be notified of"""
self.events[event_name].append(handler)
self.events[event_cls.event_name()].append(handler)

def post_event(self, event_name: RunnerEvents):
def post_event(self, event: RunnerEvent):
"""Post an event for subscribers to be notified of"""
for handler in self.events[event_name]:
handler()

def post_output(self, data: str):
for handler in self.output_handlers:
handler(data)
for handler in self.events[type(event).event_name()]:
handler(event)

def register_stop_condition_checker(self, checker: StopConditionType):
self.stop_condition_checkers.append(checker)
Expand All @@ -95,10 +107,11 @@ def check_and_post_output(self):
raw_output = self.process.stdout.read()
if not raw_output:
return
self.post_event(RunnerEvents.OUTPUT_RECEIVED)

output = raw_output.decode(sys.stdout.encoding, errors="replace")
self.post_output(output)
self.post_event(
OutputEvent(
raw_output.decode(sys.stdout.encoding, errors="replace")
)
)

def run_command_thread(self, cmd: str):
self.process = subprocess.Popen(
Expand Down Expand Up @@ -141,7 +154,7 @@ def run(self, cmd: str) -> Tuple[int, Optional[TestEvent], str]:

stop_event, stop_reason = self.check_stop_conditions()
if stop_event is not None:
self.post_output(f"\n{stop_reason}\n")
self.post_event(OutputEvent(f"\n{stop_reason}\n"))
self.cleanup()
break

Expand Down
3 changes: 2 additions & 1 deletion agent/testflinger_agent/stop_condition_checkers.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from typing import Optional, Tuple
from testflinger_common.enums import JobState, TestEvent
from .client import TestflingerClient
from .runner import RunnerEvent


class JobCancelledChecker:
Expand Down Expand Up @@ -59,6 +60,6 @@ def __call__(self) -> Tuple[Optional[TestEvent], str]:
)
return None, ""

def update(self):
def update(self, _: RunnerEvent):
"""Update the last output time to the current time."""
self.last_output_time = time.time()
6 changes: 3 additions & 3 deletions agent/testflinger_agent/tests/test_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import testflinger_agent
from testflinger_agent.client import TestflingerClient as _TestflingerClient
from testflinger_agent.job import TestflingerJob as _TestflingerJob
from testflinger_agent.runner import CommandRunner
from testflinger_agent.runner import CommandRunner, OutputEvent
from testflinger_agent.handlers import LogUpdateHandler
from testflinger_agent.stop_condition_checkers import (
GlobalTimeoutChecker,
Expand Down Expand Up @@ -73,7 +73,7 @@ def test_job_global_timeout(self, tmp_path):
logfile = tmp_path / "testlog"
runner = CommandRunner(tmp_path, env={})
log_handler = LogUpdateHandler(logfile)
runner.register_output_handler(log_handler)
runner.subscribe_event(OutputEvent, log_handler)
global_timeout_checker = GlobalTimeoutChecker(1)
runner.register_stop_condition_checker(global_timeout_checker)
exit_code, exit_event, exit_reason = runner.run("sleep 12")
Expand All @@ -98,7 +98,7 @@ def test_job_output_timeout(self, tmp_path):
logfile = tmp_path / "testlog"
runner = CommandRunner(tmp_path, env={})
log_handler = LogUpdateHandler(logfile)
runner.register_output_handler(log_handler)
runner.subscribe_event(OutputEvent, log_handler)
output_timeout_checker = OutputTimeoutChecker(1)
runner.register_stop_condition_checker(output_timeout_checker)
# unfortunately, we need to sleep for longer that 10 seconds here
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
)

from testflinger_common.enums import TestEvent
from testflinger_agent.runner import OutputEvent


class TestStopConditionCheckers:
Expand Down Expand Up @@ -68,5 +69,5 @@ def test_output_timeout_update(self):
checker = OutputTimeoutChecker(0.3)
for _ in range(5):
time.sleep(0.1)
checker.update()
checker.update(OutputEvent(""))
assert checker() == (None, "")