Skip to content

Commit

Permalink
AIP 84: Migrate POST ASSET EVENT legacy API to fast API. (apache#43984)
Browse files Browse the repository at this point in the history
* AIP-84: Migrating GET Assets to fastAPI

* matching response to legacy

* Adding unit tests - part 1

* Update airflow/api_fastapi/common/parameters.py

Co-authored-by: Jed Cunningham <[email protected]>

* fixing the dag_ids filter

* fixing the dag_ids filter

* Adding unit tests - part 2

* fixing unit tests & updating parameter type

* review comments pierre

* fixing last commit

* fixing unit tests

* migrating get assets events endpoint to fastapi

* fixing test response

* Adding tests for filtering

* address review comments

* fixing test parametrize

* address review comments

* address review comments

* AIP84-create-asset-events migration implementation using fastapi

* removing typo print command

* remove typo print statement

* address review comments

* address review comments

* Update airflow/api_fastapi/core_api/datamodels/assets.py

Co-authored-by: Pierre Jeambrun <[email protected]>

* address review comments

---------

Co-authored-by: Amogh <[email protected]>
Co-authored-by: Jed Cunningham <[email protected]>
Co-authored-by: Pierre Jeambrun <[email protected]>
  • Loading branch information
4 people authored Nov 15, 2024
1 parent 0f14f66 commit fd0a0d2
Show file tree
Hide file tree
Showing 10 changed files with 275 additions and 1 deletion.
1 change: 1 addition & 0 deletions airflow/api_connexion/endpoints/asset_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ def delete_asset_queued_events(
)


@mark_fastapi_migration_done
@security.requires_access_asset("POST")
@provide_session
@action_logging
Expand Down
19 changes: 18 additions & 1 deletion airflow/api_fastapi/core_api/datamodels/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

from datetime import datetime

from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, field_validator


class DagScheduleAssetReference(BaseModel):
Expand Down Expand Up @@ -99,3 +99,20 @@ class AssetEventCollectionResponse(BaseModel):

asset_events: list[AssetEventResponse]
total_entries: int


class CreateAssetEventsBody(BaseModel):
"""Create asset events request."""

uri: str
extra: dict = Field(default_factory=dict)

@field_validator("extra", mode="after")
def set_from_rest_api(cls, v: dict) -> dict:
v["from_rest_api"] = True
return v

class Config:
"""Pydantic config."""

extra = "forbid"
57 changes: 57 additions & 0 deletions airflow/api_fastapi/core_api/openapi/v1-generated.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,49 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
post:
tags:
- Asset
summary: Create Asset Event
description: Create asset events.
operationId: create_asset_event
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/CreateAssetEventsBody'
responses:
'200':
description: Successful Response
content:
application/json:
schema:
$ref: '#/components/schemas/AssetEventResponse'
'401':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Unauthorized
'403':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Forbidden
'404':
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPExceptionResponse'
description: Not Found
'422':
description: Validation Error
content:
application/json:
schema:
$ref: '#/components/schemas/HTTPValidationError'
/public/assets/{uri}:
get:
tags:
Expand Down Expand Up @@ -4182,6 +4225,20 @@ components:
- message
title: ConnectionTestResponse
description: Connection Test serializer for responses.
CreateAssetEventsBody:
properties:
uri:
type: string
title: Uri
extra:
type: object
title: Extra
additionalProperties: false
type: object
required:
- uri
title: CreateAssetEventsBody
description: Create asset events request.
DAGCollectionResponse:
properties:
dags:
Expand Down
30 changes: 30 additions & 0 deletions airflow/api_fastapi/core_api/routes/public/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,13 @@
AssetEventCollectionResponse,
AssetEventResponse,
AssetResponse,
CreateAssetEventsBody,
)
from airflow.api_fastapi.core_api.openapi.exceptions import create_openapi_http_exception_doc
from airflow.assets import Asset
from airflow.assets.manager import asset_manager
from airflow.models.asset import AssetEvent, AssetModel
from airflow.utils import timezone

assets_router = AirflowRouter(tags=["Asset"], prefix="/assets")

Expand Down Expand Up @@ -134,6 +138,32 @@ def get_asset_events(
)


@assets_router.post(
"/events",
responses=create_openapi_http_exception_doc([404]),
)
def create_asset_event(
body: CreateAssetEventsBody,
session: Annotated[Session, Depends(get_session)],
) -> AssetEventResponse:
"""Create asset events."""
asset = session.scalar(select(AssetModel).where(AssetModel.uri == body.uri).limit(1))
if not asset:
raise HTTPException(404, f"Asset with uri: `{body.uri}` was not found")
timestamp = timezone.utcnow()

assets_event = asset_manager.register_asset_change(
asset=Asset(uri=body.uri),
timestamp=timestamp,
extra=body.extra,
session=session,
)

if not assets_event:
raise HTTPException(404, f"Asset with uri: `{body.uri}` was not found")
return AssetEventResponse.model_validate(assets_event, from_attributes=True)


