Skip to content

Commit

Permalink
WIP: Implement object storage reference implementation.
Browse files Browse the repository at this point in the history
  • Loading branch information
csadorf committed Jun 17, 2022
1 parent 2de7297 commit 769b301
Show file tree
Hide file tree
Showing 7 changed files with 519 additions and 13 deletions.
4 changes: 4 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,13 @@ repos:
- id: mypy
language_version: '3.10'
additional_dependencies:
- aiosqlite==0.17.0
- click==8.1.3
- databases==0.6.0
- fastapi==0.75.2
- pydantic==1.9.0
- pytest==7.1.2
- sqlalchemy2-stubs==0.0.2a24
- types-requests==2.27.27
- types-ujson==4.2.1
args: [--config-file=pyproject.toml]
Expand All @@ -73,6 +76,7 @@ repos:
pass_filenames: false
additional_dependencies:
- click==8.1.3
- databases==0.6.0
- email-validator==1.2.1
- fastapi==0.75.2
- python-multipart==0.0.5
Expand Down
141 changes: 129 additions & 12 deletions marketplace_standard_app_api/main.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import os
import sqlite3
import uuid
from pathlib import Path
from typing import Any, Callable, Dict, Optional, Union

import databases
import requests
import sqlalchemy
from fastapi import Depends, FastAPI, HTTPException, Request, UploadFile
from fastapi.responses import HTMLResponse, Response

Expand All @@ -22,9 +28,49 @@
TransformationUpdateModel,
TransformationUpdateResponse,
)
from .reference import object_storage
from .reference.common import metadata
from .security import AuthTokenBearer
from .version import __version__

# Standard approach to enabling foreign key support for sqlite3, however since
# we use the async databases library, we need to use a custom Connection object
# as implemented in the get_database() function below.
# See also: https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#foreign-key-support
# @event.listens_for(Engine, "connect")
# def set_sqlite_pragma(dbapi_connection, connection_record):
# print("SET SQLITE PRAGMA")
# cursor = dbapi_connection.cursor()
# cursor.execute("PRAGMA foreign_keys=ON")
# cursor.close()


DATABASE_URL = os.environ.get("DATABASE_URL", "sqlite:///./app.db")
DATA_DIR = Path.cwd() / "data"

database = None
engine = None


def get_database() -> databases.Database:
"Get the database connection."
global database, engine
if database is None:

# Work-around for sqlite3 due to a limitation in encode/databases
# and aioqslite: https://github.com/encode/databases/issues/169
class Connection(sqlite3.Connection):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.execute("PRAGMA foreign_keys=ON")

database = databases.Database(DATABASE_URL, factory=Connection)

engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
return database


async def catch_authentication_request_errors_middleware(
request: Request, call_next: Callable
Expand All @@ -38,6 +84,9 @@ async def catch_authentication_request_errors_middleware(
raise


auth_token_bearer = AuthTokenBearer()


class MarketPlaceAPI(FastAPI):
def openapi(self) -> Dict[str, Any]:
openapi_schema = super().openapi()
Expand All @@ -56,11 +105,23 @@ def openapi(self) -> Dict[str, Any]:
"email": "[email protected]",
},
license_info={"name": "MIT", "url": "https://opensource.org/licenses/MIT"},
dependencies=[Depends(AuthTokenBearer())],
dependencies=[Depends(auth_token_bearer)],
)
api.middleware("http")(catch_authentication_request_errors_middleware)


@api.on_event("startup")
async def startup():
database = get_database()
metadata.create_all(engine)
await database.connect()


@api.on_event("shutdown")
async def shutdown():
await get_database().disconnect()


