Skip to content

Commit

Permalink
Tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
bestie committed May 1, 2024
1 parent 9594e8a commit c8cff9e
Showing 1 changed file with 24 additions and 23 deletions.
47 changes: 24 additions & 23 deletions lib/racecar/forking_runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,49 +2,49 @@

module Racecar
class ForkingRunner
def initialize(runner:, config:, logger:, liveness_monitor: LivenessMonitor.new)
def initialize(runner:, config:, logger:, parent_monitor: ParentProcessMonitor.new)
@runner = runner
@config = config
@logger = logger
@pids = []
@liveness_monitor = liveness_monitor
@parent_monitor = parent_monitor
@running = false
end

attr_reader :config, :runner, :logger, :pids, :liveness_monitor
private :config, :runner, :logger, :pids, :liveness_monitor
attr_reader :config, :runner, :logger, :pids, :parent_monitor
private :config, :runner, :logger, :pids, :parent_monitor

def run
config.prefork.call
install_signal_handlers

@running = true

@pids = config.forks.times.map do |n|
pid = fork do
liveness_monitor.child_post_fork
parent_monitor.child_post_fork
config.postfork.call

liveness_monitor.on_exit do
logger.warn "Supervisor dead, exiting."
parent_monitor.on_parent_exit do
logger.warn("Supervisor dead, exiting.")
runner.stop
end

runner.run
end
logger.info "Racecar forked consumer process #{pid}"
logger.debug("Racecar forked consumer process #{pid}.")

pid
end

liveness_monitor.parent_post_fork

install_signal_handlers
parent_monitor.parent_post_fork

wait_for_child_processes
end

def stop
@running = false
$stdout.puts "Racecar::ForkingRunner runner stopping #{Process.pid}"
logger.debug("Racecar::ForkingRunner runner stopping #{Process.pid}.")
terminate_workers
end

Expand All @@ -59,14 +59,15 @@ def terminate_workers
begin
Process.kill("TERM", pid)
rescue Errno::ESRCH
logger.debug("Racecar::ForkingRunner Process not found #{Process.pid}.")
end
end
end

def check_workers
pids.each do |pid|
unless worker_running?(pid)
$stdout.puts("A forker worker has exited. Shuttin' it down ...")
logger.debug("A forked worker has exited unepxectedly. Shutting everything down.")
stop
return
end
Expand All @@ -91,8 +92,7 @@ def wait_for_child_processes

def install_signal_handlers
Signal.trap("CHLD") do |sid|
# Received when child process terminates
# $stderr.puts "👼👼👼👼👼👼👼 SIGCHLD"
logger.warn("Received SIGCHLD")
check_workers if running?
end
Signal.trap("TERM") do |sid|
Expand All @@ -103,7 +103,7 @@ def install_signal_handlers
end
end

class LivenessMonitor
class ParentProcessMonitor
def initialize(pipe_ends = IO.pipe)
@read_end, @write_end = pipe_ends
@monitor_thread = nil
Expand All @@ -112,20 +112,21 @@ def initialize(pipe_ends = IO.pipe)
attr_reader :read_end, :write_end, :monitor_thread
private :read_end, :write_end, :monitor_thread

def on_parent_exit(&block)
child_post_fork
monitor_thread = Thread.new do
IO.select([read_end])
block.call
end
end

def parent_post_fork
read_end.close
end

def child_post_fork
write_end.close
end

def on_exit(&block)
monitor_thread = Thread.new do
IO.select([read_end])
block.call
end
end
end
end
end

0 comments on commit c8cff9e

Please sign in to comment.