Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Clean up transaction management for file_complete handler #930

Draft
wants to merge 4 commits into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions servicex_app/migrations/versions/1.5.6_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""empty message

Revision ID: 1.5.6
Revises: v1_5_5
Create Date: 2024-11-23 17:30:18.079736

"""
from alembic import op
import sqlalchemy as sa

# revision identifiers, used by Alembic.
revision = 'v1_5_6'
down_revision = 'v1_5_5'
branch_labels = None
depends_on = None

"""
Clean up indexes and constraints in the database
"""
def upgrade():
op.drop_index('ix_dataset_id', table_name='files')
op.drop_constraint('files_dataset_id_fkey', 'files', type_='foreignkey')
op.create_foreign_key(None, 'files', 'datasets', ['dataset_id'], ['id'])
op.alter_column('requests', 'files',
existing_type=sa.INTEGER(),
nullable=False)
op.drop_index('ix_transform_result_request_id', table_name='transform_result')
op.drop_index('ix_transform_result_transform_status', table_name='transform_result')
op.create_index(op.f('ix_users_sub'), 'users', ['sub'], unique=True)


def downgrade():
op.drop_index(op.f('ix_users_sub'), table_name='users')
op.create_index('ix_transform_result_transform_status', 'transform_result', ['transform_status'], unique=False)
op.create_index('ix_transform_result_request_id', 'transform_result', ['request_id'], unique=False)
op.alter_column('requests', 'files',
existing_type=sa.INTEGER(),
nullable=True)
op.drop_constraint(None, 'files', type_='foreignkey')
op.create_foreign_key('files_dataset_id_fkey', 'files', 'datasets', ['dataset_id'], ['id'], ondelete='CASCADE')
op.create_index('ix_dataset_id', 'files', ['dataset_id'], unique=False)
26 changes: 0 additions & 26 deletions servicex_app/migrations/versions/v1_6_0.py

This file was deleted.

30 changes: 18 additions & 12 deletions servicex_app/servicex_app/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@
verify_jwt_in_request)
from flask_jwt_extended.exceptions import NoAuthorizationError

from servicex_app.models import UserModel
from servicex_app.models import UserModel, db


@jwt_required()
def get_jwt_user():
jwt_val = get_jwt_identity()
user = UserModel.find_by_sub(jwt_val)

return user


Expand Down Expand Up @@ -53,17 +54,22 @@ def inner(*args, **kwargs) -> Response:
except NoAuthorizationError as exc:
assert "NoAuthorizationError"
return make_response({'message': str(exc)}, 401)
user = get_jwt_user()
if not user:
msg = 'Not Authorized: No user found matching this API token. ' \
'Your account may have been deleted. ' \
'Please visit the ServiceX website to obtain a new API token.'
return make_response({'message': msg}, 401)
elif user.pending:
msg = 'Not Authorized: Your account is still pending. ' \
'An administrator should approve it shortly. If not, ' \
'please contact the ServiceX admins via email or Slack.'
return make_response({'message': msg}, 401)

# Explicitly start a transaction here to avoid unexpected in_transaction() b
# in the method wrapped by this decorator.
with db.session.begin():
user = get_jwt_user()

if not user:
msg = 'Not Authorized: No user found matching this API token. ' \
'Your account may have been deleted. ' \
'Please visit the ServiceX website to obtain a new API token.'
return make_response({'message': msg}, 401)
elif user.pending:
msg = 'Not Authorized: Your account is still pending. ' \
'An administrator should approve it shortly. If not, ' \
'please contact the ServiceX admins via email or Slack.'
return make_response({'message': msg}, 401)

return fn(*args, **kwargs)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,84 @@
# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import logging
import time
from datetime import datetime, timezone
from functools import wraps
from logging import Logger

from flask import request, current_app
from sqlalchemy.orm import Session
from tenacity import retry, stop_after_attempt, wait_exponential_jitter, \
before_sleep_log, after_log

from servicex_app import TransformerManager
from servicex_app.models import TransformRequest, TransformationResult, db, TransformStatus
from servicex_app.resources.servicex_resource import ServiceXResource
import time


def file_complete_ops_retry(func):
"""
A decorator that applies a standard retry mechanism for file complete operations.

This decorator wraps the target function with a retry mechanism using the tenacity library.
It attempts to execute the function up to 3 times, with exponential backoff and jitter
between retries. The decorator also logs retry attempts and results.

