Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipeline): Add Firestore DatabaseClient #12

Merged
merged 7 commits into from
Oct 2, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ jobs:
pipeline:
strategy:
fail-fast: false
matrix:
target: [lint, test]
matrix:
target: [lint, test]
name: "${{ matrix.target }} Pipeline"
defaults:
run:
Expand All @@ -26,6 +26,9 @@ jobs:
cache: "pip"
- name: Install Pip dependencies
run: pip install -r requirements.txt
- uses: "google-github-actions/auth@v2"
with:
credentials_json: "${{ secrets.GCP_SERVICE_ACCOUNT_KEY }}"
- name: "${{ matrix.target }} Pipeline"
run: "make ${{ matrix.target }}"

Expand Down
24 changes: 21 additions & 3 deletions pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,46 @@ Flowmotion's ML pipeline

## Setup

- Install `pip` modules

```sh
pip install -r requirements.txt
```

- Setup [Google Application Default credentials](https://cloud.google.com/docs/authentication#service-accounts),
which is needed to is needed to authenticate with the Firestore
(eg. by setting `GOOGLE_APPLICATION_CREDENTIALS` env var.) <a id="pipeline-credentials"></a>

## Usage

Running the ML PIpeline:

2. Run Pipeline

```sh
python pipeline.py
```

### Contributing
## Contributing

Before pushing, ensure that you:

1. Format & Lint code

```sh
black . && isort . && ruff .
```

3. Run tests
2. Run unit tests.

```sh
pytest -m "not integration"
```

3. Run integration tests. Requires [GCP credentials](#pipeline-credentials).

```sh
pytest
pytest -m integration
```

> A `makefile` is provided to make this easier. Just `make`.
2 changes: 1 addition & 1 deletion pipeline/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def parse_cameras(meta: dict) -> list[Camera]:
retrieved_on = meta["timestamp"]
return [
Camera(
camera_id=c["camera_id"],
id=c["camera_id"],
retrieved_on=retrieved_on,
captured_on=c["timestamp"],
image_url=c["image"],
Expand Down
111 changes: 111 additions & 0 deletions pipeline/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#
# Flowmotion
# Pipeline
# Firestore DB Client
#

from typing import Any, Iterable, Optional, cast

import firebase_admin
from firebase_admin import firestore
from google.cloud.firestore import DocumentReference
from pydantic import BaseModel

from model import to_json_dict


class DatabaseClient:
"""Firestore Database (DB) client"""

def __init__(self) -> None:
"""Creates a new Firestore DB client.
Uses Google Application Default credentials with authenticate DB requests.
See https://firebase.google.com/docs/admin/setup#initialize-sdk.
"""
app = firebase_admin.initialize_app()
self._db = firestore.client(app)

def insert(self, table: str, data: BaseModel) -> str:
"""
Inserts the given data into the specified table.
Args:
table: Name of Firestore collection to insert to.
data: Pydantic model to insert as a document.
Returns:
Key of the inserted firebase document.
"""
_, doc = self._db.collection(table).add(to_json_dict(data))
return _to_key(doc)

def update(self, table: str, key: str, data: BaseModel):
"""
Updates the given data into the specified table.
Args:
table: Name of Firestore collection to update to.
key: Key specifying the Firestore document to update.
data: Pydantic model to update Firestore document's contents
"""
self._db.collection(table).document(_doc_id(key)).set(to_json_dict(data))

def delete(self, table: str, key: str):
"""
Deletes the row (document) with key from the specified table.
Args:
table: Name of Firestore collection to delete from.
key: Key specifying the Firestore document to delete.
"""
self._db.collection(table).document(_doc_id(key)).delete()

def get(self, table: str, key: str) -> Optional[dict[str, Any]]:
"""
Retrieves the contents of the row (document) with key from the specified table.
Args:
table: Name of Firestore collection to delete from.
key: Key specifying the Firestore document to delete.
Returns:
Contents of matching document as dict or None if not such document exists.
"""
return self._db.collection(table).document(_doc_id(key)).get().to_dict()

def query(self, table: str, **params) -> Iterable[str]:
"""
Query keys of all rows (Firestore documents) on the specified table that match params.
Args:
table: Name of Firestore collection to query from.
params: Query parameters are given as document fields in in the format
<field>=(<operator>, <value>) where:
- <field> is a field path "<name>[__<subfield>...]" which refers the documents
<name> field (or <name>.<subfield> if optional sub field name is specified).
- <operator> is one of Firestore's supported operators.
See https://firebase.google.com/docs/firestore/query-data/queries
- <value> is used by the operator to find matching rows.
Example:
Get User rows with `name="john"`:
db = DatabaseClient()
users = db.query("Users", name=("==", "john"))
Returns:
Iterator of keys of matching rows in on the table.
"""
# build query by applying query params
collection = self._db.collection(table)
for field, (op, value) in params.items():
collection = collection.where(field.replace("__", "."), op, value)

for document in self._db.collection(table).list_documents():
yield _to_key(document)


def _to_key(ref: DocumentReference) -> str:
return cast(DocumentReference, ref).path


def _doc_id(key: str) -> str:
return key.split("/")[-1]
24 changes: 23 additions & 1 deletion pipeline/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Models
#

import json
from datetime import datetime

from pydantic import BaseModel
Expand All @@ -16,11 +17,32 @@ class Location(BaseModel):
latitude: float


class Rating(BaseModel):
"""Traffic Congestion rating performed by a model"""

rated_on: datetime
model_id: str
value: float


class Camera(BaseModel):
"""Traffic Camera capturing traffic images."""

camera_id: str
id: str
image_url: str
captured_on: datetime
retrieved_on: datetime
location: Location


class Congestion(BaseModel):
"""Traffic Congestion data."""

camera: Camera
rating: Rating
updated_on: datetime


def to_json_dict(model: BaseModel):
"""Convert given pydantic model into the its JSON dict representation"""
return json.loads(model.model_dump_json())
5 changes: 5 additions & 0 deletions pipeline/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
annotated-types==0.7.0
anyio==4.5.0
attrs==24.2.0
black==24.8.0
CacheControl==0.14.0
cachetools==5.5.0
Expand Down Expand Up @@ -29,6 +30,8 @@ httpx==0.27.2
idna==3.10
iniconfig==2.0.0
isort==5.13.2
jsonschema==4.23.0
jsonschema-specifications==2023.12.1
msgpack==1.1.0
mypy-extensions==1.0.0
packaging==24.1
Expand All @@ -46,7 +49,9 @@ PyJWT==2.9.0
pyparsing==3.1.4
pytest==8.3.3
pytest-asyncio==0.24.0
referencing==0.35.1
requests==2.32.3
rpds-py==0.20.0
rsa==4.9
ruff==0.6.6
sniffio==1.3.1
Expand Down
60 changes: 60 additions & 0 deletions pipeline/test_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Flowmotion
# Firestore DB Client
# Integration Test
#


# Usage:
# - Google Application Default credentials should be provided to authenticate
# with firestore eg. by setting GOOGLE_APPLICATION_CREDENTIALS env var.


import os
from uuid import uuid4

import pytest
from pydantic import BaseModel

from db import DatabaseClient
from model import to_json_dict


class Model(BaseModel):
field: str


@pytest.fixture(scope="session")
def db() -> DatabaseClient:
os.environ["GOOGLE_CLOUD_PROJECT"] = "flowmotion-4e268"
return DatabaseClient()


@pytest.fixture(scope="session")
def collection(db: DatabaseClient):
# unique collection name for testing
name = f"test_db_{uuid4()}"
yield name

# empty collection of any existing documents to cleanup test collection
collection = db._db.collection(name)
for document in collection.list_documents():
document.delete()


@pytest.fixture
def model() -> Model:
return Model(field="test")


@pytest.mark.integration
def test_db_insert_get_delete_query(db: DatabaseClient, collection: str, model: Model):
# test: insert model into collection
key = db.insert(table=collection, data=model)
assert len(key) > 0
# test: get by key
assert db.get(table=collection, key=key) == to_json_dict(model)
# test: query by field value
got_key = list(db.query(table=collection, field=("==", "test")))[0]
assert got_key == key
db.delete(collection, key)
37 changes: 37 additions & 0 deletions pipeline/test_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Flowmotion
# Models
# Unit Tests
#


import json
from datetime import datetime
from pathlib import Path

from jsonschema import validate

from model import Camera, Congestion, Location, Rating, to_json_dict

CONGESTION_SCHEMA = Path(__file__).parent.parent / "schema" / "congestion.schema.json"


def test_congestion_json():
congestion = Congestion(
camera=Camera(
id="1001",
image_url="https://images.data.gov.sg/api/traffic/1001.jpg",
captured_on=datetime(2024, 9, 27, 8, 30, 0),
retrieved_on=datetime(2024, 9, 27, 8, 31, 0),
location=Location(longitude=103.851959, latitude=1.290270),
),
rating=Rating(
rated_on=datetime(2024, 9, 27, 8, 32, 0), model_id="v1.0", value=0.75
),
updated_on=datetime(2024, 9, 27, 8, 33, 0),
)

with open(CONGESTION_SCHEMA, "r") as f:
schema = json.load(f)

validate(to_json_dict(congestion), schema)