FastAPI plugin for CloudEvents Integration
Allows to easily consume and produce CloudEvents over REST API.
Automatically parses CloudEvents both in the binary and structured format and
provides an interface very similar to the regular FastAPI interface. No more
hustling with to_structured
and from_http
function calls!
@app.post("/")
async def on_event(event: CloudEvent) -> CloudEvent:
pass
See more examples below
pip install fastapi-cloudevents
import uvicorn
from fastapi import FastAPI
from fastapi_cloudevents import CloudEvent, install_fastapi_cloudevents
app = FastAPI()
app = install_fastapi_cloudevents(app)
@app.post("/")
async def on_event(event: CloudEvent) -> CloudEvent:
return CloudEvent(
type="my.response-type.v1",
data=event.data,
datacontenttype=event.datacontenttype,
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
The rout accepts both binary CloudEvents
curl http://localhost:8000 -i -X POST -d "Hello World!" \
-H "Content-Type: text/plain" \
-H "ce-specversion: 1.0" \
-H "ce-type: my.request-type.v1" \
-H "ce-id: 123" \
-H "ce-source: my-source"
And structured CloudEvents
curl http://localhost:8000 -i -X POST -H "Content-Type: application/json" \
-d '{"data":"Hello World", "source":"my-source", "id":"123", "type":"my.request-type.v1","specversion":"1.0"}'
Both of the requests will yield a response in the same format:
HTTP/1.1 200 OK
date: Fri, 05 Aug 2022 23:50:52 GMT
server: uvicorn
content-length: 13
content-type: application/json
ce-specversion: 1.0
ce-id: 25cd28f0-0605-4a76-b1d8-cffbe3375413
ce-source: http://localhost:8000/
ce-type: my.response-type.v1
ce-time: 2022-08-05T23:50:52.809697+00:00
"Hello World"
from typing import Literal, Union
import uvicorn
from fastapi import FastAPI, Body
from pydantic import Field
from typing_extensions import Annotated
from fastapi_cloudevents import (
CloudEvent,
CloudEventSettings,
ContentMode,
install_fastapi_cloudevents,
)
app = FastAPI()
app = install_fastapi_cloudevents(
app, settings=CloudEventSettings(default_response_mode=ContentMode.structured)
)
class MyEvent(CloudEvent):
type: Literal["my.type.v1"]
class YourEvent(CloudEvent):
type: Literal["your.type.v1"]
OurEvent = Annotated[Union[MyEvent, YourEvent], Body(discriminator="type")]
_source = "dummy:source"
@app.post("/")
async def on_event(event: OurEvent) -> CloudEvent:
if isinstance(event, MyEvent):
return CloudEvent(
type="my.response-type.v1",
data=f"got {event.data} from my event!",
datacontenttype="text/plain",
)
else:
return CloudEvent(
type="your.response-type.v1",
data=f"got {event.data} from your event!",
datacontenttype="text/plain",
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8002)
To send the response in the http CloudEvent structured format, you MAY use the
BinaryCloudEventResponse
class
import uvicorn
from fastapi import FastAPI
from fastapi_cloudevents import (CloudEvent, StructuredCloudEventResponse,
install_fastapi_cloudevents)
app = FastAPI()
app = install_fastapi_cloudevents(app)
@app.post("/", response_class=StructuredCloudEventResponse)
async def on_event(event: CloudEvent) -> CloudEvent:
return CloudEvent(
type="com.my-corp.response.v1",
data=event.data,
datacontenttype=event.datacontenttype,
)
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8001)
curl http://localhost:8001 -i -X POST -d "Hello World!" \
-H "Content-Type: text/plain" \
-H "ce-specversion: 1.0" \
-H "ce-type: my.request-type.v1" \
-H "ce-id: 123" \
-H "ce-source: my-source"
HTTP/1.1 200 OK
date: Fri, 05 Aug 2022 23:51:26 GMT
server: uvicorn
content-length: 247
content-type: application/json
{"data":"Hello World!","source":"http://localhost:8001/","id":"3412321f-85b3-4f7f-a551-f4c23a05de3a","type":"com.my-corp.response.v1","specversion":"1.0","time":"2022-08-05T23:51:26.878723+00:00","datacontenttype":"text/plain"}