Skip to content

feature: add optional parameter depends_on attribute to start dependent processes consecutively #1449

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
56 changes: 56 additions & 0 deletions supervisor/graphutils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
from collections import defaultdict

class Graph():
""" Class to save and analyse a directed graph
"""
def __init__(self,vertices):
self.graph = defaultdict(list)
self.V = vertices

def addEdge(self,u,v):
self.graph[u].append(v)

def cyclic(self):
""" Return True if the directed graph has a cycle.
The graph must be represented as a dictionary mapping vertices to
iterables of neighbouring vertices. For example:

>>> cyclic({1: (2,), 2: (3,), 3: (1,)})
True
>>> cyclic({1: (2,), 2: (3,), 3: (4,)})
False

"""
path = set()
visited = set()

def visit(vertex):
if vertex in visited:
return False
visited.add(vertex)
path.add(vertex)
for neighbour in self.graph.get(vertex, ()):
if neighbour in path or visit(neighbour):
return True
path.remove(vertex)
return False

return any(visit(v) for v in self.graph)


def connected(self, start_node, end_node):
""" Check whether two nodes are connected, and if the
start_node comes before the end_node.

"""
visited = set()

return self._explore_graph_from(start_node, end_node, visited)

def _explore_graph_from(self, start_node, end_node, visited):
""" Check if end_node comes after start_node and if they are connected
"""
for neighbour in self.graph.get(start_node, ()):
visited.add(neighbour)
self._explore_graph_from(neighbour, end_node, visited)
return end_node in visited
10 changes: 8 additions & 2 deletions supervisor/options.py
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,8 @@ def get(section, opt, *args, **kwargs):
serverurl = get(section, 'serverurl', None)
if serverurl and serverurl.strip().upper() == 'AUTO':
serverurl = None
depends_on = get(section, 'depends_on', None)
spawn_timeout = int(get(section, 'spawn_timeout', 60))

# find uid from "user" option
user = get(section, 'user', None)
Expand Down Expand Up @@ -1067,7 +1069,10 @@ def get(section, opt, *args, **kwargs):
exitcodes=exitcodes,
redirect_stderr=redirect_stderr,
environment=environment,
serverurl=serverurl)
serverurl=serverurl,
depends_on=depends_on,
spawn_timeout=spawn_timeout,
)

programs.append(pconfig)

Expand Down Expand Up @@ -1888,7 +1893,8 @@ class ProcessConfig(Config):
'stderr_events_enabled', 'stderr_syslog',
'stopsignal', 'stopwaitsecs', 'stopasgroup', 'killasgroup',
'exitcodes', 'redirect_stderr' ]
optional_param_names = [ 'environment', 'serverurl' ]
optional_param_names = [ 'environment', 'serverurl',
'depends_on', 'spawn_timeout' ]

def __init__(self, options, **params):
self.options = options
Expand Down
68 changes: 56 additions & 12 deletions supervisor/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,33 @@ def record_spawnerr(self, msg):
self.spawnerr = msg
self.config.options.logger.info("spawnerr: %s" % msg)

def spawn(self):
def queue_all_dependee_processes(self, supervisor):
if (self.config.name not in supervisor.process_spawn_dict.keys() and
self.config.name not in supervisor.process_started_dict.keys()):
supervisor.process_spawn_dict[self.config.name] = self
if self.config.depends_on is not None:
for dependee in self.config.depends_on.values():
if dependee.state is not ProcessStates.RUNNING and dependee.state is not ProcessStates.STARTING:
if (dependee.config.name not in supervisor.process_spawn_dict.keys() and
dependee.config.name not in supervisor.process_started_dict.keys()):
supervisor.process_spawn_dict[dependee.config.name] = dependee
dependee.queue_all_dependee_processes(supervisor)

def spawn(self, supervisor=None):
"""Start the subprocess. It must not be running already.

Return the process id. If the fork() call fails, return None.
Parameters:
supervisor : supervisord instance. This parameter is required
to keep track of all dependent processes in
supervisor.process_spawn_dict
"""
if self.config.depends_on is not None and not supervisor.abort_queing :
if any([dependee.state is not ProcessStates.RUNNING for dependee in
self.config.depends_on.values()]):
self.queue_all_dependee_processes(supervisor)
return

