diff --git a/supervisor/graphutils.py b/supervisor/graphutils.py new file mode 100644 index 000000000..af70add6b --- /dev/null +++ b/supervisor/graphutils.py @@ -0,0 +1,56 @@ +from collections import defaultdict + +class Graph(): + """ Class to save and analyse a directed graph + """ + def __init__(self,vertices): + self.graph = defaultdict(list) + self.V = vertices + + def addEdge(self,u,v): + self.graph[u].append(v) + + def cyclic(self): + """ Return True if the directed graph has a cycle. + The graph must be represented as a dictionary mapping vertices to + iterables of neighbouring vertices. For example: + + >>> cyclic({1: (2,), 2: (3,), 3: (1,)}) + True + >>> cyclic({1: (2,), 2: (3,), 3: (4,)}) + False + + """ + path = set() + visited = set() + + def visit(vertex): + if vertex in visited: + return False + visited.add(vertex) + path.add(vertex) + for neighbour in self.graph.get(vertex, ()): + if neighbour in path or visit(neighbour): + return True + path.remove(vertex) + return False + + return any(visit(v) for v in self.graph) + + + def connected(self, start_node, end_node): + """ Check whether two nodes are connected, and if the + start_node comes before the end_node. + + """ + visited = set() + + return self._explore_graph_from(start_node, end_node, visited) + + def _explore_graph_from(self, start_node, end_node, visited): + """ Check if end_node comes after start_node and if they are connected + """ + for neighbour in self.graph.get(start_node, ()): + visited.add(neighbour) + self._explore_graph_from(neighbour, end_node, visited) + return end_node in visited diff --git a/supervisor/options.py b/supervisor/options.py index 20c1b07aa..0dacbfe96 100644 --- a/supervisor/options.py +++ b/supervisor/options.py @@ -942,6 +942,8 @@ def get(section, opt, *args, **kwargs): serverurl = get(section, 'serverurl', None) if serverurl and serverurl.strip().upper() == 'AUTO': serverurl = None + depends_on = get(section, 'depends_on', None) + spawn_timeout = int(get(section, 'spawn_timeout', 60)) # find uid from "user" option user = get(section, 'user', None) @@ -1067,7 +1069,10 @@ def get(section, opt, *args, **kwargs): exitcodes=exitcodes, redirect_stderr=redirect_stderr, environment=environment, - serverurl=serverurl) + serverurl=serverurl, + depends_on=depends_on, + spawn_timeout=spawn_timeout, + ) programs.append(pconfig) @@ -1888,7 +1893,8 @@ class ProcessConfig(Config): 'stderr_events_enabled', 'stderr_syslog', 'stopsignal', 'stopwaitsecs', 'stopasgroup', 'killasgroup', 'exitcodes', 'redirect_stderr' ] - optional_param_names = [ 'environment', 'serverurl' ] + optional_param_names = [ 'environment', 'serverurl', + 'depends_on', 'spawn_timeout' ] def __init__(self, options, **params): self.options = options diff --git a/supervisor/process.py b/supervisor/process.py index b394be812..b122987ed 100644 --- a/supervisor/process.py +++ b/supervisor/process.py @@ -188,11 +188,33 @@ def record_spawnerr(self, msg): self.spawnerr = msg self.config.options.logger.info("spawnerr: %s" % msg) - def spawn(self): + def queue_all_dependee_processes(self, supervisor): + if (self.config.name not in supervisor.process_spawn_dict.keys() and + self.config.name not in supervisor.process_started_dict.keys()): + supervisor.process_spawn_dict[self.config.name] = self + if self.config.depends_on is not None: + for dependee in self.config.depends_on.values(): + if dependee.state is not ProcessStates.RUNNING and dependee.state is not ProcessStates.STARTING: + if (dependee.config.name not in supervisor.process_spawn_dict.keys() and + dependee.config.name not in supervisor.process_started_dict.keys()): + supervisor.process_spawn_dict[dependee.config.name] = dependee + dependee.queue_all_dependee_processes(supervisor) + + def spawn(self, supervisor=None): """Start the subprocess. It must not be running already. Return the process id. If the fork() call fails, return None. + Parameters: + supervisor : supervisord instance. This parameter is required + to keep track of all dependent processes in + supervisor.process_spawn_dict """ + if self.config.depends_on is not None and not supervisor.abort_queing : + if any([dependee.state is not ProcessStates.RUNNING for dependee in + self.config.depends_on.values()]): + self.queue_all_dependee_processes(supervisor) + return + options = self.config.options processname = as_string(self.config.name) @@ -653,7 +675,13 @@ def __repr__(self): def get_state(self): return self.state - def transition(self): + def transition(self, supervisor=None): + """ + Parameters: + supervisor : supervisord instance. This parameter is required + to keep track of all dependent processes in + supervisor.process_spawn_dict + """ now = time.time() state = self.state @@ -665,22 +693,22 @@ def transition(self): # dont start any processes if supervisor is shutting down if state == ProcessStates.EXITED: if self.config.autorestart: + # STOPPED -> STARTING if self.config.autorestart is RestartUnconditionally: # EXITED -> STARTING - self.spawn() + self.spawn(supervisor) else: # autorestart is RestartWhenExitUnexpected if self.exitstatus not in self.config.exitcodes: # EXITED -> STARTING - self.spawn() + self.spawn(supervisor) elif state == ProcessStates.STOPPED and not self.laststart: if self.config.autostart: - # STOPPED -> STARTING - self.spawn() + self.spawn(supervisor) elif state == ProcessStates.BACKOFF: if self.backoff <= self.config.startretries: if now > self.delay: # BACKOFF -> STARTING - self.spawn() + self.spawn(supervisor) processname = as_string(self.config.name) if state == ProcessStates.STARTING: @@ -735,9 +763,13 @@ def before_spawn(self): '%s:%s' % (self.group, dir(self.group))) self.fcgi_sock = self.group.socket_manager.get_socket() - def spawn(self): + def spawn(self, supervisor=None): """ Overrides Subprocess.spawn() so we can hook in before it happens + Parameters: + supervisor : This parameter has no effect and is only for + not breaking the tests. This would be needed for the depends_on + feature which is not available for FastCGI subprocesses. """ self.before_spawn() pid = Subprocess.spawn(self) @@ -842,9 +874,15 @@ def before_remove(self): pass class ProcessGroup(ProcessGroupBase): - def transition(self): + def transition(self, supervisor=None): + """ + Parameters: + supervisor : supervisord instance. This parameter is required + to keep track of all dependent processes in + supervisor.process_spawn_dict + """ for proc in self.processes.values(): - proc.transition() + proc.transition(supervisor) class FastCGIProcessGroup(ProcessGroup): @@ -879,11 +917,17 @@ def handle_rejected(self, event): # rebuffer the event self._acceptEvent(event.event, head=True) - def transition(self): + def transition(self, supervisor=None): + """ + Parameters: + supervisor : This parameter has no effect and is only for + not breaking the tests. This would be needed for the depends_on + feature which is not available for EventListenerPool. + """ processes = self.processes.values() dispatch_capable = False for process in processes: - process.transition() + process.transition(supervisor) # this is redundant, we do it in _dispatchEvent too, but we # want to reduce function call overhead if process.state == ProcessStates.RUNNING: diff --git a/supervisor/rpcinterface.py b/supervisor/rpcinterface.py index 7b32ae8c9..07f2f4917 100644 --- a/supervisor/rpcinterface.py +++ b/supervisor/rpcinterface.py @@ -304,7 +304,11 @@ def startProcess(self, name, wait=True): raise RPCError(Faults.FAILED, "%s is in an unknown process state" % name) - process.spawn() + process.spawn(self.supervisord) + # if process has dependees, return succesfull start - + # errors will be handled in main loop and inside process.spawn() + if process.config.depends_on is not None: + return True # We call reap() in order to more quickly obtain the side effects of # process.finish(), which reap() eventually ends up calling. This @@ -596,6 +600,8 @@ def getAllConfigInfo(self): 'stderr_logfile_maxbytes': pconfig.stderr_logfile_maxbytes, 'stderr_syslog': pconfig.stderr_syslog, 'serverurl': pconfig.serverurl, + 'depends_on': pconfig.depends_on, + 'spawn_timeout': pconfig.spawn_timeout, } # no support for these types in xml-rpc d.update((k, 'auto') for k, v in d.items() if v is Automatic) diff --git a/supervisor/supervisord.py b/supervisor/supervisord.py index 138732dd4..c4e96823c 100755 --- a/supervisor/supervisord.py +++ b/supervisor/supervisord.py @@ -44,6 +44,9 @@ from supervisor import events from supervisor.states import SupervisorStates from supervisor.states import getProcessStateDescription +from supervisor.graphutils import Graph + +from supervisor.states import ProcessStates class Supervisor: stopping = False # set after we detect that we are handling a stop request @@ -55,6 +58,9 @@ def __init__(self, options): self.options = options self.process_groups = {} self.ticks = {} + self.process_spawn_dict = dict() + self.process_started_dict = dict() + self.abort_queing = None def main(self): if not self.options.first: @@ -84,6 +90,29 @@ def run(self): try: for config in self.options.process_group_configs: self.add_process_group(config) + # add processes to directed graph, to check for dependency cycles + self.g = Graph(len(self.options.process_group_configs)) + # replace depends_on string with actual process object + for config in (self.options.process_group_configs): + # check dependencies for all programs in group: + for conf in enumerate(config.process_configs): + if config.process_configs[conf[0]].depends_on is not None: + process_dict=dict({}) + # split to get all processes in case there are multiple dependencies + dependent_processes = (config.process_configs[conf[0]].depends_on).split() + for process in dependent_processes: + # this can be of form group:process or simply process + try: + dependent_group, dependent_process=process.split(":") + except: + dependent_group=dependent_process=process + self.g.addEdge(config.process_configs[conf[0]].name, dependent_process) + process_dict[dependent_process] = self.process_groups[dependent_group].processes[dependent_process] + config.process_configs[conf[0]].depends_on = process_dict + # check for cyclical process dependencies + if self.g.cyclic() == 1: + raise AttributeError('Process config contains dependeny cycle(s)! Check config files again!') + self.options.openhttpservers(self) self.options.setsignals() if (not self.options.nodaemon) and self.options.first: @@ -239,7 +268,10 @@ def runforever(self): combined_map[fd].handle_error() for group in pgroups: - group.transition() + group.transition(self) + + self._spawn_dependee_queue() + self._handle_spawn_timeout() self.reap() self.handle_signal() @@ -316,6 +348,133 @@ def handle_signal(self): def get_state(self): return self.options.mood + def _spawn_dependee_queue(self): + """ + Iterate over processes that are not started but added to + process_spawn_dict. Spawn all processes which are ready + (All dependees RUNNING or process without dependees) + """ + for process_name, process_object in list(self.process_spawn_dict.items()): + if process_object.config.depends_on is not None: + failed_depending_processes = self._get_failed_depending_processes(process_object) + if failed_depending_processes: + self._remove_failed_process_and_dependency_from_queue(process_object, failed_depending_processes) + break + if self._all_dependees_running(process_object): + self._spawn_process_from_process_dict(process_name, process_object) + else: + self._spawn_process_from_process_dict(process_name, process_object) + + def _get_failed_depending_processes(self, process_object): + """ Return all processes in the dependency chain which have failed. + """ + failed_depending_processes = dict() + for dependee in process_object.config.depends_on.values(): + if dependee.state is ProcessStates.BACKOFF or dependee.state is ProcessStates.FATAL: + failed_depending_processes[dependee.config.name] = dependee + return failed_depending_processes + + def _all_dependees_running(self, process_object): + return all([dependee.state is ProcessStates.RUNNING for dependee in + process_object.config.depends_on.values()]) + + def _spawn_process_from_process_dict(self, process_name, process_object): + self.process_started_dict[process_name] = process_object + del self.process_spawn_dict[process_name] + # only spawn if the process is not running yet (could be started in the meanwhile) + if (process_object.state is not ProcessStates.STARTING and + process_object.state is not ProcessStates.RUNNING): + process_object.spawn(self) + process_object.notify_timer = 5 + + def _remove_processes_from_queue(self, processes): + for process_name in processes: + self.process_spawn_dict.pop(process_name, None) + + def _remove_failed_process_and_dependency_from_queue(self, process_object, failed_depending_processes): + not_startable_processes = [] + for failed_process_name, failed_process in failed_depending_processes.items(): + not_startable_processes.append(failed_process_name) + # get all processes which can not be spawned any longer because any dependency failed + for process_name, process_object in self.process_spawn_dict.items(): + if self.g.connected(process_name, failed_process_name): + not_startable_processes.append(process_name) + if process_object.config.autostart: + process_object.config.autostart=False + msg = ("Disabling the autostart of process {}, " + "because a dependent process failed to start".format(process_name)) + process_object.config.options.logger.warn(msg) + msg = ("Trying to start process {}. " + "Any of the dependent processe(s) failed to start. " + "Please fix all processe(s) in FATAL state or restart these manually, " + "otherwise process {} can not be started" + .format(process_name, + process_name)) + process_object.config.options.logger.warn(msg) + self._remove_processes_from_queue(not_startable_processes) + + def _handle_spawn_timeout(self): + """ + Log info message each 5 seconds if some process is waiting on a dependee + Timeout if a process needs longer than spawn_timeout (default=60 seconds) + to reach RUNNING + """ + # check if any of the processes which was started did not make it and + # remove RUNNING processes from the process_started_dict. + if self.abort_queing is not None: + self.abort_queing.change_state(ProcessStates.FATAL) + self.abort_queing = None + if self.process_started_dict: + for process_name, process_object in list(self.process_started_dict.items()): + if process_object.state is ProcessStates.RUNNING: + del self.process_started_dict[process_name] + # handle timeout error. + elif (time.time() - process_object.laststart) >= process_object.config.spawn_timeout: + self._timeout_process(process_name, process_object) + del self.process_started_dict[process_name] + break + # notify user about waiting + elif (time.time() - process_object.laststart) >= process_object.notify_timer: + self._notfiy_user_about_waiting(process_name, process_object) + + def _timeout_process(self, process_name, process_object): + msg = ("timeout: dependee process {} in {} did not reach RUNNING within" + " {} seconds." + .format(process_name, + getProcessStateDescription(process_object.state), + process_object.config.spawn_timeout)) + process_object.config.options.logger.warn(msg) + process_object.stop() + # keep track of the process that timed out - will be set to FATAL in + # the next iteration + self.abort_queing = process_object + keys_to_remove = [] + for process_name, process_object in self.process_spawn_dict.items(): + if self.g.connected(process_name, self.abort_queing.config.name): + keys_to_remove.append(process_name) + msg = ("{} will not be spawned, because {} did not " + "successfully start".format(process_name, self.abort_queing.config.name)) + process_object.config.options.logger.warn(msg) + for key in keys_to_remove: + del self.process_spawn_dict[key] + keys_to_remove = [] + for process_name, process_object in self.process_started_dict.items(): + if self.g.connected(process_name, self.abort_queing.config.name): + keys_to_remove.append(process_name) + msg = ("stopping {}, because {} did not successfully " + "start".format(process_name, self.abort_queing.config.name)) + process_object.config.options.logger.warn(msg) + process_object.stop() + for key in keys_to_remove: + del self.process_started_dict[key] + + def _notfiy_user_about_waiting(self, process_name, process_object): + process_object.notify_timer += 5 + msg = ("waiting for dependee process {} in {} state to be RUNNING" + .format(process_name, + getProcessStateDescription(process_object.state))) + process_object.config.options.logger.info(msg) + def timeslice(period, when): return int(when - (when % period)) diff --git a/supervisor/tests/base.py b/supervisor/tests/base.py index 7553a2cce..0e309a8de 100644 --- a/supervisor/tests/base.py +++ b/supervisor/tests/base.py @@ -450,7 +450,7 @@ def signal(self, signal): self.sent_signal = signal - def spawn(self): + def spawn(self, supervisor=None): self.spawned = True from supervisor.process import ProcessStates self.state = ProcessStates.RUNNING @@ -494,7 +494,7 @@ def write(self, chars): raise self.write_exception self.stdin_buffer += chars - def transition(self): + def transition(self, supervisor=None): self.transitioned = True def __eq__(self, other): @@ -517,7 +517,8 @@ def __init__(self, options, name, command, directory=None, umask=None, stderr_syslog=False, redirect_stderr=False, stopsignal=None, stopwaitsecs=10, stopasgroup=False, killasgroup=False, - exitcodes=(0,), environment=None, serverurl=None): + exitcodes=(0,), environment=None, serverurl=None, depends_on=None, + spawn_timeout=60): self.options = options self.name = name self.command = command @@ -553,6 +554,8 @@ def __init__(self, options, name, command, directory=None, umask=None, self.umask = umask self.autochildlogs_created = False self.serverurl = serverurl + self.depends_on = depends_on + self.spawn_timeout = spawn_timeout def get_path(self): return ["/bin", "/usr/bin", "/usr/local/bin"] @@ -1043,7 +1046,7 @@ def __init__(self, config): self.unstopped_processes = [] self.before_remove_called = False - def transition(self): + def transition(self, supervisor=None): self.transitioned = True def before_remove(self): diff --git a/supervisor/tests/test_rpcinterfaces.py b/supervisor/tests/test_rpcinterfaces.py index 0827adf05..56651fcd2 100644 --- a/supervisor/tests/test_rpcinterfaces.py +++ b/supervisor/tests/test_rpcinterfaces.py @@ -432,10 +432,10 @@ def test_startProcess_spawnerr_in_onwait(self): supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED) process = supervisord.process_groups['foo'].processes['foo'] - def spawn(): + def spawn(supervisor=None): process.spawned = True process.state = ProcessStates.STARTING - def transition(): + def transition(supervisor=None): process.spawnerr = 'abc' process.spawn = spawn process.transition = transition @@ -450,7 +450,7 @@ def test_startProcess_success_in_onwait(self): supervisord = PopulatedDummySupervisor(options, 'foo', pconfig) supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED) process = supervisord.process_groups['foo'].processes['foo'] - def spawn(): + def spawn(supervisor=None): process.spawned = True process.state = ProcessStates.STARTING process.spawn = spawn @@ -481,7 +481,7 @@ def test_startProcess_abnormal_term_process_not_running(self): supervisord.set_procattr('foo', 'state', ProcessStates.STOPPED) interface = self._makeOne(supervisord) process = supervisord.process_groups['foo'].processes['foo'] - def spawn(): + def spawn(supervisor=None): process.spawned = True process.state = ProcessStates.STARTING process.spawn = spawn