diff --git a/jobs/involuntary-dissolutions/config.py b/jobs/involuntary-dissolutions/config.py index c1c0406403..8697b84dec 100644 --- a/jobs/involuntary-dissolutions/config.py +++ b/jobs/involuntary-dissolutions/config.py @@ -18,6 +18,7 @@ from dotenv import find_dotenv, load_dotenv + load_dotenv(find_dotenv()) CONFIGURATION = { diff --git a/jobs/involuntary-dissolutions/involuntary_dissolutions.py b/jobs/involuntary-dissolutions/involuntary_dissolutions.py index ca033ef9f0..30d5a820d7 100644 --- a/jobs/involuntary-dissolutions/involuntary_dissolutions.py +++ b/jobs/involuntary-dissolutions/involuntary_dissolutions.py @@ -21,11 +21,16 @@ import sentry_sdk # noqa: I001, E501; pylint: disable=ungrouped-imports; conflicts with Flake8 from croniter import croniter from flask import Flask -from legal_api.models import Batch, BatchProcessing, Configuration, db # noqa: I001 +from legal_api.core.filing import Filing as CoreFiling +from legal_api.models import Batch, BatchProcessing, Business, Configuration, Filing, db # noqa: I001 +from legal_api.services.filings.validations.dissolution import DissolutionTypes from legal_api.services.flags import Flags from legal_api.services.involuntary_dissolution import InvoluntaryDissolutionService +from legal_api.services.queue import QueueService +from sentry_sdk import capture_message from sentry_sdk.integrations.logging import LoggingIntegration from sqlalchemy import Date, cast, func +from sqlalchemy.orm import aliased import config # pylint: disable=import-error from utils.logging import setup_logging # pylint: disable=import-error @@ -73,7 +78,77 @@ def shell_context(): app.shell_context_processor(shell_context) -def initiate_dissolution_process(app: Flask): # pylint: disable=redefined-outer-name,too-many-locals +def create_invountary_dissolution_filing(business_id: int): + """Create a filing entry to represent an involuntary dissolution filing.""" + business = Business.find_by_internal_id(business_id) + + filing = Filing() + filing.business_id = business.id + filing._filing_type = CoreFiling.FilingTypes.DISSOLUTION # pylint: disable=protected-access + filing._filing_sub_type = DissolutionTypes.INVOLUNTARY # pylint: disable=protected-access + filing.filing_json = { + 'filing': { + 'header': { + 'date': datetime.utcnow().date().isoformat(), + 'name': 'dissolution', + 'certifiedBy': '' + }, + 'business': { + 'legalName': business.legal_name, + 'legalType': business.legal_type, + 'identifier': business.identifier, + 'foundingDate': business.founding_date.isoformat() + }, + 'dissolution': { + 'dissolutionDate': datetime.utcnow().date().isoformat(), + 'dissolutionType': 'involuntary' + } + } + } + + filing.save() + + return filing + + +async def put_filing_on_queue(filing_id: int, app: Flask, qsm: QueueService): + """Send queue message to filer to dissolve business.""" + try: + subject = app.config['NATS_FILER_SUBJECT'] + payload = {'filing': {'id': filing_id}} + await qsm.publish_json_to_subject(payload, subject) + except Exception as err: # pylint: disable=broad-except # noqa F841; + # mark any failure for human review + capture_message( + f'Queue Error: Failed to place filing {filing_id} on Queue with error:{err}', + level='error' + ) + + +def mark_eligible_batches_completed(): + """Mark batches completed if all of their associated batch_processings are compeleted.""" + AliasBatchProcessing = aliased(BatchProcessing) # pylint: disable=invalid-name # noqa N806 + batches = ( + db.session.query(Batch) + .join(BatchProcessing, Batch.id == BatchProcessing.batch_id) + .filter(Batch.batch_type == 'INVOLUNTARY_DISSOLUTION') + .filter(Batch.status != 'COMPLETED') + .filter( + ~db.session.query(AliasBatchProcessing) + .filter(Batch.id == AliasBatchProcessing.batch_id) + .filter(AliasBatchProcessing.status.notin_(['COMPLETED', 'WITHDRAWN'])) + .exists() + ) + .all() + ) + + for batch in batches: + batch.status = Batch.BatchStatus.COMPLETED + batch.end_date = datetime.utcnow() + batch.save() + + +def stage_1_process(app: Flask): # pylint: disable=redefined-outer-name,too-many-locals """Initiate dissolution process for new businesses that meet dissolution criteria.""" try: # check if batch has already run today @@ -133,7 +208,7 @@ def initiate_dissolution_process(app: Flask): # pylint: disable=redefined-outer def stage_2_process(app: Flask): - """Run dissolution stage 2 process for businesses meet moving criteria.""" + """Update batch processing data for previously created in_progress batches.""" batch_processings = ( db.session.query(BatchProcessing) .filter(BatchProcessing.batch_id == Batch.id) @@ -169,6 +244,46 @@ def stage_2_process(app: Flask): batch_processing.save() +async def stage_3_process(app: Flask, qsm: QueueService): + """Process actual dissolution of businesses.""" + batch_processings = ( + db.session.query(BatchProcessing) + .filter(BatchProcessing.batch_id == Batch.id) + .filter(Batch.batch_type == Batch.BatchType.INVOLUNTARY_DISSOLUTION) + .filter(Batch.status == Batch.BatchStatus.PROCESSING) + .filter( + BatchProcessing.status == BatchProcessing.BatchProcessingStatus.PROCESSING + ) + .filter( + BatchProcessing.step == BatchProcessing.BatchProcessingStep.WARNING_LEVEL_2 + ) + .filter( + BatchProcessing.trigger_date <= func.timezone('UTC', func.now()) + ) + .all() + ) + + # TODO: add check if warnings have been sent out & set batch_processing.status to error if not + + for batch_processing in batch_processings: + eligible, _ = InvoluntaryDissolutionService.check_business_eligibility( + batch_processing.business_identifier, exclude_in_dissolution=False + ) + if eligible: + filing = create_invountary_dissolution_filing(batch_processing.business_id) + await put_filing_on_queue(filing.id, app, qsm) + + batch_processing.step = BatchProcessing.BatchProcessingStep.DISSOLUTION + batch_processing.status = BatchProcessing.BatchProcessingStatus.COMPLETED + else: + batch_processing.status = BatchProcessing.BatchProcessingStatus.WITHDRAWN + batch_processing.notes = 'Moved back into good standing' + batch_processing.last_modified = datetime.utcnow() + batch_processing.save() + + mark_eligible_batches_completed() + + def can_run_today(cron_value: str): """Check if cron string is valid for today.""" tz = pytz.timezone('US/Pacific') @@ -178,7 +293,7 @@ def can_run_today(cron_value: str): def check_run_schedule(): - """Check if any of the dissolution stage is valid for this run.""" + """Check if any of the dissolution stages are valid for this run.""" stage_1_schedule_config = Configuration.find_by_name( config_name=Configuration.Names.DISSOLUTIONS_STAGE_1_SCHEDULE.value ) @@ -196,7 +311,7 @@ def check_run_schedule(): return cron_valid_1, cron_valid_2, cron_valid_3 -async def run(loop, application: Flask = None): # pylint: disable=redefined-outer-name +async def run(application: Flask, qsm: QueueService): # pylint: disable=redefined-outer-name """Run the stage 1-3 methods for dissolving businesses.""" if application is None: application = create_app() @@ -217,18 +332,19 @@ async def run(loop, application: Flask = None): # pylint: disable=redefined-out return if stage_1_valid: - initiate_dissolution_process(application) + stage_1_process(application) if stage_2_valid: stage_2_process(application) if stage_3_valid: - pass + await stage_3_process(application, qsm) if __name__ == '__main__': application = create_app() try: event_loop = asyncio.get_event_loop() - event_loop.run_until_complete(run(event_loop, application)) + queue_service = QueueService(app=application, loop=event_loop) + event_loop.run_until_complete(run(application, queue_service)) except Exception as err: # pylint: disable=broad-except; Catching all errors from the frameworks application.logger.error(err) # pylint: disable=no-member raise err diff --git a/jobs/involuntary-dissolutions/requirements/dev.txt b/jobs/involuntary-dissolutions/requirements/dev.txt index e8f1c165a6..2dcac39512 100644 --- a/jobs/involuntary-dissolutions/requirements/dev.txt +++ b/jobs/involuntary-dissolutions/requirements/dev.txt @@ -3,6 +3,7 @@ # Testing pytest pytest-mock +pytest-asyncio requests pyhamcrest diff --git a/jobs/involuntary-dissolutions/setup.py b/jobs/involuntary-dissolutions/setup.py index 497c563724..6af9611db9 100644 --- a/jobs/involuntary-dissolutions/setup.py +++ b/jobs/involuntary-dissolutions/setup.py @@ -15,6 +15,7 @@ from setuptools import find_packages, setup + setup( name='involuntary-dissolutions', packages=find_packages() diff --git a/jobs/involuntary-dissolutions/tests/unit/__init__.py b/jobs/involuntary-dissolutions/tests/unit/__init__.py index c020bf81d9..477bf79ae3 100644 --- a/jobs/involuntary-dissolutions/tests/unit/__init__.py +++ b/jobs/involuntary-dissolutions/tests/unit/__init__.py @@ -14,8 +14,8 @@ """The Test-Suite used to ensure that the Involuntary Dissolutions Job is working correctly.""" import datetime -from datedelta import datedelta +from datedelta import datedelta from legal_api.models import Batch, BatchProcessing, Business diff --git a/jobs/involuntary-dissolutions/tests/unit/test_involuntary_dissolutions.py b/jobs/involuntary-dissolutions/tests/unit/test_involuntary_dissolutions.py index ca09fb34b8..cea657f8bc 100644 --- a/jobs/involuntary-dissolutions/tests/unit/test_involuntary_dissolutions.py +++ b/jobs/involuntary-dissolutions/tests/unit/test_involuntary_dissolutions.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - """Tests for the Involuntary Dissolutions Job. Test suite to ensure that the Involuntary Dissolutions Job is working as expected. """ @@ -21,9 +20,19 @@ import pytest import pytz from datedelta import datedelta -from legal_api.models import Batch, BatchProcessing, Configuration - -from involuntary_dissolutions import check_run_schedule, initiate_dissolution_process, stage_2_process +from legal_api.core.filing import Filing as CoreFiling +from legal_api.models import Batch, BatchProcessing, Configuration, Filing +from legal_api.services.filings.validations.dissolution import DissolutionTypes + +from involuntary_dissolutions import ( + check_run_schedule, + create_invountary_dissolution_filing, + mark_eligible_batches_completed, + put_filing_on_queue, + stage_1_process, + stage_2_process, + stage_3_process, +) from . import factory_batch, factory_batch_processing, factory_business @@ -52,22 +61,74 @@ def test_check_run_schedule(): assert cron_valid_3 is False -def test_initiate_dissolution_process_job_already_ran(app, session): +def test_create_invountary_dissolution_filing(app, session): + """Assert that the involuntary dissolution filing is created successfully.""" + business = factory_business(identifier='BC1234567') + filing_id = create_invountary_dissolution_filing(business.id).id + + filing = Filing.find_by_id(filing_id) + assert filing + assert filing.business_id == business.id + assert filing.filing_type == CoreFiling.FilingTypes.DISSOLUTION + assert filing.filing_sub_type == DissolutionTypes.INVOLUNTARY + assert filing.filing_json + + +@pytest.mark.parametrize( + 'test_name, batch_processing_statuses, expected', [ + ( + 'MARKED_COMPLETED', + [ + BatchProcessing.BatchProcessingStatus.COMPLETED, + BatchProcessing.BatchProcessingStatus.WITHDRAWN + ], + Batch.BatchStatus.COMPLETED + ), + ( + 'NOT_MARKED_COMPLETED', + [ + BatchProcessing.BatchProcessingStatus.PROCESSING, + BatchProcessing.BatchProcessingStatus.COMPLETED, + BatchProcessing.BatchProcessingStatus.WITHDRAWN + ], + Batch.BatchStatus.PROCESSING + ), + ] +) +def test_mark_eligible_batches_completed(app, session, test_name, batch_processing_statuses, expected): + """Assert that the eligible batches are marked completed successfully.""" + business = factory_business(identifier='BC1234567') + batch = factory_batch(status=Batch.BatchStatus.PROCESSING) + + for batch_processing_status in batch_processing_statuses: + factory_batch_processing( + batch_id=batch.id, + business_id=business.id, + identifier=business.identifier, + status=batch_processing_status + ) + + mark_eligible_batches_completed() + + assert batch.status == expected + + +def test_stage_1_process_job_already_ran(app, session): """Assert that the job is skipped correctly if it already ran today.""" factory_business(identifier='BC1234567') # first run - initiate_dissolution_process(app) + stage_1_process(app) batches = Batch.find_by(batch_type=Batch.BatchType.INVOLUNTARY_DISSOLUTION) assert len(batches) == 1 # second run - initiate_dissolution_process(app) + stage_1_process(app) batches = Batch.find_by(batch_type=Batch.BatchType.INVOLUNTARY_DISSOLUTION) assert not len(batches) > 1 -def test_initiate_dissolution_process_zero_allowed(app, session): +def test_stage_1_process_zero_allowed(app, session): """Assert that the job is skipped correctly if no dissolutions are allowed.""" factory_business(identifier='BC1234567') @@ -75,18 +136,18 @@ def test_initiate_dissolution_process_zero_allowed(app, session): config.val = '0' config.save() - initiate_dissolution_process(app) + stage_1_process(app) batches = Batch.find_by(batch_type=Batch.BatchType.INVOLUNTARY_DISSOLUTION) assert len(batches) == 0 -def test_initiate_dissolution_process(app, session): +def test_stage_1_process(app, session): """Assert that batch and batch_processing entries are created correctly.""" business_identifiers = ['BC0000001', 'BC0000002', 'BC0000003'] for business_identifier in business_identifiers: factory_business(identifier=business_identifier) - initiate_dissolution_process(app) + stage_1_process(app) batches = Batch.find_by(batch_type=Batch.BatchType.INVOLUNTARY_DISSOLUTION) assert len(batches) == 1 @@ -217,3 +278,46 @@ def test_stage_2_process_update_business(app, session, test_name, status, step): assert batch_processing.trigger_date.date() == datetime.utcnow().date() + datedelta(days=30) else: assert batch_processing.trigger_date == TRIGGER_DATE + +@pytest.mark.parametrize( + 'test_name, status, step', [ + ( + 'DISSOLVE_BUSINESS', + BatchProcessing.BatchProcessingStatus.COMPLETED, + BatchProcessing.BatchProcessingStep.DISSOLUTION + ), + ( + 'MOVE_BACK_TO_GOOD_STANDING', + BatchProcessing.BatchProcessingStatus.WITHDRAWN, + BatchProcessing.BatchProcessingStep.WARNING_LEVEL_2 + ), + ] +) +@pytest.mark.asyncio +async def test_stage_3_process(app, session, test_name, status, step): + """Assert that businesses are processed correctly.""" + business = factory_business(identifier='BC1234567') + batch = factory_batch(status=Batch.BatchStatus.PROCESSING) + batch_processing = factory_batch_processing( + batch_id=batch.id, + business_id=business.id, + identifier=business.identifier, + step=BatchProcessing.BatchProcessingStep.WARNING_LEVEL_2, + created_date=CREATED_DATE, + trigger_date=TRIGGER_DATE + ) + + if test_name == 'MOVE_BACK_TO_GOOD_STANDING': + business.last_ar_date = datetime.utcnow() + business.save() + + with patch('involuntary_dissolutions.put_filing_on_queue') as mock_put_filing_on_queue: + qsm = MagicMock() + await stage_3_process(app, qsm) + if test_name == 'DISSOLVE_BUSINESS': + mock_put_filing_on_queue.assert_called() + + assert batch_processing.status == status + assert batch_processing.step == step + + assert batch.status == Batch.BatchStatus.COMPLETED