options = self.config.options
processname = as_string(self.config.name)

Expand Down Expand Up @@ -653,7 +675,13 @@ def __repr__(self):
def get_state(self):
return self.state

def transition(self):
def transition(self, supervisor=None):
"""
Parameters:
supervisor : supervisord instance. This parameter is required
to keep track of all dependent processes in
supervisor.process_spawn_dict
"""
now = time.time()
state = self.state

Expand All @@ -665,22 +693,22 @@ def transition(self):
# dont start any processes if supervisor is shutting down
if state == ProcessStates.EXITED:
if self.config.autorestart:
# STOPPED -> STARTING
if self.config.autorestart is RestartUnconditionally:
# EXITED -> STARTING
self.spawn()
self.spawn(supervisor)
else: # autorestart is RestartWhenExitUnexpected
if self.exitstatus not in self.config.exitcodes:
# EXITED -> STARTING
self.spawn()
self.spawn(supervisor)
elif state == ProcessStates.STOPPED and not self.laststart:
if self.config.autostart:
# STOPPED -> STARTING
self.spawn()
self.spawn(supervisor)
elif state == ProcessStates.BACKOFF:
if self.backoff <= self.config.startretries:
if now > self.delay:
# BACKOFF -> STARTING
self.spawn()
self.spawn(supervisor)

processname = as_string(self.config.name)
if state == ProcessStates.STARTING:
Expand Down Expand Up @@ -735,9 +763,13 @@ def before_spawn(self):
'%s:%s' % (self.group, dir(self.group)))
self.fcgi_sock = self.group.socket_manager.get_socket()

def spawn(self):
def spawn(self, supervisor=None):
"""
Overrides Subprocess.spawn() so we can hook in before it happens
Parameters:
supervisor : This parameter has no effect and is only for
not breaking the tests. This would be needed for the depends_on
feature which is not available for FastCGI subprocesses.
"""
self.before_spawn()
pid = Subprocess.spawn(self)
Expand Down Expand Up @@ -842,9 +874,15 @@ def before_remove(self):
pass

class ProcessGroup(ProcessGroupBase):
def transition(self):
def transition(self, supervisor=None):
"""
Parameters:
supervisor : supervisord instance. This parameter is required
to keep track of all dependent processes in
supervisor.process_spawn_dict
"""
for proc in self.processes.values():
proc.transition()
proc.transition(supervisor)

class FastCGIProcessGroup(ProcessGroup):

Expand Down Expand Up @@ -879,11 +917,17 @@ def handle_rejected(self, event):
# rebuffer the event
self._acceptEvent(event.event, head=True)

def transition(self):
def transition(self, supervisor=None):
"""
Parameters:
supervisor : This parameter has no effect and is only for
not breaking the tests. This would be needed for the depends_on
feature which is not available for EventListenerPool.
"""
processes = self.processes.values()
dispatch_capable = False
for process in processes:
process.transition()
process.transition(supervisor)
# this is redundant, we do it in _dispatchEvent too, but we
# want to reduce function call overhead
if process.state == ProcessStates.RUNNING:
Expand Down
8 changes: 7 additions & 1 deletion supervisor/rpcinterface.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,11 @@ def startProcess(self, name, wait=True):
raise RPCError(Faults.FAILED,
"%s is in an unknown process state" % name)

process.spawn()
process.spawn(self.supervisord)
# if process has dependees, return succesfull start -
# errors will be handled in main loop and inside process.spawn()
if process.config.depends_on is not None:
return True

# We call reap() in order to more quickly obtain the side effects of
# process.finish(), which reap() eventually ends up calling. This
Expand Down Expand Up @@ -596,6 +600,8 @@ def getAllConfigInfo(self):
'stderr_logfile_maxbytes': pconfig.stderr_logfile_maxbytes,
'stderr_syslog': pconfig.stderr_syslog,
'serverurl': pconfig.serverurl,
'depends_on': pconfig.depends_on,
'spawn_timeout': pconfig.spawn_timeout,
}
# no support for these types in xml-rpc
d.update((k, 'auto') for k, v in d.items() if v is Automatic)
Expand Down
Loading