From 1b6e36918a8528e632f08f917b6b605a760b4ffb Mon Sep 17 00:00:00 2001 From: Ben Galewsky Date: Mon, 25 Nov 2024 15:18:35 -0600 Subject: [PATCH 1/4] Clean up transaction management for file_complete handler --- .../internal/transformer_file_complete.py | 140 ++++++++++----- .../internal/test_transform_file_complete.py | 168 +++++++++--------- 2 files changed, 174 insertions(+), 134 deletions(-) diff --git a/servicex_app/servicex_app/resources/internal/transformer_file_complete.py b/servicex_app/servicex_app/resources/internal/transformer_file_complete.py index 64cb2896..12b67a8b 100644 --- a/servicex_app/servicex_app/resources/internal/transformer_file_complete.py +++ b/servicex_app/servicex_app/resources/internal/transformer_file_complete.py @@ -26,23 +26,59 @@ # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. import logging +import time from datetime import datetime, timezone +from functools import wraps from logging import Logger from flask import request, current_app +from sqlalchemy.orm import Session from tenacity import retry, stop_after_attempt, wait_exponential_jitter, \ before_sleep_log, after_log from servicex_app import TransformerManager from servicex_app.models import TransformRequest, TransformationResult, db, TransformStatus from servicex_app.resources.servicex_resource import ServiceXResource -import time + + +def file_complete_ops_retry(func): + """ + A decorator that applies a standard retry mechanism for file complete operations. + + This decorator wraps the target function with a retry mechanism using the tenacity library. + It attempts to execute the function up to 3 times, with exponential backoff and jitter + between retries. The decorator also logs retry attempts and results. + + Note: + This decorator is designed to work both within and outside of a Flask application context. + It dynamically evaluates the logger at runtime to support usage in various environments, + including unit tests. + """ + + @wraps(func) + def wrapper(*args, **kwargs): + + try: + from flask import current_app + logger = current_app.logger + except RuntimeError: + logger = logging.getLogger(__name__) + + return retry( + stop=stop_after_attempt(3), + wait=wait_exponential_jitter(initial=0.1, max=30), + before_sleep=before_sleep_log(logger, logging.INFO), + after=after_log(logger, logging.INFO) + )(func)(*args, **kwargs) + + return wrapper class TransformerFileComplete(ServiceXResource): @classmethod - def make_api(cls, transformer_manager): + def make_api(cls, transformer_manager, logger=None): cls.transformer_manager = transformer_manager + cls.logger = logger or logging.getLogger(__name__) return cls def put(self, request_id): @@ -50,16 +86,24 @@ def put(self, request_id): info = request.get_json() logger = current_app.logger logger.info("FileComplete", extra={'requestId': request_id, 'metric': info}) - transform_req = self.record_file_complete(current_app.logger, request_id, info) + + session = db.session + + # Lookup the transformation request and increment either the successful or failed file count + transform_req = self.record_file_complete(session, current_app.logger, request_id, info) if transform_req is None: return "Request not found", 404 - self.save_transform_result(transform_req, info) + # Add the transformation result to the database + files_remaining = self.save_transform_result(transform_req, info, session) + + # Now we can see if we are done with the transformation and + # can shut down the transformers. Files remaining is None if + # we are still waiting for final results from the DID finder - files_remaining = transform_req.files_remaining if files_remaining is not None and files_remaining == 0: - self.transform_complete(current_app.logger, transform_req, self.transformer_manager) + self.transform_complete(session, current_app.logger, transform_req, self.transformer_manager) current_app.logger.info("FileComplete. Request state.", extra={ 'requestId': request_id, @@ -71,57 +115,55 @@ def put(self, request_id): return "Ok" @staticmethod - @retry(stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=0.1, max=30), - before_sleep=before_sleep_log(current_app.logger, logging.INFO), - after=after_log(current_app.logger, logging.INFO), - ) - def record_file_complete(logger: Logger, request_id: str, info: dict[str, str]) -> TransformRequest | None: - transform_req = TransformRequest.lookup(request_id) - if transform_req is None: - msg = f"Request not found with id: '{request_id}'" - logger.error(msg, extra={'requestId': request_id}) - return None + @file_complete_ops_retry + def record_file_complete(session: Session, logger: Logger, request_id: str, + info: dict[str, str]) -> TransformRequest | None: - if info['status'] == 'success': - TransformRequest.file_transformed_successfully(request_id) - else: - TransformRequest.file_transformed_unsuccessfully(request_id) + with session.begin(): + # Lock the row for update + transform_req = session.query(TransformRequest).filter_by( + request_id=request_id).with_for_update().one_or_none() + + if transform_req is None: + msg = f"Request not found with id: '{request_id}'" + logger.error(msg, extra={'requestId': request_id}) + return None + + if info['status'] == 'success': + transform_req.files_completed += 1 + else: + transform_req.files_failed += 1 + + session.flush() # Flush the changes to the database return transform_req @staticmethod - @retry(stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=0.1, max=30), - before_sleep=before_sleep_log(current_app.logger, logging.INFO), - after=after_log(current_app.logger, logging.INFO), - ) - def save_transform_result(transform_req: TransformRequest, info: dict[str, str]): - rec = TransformationResult( - file_id=info['file-id'], - request_id=transform_req.request_id, - file_path=info['file-path'], - transform_status=info['status'], - transform_time=info['total-time'], - total_bytes=info['total-bytes'], - total_events=info['total-events'], - avg_rate=info['avg-rate'] - ) - rec.save_to_db() - db.session.commit() + @file_complete_ops_retry + def save_transform_result(transform_req: TransformRequest, info: dict[str, str], session: Session): + with session.begin(): + rec = TransformationResult( + file_id=info['file-id'], + request_id=transform_req.request_id, + file_path=info['file-path'], + transform_status=info['status'], + transform_time=info['total-time'], + total_bytes=info['total-bytes'], + total_events=info['total-events'], + avg_rate=info['avg-rate'] + ) + session.add(rec) + return transform_req.files_remaining @staticmethod - @retry(stop=stop_after_attempt(3), - wait=wait_exponential_jitter(initial=0.1, max=30), - before_sleep=before_sleep_log(current_app.logger, logging.INFO), - after=after_log(current_app.logger, logging.INFO), - ) - def transform_complete(logger: Logger, transform_req: TransformRequest, + @file_complete_ops_retry + def transform_complete(session: Session, logger: Logger, transform_req: TransformRequest, transformer_manager: TransformerManager): - transform_req.status = TransformStatus.complete - transform_req.finish_time = datetime.now(tz=timezone.utc) - transform_req.save_to_db() - db.session.commit() + with session.begin(): + transform_req.status = TransformStatus.complete + transform_req.finish_time = datetime.now(tz=timezone.utc) + session.add(transform_req) + logger.info("Request completed. Shutting down transformers", extra={'requestId': transform_req.request_id}) namespace = current_app.config['TRANSFORMER_NAMESPACE'] diff --git a/servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py b/servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py index 4ea48d1b..89fc0053 100644 --- a/servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py +++ b/servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py @@ -25,17 +25,17 @@ # CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, # OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -from unittest.mock import patch, PropertyMock import psycopg2 import pytest -from servicex_app.models import DatasetFile, TransformationResult, TransformRequest +from servicex_app.models import TransformationResult, TransformRequest, TransformStatus from servicex_app.transformer_manager import TransformerManager from servicex_app_test.resource_test_base import ResourceTestBase class TestTransformFileComplete(ResourceTestBase): + module = "servicex_app.resources.internal.transformer_file_complete" @pytest.fixture def mock_transformer_manager(self, mocker): @@ -44,33 +44,24 @@ def mock_transformer_manager(self, mocker): return manager @pytest.fixture - def fake_transform_request(self): - return self._generate_transform_request() - - @pytest.fixture - def mock_transform_request_lookup(self, mocker, fake_transform_request): - with patch('servicex_app.models.TransformRequest.lookup') as mock: - mock.return_value = fake_transform_request - yield mock + def db_session(self, mocker): + db = mocker.patch(f"{self.module}.db") + db.session = mocker.Mock() + db.session.begin = mocker.MagicMock() + return db.session @pytest.fixture - def mock_files_remaining(self, mocker): - with patch('servicex_app.models.TransformRequest.files_remaining', - new_callable=PropertyMock) as mock_remaining: - mock_remaining.return_value = 1 - yield mock_remaining - - @pytest.fixture - def mock_file_transformed_successfully(self, mocker): - return mocker.patch.object( - TransformRequest, - "file_transformed_successfully") + def fake_transform_request(self): + fake_request = self._generate_transform_request() + fake_request.request_id = '1234' + fake_request.status = TransformStatus.running + fake_request.files = 10 + return fake_request @pytest.fixture - def mock_file_transformed_unsuccessfully(self, mocker): - return mocker.patch.object( - TransformRequest, - "file_transformed_unsuccessfully") + def mock_transform_request_lookup(self, mocker, db_session, fake_transform_request): + db_session.query.return_value.filter_by.return_value.with_for_update.return_value.one_or_none.return_value = fake_transform_request + return db_session.query.return_value.filter_by.return_value.with_for_update.return_value.one_or_none @pytest.fixture def test_client(self, mock_transformer_manager): @@ -91,85 +82,93 @@ def file_complete_response(self): def test_put_transform_file_complete_files_remaining(self, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): + fake_transform_request.files_completed = 0 + fake_transform_request.files_failed = 2 response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 200 assert fake_transform_request.finish_time is None - mock_transform_request_lookup.assert_called_with('1234') - mock_file_transformed_successfully.assert_called_with("1234") - mock_files_remaining.assert_called() + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') + assert fake_transform_request.files_completed == 1 + assert fake_transform_request.files_failed == 2 mock_transformer_manager.shutdown_transformer_job.assert_not_called() + db_session.add.assert_called_once() + assert db_session.add.call_args[0][0].file_id == 42 def test_put_transform_file_complete_unknown_files_remaining(self, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): - mock_files_remaining.return_value = None + fake_transform_request.files = None + fake_transform_request.files_completed = 0 + fake_transform_request.files_failed = 2 response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 200 assert fake_transform_request.finish_time is None - mock_transform_request_lookup.assert_called_with('1234') - mock_file_transformed_successfully.assert_called_with("1234") - mock_files_remaining.assert_called() - + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') + assert fake_transform_request.files_completed == 1 + assert fake_transform_request.files_failed == 2 mock_transformer_manager.shutdown_transformer_job.assert_not_called() + db_session.add.assert_called_once() + assert db_session.add.call_args[0][0].file_id == 42 - def test_put_transform_file_complete_no_files_remaining(self, mocker, + def test_put_transform_file_complete_no_files_remaining(self, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): + fake_transform_request.files_completed = 7 + fake_transform_request.files_failed = 2 - mock_files_remaining.return_value = 0 - - mocker.patch.object(DatasetFile, "get_by_id") - mocker.patch.object(TransformationResult, "save_to_db") - mocker.patch.object(TransformRequest, "save_to_db") - - response = test_client.put('/servicex/internal/transformation/BR549/file-complete', + response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 200 - assert fake_transform_request.finish_time is not None - mock_transform_request_lookup.assert_called_with('BR549') - mock_file_transformed_successfully.assert_called_with("BR549") - mock_files_remaining.assert_called() - mock_transformer_manager.shutdown_transformer_job.assert_called_with('BR549', + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') + assert fake_transform_request.files_completed == 8 + assert fake_transform_request.files_failed == 2 + + assert db_session.add.call_count == 2 + assert isinstance(db_session.add.mock_calls[0][1][0], TransformationResult) + assert db_session.add.mock_calls[0][1][0].file_id == 42 + + updated_transform = db_session.add.mock_calls[1][1][0] + assert isinstance(updated_transform, TransformRequest) + assert updated_transform.status == TransformStatus.complete + assert updated_transform.finish_time is not None + mock_transformer_manager.shutdown_transformer_job.assert_called_with('1234', 'my-ws') def test_put_transform_file_complete_unknown_request_id(self, mock_transformer_manager, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): mock_transform_request_lookup.return_value = None - response = test_client.put('/servicex/internal/transformation/BR549/file-complete', + response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 404 - mock_transform_request_lookup.assert_called_with('BR549') + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') mock_transformer_manager.shutdown_transformer_job.assert_not_called() def test_database_error_request_read(self, mocker, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, @@ -179,30 +178,29 @@ def test_database_error_request_read(self, mocker, fake_transform_request ] - response = test_client.put('/servicex/internal/transformation/BR549/file-complete', + response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 200 assert fake_transform_request.finish_time is None # Verify that we retried after the database error assert mock_transform_request_lookup.call_count == 2 - mock_transform_request_lookup.assert_called_with('BR549') + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') - mock_files_remaining.assert_called() mock_transformer_manager.shutdown_transformer_job.assert_not_called() def test_database_error_request_update(self, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): - mock_file_transformed_successfully.side_effect = [ - psycopg2.OperationalError('server closed the connection unexpectedly'), - None] + db_session.flush.side_effect = [ + psycopg2.OperationalError('server closed the connection unexpectedly'), + fake_transform_request + ] response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) @@ -210,48 +208,48 @@ def test_database_error_request_update(self, assert fake_transform_request.finish_time is None # Verify that we retried after the database error - assert mock_transform_request_lookup.call_count == 2 - mock_transform_request_lookup.assert_called_with('1234') + assert db_session.flush.call_count == 2 + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') - mock_files_remaining.assert_called() mock_transformer_manager.shutdown_transformer_job.assert_not_called() - def test_database_error_transform_result_save(self, mocker, + def test_database_error_transform_result_save(self, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): - mock_save_db = mocker.patch.object(TransformationResult, "save_to_db", - side_effect=[ - psycopg2.OperationalError('server closed the connection unexpectedly'), - None - ]) + db_session.add.side_effect = [ + psycopg2.OperationalError('server closed the connection unexpectedly'), + fake_transform_request + ] + response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 200 - assert mock_save_db.call_count == 2 - def test_database_error_transform_complete(self, mocker, + def test_database_error_transform_complete(self, mock_transformer_manager, - mock_files_remaining, - mock_file_transformed_successfully, + db_session, mock_transform_request_lookup, fake_transform_request, file_complete_response, test_client): # Trigger the fileset complete by setting the files_remaining to 0 - mock_files_remaining.return_value = 0 + fake_transform_request.files_completed = 9 + + db_session.add.side_effect = [ + fake_transform_request, + psycopg2.OperationalError('server closed the connection unexpectedly'), + fake_transform_request + ] - fake_transform_request.save_to_db = mocker.Mock(side_effect=[ - psycopg2.OperationalError('server closed the connection unexpectedly'), - None - ]) response = test_client.put('/servicex/internal/transformation/1234/file-complete', json=file_complete_response) assert response.status_code == 200 - assert fake_transform_request.save_to_db.call_count == 2 + # add called once for the transform result and then once for the updated transform request + # once with a failure and once successfully + assert db_session.add.call_count == 3 From 4a4ca9abdaf4abf2ace194c22ed24b85874ded51 Mon Sep 17 00:00:00 2001 From: Ben Galewsky Date: Tue, 10 Dec 2024 16:17:59 -0600 Subject: [PATCH 2/4] Remove unused methods on TransformRequest --- servicex_app/servicex_app/models.py | 18 ------------------ 1 file changed, 18 deletions(-) diff --git a/servicex_app/servicex_app/models.py b/servicex_app/servicex_app/models.py index 7b5e1cb6..d51d5a3a 100644 --- a/servicex_app/servicex_app/models.py +++ b/servicex_app/servicex_app/models.py @@ -279,24 +279,6 @@ def lookup_pending_on_dataset(cls, dataset_id: int) -> list[TransformRequest]: except NoResultFound: return [] - @classmethod - def file_transformed_successfully(cls, key: Union[str, int]) -> None: - req = cls.query.filter_by(request_id=key).one() - req.files_completed = cls.files_completed + 1 - db.session.commit() - - @classmethod - def file_transformed_unsuccessfully(cls, key: Union[str, int]) -> None: - req = cls.query.filter_by(request_id=key).one() - req.files_failed = cls.files_failed + 1 - db.session.commit() - - @classmethod - def add_a_file(cls, key) -> None: - req = cls.query.filter_by(request_id=key).one() - req.files = TransformRequest.files + 1 - db.session.commit() - @property def age(self) -> timedelta: return datetime.utcnow() - self.submit_time From 0e4d51def08baee2559b293d82ead8475530d111 Mon Sep 17 00:00:00 2001 From: Ben Galewsky Date: Tue, 14 Jan 2025 14:42:56 -0600 Subject: [PATCH 3/4] Allow for possible duplicate file_complete reports for any given file. The celery transaction model settings to guarantee we never lose a file could result in duplicate file reports. Handle this by adding a unique key to the transformation_result table. Attempt to insert the record and just report a warning if that fails, but don't increment the file counter. --- servicex_app/migrations/versions/1.5.7.py | 24 ++++ servicex_app/servicex_app/models.py | 4 + .../internal/transformer_file_complete.py | 103 +++++++++++------- .../internal/test_transform_file_complete.py | 41 ++++++- .../src/transformer_sidecar/transformer.py | 58 +++++----- transformer_sidecar/tests/test_transformer.py | 2 +- 6 files changed, 162 insertions(+), 70 deletions(-) create mode 100644 servicex_app/migrations/versions/1.5.7.py diff --git a/servicex_app/migrations/versions/1.5.7.py b/servicex_app/migrations/versions/1.5.7.py new file mode 100644 index 00000000..21b42bc3 --- /dev/null +++ b/servicex_app/migrations/versions/1.5.7.py @@ -0,0 +1,24 @@ +"""Add unique constraint to TransformResult on request_id and file_id + +Revision ID: 1.5.7 +Revises: v1_5_6 +Create Date: 2025-01-13 + +""" +from alembic import op +import sqlalchemy as sa + +# revision identifiers, used by Alembic. +revision = 'v1_5_7' +down_revision = 'v1_5_6' +branch_labels = None +depends_on = None + +def upgrade(): + # Add unique constraint + op.create_unique_constraint('uix_file_request', 'transform_result', ['file_id', 'request_id']) + + +def downgrade(): + # Remove unique constraint + op.drop_constraint('uix_file_request', 'transform_result', type_='unique') diff --git a/servicex_app/servicex_app/models.py b/servicex_app/servicex_app/models.py index d51d5a3a..577cc225 100644 --- a/servicex_app/servicex_app/models.py +++ b/servicex_app/servicex_app/models.py @@ -357,6 +357,10 @@ class TransformationResult(db.Model): total_bytes = db.Column(db.BigInteger, nullable=True) avg_rate = db.Column(db.Float, nullable=True) + __table_args__ = ( + db.UniqueConstraint('file_id', 'request_id', name='uix_file_request'), + ) + @classmethod def to_json_list(cls, a_list): return [TransformationResult.to_json(msg) for msg in a_list] diff --git a/servicex_app/servicex_app/resources/internal/transformer_file_complete.py b/servicex_app/servicex_app/resources/internal/transformer_file_complete.py index 12b67a8b..285574e6 100644 --- a/servicex_app/servicex_app/resources/internal/transformer_file_complete.py +++ b/servicex_app/servicex_app/resources/internal/transformer_file_complete.py @@ -31,13 +31,15 @@ from functools import wraps from logging import Logger -from flask import request, current_app +from flask import current_app, request +from sqlalchemy.exc import IntegrityError from sqlalchemy.orm import Session -from tenacity import retry, stop_after_attempt, wait_exponential_jitter, \ - before_sleep_log, after_log +from tenacity import after_log, before_sleep_log, retry, retry_if_not_exception_type, \ + stop_after_attempt, wait_exponential_jitter from servicex_app import TransformerManager -from servicex_app.models import TransformRequest, TransformationResult, db, TransformStatus +from servicex_app.models import TransformRequest, TransformStatus, TransformationResult, \ + db from servicex_app.resources.servicex_resource import ServiceXResource @@ -65,7 +67,8 @@ def wrapper(*args, **kwargs): logger = logging.getLogger(__name__) return retry( - stop=stop_after_attempt(3), + retry=retry_if_not_exception_type(IntegrityError), + stop=stop_after_attempt(5), wait=wait_exponential_jitter(initial=0.1, max=30), before_sleep=before_sleep_log(logger, logging.INFO), after=after_log(logger, logging.INFO) @@ -85,39 +88,59 @@ def put(self, request_id): start_time = time.time() info = request.get_json() logger = current_app.logger - logger.info("FileComplete", extra={'requestId': request_id, 'metric': info}) + log_extra = { + 'requestId': request_id, + 'file-id': info['file-id'] + } - session = db.session - - # Lookup the transformation request and increment either the successful or failed file count - transform_req = self.record_file_complete(session, current_app.logger, request_id, info) - - if transform_req is None: - return "Request not found", 404 - - # Add the transformation result to the database - files_remaining = self.save_transform_result(transform_req, info, session) - - # Now we can see if we are done with the transformation and - # can shut down the transformers. Files remaining is None if - # we are still waiting for final results from the DID finder - - if files_remaining is not None and files_remaining == 0: - self.transform_complete(session, current_app.logger, transform_req, self.transformer_manager) + logger.info("FileComplete", extra={**log_extra, 'metric': info}) + try: + session = db.session + + try: + # Add the transformation result to the database and verify that + # we've not processed this file already + self.save_transform_result(request_id, info, session) + except IntegrityError: + logger.warning("Ignoring duplicate result report", + extra=log_extra) + return "Ignoring duplicate result report", 200 + + # Lookup the transformation request and increment either the successful or failed file count + transform_req = self.record_file_complete(session, + current_app.logger, + request_id, + info, log_extra) - current_app.logger.info("FileComplete. Request state.", extra={ - 'requestId': request_id, - 'files_remaining': transform_req.files_remaining, - 'files_completed': transform_req.files_completed, - 'files_failed': transform_req.files_failed, - 'report_processed_time': (time.time() - start_time) - }) - return "Ok" + if transform_req is None: + logger.error("Request not found", extra=log_extra) + return "Request not found", 404 + + # Now we can see if we are done with the transformation and + # can shut down the transformers. Files remaining is None if + # we are still waiting for final results from the DID finder + with session.begin(): + files_remaining = transform_req.files_remaining + if files_remaining is not None and files_remaining == 0: + self.transform_complete(session, current_app.logger, transform_req, self.transformer_manager) + + current_app.logger.info("FileComplete. Request state.", extra={ + **log_extra, + 'files_remaining': transform_req.files_remaining, + 'files_completed': transform_req.files_completed, + 'files_failed': transform_req.files_failed, + 'report_processed_time': (time.time() - start_time) + }) + return "Ok", 200 + except Exception as e: + logger.exception("Error processing file complete", + extra=log_extra, exc_info=e) + return "Error processing file complete", 500 @staticmethod @file_complete_ops_retry def record_file_complete(session: Session, logger: Logger, request_id: str, - info: dict[str, str]) -> TransformRequest | None: + info: dict[str, str], log_extra: dict[str, str]) -> TransformRequest | None: with session.begin(): # Lock the row for update @@ -126,7 +149,7 @@ def record_file_complete(session: Session, logger: Logger, request_id: str, if transform_req is None: msg = f"Request not found with id: '{request_id}'" - logger.error(msg, extra={'requestId': request_id}) + logger.error(msg, extra=log_extra) return None if info['status'] == 'success': @@ -134,17 +157,15 @@ def record_file_complete(session: Session, logger: Logger, request_id: str, else: transform_req.files_failed += 1 - session.flush() # Flush the changes to the database - return transform_req @staticmethod @file_complete_ops_retry - def save_transform_result(transform_req: TransformRequest, info: dict[str, str], session: Session): + def save_transform_result(request_id: str, info: dict[str, str], session: Session): with session.begin(): rec = TransformationResult( file_id=info['file-id'], - request_id=transform_req.request_id, + request_id=request_id, file_path=info['file-path'], transform_status=info['status'], transform_time=info['total-time'], @@ -153,16 +174,14 @@ def save_transform_result(transform_req: TransformRequest, info: dict[str, str], avg_rate=info['avg-rate'] ) session.add(rec) - return transform_req.files_remaining @staticmethod @file_complete_ops_retry def transform_complete(session: Session, logger: Logger, transform_req: TransformRequest, transformer_manager: TransformerManager): - with session.begin(): - transform_req.status = TransformStatus.complete - transform_req.finish_time = datetime.now(tz=timezone.utc) - session.add(transform_req) + transform_req.status = TransformStatus.complete + transform_req.finish_time = datetime.now(tz=timezone.utc) + session.add(transform_req) logger.info("Request completed. Shutting down transformers", extra={'requestId': transform_req.request_id}) diff --git a/servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py b/servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py index 89fc0053..6d330653 100644 --- a/servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py +++ b/servicex_app/servicex_app_test/resources/internal/test_transform_file_complete.py @@ -28,6 +28,7 @@ import psycopg2 import pytest +from sqlalchemy.exc import IntegrityError from servicex_app.models import TransformationResult, TransformRequest, TransformStatus from servicex_app.transformer_manager import TransformerManager @@ -151,6 +152,45 @@ def test_put_transform_file_complete_no_files_remaining(self, mock_transformer_manager.shutdown_transformer_job.assert_called_with('1234', 'my-ws') + def test_put_transform_file_complete_duplicate_report(self, + mocker, + mock_transformer_manager, + db_session, + mock_transform_request_lookup, + fake_transform_request, + file_complete_response, + test_client): + fake_transform_request.files_completed = 6 + fake_transform_request.files_failed = 2 + db_session.add.side_effect = [ + None, + IntegrityError('duplicate key value violates unique constraint', + params=['request_id'], orig=Exception()) + ] + + response1 = test_client.put( + '/servicex/internal/transformation/1234/file-complete', + json=file_complete_response) + + response2 = test_client.put( + '/servicex/internal/transformation/1234/file-complete', + json=file_complete_response) + + assert response1.status_code == 200 + assert response2.status_code == 200 + + db_session.query.return_value.filter_by.assert_called_with(request_id='1234') + assert fake_transform_request.files_completed == 7 + assert fake_transform_request.files_failed == 2 + + assert db_session.add.call_count == 2 + assert isinstance(db_session.add.mock_calls[0][1][0], TransformationResult) + assert db_session.add.mock_calls[0][1][0].file_id == 42 + + assert db_session.add.call_count == 2 + assert db_session.add.mock_calls[0][1][0].file_id == 42 + assert db_session.add.mock_calls[1][1][0].file_id == 42 + def test_put_transform_file_complete_unknown_request_id(self, mock_transformer_manager, db_session, @@ -208,7 +248,6 @@ def test_database_error_request_update(self, assert fake_transform_request.finish_time is None # Verify that we retried after the database error - assert db_session.flush.call_count == 2 db_session.query.return_value.filter_by.assert_called_with(request_id='1234') mock_transformer_manager.shutdown_transformer_job.assert_not_called() diff --git a/transformer_sidecar/src/transformer_sidecar/transformer.py b/transformer_sidecar/src/transformer_sidecar/transformer.py index 52d5f27e..c1dd71cb 100644 --- a/transformer_sidecar/src/transformer_sidecar/transformer.py +++ b/transformer_sidecar/src/transformer_sidecar/transformer.py @@ -30,6 +30,7 @@ import os import shutil import sys +import time import timeit from argparse import Namespace from hashlib import sha1, sha256 @@ -39,20 +40,20 @@ import kombu import psutil as psutil -import time from celery import Celery, shared_task from celery.signals import after_setup_logger +from transformer_sidecar.object_store_manager import ObjectStoreError, ObjectStoreManager from transformer_sidecar.science_container_command import ScienceContainerCommand, \ ScienceContainerException +from transformer_sidecar.servicex_adapter import FileCompleteRecord, ServiceXAdapter +from transformer_sidecar.transformer_argument_parser import TransformerArgumentParser from transformer_sidecar.transformer_logging import initialize_logging from transformer_sidecar.transformer_stats import TransformerStats from transformer_sidecar.transformer_stats.aod_stats import AODStats # NOQA: 401 +from transformer_sidecar.transformer_stats.raw_uproot_stats import \ + RawUprootStats # NOQA: 401 from transformer_sidecar.transformer_stats.uproot_stats import UprootStats # NOQA: 401 -from transformer_sidecar.transformer_stats.raw_uproot_stats import RawUprootStats # NOQA: 401 -from transformer_sidecar.object_store_manager import ObjectStoreManager, ObjectStoreError -from transformer_sidecar.servicex_adapter import ServiceXAdapter, FileCompleteRecord -from transformer_sidecar.transformer_argument_parser import TransformerArgumentParser # Module globals shared_dir: Optional[str] = None @@ -80,7 +81,9 @@ logger = initialize_logging() -@shared_task(acks_late=True) +@shared_task( + acks_late=True, +) def transform_file( request_id, file_id, @@ -107,6 +110,12 @@ def transform_file( global shared_dir + log_extra = { + "requestId": request_id, + "file-id": file_id, + "place": PLACE + } + transform_request = { "file-id": file_id, "request-id": request_id, @@ -128,13 +137,11 @@ def transform_file( logger.info( "got transform request.", extra={ - "requestId": request_id, + **log_extra, "paths": _file_paths, - "file-id": file_id, "result-destination": result_destination, "result-format": result_format, "service-endpoint": service_endpoint, - "place": PLACE, }, ) servicex = ServiceXAdapter(service_endpoint) @@ -158,10 +165,8 @@ def transform_file( logger.info( "trying to transform file", extra={ - "requestId": request_id, - "file-id": file_id, + **log_extra, "file-path": _file_path, - "place": PLACE, }, ) @@ -193,6 +198,10 @@ def transform_file( science_container_response = science_container.await_response() transform_request["status"] = science_container_response + logger.info( + "Science container completed with status %s" % science_container_response, + extra=log_extra + ) # Grab the logs transformer_stats = fill_stats_parser( @@ -220,11 +229,9 @@ def transform_file( transform_success = True ts = { - "requestId": request_id, - "file-id": file_id, + **log_extra, "file-size": transformer_stats.file_size, "total-events": transformer_stats.total_events, - "place": PLACE, } logger.info("Transformer stats.", extra=ts) science_container.confirm() @@ -236,10 +243,8 @@ def transform_file( # a hard failure with this file. if not transform_success: hf = { - "requestId": request_id, + **log_extra, "file-path": _file_paths[0], - "file-id": file_id, - "place": PLACE, "log_body": transformer_stats.log_body, } logger.error(f"Hard Failure: {transformer_stats.error_info}", extra=hf) @@ -266,20 +271,21 @@ def transform_file( logger.info( "File processed.", extra={ - "requestId": request_id, - "file-id": file_id, + **log_extra, "user": elapsed_times.user, "sys": elapsed_times.system, "iowait": elapsed_times.iowait, - "place": PLACE, }, ) - except ScienceContainerException: - logger.exception("Science container not responding. Shutting down this transformer.") - sys.exit(0) + except ScienceContainerException as e: + logger.exception("Science container not responding. Shutting down this transformer.", + extra=log_extra, exc_info=e) + sys.exit(-1) + except Exception as error: - logger.exception(f"Received exception doing transform: {error}") + logger.exception("Received exception doing transform", + extra=log_extra, exc_info=error) rec = FileCompleteRecord( request_id=request_id, file_path=_file_paths[0], @@ -456,7 +462,7 @@ def init(args: Union[Namespace, SimpleNamespace], app: Celery) -> None: "--without-mingle", "--without-gossip", "--without-heartbeat", - "--loglevel=warning", + "--loglevel=info", "-Q", f"transformer-{args.request_id}", "-n", f"transformer-{args.request_id}@%h", diff --git a/transformer_sidecar/tests/test_transformer.py b/transformer_sidecar/tests/test_transformer.py index db69206c..716bc463 100644 --- a/transformer_sidecar/tests/test_transformer.py +++ b/transformer_sidecar/tests/test_transformer.py @@ -131,7 +131,7 @@ def test_transformer_init(args, mock_celery, transformer_capabilities, '--without-mingle', '--without-gossip', '--without-heartbeat', - "--loglevel=warning", + "--loglevel=info", '-Q', 'transformer-1234', "-n", "transformer-1234@%h", ] From 9ad47324e40fe6e685b335181c2fd38c1877e8cb Mon Sep 17 00:00:00 2001 From: Ben Galewsky Date: Tue, 14 Jan 2025 15:43:23 -0600 Subject: [PATCH 4/4] Add pod name to log extras --- transformer_sidecar/src/transformer_sidecar/transformer.py | 1 + transformer_sidecar/tests/test_transformer.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/transformer_sidecar/src/transformer_sidecar/transformer.py b/transformer_sidecar/src/transformer_sidecar/transformer.py index c1dd71cb..ca51b2dc 100644 --- a/transformer_sidecar/src/transformer_sidecar/transformer.py +++ b/transformer_sidecar/src/transformer_sidecar/transformer.py @@ -74,6 +74,7 @@ PLACE = { "host_name": os.getenv("HOST_NAME", "unknown"), "site": os.getenv("site", "unknown"), + "pod": os.getenv("POD_NAME", "unknown"), } diff --git a/transformer_sidecar/tests/test_transformer.py b/transformer_sidecar/tests/test_transformer.py index 716bc463..57a950b9 100644 --- a/transformer_sidecar/tests/test_transformer.py +++ b/transformer_sidecar/tests/test_transformer.py @@ -128,7 +128,7 @@ def test_transformer_init(args, mock_celery, transformer_capabilities, argv=[ "worker", "--concurrency=1", - '--without-mingle', + "--without-mingle", '--without-gossip', '--without-heartbeat', "--loglevel=info",