diff --git a/airflow/api_connexion/endpoints/asset_endpoint.py b/airflow/api_connexion/endpoints/asset_endpoint.py index 837225209590..7915bf8b034b 100644 --- a/airflow/api_connexion/endpoints/asset_endpoint.py +++ b/airflow/api_connexion/endpoints/asset_endpoint.py @@ -323,6 +323,7 @@ def delete_asset_queued_events( ) +@mark_fastapi_migration_done @security.requires_access_asset("POST") @provide_session @action_logging diff --git a/airflow/api_fastapi/core_api/datamodels/assets.py b/airflow/api_fastapi/core_api/datamodels/assets.py index 3e317a4c7e35..e5ac10715ed4 100644 --- a/airflow/api_fastapi/core_api/datamodels/assets.py +++ b/airflow/api_fastapi/core_api/datamodels/assets.py @@ -19,7 +19,7 @@ from datetime import datetime -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, field_validator class DagScheduleAssetReference(BaseModel): @@ -99,3 +99,20 @@ class AssetEventCollectionResponse(BaseModel): asset_events: list[AssetEventResponse] total_entries: int + + +class CreateAssetEventsBody(BaseModel): + """Create asset events request.""" + + uri: str + extra: dict = Field(default_factory=dict) + + @field_validator("extra", mode="after") + def set_from_rest_api(cls, v: dict) -> dict: + v["from_rest_api"] = True + return v + + class Config: + """Pydantic config.""" + + extra = "forbid" diff --git a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml index 94bb60b8efbd..679eedb04791 100644 --- a/airflow/api_fastapi/core_api/openapi/v1-generated.yaml +++ b/airflow/api_fastapi/core_api/openapi/v1-generated.yaml @@ -346,6 +346,49 @@ paths: application/json: schema: $ref: '#/components/schemas/HTTPValidationError' + post: + tags: + - Asset + summary: Create Asset Event + description: Create asset events. + operationId: create_asset_event + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/CreateAssetEventsBody' + responses: + '200': + description: Successful Response + content: + application/json: + schema: + $ref: '#/components/schemas/AssetEventResponse' + '401': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Unauthorized + '403': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Forbidden + '404': + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPExceptionResponse' + description: Not Found + '422': + description: Validation Error + content: + application/json: + schema: + $ref: '#/components/schemas/HTTPValidationError' /public/assets/{uri}: get: tags: @@ -4182,6 +4225,20 @@ components: - message title: ConnectionTestResponse description: Connection Test serializer for responses. + CreateAssetEventsBody: + properties: + uri: + type: string + title: Uri + extra: + type: object + title: Extra + additionalProperties: false + type: object + required: + - uri + title: CreateAssetEventsBody + description: Create asset events request. DAGCollectionResponse: properties: dags: diff --git a/airflow/api_fastapi/core_api/routes/public/assets.py b/airflow/api_fastapi/core_api/routes/public/assets.py index 67218c471615..326a387f0089 100644 --- a/airflow/api_fastapi/core_api/routes/public/assets.py +++ b/airflow/api_fastapi/core_api/routes/public/assets.py @@ -42,9 +42,13 @@ AssetEventCollectionResponse, AssetEventResponse, AssetResponse, + CreateAssetEventsBody, ) from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc +from airflow.assets import Asset +from airflow.assets.manager import asset_manager from airflow.models.asset import AssetEvent, AssetModel +from airflow.utils import timezone assets_router = AirflowRouter(tags=["Asset"], prefix="/assets") @@ -134,6 +138,32 @@ def get_asset_events( ) +@assets_router.post( + "/events", + responses=create_openapi_http_exception_doc([404]), +) +def create_asset_event( + body: CreateAssetEventsBody, + session: Annotated[Session, Depends(get_session)], +) -> AssetEventResponse: + """Create asset events.""" + asset = session.scalar(select(AssetModel).where(AssetModel.uri == body.uri).limit(1)) + if not asset: + raise HTTPException(404, f"Asset with uri: `{body.uri}` was not found") + timestamp = timezone.utcnow() + + assets_event = asset_manager.register_asset_change( + asset=Asset(uri=body.uri), + timestamp=timestamp, + extra=body.extra, + session=session, + ) + + if not assets_event: + raise HTTPException(404, f"Asset with uri: `{body.uri}` was not found") + return AssetEventResponse.model_validate(assets_event, from_attributes=True) + + @assets_router.get( "/{uri:path}", responses=create_openapi_http_exception_doc([401, 403, 404]), diff --git a/airflow/ui/openapi-gen/queries/common.ts b/airflow/ui/openapi-gen/queries/common.ts index 6175179d11cc..56d92e828772 100644 --- a/airflow/ui/openapi-gen/queries/common.ts +++ b/airflow/ui/openapi-gen/queries/common.ts @@ -1085,6 +1085,9 @@ export const UseVersionServiceGetVersionKeyFn = (queryKey?: Array) => [ useVersionServiceGetVersionKey, ...(queryKey ?? []), ]; +export type AssetServiceCreateAssetEventMutationResult = Awaited< + ReturnType +>; export type BackfillServiceCreateBackfillMutationResult = Awaited< ReturnType >; diff --git a/airflow/ui/openapi-gen/queries/queries.ts b/airflow/ui/openapi-gen/queries/queries.ts index 3c6a426ee07f..b2556a2cd598 100644 --- a/airflow/ui/openapi-gen/queries/queries.ts +++ b/airflow/ui/openapi-gen/queries/queries.ts @@ -32,6 +32,7 @@ import { import { BackfillPostBody, ConnectionBody, + CreateAssetEventsBody, DAGPatchBody, DAGRunClearBody, DAGRunPatchBody, @@ -1757,6 +1758,45 @@ export const useVersionServiceGetVersion = < queryFn: () => VersionService.getVersion() as TData, ...options, }); +/** + * Create Asset Event + * Create asset events. + * @param data The data for the request. + * @param data.requestBody + * @returns AssetEventResponse Successful Response + * @throws ApiError + */ +export const useAssetServiceCreateAssetEvent = < + TData = Common.AssetServiceCreateAssetEventMutationResult, + TError = unknown, + TContext = unknown, +>( + options?: Omit< + UseMutationOptions< + TData, + TError, + { + requestBody: CreateAssetEventsBody; + }, + TContext + >, + "mutationFn" + >, +) => + useMutation< + TData, + TError, + { + requestBody: CreateAssetEventsBody; + }, + TContext + >({ + mutationFn: ({ requestBody }) => + AssetService.createAssetEvent({ + requestBody, + }) as unknown as Promise, + ...options, + }); /** * Create Backfill * @param data The data for the request. diff --git a/airflow/ui/openapi-gen/requests/schemas.gen.ts b/airflow/ui/openapi-gen/requests/schemas.gen.ts index 17b4cf787218..1a4ab7f498ca 100644 --- a/airflow/ui/openapi-gen/requests/schemas.gen.ts +++ b/airflow/ui/openapi-gen/requests/schemas.gen.ts @@ -694,6 +694,24 @@ export const $ConnectionTestResponse = { description: "Connection Test serializer for responses.", } as const; +export const $CreateAssetEventsBody = { + properties: { + uri: { + type: "string", + title: "Uri", + }, + extra: { + type: "object", + title: "Extra", + }, + }, + additionalProperties: false, + type: "object", + required: ["uri"], + title: "CreateAssetEventsBody", + description: "Create asset events request.", +} as const; + export const $DAGCollectionResponse = { properties: { dags: { diff --git a/airflow/ui/openapi-gen/requests/services.gen.ts b/airflow/ui/openapi-gen/requests/services.gen.ts index 1ebbac6aa6cc..7a5d1389eb53 100644 --- a/airflow/ui/openapi-gen/requests/services.gen.ts +++ b/airflow/ui/openapi-gen/requests/services.gen.ts @@ -9,6 +9,8 @@ import type { GetAssetsResponse, GetAssetEventsData, GetAssetEventsResponse, + CreateAssetEventData, + CreateAssetEventResponse, GetAssetData, GetAssetResponse, HistoricalMetricsData, @@ -220,6 +222,31 @@ export class AssetService { }); } + /** + * Create Asset Event + * Create asset events. + * @param data The data for the request. + * @param data.requestBody + * @returns AssetEventResponse Successful Response + * @throws ApiError + */ + public static createAssetEvent( + data: CreateAssetEventData, + ): CancelablePromise { + return __request(OpenAPI, { + method: "POST", + url: "/public/assets/events", + body: data.requestBody, + mediaType: "application/json", + errors: { + 401: "Unauthorized", + 403: "Forbidden", + 404: "Not Found", + 422: "Validation Error", + }, + }); + } + /** * Get Asset * Get an asset. diff --git a/airflow/ui/openapi-gen/requests/types.gen.ts b/airflow/ui/openapi-gen/requests/types.gen.ts index 05b6e84adfca..6d8af631417f 100644 --- a/airflow/ui/openapi-gen/requests/types.gen.ts +++ b/airflow/ui/openapi-gen/requests/types.gen.ts @@ -174,6 +174,16 @@ export type ConnectionTestResponse = { message: string; }; +/** + * Create asset events request. + */ +export type CreateAssetEventsBody = { + uri: string; + extra?: { + [key: string]: unknown; + }; +}; + /** * DAG Collection serializer for responses. */ @@ -999,6 +1009,12 @@ export type GetAssetEventsData = { export type GetAssetEventsResponse = AssetEventCollectionResponse; +export type CreateAssetEventData = { + requestBody: CreateAssetEventsBody; +}; + +export type CreateAssetEventResponse = AssetEventResponse; + export type GetAssetData = { uri: string; }; @@ -1542,6 +1558,31 @@ export type $OpenApiTs = { 422: HTTPValidationError; }; }; + post: { + req: CreateAssetEventData; + res: { + /** + * Successful Response + */ + 200: AssetEventResponse; + /** + * Unauthorized + */ + 401: HTTPExceptionResponse; + /** + * Forbidden + */ + 403: HTTPExceptionResponse; + /** + * Not Found + */ + 404: HTTPExceptionResponse; + /** + * Validation Error + */ + 422: HTTPValidationError; + }; + }; }; "/public/assets/{uri}": { get: { diff --git a/tests/api_fastapi/core_api/routes/public/test_assets.py b/tests/api_fastapi/core_api/routes/public/test_assets.py index dd1c971dacbf..42b7acd908ff 100644 --- a/tests/api_fastapi/core_api/routes/public/test_assets.py +++ b/tests/api_fastapi/core_api/routes/public/test_assets.py @@ -17,8 +17,11 @@ from __future__ import annotations import urllib +from typing import Generator +from unittest import mock import pytest +import time_machine from airflow.models import DagModel from airflow.models.asset import AssetEvent, AssetModel, DagScheduleAssetReference, TaskOutletAssetReference @@ -459,3 +462,40 @@ def test_should_respond_404(self, test_client): ) assert response.status_code == 404 assert response.json()["detail"] == "The Asset with uri: `s3://bucket/key` was not found" + + +class TestPostAssetEvents(TestAssets): + @pytest.fixture + def time_freezer(self) -> Generator: + freezer = time_machine.travel(self.default_time, tick=False) + freezer.start() + + yield + + freezer.stop() + + @pytest.mark.usefixtures("time_freezer") + def test_should_respond_200(self, test_client, session): + self.create_assets() + event_payload = {"uri": "s3://bucket/key/1", "extra": {"foo": "bar"}} + response = test_client.post("/public/assets/events", json=event_payload) + assert response.status_code == 200 + assert response.json() == { + "id": mock.ANY, + "asset_id": 1, + "uri": "s3://bucket/key/1", + "extra": {"foo": "bar", "from_rest_api": True}, + "source_task_id": None, + "source_dag_id": None, + "source_run_id": None, + "source_map_index": -1, + "created_dagruns": [], + "timestamp": self.default_time.replace("+00:00", "Z"), + } + + def test_invalid_attr_not_allowed(self, test_client, session): + self.create_assets() + event_invalid_payload = {"asset_uri": "s3://bucket/key/1", "extra": {"foo": "bar"}, "fake": {}} + response = test_client.post("/public/assets/events", json=event_invalid_payload) + + assert response.status_code == 422