Note:
This decorator is designed to work both within and outside of a Flask application context.
It dynamically evaluates the logger at runtime to support usage in various environments,
including unit tests.
"""

@wraps(func)
def wrapper(*args, **kwargs):

try:
from flask import current_app
logger = current_app.logger
except RuntimeError:
logger = logging.getLogger(__name__)

return retry(
stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=0.1, max=30),
before_sleep=before_sleep_log(logger, logging.INFO),
after=after_log(logger, logging.INFO)
)(func)(*args, **kwargs)

return wrapper


class TransformerFileComplete(ServiceXResource):
@classmethod
def make_api(cls, transformer_manager):
def make_api(cls, transformer_manager, logger=None):
cls.transformer_manager = transformer_manager
cls.logger = logger or logging.getLogger(__name__)
return cls

def put(self, request_id):
start_time = time.time()
info = request.get_json()
logger = current_app.logger
logger.info("FileComplete", extra={'requestId': request_id, 'metric': info})
transform_req = self.record_file_complete(current_app.logger, request_id, info)

session = db.session

# Lookup the transformation request and increment either the successful or failed file count
transform_req = self.record_file_complete(session, current_app.logger, request_id, info)

if transform_req is None:
return "Request not found", 404

self.save_transform_result(transform_req, info)
# Add the transformation result to the database
files_remaining = self.save_transform_result(transform_req, info, session)

# Now we can see if we are done with the transformation and
# can shut down the transformers. Files remaining is None if
# we are still waiting for final results from the DID finder

files_remaining = transform_req.files_remaining
if files_remaining is not None and files_remaining == 0:
self.transform_complete(current_app.logger, transform_req, self.transformer_manager)
self.transform_complete(session, current_app.logger, transform_req, self.transformer_manager)

current_app.logger.info("FileComplete. Request state.", extra={
'requestId': request_id,
Expand All @@ -71,57 +115,55 @@ def put(self, request_id):
return "Ok"

@staticmethod
@retry(stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=0.1, max=30),
before_sleep=before_sleep_log(current_app.logger, logging.INFO),
after=after_log(current_app.logger, logging.INFO),
)
def record_file_complete(logger: Logger, request_id: str, info: dict[str, str]) -> TransformRequest | None:
transform_req = TransformRequest.lookup(request_id)
if transform_req is None:
msg = f"Request not found with id: '{request_id}'"
logger.error(msg, extra={'requestId': request_id})
return None
@file_complete_ops_retry
def record_file_complete(session: Session, logger: Logger, request_id: str,
info: dict[str, str]) -> TransformRequest | None:

if info['status'] == 'success':
TransformRequest.file_transformed_successfully(request_id)
else:
TransformRequest.file_transformed_unsuccessfully(request_id)
with session.begin():
# Lock the row for update
transform_req = session.query(TransformRequest).filter_by(
request_id=request_id).with_for_update().one_or_none()

if transform_req is None:
msg = f"Request not found with id: '{request_id}'"
logger.error(msg, extra={'requestId': request_id})
return None

if info['status'] == 'success':
transform_req.files_completed += 1
else:
transform_req.files_failed += 1

session.flush() # Flush the changes to the database

return transform_req

@staticmethod
@retry(stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=0.1, max=30),
before_sleep=before_sleep_log(current_app.logger, logging.INFO),
after=after_log(current_app.logger, logging.INFO),
)
def save_transform_result(transform_req: TransformRequest, info: dict[str, str]):
rec = TransformationResult(
file_id=info['file-id'],
request_id=transform_req.request_id,
file_path=info['file-path'],
transform_status=info['status'],
transform_time=info['total-time'],
total_bytes=info['total-bytes'],
total_events=info['total-events'],
avg_rate=info['avg-rate']
)
rec.save_to_db()
db.session.commit()
@file_complete_ops_retry
def save_transform_result(transform_req: TransformRequest, info: dict[str, str], session: Session):
with session.begin():
rec = TransformationResult(
file_id=info['file-id'],
request_id=transform_req.request_id,
file_path=info['file-path'],
transform_status=info['status'],
transform_time=info['total-time'],
total_bytes=info['total-bytes'],
total_events=info['total-events'],
avg_rate=info['avg-rate']
)
session.add(rec)
return transform_req.files_remaining

@staticmethod
@retry(stop=stop_after_attempt(3),
wait=wait_exponential_jitter(initial=0.1, max=30),
before_sleep=before_sleep_log(current_app.logger, logging.INFO),
after=after_log(current_app.logger, logging.INFO),
)
def transform_complete(logger: Logger, transform_req: TransformRequest,
@file_complete_ops_retry
def transform_complete(session: Session, logger: Logger, transform_req: TransformRequest,
transformer_manager: TransformerManager):
transform_req.status = TransformStatus.complete
transform_req.finish_time = datetime.now(tz=timezone.utc)
transform_req.save_to_db()
db.session.commit()
with session.begin():
transform_req.status = TransformStatus.complete
transform_req.finish_time = datetime.now(tz=timezone.utc)
session.add(transform_req)

logger.info("Request completed. Shutting down transformers",
extra={'requestId': transform_req.request_id})
namespace = current_app.config['TRANSFORMER_NAMESPACE']
Expand Down
2 changes: 1 addition & 1 deletion servicex_app/servicex_app/web/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
def dashboard(template_name: str, user_specific=False):
args = parser.parse_args()
sort, order = args["sort"], args["order"]
query = TransformRequest.query.filter_by(archived=False)
query = TransformRequest.query.filter_by()

if user_specific:
query = query.filter_by(submitted_by=session["user_id"])
Expand Down
Loading