Skip to content

Commit

Permalink
Merge pull request #147 from nokia/command_termination_on_timeout
Browse files Browse the repository at this point in the history
Command termination on timeout
  • Loading branch information
Ernold11 authored Apr 17, 2019
2 parents 8ed8408 + e85415e commit 3b92199
Show file tree
Hide file tree
Showing 13 changed files with 241 additions and 75 deletions.
19 changes: 12 additions & 7 deletions moler/cmd/commandtextualgeneric.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@

from moler.cmd import RegexHelper
from moler.command import Command
from moler.exceptions import CommandTimeout
from threading import Lock


@six.add_metaclass(abc.ABCMeta)
Expand All @@ -39,6 +39,9 @@ def __init__(self, connection, prompt=None, newline_chars=None, runner=None):
self.__command_string = None # String representing command on device
self._cmd_escaped = None # Escaped regular expression string with command
super(CommandTextualGeneric, self).__init__(connection=connection, runner=runner)
self.terminating_timeout = 3.0 # value for terminating command if it timeouts. Set positive value for command
# if they can do anything if timeout. Set 0 for command if it cannot do
# anything if timeout.
self.current_ret = dict() # Placeholder for result as-it-grows, before final write into self._result
self._cmd_output_started = False # If false parsing is not passed to command
self._regex_helper = RegexHelper() # Object to regular expression matching
Expand All @@ -57,6 +60,7 @@ def __init__(self, connection, prompt=None, newline_chars=None, runner=None):
self._concatenate_before_command_starts = True # Set True to concatenate all strings from connection before
# command starts, False to split lines on every new line char
self._stored_exception = None # Exception stored before it is passed to base class when command is done.
self._lock_is_done = Lock()

if not self._newline_chars:
self._newline_chars = CommandTextualGeneric._default_newline_chars
Expand Down Expand Up @@ -103,11 +107,12 @@ def _is_done(self):

@_is_done.setter
def _is_done(self, value):
if self._stored_exception:
exception = self._stored_exception
self._stored_exception = None
super(CommandTextualGeneric, self).set_exception(exception=exception)
super(CommandTextualGeneric, self.__class__)._is_done.fset(self, value)
with self._lock_is_done:
if self._stored_exception:
exception = self._stored_exception
self._stored_exception = None
super(CommandTextualGeneric, self)._set_exception_without_done(exception=exception)
super(CommandTextualGeneric, self.__class__)._is_done.fset(self, value)

@staticmethod
def _calculate_prompt(prompt):
Expand Down Expand Up @@ -240,7 +245,7 @@ def set_exception(self, exception):
:param exception: An exception object to set.
:return: None.
"""
if self.done() or not self.wait_for_prompt_on_exception or isinstance(exception, CommandTimeout):
if self.done() or not self.wait_for_prompt_on_exception:
super(CommandTextualGeneric, self).set_exception(exception=exception)
else:
if self._stored_exception is None:
Expand Down
23 changes: 23 additions & 0 deletions moler/command_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,19 @@ def dequeue_running_on_connection(connection_observer):
scheduler = CommandScheduler._get_scheduler()
scheduler._remove_command(cmd=connection_observer)

@staticmethod
def is_waiting_for_execution(connection_observer):
"""
Checks if connection_observer waits in queue before passed to runner.
:param connection_observer: ConnectionObserver object.
:return: True if connection_observer waits in queue and False if it does not wait.
"""
if connection_observer.is_command:
scheduler = CommandScheduler._get_scheduler()
return scheduler._does_it_wait_in_queue(cmd=connection_observer)
return False

# internal methods and variables

_conn_lock = threading.Lock()
Expand Down Expand Up @@ -85,6 +98,7 @@ def _add_command_to_connection(self, cmd, wait_for_slot=True):
timeout=cmd.timeout,
kind="scheduler.await_done",
passed_time=time.time() - start_time))
cmd.set_end_of_life()
self._remove_command(cmd=cmd)
return False

Expand Down Expand Up @@ -179,6 +193,15 @@ def _add_command_to_queue(self, cmd):
with lock:
conn_atr['queue'].append(cmd)

def _does_it_wait_in_queue(self, cmd):
connection = cmd.connection
lock = self._lock_for_connection(connection)
conn_atr = self._locks[connection]
with lock:
if cmd in conn_atr['queue']:
return True
return False

def _submit(self, connection_observer):
"""
Submits a connection_observer object (command or observer) in the runner.
Expand Down
52 changes: 43 additions & 9 deletions moler/connection_observer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,19 @@ def __init__(self, connection=None, runner=None):
self.runner = runner if runner else get_runner()
self._future = None
self.start_time = 0.0 # means epoch: 1970-01-01 00:00:00
self.__timeout = 7 # default
self.__timeout = 7.0 # default
self.terminating_timeout = 0.0 # value for terminating connection_observer when it timeouts. Set positive value
# for command if they can do anything if timeout. Set 0 for observer or command
# if it cannot do anything if timeout.
self.device_logger = logging.getLogger('moler.{}'.format(self.get_logger_name()))
self.logger = logging.getLogger('moler.connection.{}'.format(self.get_logger_name()))

