Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 44 additions & 6 deletions airflow-core/src/airflow/api_fastapi/common/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from typing import Generic, TypeVar

from fastapi import HTTPException, Request, status
from sqlalchemy.exc import IntegrityError
from sqlalchemy.exc import IntegrityError, SQLAlchemyError, OperationalError, ProgrammingError

T = TypeVar("T", bound=Exception)

Expand Down Expand Up @@ -61,13 +61,10 @@ def __init__(self):
def exception_handler(self, request: Request, exc: IntegrityError):
"""Handle IntegrityError exception."""
if self._is_dialect_matched(exc):
error_message = self._get_user_friendly_message(exc)
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail={
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should change this with a string. Some tests and logic depend on this is expected to be a dictionary, not a string, in the API and API tests. Is there a reason for this change?

"reason": "Unique constraint violation",
"statement": str(exc.statement),
"orig_error": str(exc.orig),
},
detail=f"{error_message}\nType: unique_constraint_violation"
)

def _is_dialect_matched(self, exc: IntegrityError) -> bool:
Expand All @@ -79,6 +76,47 @@ def _is_dialect_matched(self, exc: IntegrityError) -> bool:
return True
return False

def _get_user_friendly_message(self, exc: IntegrityError) -> str:
"""Convert database error to user-friendly message."""
exc_orig_str = str(exc.orig)

# Handle DAG run unique constraint
if "dag_run.dag_id" in exc_orig_str and "dag_run.run_id" in exc_orig_str:
return "A DAG run with this ID already exists. Please use a different run ID."

# Handle task instance unique constrain
if "task_instance.dag_id" in exc_orig_str and "task_instance.task_id" in exc_orig_str:
return "A task instance with this ID already exists. Please use a different task ID."

# Handle DAG run logical date unique constraint
if "dag_run.dag_id" in exc_orig_str and "dag_run.logical_date" in exc_orig_str:
return "A DAG run with this logical date already exists. Please use a different logical date."

# Generic unique constraint message
return "This operation would create a duplicate entry. Please ensure all unique fields have unique values."
Comment on lines +79 to +96
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seem to handle only duplicated entry errors (unique constraint), while there are many other IntegrityError possible.


class _DatabaseErrorHandler(BaseErrorHandler[SQLAlchemyError]):
"""Handler for general database errors."""

def __init__(self):
super().__init__(SQLAlchemyError)

def exception_handler(self, request: Request, exc: SQLAlchemyError):
"""Handle SQLAlchemyError exception."""
error_message = self._get_user_friendly_message(exc)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=f"{error_message}\nType: database_error"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, I think enhancing the text message according to cases and making it user-friendly is a good idea, but the other two things are also useful for debugging.

    "statement": str(exc.statement),
    "orig_error": str(exc.orig),

)

def _get_user_friendly_message(self, exc: SQLAlchemyError) -> str:
"""Convert database error to user-friendly message."""
if isinstance(exc, OperationalError):
return "A database operation failed. Please try again later."
elif isinstance(exc, ProgrammingError):
return "An error occurred while processing your request. Please check your input and try again."
else:
return "An unexpected database error occurred. Please try again later."
Comment on lines +112 to +119
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This obfuscate the error cause, there's no way to tell what's happening from a user perspective receiving such messages.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apologies, I’ve recently been assigned to another project and will be unable to work on this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll pick it up


DatabaseErrorHandlers = [
_UniqueConstraintErrorHandler(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,14 @@ def ti_run(
context.next_kwargs = ti.next_kwargs

return context
except SQLAlchemyError:
except SQLAlchemyError as e:
log.exception("Error marking Task Instance state as running")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Database error occurred"
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail= {
"message": "Failed to update task instance state. Please try again later.",
"type": "database_error"
}
)


Expand Down Expand Up @@ -425,7 +429,11 @@ def ti_update_state(
except SQLAlchemyError as e:
log.error("Error updating Task Instance state: %s", e)
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail="Database error occurred"
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail={
"message": "Failed to update task instance state. Please try again later.",
"type": "database_error"
}
)


Expand Down
Loading