Skip to content

Commit

Permalink
fix: log action get the correct request body (apache#45546)
Browse files Browse the repository at this point in the history
  • Loading branch information
luoyuliuyin authored Jan 10, 2025
1 parent db739d9 commit bae4bb1
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 1 deletion.
2 changes: 1 addition & 1 deletion airflow/www/decorators.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def wrapper(*args, **kwargs):
user_display = get_auth_manager().get_user_display_name()

isAPIRequest = request.blueprint == "/api/v1"
hasJsonBody = request.headers.get("content-type") == "application/json" and request.json
hasJsonBody = "application/json" in request.headers.get("content-type") and request.json

fields_skip_logging = {
"csrf_token",
Expand Down
56 changes: 56 additions & 0 deletions tests/api_connexion/endpoints/test_dag_run_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
# under the License.
from __future__ import annotations

import json
import urllib
from datetime import timedelta
from unittest import mock
Expand All @@ -24,6 +25,7 @@
import time_machine

from airflow.api_connexion.exceptions import EXCEPTIONS_LINK_MAP
from airflow.models import Log
from airflow.models.asset import AssetEvent, AssetModel
from airflow.models.dag import DAG, DagModel
from airflow.models.dagrun import DagRun
Expand Down Expand Up @@ -1522,6 +1524,60 @@ def test_should_respond_200(self, state, run_type, dag_maker, session):
assert response.status_code == 200
assert response.json == expected_response_json

@pytest.mark.parametrize("state", ["failed", "success", "queued"])
@pytest.mark.parametrize("run_type", [state.value for state in DagRunType])
def test_action_logging(self, state, run_type, dag_maker, session):
dag_id = "TEST_DAG_ID"
dag_run_id = "TEST_DAG_RUN_ID"
with dag_maker(dag_id) as dag:
task = EmptyOperator(task_id="task_id", dag=dag)
self.app.dag_bag.bag_dag(dag, root_dag=dag)
dr = dag_maker.create_dagrun(run_id=dag_run_id, run_type=run_type)
ti = dr.get_task_instance(task_id="task_id")
ti.task = task
ti.state = State.RUNNING
session.merge(ti)
session.commit()

request_json = {"state": state}

self.client.patch(
f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
json=request_json,
environ_overrides={"REMOTE_USER": "test"},
)

log = (
session.query(Log)
.filter(
Log.dag_id == dag_id,
Log.run_id == dag_run_id,
Log.event == "api.update_dag_run_state",
)
.order_by(Log.id.desc())
.first()
)
assert log.extra == json.dumps(request_json)

self.client.patch(
f"api/v1/dags/{dag_id}/dagRuns/{dag_run_id}",
json=request_json,
environ_overrides={"REMOTE_USER": "test"},
headers={"content-type": "application/json; charset=utf-8"},
)

log = (
session.query(Log)
.filter(
Log.dag_id == dag_id,
Log.run_id == dag_run_id,
Log.event == "api.update_dag_run_state",
)
.order_by(Log.id.desc())
.first()
)
assert log.extra == json.dumps(request_json)

def test_schema_validation_error_raises(self, dag_maker, session):
dag_id = "TEST_DAG_ID"
dag_run_id = "TEST_DAG_RUN_ID"
Expand Down

0 comments on commit bae4bb1

Please sign in to comment.