@assets_router.get(
"/{uri:path}",
responses=create_openapi_http_exception_doc([401, 403, 404]),
Expand Down
3 changes: 3 additions & 0 deletions airflow/ui/openapi-gen/queries/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1085,6 +1085,9 @@ export const UseVersionServiceGetVersionKeyFn = (queryKey?: Array<unknown>) => [
useVersionServiceGetVersionKey,
...(queryKey ?? []),
];
export type AssetServiceCreateAssetEventMutationResult = Awaited<
ReturnType<typeof AssetService.createAssetEvent>
>;
export type BackfillServiceCreateBackfillMutationResult = Awaited<
ReturnType<typeof BackfillService.createBackfill>
>;
Expand Down
40 changes: 40 additions & 0 deletions airflow/ui/openapi-gen/queries/queries.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
import {
BackfillPostBody,
ConnectionBody,
CreateAssetEventsBody,
DAGPatchBody,
DAGRunClearBody,
DAGRunPatchBody,
Expand Down Expand Up @@ -1757,6 +1758,45 @@ export const useVersionServiceGetVersion = <
queryFn: () => VersionService.getVersion() as TData,
...options,
});
/**
* Create Asset Event
* Create asset events.
* @param data The data for the request.
* @param data.requestBody
* @returns AssetEventResponse Successful Response
* @throws ApiError
*/
export const useAssetServiceCreateAssetEvent = <
TData = Common.AssetServiceCreateAssetEventMutationResult,
TError = unknown,
TContext = unknown,
>(
options?: Omit<
UseMutationOptions<
TData,
TError,
{
requestBody: CreateAssetEventsBody;
},
TContext
>,
"mutationFn"
>,
) =>
useMutation<
TData,
TError,
{
requestBody: CreateAssetEventsBody;
},
TContext
>({
mutationFn: ({ requestBody }) =>
AssetService.createAssetEvent({
requestBody,
}) as unknown as Promise<TData>,
...options,
});
/**
* Create Backfill
* @param data The data for the request.
Expand Down
18 changes: 18 additions & 0 deletions airflow/ui/openapi-gen/requests/schemas.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -694,6 +694,24 @@ export const $ConnectionTestResponse = {
description: "Connection Test serializer for responses.",
} as const;

export const $CreateAssetEventsBody = {
properties: {
uri: {
type: "string",
title: "Uri",
},
extra: {
type: "object",
title: "Extra",
},
},
additionalProperties: false,
type: "object",
required: ["uri"],
title: "CreateAssetEventsBody",
description: "Create asset events request.",
} as const;

export const $DAGCollectionResponse = {
properties: {
dags: {
Expand Down
27 changes: 27 additions & 0 deletions airflow/ui/openapi-gen/requests/services.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ import type {
GetAssetsResponse,
GetAssetEventsData,
GetAssetEventsResponse,
CreateAssetEventData,
CreateAssetEventResponse,
GetAssetData,
GetAssetResponse,
HistoricalMetricsData,
Expand Down Expand Up @@ -220,6 +222,31 @@ export class AssetService {
});
}

/**
* Create Asset Event
* Create asset events.
* @param data The data for the request.
* @param data.requestBody
* @returns AssetEventResponse Successful Response
* @throws ApiError
*/
public static createAssetEvent(
data: CreateAssetEventData,
): CancelablePromise<CreateAssetEventResponse> {
return __request(OpenAPI, {
method: "POST",
url: "/public/assets/events",
body: data.requestBody,
mediaType: "application/json",
errors: {
401: "Unauthorized",
403: "Forbidden",
404: "Not Found",
422: "Validation Error",
},
});
}

/**
* Get Asset
* Get an asset.
Expand Down
41 changes: 41 additions & 0 deletions airflow/ui/openapi-gen/requests/types.gen.ts
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,16 @@ export type ConnectionTestResponse = {
message: string;
};

/**
* Create asset events request.
*/
export type CreateAssetEventsBody = {
uri: string;
extra?: {
[key: string]: unknown;
};
};

/**
* DAG Collection serializer for responses.
*/
Expand Down Expand Up @@ -999,6 +1009,12 @@ export type GetAssetEventsData = {

export type GetAssetEventsResponse = AssetEventCollectionResponse;

export type CreateAssetEventData = {
requestBody: CreateAssetEventsBody;
};

export type CreateAssetEventResponse = AssetEventResponse;

export type GetAssetData = {
uri: string;
};
Expand Down Expand Up @@ -1542,6 +1558,31 @@ export type $OpenApiTs = {
422: HTTPValidationError;
};
};
post: {
req: CreateAssetEventData;
res: {
/**
* Successful Response
*/
200: AssetEventResponse;
/**
* Unauthorized
*/
401: HTTPExceptionResponse;
/**
* Forbidden
*/
403: HTTPExceptionResponse;
/**
* Not Found
*/
404: HTTPExceptionResponse;
/**
* Validation Error
*/
422: HTTPValidationError;
};
};
};
"/public/assets/{uri}": {
get: {
Expand Down
Loading

0 comments on commit fd0a0d2

Please sign in to comment.