diff --git a/servicex_app/migrations/versions/1.5.6_.py b/servicex_app/migrations/versions/1.5.6_.py new file mode 100644 index 00000000..91f53fae --- /dev/null +++ b/servicex_app/migrations/versions/1.5.6_.py @@ -0,0 +1,41 @@ +"""empty message + +Revision ID: 1.5.6 +Revises: v1_5_5 +Create Date: 2024-11-23 17:30:18.079736 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = 'v1_5_6' +down_revision = 'v1_5_5' +branch_labels = None +depends_on = None + +""" +Clean up indexes and constraints in the database +""" +def upgrade(): + op.drop_index('ix_dataset_id', table_name='files') + op.drop_constraint('files_dataset_id_fkey', 'files', type_='foreignkey') + op.create_foreign_key(None, 'files', 'datasets', ['dataset_id'], ['id']) + op.alter_column('requests', 'files', + existing_type=sa.INTEGER(), + nullable=False) + op.drop_index('ix_transform_result_request_id', table_name='transform_result') + op.drop_index('ix_transform_result_transform_status', table_name='transform_result') + op.create_index(op.f('ix_users_sub'), 'users', ['sub'], unique=True) + + +def downgrade(): + op.drop_index(op.f('ix_users_sub'), table_name='users') + op.create_index('ix_transform_result_transform_status', 'transform_result', ['transform_status'], unique=False) + op.create_index('ix_transform_result_request_id', 'transform_result', ['request_id'], unique=False) + op.alter_column('requests', 'files', + existing_type=sa.INTEGER(), + nullable=True) + op.drop_constraint(None, 'files', type_='foreignkey') + op.create_foreign_key('files_dataset_id_fkey', 'files', 'datasets', ['dataset_id'], ['id'], ondelete='CASCADE') + op.create_index('ix_dataset_id', 'files', ['dataset_id'], unique=False) diff --git a/servicex_app/migrations/versions/v1_6_0.py b/servicex_app/migrations/versions/v1_6_0.py deleted file mode 100644 index f32aaa65..00000000 --- a/servicex_app/migrations/versions/v1_6_0.py +++ /dev/null @@ -1,26 +0,0 @@ -""" -Mark dataset rows as stale - -Revision ID: v1_6_0 -Revises: v1_5_5 -Create Date: 2024-11-14 18:19:00.000000 -""" -from alembic import op -import sqlalchemy as sa -from sqlalchemy.dialects import postgresql - -# revision identifiers, used by Alembic. -revision = 'v1_6_0' -down_revision = 'v1_5_5' -branch_labels = None -depends_on = None - - -def upgrade(): - op.add_column('requests', sa.Column('archived', - sa.Boolean(), - nullable=False, - server_default='false')) - -def downgrade(): - op.drop_column('requests', 'archived') diff --git a/servicex_app/servicex_app/decorators.py b/servicex_app/servicex_app/decorators.py index 43e7149d..b8fe361b 100644 --- a/servicex_app/servicex_app/decorators.py +++ b/servicex_app/servicex_app/decorators.py @@ -7,13 +7,14 @@ verify_jwt_in_request) from flask_jwt_extended.exceptions import NoAuthorizationError -from servicex_app.models import UserModel +from servicex_app.models import UserModel, db @jwt_required() def get_jwt_user(): jwt_val = get_jwt_identity() user = UserModel.find_by_sub(jwt_val) + return user @@ -53,17 +54,22 @@ def inner(*args, **kwargs) -> Response: except NoAuthorizationError as exc: assert "NoAuthorizationError" return make_response({'message': str(exc)}, 401) - user = get_jwt_user() - if not user: - msg = 'Not Authorized: No user found matching this API token. ' \ - 'Your account may have been deleted. ' \ - 'Please visit the ServiceX website to obtain a new API token.' - return make_response({'message': msg}, 401) - elif user.pending: - msg = 'Not Authorized: Your account is still pending. ' \ - 'An administrator should approve it shortly. If not, ' \ - 'please contact the ServiceX admins via email or Slack.' - return make_response({'message': msg}, 401) + + # Explicitly start a transaction here to avoid unexpected in_transaction() b + # in the method wrapped by this decorator. + with db.session.begin(): + user = get_jwt_user() + + if not user: + msg = 'Not Authorized: No user found matching this API token. ' \ + 'Your account may have been deleted. ' \ + 'Please visit the ServiceX website to obtain a new API token.' + return make_response({'message': msg}, 401) + elif user.pending: + msg = 'Not Authorized: Your account is still pending. ' \ + 'An administrator should approve it shortly. If not, ' \ + 'please contact the ServiceX admins via email or Slack.' + return make_response({'message': msg}, 401) return fn(*args, **kwargs) diff --git a/servicex_app/servicex_app/resources/internal/transformer_file_complete.py b/servicex_app/servicex_app/resources/internal/transformer_file_complete.py index 64cb2896..12b67a8b 100644 --- a/servicex_app/servicex_app/resources/internal/transformer_file_complete.py +++ b/servicex_app/servicex_app/resources/internal/transformer_file_complete.py @@ -26,23 +26,59 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import logging +import time from datetime import datetime, timezone +from functools import wraps from logging import Logger from flask import request, current_app +from sqlalchemy.orm import Session from tenacity import retry, stop_after_attempt, wait_exponential_jitter, \ before_sleep_log, after_log from servicex_app import TransformerManager from servicex_app.models import TransformRequest, TransformationResult, db, TransformStatus from servicex_app.resources.servicex_resource import ServiceXResource -import time + + +def file_complete_ops_retry(func): + """ + A decorator that applies a standard retry mechanism for file complete operations. + + This decorator wraps the target function with a retry mechanism using the tenacity library. + It attempts to execute the function up to 3 times, with exponential backoff and jitter + between retries. The decorator also logs retry attempts and results. + + Note: + This decorator is designed to work both within and outside of a Flask application context. + It dynamically evaluates the logger at runtime to support usage in various environments, + including unit tests. + """ + + @wraps(func) + def wrapper(*args, **kwargs): + + try: + from flask import current_app + logger = current_app.logger + except RuntimeError: + logger = logging.getLogger(__name__) + + return retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=0.1, max=30), + before_sleep=before_sleep_log(logger, logging.INFO), + after=after_log(logger, logging.INFO) + )(func)(*args, **kwargs) + + return wrapper class TransformerFileComplete(ServiceXResource): @classmethod - def make_api(cls, transformer_manager): + def make_api(cls, transformer_manager, logger=None): cls.transformer_manager = transformer_manager + cls.logger = logger or logging.getLogger(__name__) return cls def put(self, request_id): @@ -50,16 +86,24 @@ def put(self, request_id): info = request.get_json() logger = current_app.logger logger.info("FileComplete", extra={'requestId': request_id, 'metric': info}) - transform_req = self.record_file_complete(current_app.logger, request_id, info) + + session = db.session + + # Lookup the transformation request and increment either the successful or failed file count + transform_req = self.record_file_complete(session, current_app.logger, request_id, info) if transform_req is None: return "Request not found", 404 - self.save_transform_result(transform_req, info) + # Add the transformation result to the database + files_remaining = self.save_transform_result(transform_req, info, session) + + # Now we can see if we are done with the transformation and + # can shut down the transformers. Files remaining is None if + # we are still waiting for final results from the DID finder - files_remaining = transform_req.files_remaining if files_remaining is not None and files_remaining == 0: - self.transform_complete(current_app.logger, transform_req, self.transformer_manager) + self.transform_complete(session, current_app.logger, transform_req, self.transformer_manager) current_app.logger.info("FileComplete. Request state.", extra={ 'requestId': request_id, @@ -71,57 +115,55 @@ def put(self, request_id): return "Ok" @staticmethod - @retry(stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=0.1, max=30), - before_sleep=before_sleep_log(current_app.logger, logging.INFO), - after=after_log(current_app.logger, logging.INFO), - ) - def record_file_complete(logger: Logger, request_id: str, info: dict[str, str]) -> TransformRequest | None: - transform_req = TransformRequest.lookup(request_id) - if transform_req is None: - msg = f"Request not found with id: '{request_id}'" - logger.error(msg, extra={'requestId': request_id}) - return None + @file_complete_ops_retry + def record_file_complete(session: Session, logger: Logger, request_id: str, + info: dict[str, str]) -> TransformRequest | None: - if info['status'] == 'success': - TransformRequest.file_transformed_successfully(request_id) - else: - TransformRequest.file_transformed_unsuccessfully(request_id) + with session.begin(): + # Lock the row for update + transform_req = session.query(TransformRequest).filter_by( + request_id=request_id).with_for_update().one_or_none() + + if transform_req is None: + msg = f"Request not found with id: '{request_id}'" + logger.error(msg, extra={'requestId': request_id}) + return None + + if info['status'] == 'success': + transform_req.files_completed += 1 + else: + transform_req.files_failed += 1 + + session.flush() # Flush the changes to the database return transform_req @staticmethod - @retry(stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=0.1, max=30), - before_sleep=before_sleep_log(current_app.logger, logging.INFO), - after=after_log(current_app.logger, logging.INFO), - ) - def save_transform_result(transform_req: TransformRequest, info: dict[str, str]): - rec = TransformationResult( - file_id=info['file-id'], - request_id=transform_req.request_id, - file_path=info['file-path'], - transform_status=info['status'], - transform_time=info['total-time'], - total_bytes=info['total-bytes'], - total_events=info['total-events'], - avg_rate=info['avg-rate'] - ) - rec.save_to_db() - db.session.commit() + @file_complete_ops_retry + def save_transform_result(transform_req: TransformRequest, info: dict[str, str], session: Session): + with session.begin(): + rec = TransformationResult( + file_id=info['file-id'], + request_id=transform_req.request_id, + file_path=info['file-path'], + transform_status=info['status'], + transform_time=info['total-time'], + total_bytes=info['total-bytes'], + total_events=info['total-events'], + avg_rate=info['avg-rate'] + ) + session.add(rec) + return transform_req.files_remaining @staticmethod - @retry(stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=0.1, max=30), - before_sleep=before_sleep_log(current_app.logger, logging.INFO), - after=after_log(current_app.logger, logging.INFO), - ) - def transform_complete(logger: Logger, transform_req: TransformRequest, + @file_complete_ops_retry + def transform_complete(session: Session, logger: Logger, transform_req: TransformRequest, transformer_manager: TransformerManager): - transform_req.status = TransformStatus.complete - transform_req.finish_time = datetime.now(tz=timezone.utc) - transform_req.save_to_db() - db.session.commit() + with session.begin(): + transform_req.status = TransformStatus.complete + transform_req.finish_time = datetime.now(tz=timezone.utc) + session.add(transform_req) + logger.info("Request completed. Shutting down transformers", extra={'requestId': transform_req.request_id}) namespace = current_app.config['TRANSFORMER_NAMESPACE'] diff --git a/servicex_app/servicex_app/web/dashboard.py b/servicex_app/servicex_app/web/dashboard.py index 45f6b469..9f4821e8 100644 --- a/servicex_app/servicex_app/web/dashboard.py +++ b/servicex_app/servicex_app/web/dashboard.py @@ -32,7 +32,7 @@ def dashboard(template_name: str, user_specific=False): args = parser.parse_args() sort, order = args["sort"], args["order"] - query = TransformRequest.query.filter_by(archived=False) + query = TransformRequest.query.filter_by() if user_specific: query = query.filter_by(submitted_by=session["user_id"]) diff --git a/servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py b/servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py index 4ea48d1b..89fc0053 100644 --- a/servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py +++ b/servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py @@ -25,17 +25,17 @@ # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -from unittest.mock import patch, PropertyMock import psycopg2 import pytest -from servicex_app.models import DatasetFile, TransformationResult, TransformRequest +from servicex_app.models import TransformationResult, TransformRequest, TransformStatus from servicex_app.transformer_manager import TransformerManager from servicex_app_test.resource_test_base import ResourceTestBase class TestTransformFileComplete(ResourceTestBase): + module = "servicex_app.resources.internal.transformer_file_complete" @pytest.fixture def mock_transformer_manager(self, mocker): @@ -44,33 +44,24 @@ def mock_transformer_manager(self, mocker): return manager @pytest.fixture - def fake_transform_request(self): - return self._generate_transform_request() - - @pytest.fixture - def mock_transform_request_lookup(self, mocker, fake_transform_request): - with patch('servicex_app.models.TransformRequest.lookup') as mock: - mock.return_value = fake_transform_request - yield mock + def db_session(self, mocker): + db = mocker.patch(f"{self.module}.db") + db.session = mocker.Mock() + db.session.begin = mocker.MagicMock() + return db.session @pytest.fixture - def mock_files_remaining(self, mocker): - with patch('servicex_app.models.TransformRequest.files_remaining', - new_callable=PropertyMock) as mock_remaining: - mock_remaining.return_value = 1 - yield mock_remaining - - @pytest.fixture - def mock_file_transformed_successfully(self, mocker): - return mocker.patch.object( - TransformRequest, - "file_transformed_successfully") + def fake_transform_request(self): + fake_request = self._generate_transform_request() + fake_request.request_id = '1234' + fake_request.status = TransformStatus.running + fake_request.files = 10 + return fake_request @pytest.fixture - def mock_file_transformed_unsuccessfully(self, mocker): - return mocker.patch.object( - TransformRequest, - "file_transformed_unsuccessfully") + def mock_transform_request_lookup(self, mocker, db_session, fake_transform_request): + db_session.query.return_value.filter_by.return_value.with_for_update.return_value.one_or_none.return_value = fake_transform_request + return db_session.query.return_value.filter_by.return_value.with_for_update.return_value.one_or_none @pytest.fixture def test_client(self, mock_transformer_manager): @@ -91,85 +82,93 @@ def file_complete_response(self): def test_put_transform_file_complete_files_remaining(self, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): + fake_transform_request.files_completed = 0 + fake_transform_request.files_failed = 2 response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 200 assert fake_transform_request.finish_time is None - mock_transform_request_lookup.assert_called_with('1234') - mock_file_transformed_successfully.assert_called_with("1234") - mock_files_remaining.assert_called() + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') + assert fake_transform_request.files_completed == 1 + assert fake_transform_request.files_failed == 2 mock_transformer_manager.shutdown_transformer_job.assert_not_called() + db_session.add.assert_called_once() + assert db_session.add.call_args[0][0].file_id == 42 def test_put_transform_file_complete_unknown_files_remaining(self, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): - mock_files_remaining.return_value = None + fake_transform_request.files = None + fake_transform_request.files_completed = 0 + fake_transform_request.files_failed = 2 response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 200 assert fake_transform_request.finish_time is None - mock_transform_request_lookup.assert_called_with('1234') - mock_file_transformed_successfully.assert_called_with("1234") - mock_files_remaining.assert_called() - + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') + assert fake_transform_request.files_completed == 1 + assert fake_transform_request.files_failed == 2 mock_transformer_manager.shutdown_transformer_job.assert_not_called() + db_session.add.assert_called_once() + assert db_session.add.call_args[0][0].file_id == 42 - def test_put_transform_file_complete_no_files_remaining(self, mocker, + def test_put_transform_file_complete_no_files_remaining(self, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): + fake_transform_request.files_completed = 7 + fake_transform_request.files_failed = 2 - mock_files_remaining.return_value = 0 - - mocker.patch.object(DatasetFile, "get_by_id") - mocker.patch.object(TransformationResult, "save_to_db") - mocker.patch.object(TransformRequest, "save_to_db") - - response = test_client.put('/servicex/internal/transformation/BR549/file-complete', + response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 200 - assert fake_transform_request.finish_time is not None - mock_transform_request_lookup.assert_called_with('BR549') - mock_file_transformed_successfully.assert_called_with("BR549") - mock_files_remaining.assert_called() - mock_transformer_manager.shutdown_transformer_job.assert_called_with('BR549', + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') + assert fake_transform_request.files_completed == 8 + assert fake_transform_request.files_failed == 2 + + assert db_session.add.call_count == 2 + assert isinstance(db_session.add.mock_calls[0][1][0], TransformationResult) + assert db_session.add.mock_calls[0][1][0].file_id == 42 + + updated_transform = db_session.add.mock_calls[1][1][0] + assert isinstance(updated_transform, TransformRequest) + assert updated_transform.status == TransformStatus.complete + assert updated_transform.finish_time is not None + mock_transformer_manager.shutdown_transformer_job.assert_called_with('1234', 'my-ws') def test_put_transform_file_complete_unknown_request_id(self, mock_transformer_manager, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): mock_transform_request_lookup.return_value = None - response = test_client.put('/servicex/internal/transformation/BR549/file-complete', + response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 404 - mock_transform_request_lookup.assert_called_with('BR549') + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') mock_transformer_manager.shutdown_transformer_job.assert_not_called() def test_database_error_request_read(self, mocker, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, @@ -179,30 +178,29 @@ def test_database_error_request_read(self, mocker, fake_transform_request ] - response = test_client.put('/servicex/internal/transformation/BR549/file-complete', + response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 200 assert fake_transform_request.finish_time is None # Verify that we retried after the database error assert mock_transform_request_lookup.call_count == 2 - mock_transform_request_lookup.assert_called_with('BR549') + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') - mock_files_remaining.assert_called() mock_transformer_manager.shutdown_transformer_job.assert_not_called() def test_database_error_request_update(self, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): - mock_file_transformed_successfully.side_effect = [ - psycopg2.OperationalError('server closed the connection unexpectedly'), - None] + db_session.flush.side_effect = [ + psycopg2.OperationalError('server closed the connection unexpectedly'), + fake_transform_request + ] response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) @@ -210,48 +208,48 @@ def test_database_error_request_update(self, assert fake_transform_request.finish_time is None # Verify that we retried after the database error - assert mock_transform_request_lookup.call_count == 2 - mock_transform_request_lookup.assert_called_with('1234') + assert db_session.flush.call_count == 2 + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') - mock_files_remaining.assert_called() mock_transformer_manager.shutdown_transformer_job.assert_not_called() - def test_database_error_transform_result_save(self, mocker, + def test_database_error_transform_result_save(self, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): - mock_save_db = mocker.patch.object(TransformationResult, "save_to_db", - side_effect=[ - psycopg2.OperationalError('server closed the connection unexpectedly'), - None - ]) + db_session.add.side_effect = [ + psycopg2.OperationalError('server closed the connection unexpectedly'), + fake_transform_request + ] + response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 200 - assert mock_save_db.call_count == 2 - def test_database_error_transform_complete(self, mocker, + def test_database_error_transform_complete(self, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): # Trigger the fileset complete by setting the files_remaining to 0 - mock_files_remaining.return_value = 0 + fake_transform_request.files_completed = 9 + + db_session.add.side_effect = [ + fake_transform_request, + psycopg2.OperationalError('server closed the connection unexpectedly'), + fake_transform_request + ] - fake_transform_request.save_to_db = mocker.Mock(side_effect=[ - psycopg2.OperationalError('server closed the connection unexpectedly'), - None - ]) response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 200 - assert fake_transform_request.save_to_db.call_count == 2 + # add called once for the transform result and then once for the updated transform request + # once with a failure and once successfully + assert db_session.add.call_count == 3