Skip to content

Commit

Permalink
fix: ensure the worker exits when a thread dies with an unhandled exc…
Browse files Browse the repository at this point in the history
…eption (#182)

* fix: ensure the worker exits when a thread dies with an unhandled exception

* fix: pin sqlite version

* fix: require missing libs

* chore: add test to cover zombie threads

* chore: improve tests to cover thread failures

* chore: bump version
  • Loading branch information
claudioscalzo authored Oct 1, 2024
1 parent 8e375d0 commit 4d8ae3f
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 22 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
ruby-version: ${{ matrix.ruby }}
- name: install gems
run: |
gem install sqlite3
gem install sqlite3 -v '~> 1.0'
gem install bundler
bundle install --gemfile ${{ matrix.gemfile }}
- name: Tests
Expand Down
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
### 10.0.0
- Ensure all worker threads exit if a thread dies in case of an unhandled exception, to avoid "zombie" workers (running without any consumer thread)

### 9.0.0
- Add support for Ruby 3.2 and Rails 7
- Remove support for Ruby 2.6, 2.7 and Rails 5 and 6.0
Expand Down
2 changes: 1 addition & 1 deletion lib/sqewer/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module Sqewer
VERSION = '9.0.0'
VERSION = '10.0.0'
end
9 changes: 8 additions & 1 deletion lib/sqewer/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,14 @@ def start
@logger.info { '[worker] Starting with %d consumer threads' % @num_threads }
@execution_queue = Queue.new

consumers = (1..@num_threads).map do
# Ensure that unhandled exceptions inside threads make the worker fail,
# to avoid silent failures with no consumer threads running.
Thread.abort_on_exception = true

consumers = (1..@num_threads).each_with_index.map do |_, index|
Thread.new do
Thread.current[:role] = :consumer
Thread.current[:id] = index
loop { take_and_execute }
end
end
Expand All @@ -99,6 +105,7 @@ def start
# grab new messages and place them on the local queue.
owning_worker = self # self won't be self anymore in the thread
provider = Thread.new do
Thread.current[:role] = :provider
loop do
begin
break if stopping?
Expand Down
1 change: 1 addition & 0 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
$LOAD_PATH.unshift File.expand_path("../../lib", __FILE__)

require 'base64'
require 'rspec'
require 'rspec/wait'
require 'dotenv'
Expand Down
82 changes: 63 additions & 19 deletions spec/sqewer/worker_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -59,25 +59,6 @@
worker.stop
end
end

context 'when the connection to SQS fails in receive_messages' do
it 'it stops itself gracefully' do
fake_conn = double('bad connection')
allow(fake_conn).to receive(:receive_messages).and_raise("boom")

log_device = StringIO.new('')
worker = described_class.new(logger: Logger.new(log_device), connection: fake_conn)

expect(worker).to receive(:stop).at_least(:once)
expect(worker).not_to receive(:kill)

worker.start
wait_for(worker.state.in_state?(:stopped))
wait_for{
log_device.string
}.to include("boom")
end
end

context 'when the job payload cannot be unserialized from JSON due to invalid syntax' do
it 'is able to cope with an exception when the job class is unknown (one of generic exceptions)' do
Expand Down Expand Up @@ -141,4 +122,67 @@ def run(executor)
end
end
end

context 'when a worker thread raises a non-StandardError exception' do
class CustomFatalException < Exception; end

it 'kills all threads and stops the worker' do
log_device = StringIO.new('')
worker = described_class.new(logger: Logger.new(log_device), num_threads: 4)
allow(worker).to receive(:take_and_execute) do
if Thread.current[:id] == 2 && Thread.current[:role] == :consumer
# just for one consumer thread, raise an exception soon after starting
sleep 1
raise CustomFatalException, "Custom Fatal Exception"
else
# for all the other consumer threads, sleep for a long time to simulate a working thread
sleep 30
end
end

begin
worker.start

sleep 5
consumer_threads = worker.threads.select { |t| t[:role] == :consumer }

# all the consumer threads should have died by now, as
# the `Thread.abort_on_exception` flag is set to `true`
expect(consumer_threads).to all(satisfy { |t| !t.alive? })

worker.stop
rescue CustomFatalException
# expected from a consumer thread, don't fail the test
end
end
end

context 'when a worker thread raises a StandardError exception' do
class CustomFatalException < Exception; end

it 'the processing continues' do
log_device = StringIO.new('')
worker = described_class.new(logger: Logger.new(log_device), num_threads: 4)
allow(worker).to receive(:handle_message) do
if Thread.current[:id] == 2 && Thread.current[:role] == :consumer
# just for one consumer thread, raise an exception soon after starting
sleep 1
raise StandardError
else
# for all the other consumer threads, sleep for a long time to simulate a working thread
sleep 30
end
end

worker.start

sleep 5
consumer_threads = worker.threads.select { |t| t[:role] == :consumer }

# all the consumer threads should still be alive
expect(consumer_threads).to all(satisfy { |t| t.alive? })

worker.stop
end
end
end

0 comments on commit 4d8ae3f

Please sign in to comment.