self.in_terminating = False # Set True if ConnectionObserver object is just after __timeout but it can do
# something during terminating_timeout. False if the ConnectionObserver object runs
# during normal timeout. For Runners only!
self.was_on_timeout_called = False # Set True if method on_timeout was called. False otherwise. For Runners
# only!

def __str__(self):
return '{}(id:{})'.format(self.__class__.__name__, instance_id(self))

Expand All @@ -76,9 +85,11 @@ def __call__(self, timeout=None, *args, **kwargs):
or you may delegate blocking call execution to separate thread,
see: https://pymotw.com/3/asyncio/executors.html
"""
started_observer = self.start(timeout, *args, **kwargs)
if started_observer:
return started_observer.await_done(*args, **kwargs)
self.start(timeout, *args, **kwargs)
# started_observer = self.start(timeout, *args, **kwargs)
# if started_observer:
# return started_observer.await_done(*args, **kwargs)
return self.await_done()
# TODO: raise ConnectionObserverFailedToStart

@property
Expand Down Expand Up @@ -200,7 +211,7 @@ def await_done(self, timeout=None):
with exception_stored_if_not_main_thread(self):
if not self._is_running:
raise ConnectionObserverNotStarted(self)

# check if already is running
self.runner.wait_for(connection_observer=self, connection_observer_future=self._future, timeout=timeout)
return self.result()

Expand All @@ -209,10 +220,18 @@ def cancel(self):
# TODO: call cancel on runner to stop background run of connection-observer
if self.cancelled() or self.done():
return False
self._is_done = True
self._is_cancelled = True
self._is_done = True
return True

def set_end_of_life(self):
"""
Set end of life of object. Dedicated for runners only!
:return: None
"""
self._is_done = True

def cancelled(self):
"""Return True if the connection-observer has been cancelled."""
return self._is_cancelled
Expand All @@ -231,8 +250,8 @@ def set_result(self, result):
"""Should be used to set final result"""
if self.done():
raise ResultAlreadySet(self)
self._is_done = True
self._result = result
self._is_done = True

@abstractmethod
def data_received(self, data):
Expand All @@ -243,13 +262,28 @@ def data_received(self, data):
pass

def set_exception(self, exception):
"""Should be used to indicate some failure during observation"""
"""
Should be used to indicate some failure during observation.
:param exception: Exception to set
:return: None
"""
self._set_exception_without_done(exception)
self._is_done = True

def _set_exception_without_done(self, exception):
"""
Should be used to indicate some failure during observation. This method does not finish connection observer
object!
:param exception: exception to set
:return: None
"""
if self._is_done:
self._log(logging.WARNING,
"Trial to set exception {!r} on already done {}".format(exception, self),
levels_to_go_up=2)
return
self._is_done = True
ConnectionObserver._change_unraised_exception(new_exception=exception, observer=self)
self._log(logging.INFO,
"{}.{} has set exception {!r}".format(self.__class__.__module__, self, exception),
Expand Down
Loading

0 comments on commit 3b92199

Please sign in to comment.