Skip to content

Commit

Permalink
feat(pipeline): Add Firestore DatabaseClient (#12)
Browse files Browse the repository at this point in the history
* feat(pipeline): add Rating Model class

* build(pipeline): add jsonschema to requirements.txt to validate json schema in tests

* feat(pipeline): add Congestion model

* test(pipeline): check pydantic model export conforms to schema

* feat(pipeline): add DatabaseClient to interact with firebase database

* doc(readme): document pipeline need for GOOGLE_APPLICATION_CREDENTIALS

* ci(pipeline): add gcp auth needed for firestore integration tests
  • Loading branch information
mrzzy authored Oct 2, 2024
1 parent 4d3a9bf commit 2c1b9cd
Show file tree
Hide file tree
Showing 8 changed files with 263 additions and 7 deletions.
7 changes: 5 additions & 2 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ jobs:
pipeline:
strategy:
fail-fast: false
matrix:
target: [lint, test]
matrix:
target: [lint, test]
name: "${{ matrix.target }} Pipeline"
defaults:
run:
Expand All @@ -26,6 +26,9 @@ jobs:
cache: "pip"
- name: Install Pip dependencies
run: pip install -r requirements.txt
- uses: "google-github-actions/auth@v2"
with:
credentials_json: "${{ secrets.GCP_SERVICE_ACCOUNT_KEY }}"
- name: "${{ matrix.target }} Pipeline"
run: "make ${{ matrix.target }}"

Expand Down
24 changes: 21 additions & 3 deletions pipeline/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,28 +4,46 @@ Flowmotion's ML pipeline

## Setup

- Install `pip` modules

```sh
pip install -r requirements.txt
```

- Setup [Google Application Default credentials](https://cloud.google.com/docs/authentication#service-accounts),
which is needed to is needed to authenticate with the Firestore
(eg. by setting `GOOGLE_APPLICATION_CREDENTIALS` env var.) <a id="pipeline-credentials"></a>

## Usage

Running the ML PIpeline:

2. Run Pipeline

```sh
python pipeline.py
```

### Contributing
## Contributing

Before pushing, ensure that you:

1. Format & Lint code

```sh
black . && isort . && ruff .
```

3. Run tests
2. Run unit tests.

```sh
pytest -m "not integration"
```

3. Run integration tests. Requires [GCP credentials](#pipeline-credentials).

```sh
pytest
pytest -m integration
```

> A `makefile` is provided to make this easier. Just `make`.
2 changes: 1 addition & 1 deletion pipeline/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ def parse_cameras(meta: dict) -> list[Camera]:
retrieved_on = meta["timestamp"]
return [
Camera(
camera_id=c["camera_id"],
id=c["camera_id"],
retrieved_on=retrieved_on,
captured_on=c["timestamp"],
image_url=c["image"],
Expand Down
111 changes: 111 additions & 0 deletions pipeline/db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#
# Flowmotion
# Pipeline
# Firestore DB Client
#

from typing import Any, Iterable, Optional, cast

import firebase_admin
from firebase_admin import firestore
from google.cloud.firestore import DocumentReference
from pydantic import BaseModel

from model import to_json_dict


class DatabaseClient:
"""Firestore Database (DB) client"""

def __init__(self) -> None:
"""Creates a new Firestore DB client.
Uses Google Application Default credentials with authenticate DB requests.
See https://firebase.google.com/docs/admin/setup#initialize-sdk.
"""
app = firebase_admin.initialize_app()
self._db = firestore.client(app)

def insert(self, table: str, data: BaseModel) -> str:
"""
Inserts the given data into the specified table.
Args:
table: Name of Firestore collection to insert to.
data: Pydantic model to insert as a document.
Returns:
Key of the inserted firebase document.
"""
_, doc = self._db.collection(table).add(to_json_dict(data))
return _to_key(doc)

def update(self, table: str, key: str, data: BaseModel):
"""
Updates the given data into the specified table.
Args:
table: Name of Firestore collection to update to.
key: Key specifying the Firestore document to update.
data: Pydantic model to update Firestore document's contents
"""
self._db.collection(table).document(_doc_id(key)).set(to_json_dict(data))

def delete(self, table: str, key: str):
"""
Deletes the row (document) with key from the specified table.
Args:
table: Name of Firestore collection to delete from.
key: Key specifying the Firestore document to delete.
"""
self._db.collection(table).document(_doc_id(key)).delete()

def get(self, table: str, key: str) -> Optional[dict[str, Any]]:
"""
Retrieves the contents of the row (document) with key from the specified table.
Args:
table: Name of Firestore collection to delete from.
key: Key specifying the Firestore document to delete.
Returns:
Contents of matching document as dict or None if not such document exists.
"""
return self._db.collection(table).document(_doc_id(key)).get().to_dict()

def query(self, table: str, **params) -> Iterable[str]:
"""
Query keys of all rows (Firestore documents) on the specified table that match params.
Args:
table: Name of Firestore collection to query from.
params: Query parameters are given as document fields in in the format
<field>=(<operator>, <value>) where:
- <field> is a field path "<name>[__<subfield>...]" which refers the documents
<name> field (or <name>.<subfield> if optional sub field name is specified).
- <operator> is one of Firestore's supported operators.
See https://firebase.google.com/docs/firestore/query-data/queries
- <value> is used by the operator to find matching rows.
Example:
Get User rows with `name="john"`:
db = DatabaseClient()
users = db.query("Users", name=("==", "john"))
Returns:
Iterator of keys of matching rows in on the table.
"""
# build query by applying query params
collection = self._db.collection(table)
for field, (op, value) in params.items():
collection = collection.where(field.replace("__", "."), op, value)

for document in self._db.collection(table).list_documents():
yield _to_key(document)


def _to_key(ref: DocumentReference) -> str:
return cast(DocumentReference, ref).path


def _doc_id(key: str) -> str:
return key.split("/")[-1]
24 changes: 23 additions & 1 deletion pipeline/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
# Models
#

import json
from datetime import datetime

from pydantic import BaseModel
Expand All @@ -16,11 +17,32 @@ class Location(BaseModel):
latitude: float


class Rating(BaseModel):
"""Traffic Congestion rating performed by a model"""

rated_on: datetime
model_id: str
value: float


class Camera(BaseModel):
"""Traffic Camera capturing traffic images."""

camera_id: str
id: str
image_url: str
captured_on: datetime
retrieved_on: datetime
location: Location


class Congestion(BaseModel):
"""Traffic Congestion data."""

camera: Camera
rating: Rating
updated_on: datetime


def to_json_dict(model: BaseModel):
"""Convert given pydantic model into the its JSON dict representation"""
return json.loads(model.model_dump_json())
5 changes: 5 additions & 0 deletions pipeline/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
annotated-types==0.7.0
anyio==4.5.0
attrs==24.2.0
black==24.8.0
CacheControl==0.14.0
cachetools==5.5.0
Expand Down Expand Up @@ -29,6 +30,8 @@ httpx==0.27.2
idna==3.10
iniconfig==2.0.0
isort==5.13.2
jsonschema==4.23.0
jsonschema-specifications==2023.12.1
msgpack==1.1.0
mypy-extensions==1.0.0
packaging==24.1
Expand All @@ -46,7 +49,9 @@ PyJWT==2.9.0
pyparsing==3.1.4
pytest==8.3.3
pytest-asyncio==0.24.0
referencing==0.35.1
requests==2.32.3
rpds-py==0.20.0
rsa==4.9
ruff==0.6.6
sniffio==1.3.1
Expand Down
60 changes: 60 additions & 0 deletions pipeline/test_db.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
#
# Flowmotion
# Firestore DB Client
# Integration Test
#


# Usage:
# - Google Application Default credentials should be provided to authenticate
# with firestore eg. by setting GOOGLE_APPLICATION_CREDENTIALS env var.


import os
from uuid import uuid4

import pytest
from pydantic import BaseModel

from db import DatabaseClient
from model import to_json_dict


class Model(BaseModel):
field: str


@pytest.fixture(scope="session")
def db() -> DatabaseClient:
os.environ["GOOGLE_CLOUD_PROJECT"] = "flowmotion-4e268"
return DatabaseClient()


@pytest.fixture(scope="session")
def collection(db: DatabaseClient):
# unique collection name for testing
name = f"test_db_{uuid4()}"
yield name

# empty collection of any existing documents to cleanup test collection
collection = db._db.collection(name)
for document in collection.list_documents():
document.delete()


@pytest.fixture
def model() -> Model:
return Model(field="test")


@pytest.mark.integration
def test_db_insert_get_delete_query(db: DatabaseClient, collection: str, model: Model):
# test: insert model into collection
key = db.insert(table=collection, data=model)
assert len(key) > 0
# test: get by key
assert db.get(table=collection, key=key) == to_json_dict(model)
# test: query by field value
got_key = list(db.query(table=collection, field=("==", "test")))[0]
assert got_key == key
db.delete(collection, key)
37 changes: 37 additions & 0 deletions pipeline/test_model.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
#
# Flowmotion
# Models
# Unit Tests
#


import json
from datetime import datetime
from pathlib import Path

from jsonschema import validate

from model import Camera, Congestion, Location, Rating, to_json_dict

CONGESTION_SCHEMA = Path(__file__).parent.parent / "schema" / "congestion.schema.json"


def test_congestion_json():
congestion = Congestion(
camera=Camera(
id="1001",
image_url="https://images.data.gov.sg/api/traffic/1001.jpg",
captured_on=datetime(2024, 9, 27, 8, 30, 0),
retrieved_on=datetime(2024, 9, 27, 8, 31, 0),
location=Location(longitude=103.851959, latitude=1.290270),
),
rating=Rating(
rated_on=datetime(2024, 9, 27, 8, 32, 0), model_id="v1.0", value=0.75
),
updated_on=datetime(2024, 9, 27, 8, 33, 0),
)

with open(CONGESTION_SCHEMA, "r") as f:
schema = json.load(f)

validate(to_json_dict(congestion), schema)

0 comments on commit 2c1b9cd

Please sign in to comment.