diff --git a/development.txt b/development.txt index 6d4f866..40bb1a4 100644 --- a/development.txt +++ b/development.txt @@ -1,6 +1,6 @@ coverage==4.4.1 mock==1.0.1 -moto==1.3.4 +moto==1.3.7 nose==1.3.0 pre-commit==0.7.6 sure==1.2.2 diff --git a/pyqs/worker.py b/pyqs/worker.py index 8d90d34..f27246e 100644 --- a/pyqs/worker.py +++ b/pyqs/worker.py @@ -43,6 +43,7 @@ def shutdown(self): "Received shutdown signal, shutting down PID {}!".format( os.getpid())) self.should_exit.set() + self.join() def parent_is_alive(self): if os.getppid() == 1: diff --git a/tests/test_manager_worker.py b/tests/test_manager_worker.py index da1133a..681ba1f 100644 --- a/tests/test_manager_worker.py +++ b/tests/test_manager_worker.py @@ -7,6 +7,7 @@ import boto3 from mock import patch, Mock, MagicMock from moto import mock_sqs, mock_sqs_deprecated +from nose.tools import timed from pyqs.main import main, _main from pyqs.worker import ManagerWorker @@ -153,39 +154,6 @@ def test_master_spawns_worker_processes(): manager.stop() -@mock_sqs -@mock_sqs_deprecated -def test_master_replaces_reader_processes(): - """ - Test managing process replaces reader children - """ - - # Setup SQS Queue - conn = boto3.client('sqs', region_name='us-east-1') - conn.create_queue(QueueName="tester") - - # Setup Manager - manager = ManagerWorker( - queue_prefixes=["tester"], worker_concurrency=1, interval=1, - batchsize=10, - ) - manager.start() - - # Get Reader PID - pid = manager.reader_children[0].pid - - # Kill Reader and wait to replace - manager.reader_children[0].shutdown() - time.sleep(0.1) - manager.replace_workers() - - # Check Replacement - manager.reader_children[0].pid.shouldnt.equal(pid) - - # Cleanup - manager.stop() - - @mock_sqs @mock_sqs_deprecated def test_master_counts_processes(): @@ -223,34 +191,63 @@ def test_master_counts_processes(): @mock_sqs @mock_sqs_deprecated -def test_master_replaces_worker_processes(): - """ - Test managing process replaces worker processes - """ - # Setup SQS Queue - conn = boto3.client('sqs', region_name='us-east-1') - conn.create_queue(QueueName="tester") - - # Setup Manager - manager = ManagerWorker( - queue_prefixes=["tester"], worker_concurrency=1, interval=1, - batchsize=10, - ) - manager.start() - - # Get Worker PID - pid = manager.worker_children[0].pid - - # Kill Worker and wait to replace - manager.worker_children[0].shutdown() - time.sleep(0.1) - manager.replace_workers() - - # Check Replacement - manager.worker_children[0].pid.shouldnt.equal(pid) - - # Cleanup - manager.stop() +class TestMasterWorker: + + def setup(self): + # For debugging test + import sys + logger = logging.getLogger("pyqs") + logger.setLevel(logging.DEBUG) + stdout_handler = logging.StreamHandler(sys.stdout) + logger.addHandler(stdout_handler) + + # Setup SQS Queue + self.conn = boto3.client('sqs', region_name='us-east-1') + self.conn.create_queue(QueueName="tester") + + # Setup Manager + self.manager = ManagerWorker( + queue_prefixes=["tester"], worker_concurrency=1, interval=1, + batchsize=10, + ) + self.manager.start() + + def teardown(self): + self.manager.stop() + + @timed(60) + def test_master_replaces_reader_processes(self): + """ + Test managing process replaces reader children + """ + # Get Reader PID + pid = self.manager.reader_children[0].pid + + # Kill Reader and wait to replace + self.manager.reader_children[0].shutdown() + time.sleep(0.1) + self.manager.reader_children[0].is_alive().should.equal(False) + self.manager.replace_workers() + + # Check Replacement + self.manager.reader_children[0].pid.shouldnt.equal(pid) + + @timed(60) + def test_master_replaces_worker_processes(self): + """ + Test managing process replaces worker processes + """ + # Get Worker PID + pid = self.manager.worker_children[0].pid + + # Kill Worker and wait to replace + self.manager.worker_children[0].shutdown() + time.sleep(0.1) + self.manager.worker_children[0].is_alive().should.equal(False) + self.manager.replace_workers() + + # Check Replacement + self.manager.worker_children[0].pid.shouldnt.equal(pid) @mock_sqs