@api.get(
"/",
operation_id="frontend",
Expand Down Expand Up @@ -131,7 +192,8 @@ async def list_collections(
limit: int = 100, offset: int = 0
) -> Union[CollectionListResponse, Response]:
"""List all collections."""
raise HTTPException(status_code=501, detail="Not implemented.")
collections = await object_storage.list_collections(get_database(), limit, offset)
return collections or Response(status_code=204)


@api.get(
Expand All @@ -152,7 +214,21 @@ async def list_datasets(
collection_name: CollectionName, limit: int = 100, offset: int = 0
) -> Union[DatasetListResponse, Response]:
"""List all datasets."""
raise HTTPException(status_code=501, detail="Not implemented.")
try:
datasets, headers = await object_storage.list_datasets(
get_database(), collection_name, limit, offset
)
if datasets:
return Response(
content="{{ {} }}".format(
",".join([dataset.json() for dataset in datasets])
),
headers=headers,
)
else:
return Response(status_code=204, headers=headers)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Collection not found.")


CREATE_COLLECTION_DESCRIPTION = """
Expand Down Expand Up @@ -210,7 +286,14 @@ async def create_collection(
request: Request, collection_name: CollectionName = None
) -> Response:
"""Create a new or replace an existing collection."""
raise HTTPException(status_code=501, detail="Not implemented.")
# TODO: Support updates.
if collection_name is None:
collection_name = CollectionName(str(uuid.uuid4()))

await object_storage.create_collection(
get_database(), collection_name, request.headers
)
return Response(status_code=201, content=collection_name)


@api.head(
Expand All @@ -231,7 +314,13 @@ async def create_collection(
)
async def get_collection_metadata(collection_name: CollectionName) -> Response:
"""Get the metadata for a collection."""
raise HTTPException(status_code=501, detail="Not implemented.")
try:
headers = await object_storage.get_collection_metadata_headers(
get_database(), collection_name
)
return Response(status_code=204, headers=headers)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Collection not found.")


@api.delete(
Expand All @@ -254,7 +343,11 @@ async def get_collection_metadata(collection_name: CollectionName) -> Response:
)
async def delete_collection(collection_name: CollectionName) -> Response:
"""Delete an empty collection."""
raise HTTPException(status_code=501, detail="Not implemented.")
try:
await object_storage.delete_collection(get_database(), collection_name)
return Response(status_code=204, content="Collection has been deleted.")
except object_storage.ConflictError as error:
raise HTTPException(status_code=409, detail=str(error))


CREATE_DATASET_DESCRIPTION = """
Expand Down Expand Up @@ -309,7 +402,18 @@ async def create_dataset(
dataset_name: Optional[DatasetName] = None,
) -> Union[DatasetCreateResponse, Response]:
"""Create a new or replace an existing dataset."""
raise HTTPException(status_code=501, detail="Not implemented.")
if dataset_name is None:
dataset_name = DatasetName(str(uuid.uuid4()))

await object_storage.create_dataset(
get_database(),
DATA_DIR,
collection_name,
dataset_name,
file,
dict(request.headers),
)
return Response(status_code=201, content=dataset_name)


@api.post(
Expand Down Expand Up @@ -390,8 +494,13 @@ async def get_dataset_metadata(
storage API:
https://docs.openstack.org/api-ref/object-store/index.html#show-object-metadata
"""
# return Response(content=None, headers={"X-Object-Meta-my-key": "some-value"})
raise HTTPException(status_code=501, detail="Not implemented.")
try:
headers = await object_storage.get_dataset_metadata_headers(
get_database(), collection_name, dataset_name
)
return Response(status_code=200, headers=headers)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Not found.")


@api.get(
Expand Down Expand Up @@ -433,8 +542,13 @@ async def get_dataset(
storage API:
https://docs.openstack.org/api-ref/object-store/index.html#get-object-content-and-metadata
"""
# return Response(content=data, headers={"X-Object-Meta-my-key": "some-value"})
raise HTTPException(status_code=501, detail="Not implemented.")
try:
content, headers = await object_storage.get_dataset(
get_database(), DATA_DIR, collection_name, dataset_name
)
return Response(content=content, headers=headers)
except FileNotFoundError:
raise HTTPException(status_code=404, detail="Not found.")


@api.delete(
Expand All @@ -459,7 +573,10 @@ async def delete_dataset(
storage API:
https://docs.openstack.org/api-ref/object-store/index.html#delete-object
"""
raise HTTPException(status_code=501, detail="Not implemented.")
await object_storage.delete_dataset(
get_database(), DATA_DIR, collection_name, dataset_name
)
return Response(status_code=204)


@api.post(
Expand Down
Empty file.
3 changes: 3 additions & 0 deletions marketplace_standard_app_api/reference/common.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
import sqlalchemy

metadata = sqlalchemy.MetaData()
Loading

0 comments on commit 769b301

Please sign in to comment.