Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/mtchbx 45 #24

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ dependencies = [

[project.optional-dependencies]
server = [
"boto3>=1.35.79",
"fastapi[standard]>=0.115.0,<0.116.0",
"pg-bulk-ingest>=0.0.54",
"tomli>=2.0.1",
Expand Down
69 changes: 58 additions & 11 deletions src/matchbox/server/api.py
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
from binascii import hexlify
from enum import StrEnum
from typing import Annotated

from dotenv import find_dotenv, load_dotenv
from fastapi import Depends, FastAPI, HTTPException
from fastapi import Depends, FastAPI, Form, HTTPException, UploadFile
from pydantic import BaseModel

from matchbox.common.graph import ResolutionGraph
from matchbox.server.base import BackendManager, MatchboxDBAdapter
from matchbox.server.utils.s3 import upload_to_s3

dotenv_path = find_dotenv(usecwd=True)
load_dotenv(dotenv_path)


app = FastAPI(
title="matchbox API",
version="0.2.0",
Expand Down Expand Up @@ -45,6 +46,21 @@ class CountResult(BaseModel):
entities: dict[BackendEntityType, int]


class SourceItem(BaseModel):
"""Response model for source"""

schema: str
table: str
id: str
resolution: str | None = None


class Sources(BaseModel):
"""Response model for sources"""

sources: list[SourceItem]
Comment on lines +49 to +61
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we start moving this stuff to common?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below. I think my favoured approach is separate Response* classes for now?

Comment on lines +58 to +61
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need a separate pydantic object? Can't we just return list[Sources]?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sources requires a warehouse, and therefore a valid engine, and it doesn't expose the resolution hash (though it could). Two solutions:

  • SourceBase with the common fields, then Source and SourceResponse as subclasses with connection-enabled fields in one, and the resolution hash in the other
  • Keep response models separate to common -- they need structures that aren't relevant to the common objects



def get_backend() -> MatchboxDBAdapter:
return BackendManager.get_backend()

Expand Down Expand Up @@ -76,18 +92,49 @@ async def clear_backend():


@app.get("/sources")
async def list_sources():
raise HTTPException(status_code=501, detail="Not implemented")
async def list_sources(
backend: Annotated[MatchboxDBAdapter, Depends(get_backend)],
) -> Sources:
datasets = backend.datasets.list()
result = []
for dataset in datasets:
result.append(
SourceItem(
table=dataset.table,
id=dataset.id,
schema=dataset.schema,
resolution=hexlify(dataset.resolution).decode("ascii"),
)
)
return Sources(sources=result)


@app.get("/sources/{hash}")
async def get_source(hash: str):
raise HTTPException(status_code=501, detail="Not implemented")


@app.post("/sources/{hash}")
async def add_source(hash: str):
raise HTTPException(status_code=501, detail="Not implemented")
async def get_source(
hash: str, backend: Annotated[MatchboxDBAdapter, Depends(get_backend)]
) -> dict[str, SourceItem] | str:
datasets = backend.datasets.list()
for dataset in datasets:
resolution = hexlify(dataset.resolution).decode("ascii")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we use base64?

if resolution == hash:
result_obj = SourceItem(
table=dataset.table,
id=dataset.id,
schema=dataset.schema,
resolution=resolution,
)
return {"source": result_obj}
return "Source not found"
Comment on lines +126 to +127
Copy link
Collaborator

@lmazz1-dbt lmazz1-dbt Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if the source is not found we need to return a 404, and use a response with a proper schema, probably a codifier error message. And it is found, let's just return the object, no need to wrap it in a dict.



@app.post("/sources/uploadFile")
Copy link
Collaborator

@lmazz1-dbt lmazz1-dbt Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

camelCase? Are we sure? We should also follow the endpoint convention we'd set

async def add_source_to_s3(
file: UploadFile, bucket_name: str = Form(...), object_name: str = Form(...)
Copy link
Collaborator

@lmazz1-dbt lmazz1-dbt Dec 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we shouldn't allow specifying a bucket name, nor an object name (directly)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a quick look at the docs, they're using annotations for form parameters
https://fastapi.tiangolo.com/tutorial/request-forms/

):
is_file_uploaded = upload_to_s3(file.file, bucket_name, object_name)
if is_file_uploaded:
return "File was successfully uplaoded"
return "File could not be uplaoded"
Comment on lines +135 to +137
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP status codes please, and Pydantic schemas for the response



@app.get("/models")
Expand Down
Empty file.
28 changes: 28 additions & 0 deletions src/matchbox/server/utils/s3.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
from typing import BinaryIO

import boto3
from botocore.exceptions import NoCredentialsError, PartialCredentialsError


def upload_to_s3(
file: BinaryIO, bucket_name: str, object_name: str | None = None
) -> bool:
"""
Upload a file to an S3 bucket.

Args:
file (BinaryIO): File to upload.
bucket_name (str): Target S3 bucket.
object_name (str): S3 object name. If not specified, file_name is used.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's file_name?


Returns:
bool: True if the file was uploaded, else False.
"""
s3_client = boto3.client("s3")
try:
s3_client.upload_fileobj(file, bucket_name, object_name)
return True
except NoCredentialsError:
return False
except PartialCredentialsError:
return False
40 changes: 34 additions & 6 deletions test/server/test_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
ResolutionGraph,
)
from matchbox.server import app
from matchbox.server.postgresql.orm import Sources

client = TestClient(app)

Expand Down Expand Up @@ -51,13 +52,40 @@ def test_count_backend_item(self, get_backend):
# response = client.post("/testing/clear")
# assert response.status_code == 200

# def test_list_sources():
# response = client.get("/sources")
# assert response.status_code == 200
@patch("matchbox.server.base.BackendManager.get_backend")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests will need to be updated to reflect requested changes in the API

def test_list_sources(self, get_backend):
hash_hex = "5eb63bbbe01eeed093cb22bb8f5acdc3"
byte_arr = bytearray.fromhex(hash_hex)
obj_mock = Sources(
table="mock table", schema="mock_schema", id="mock_id", resolution=byte_arr
)
mock_backend = Mock()
mock_backend.datasets.list = Mock(return_value=[obj_mock])
get_backend.return_value = mock_backend
response = client.get("/sources")
assert response.status_code == 200

# def test_get_source():
# response = client.get("/sources/test_source")
# assert response.status_code == 200
@patch("matchbox.server.base.BackendManager.get_backend")
def test_get_source(self, get_backend):
hash_hex = "5eb63bbbe01eeed093cb22bb8f5acdc3"
byte_arr = bytearray.fromhex(hash_hex)
obj_mock = Sources(
table="mock_table", schema="mock_schema", id="mock_id", resolution=byte_arr
)
mock_backend = Mock()
mock_backend.datasets.list = Mock(return_value=[obj_mock])
get_backend.return_value = mock_backend

response = client.get(f"/sources/{hash_hex}")
assert response.status_code == 200
assert response.json() == {
"source": {
"schema": "mock_schema",
"table": "mock_table",
"id": "mock_id",
"resolution": hash_hex,
}
}

# def test_add_source():
# response = client.post("/sources")
Expand Down
Loading
Loading