Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add multiple triton backend #1314

Merged
merged 7 commits into from
Feb 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ livecheck:

log:
@echo "🥫 Reading logs (docker-compose) …"
${DOCKER_COMPOSE} logs -f api scheduler worker_high_1 worker_high_2 worker_low_1 worker_low_2
${DOCKER_COMPOSE} logs -f api scheduler worker_1 worker_2 worker_3 worker_4

#------------#
# Management #
Expand Down Expand Up @@ -152,10 +152,10 @@ init-elasticsearch:

launch-burst-worker:
ifdef queues
${DOCKER_COMPOSE} run --rm -d --no-deps worker_high_1 python -m robotoff run-worker ${queues} --burst
${DOCKER_COMPOSE} run --rm -d --no-deps worker_1 python -m robotoff run-worker ${queues} --burst
# Only launch burst worker on low priority queue if queue is not specified
else
${DOCKER_COMPOSE} run --rm -d --no-deps worker_high_1 python -m robotoff run-worker robotoff-low --burst
${DOCKER_COMPOSE} run --rm -d --no-deps worker_1 python -m robotoff run-worker robotoff-low --burst
endif

#------------#
Expand Down Expand Up @@ -204,26 +204,26 @@ health:
i18n-compile:
@echo "🥫 Compiling translations …"
# Note it's important to have --no-deps, to avoid launching a concurrent postgres instance
${DOCKER_COMPOSE} run --rm --entrypoint bash --no-deps worker_high_1 -c "cd i18n && . compile.sh"
${DOCKER_COMPOSE} run --rm --entrypoint bash --no-deps worker_1 -c "cd i18n && . compile.sh"

unit-tests:
@echo "🥫 Running tests …"
# run tests in worker to have more memory
# also, change project name to run in isolation
${DOCKER_COMPOSE_TEST} run --rm worker_high_1 poetry run pytest --cov-report xml --cov=robotoff tests/unit
${DOCKER_COMPOSE_TEST} run --rm worker_1 poetry run pytest --cov-report xml --cov=robotoff tests/unit

integration-tests:
@echo "🥫 Running integration tests …"
# run tests in worker to have more memory
# also, change project name to run in isolation
${DOCKER_COMPOSE_TEST} run --rm worker_high_1 poetry run pytest -vv --cov-report xml --cov=robotoff --cov-append tests/integration
${DOCKER_COMPOSE_TEST} run --rm worker_1 poetry run pytest -vv --cov-report xml --cov=robotoff --cov-append tests/integration
( ${DOCKER_COMPOSE_TEST} down -v || true )

# interactive testings
# usage: make pytest args='test/unit/my-test.py --pdb'
pytest: guard-args
@echo "🥫 Running test: ${args} …"
${DOCKER_COMPOSE_TEST} run --rm worker_high_1 poetry run pytest ${args}
${DOCKER_COMPOSE_TEST} run --rm worker_1 poetry run pytest ${args}

#------------#
# Production #
Expand Down
25 changes: 13 additions & 12 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -70,26 +70,27 @@ services:
- postgres
- redis

worker_high_1:
worker_1:
<<: *robotoff-worker
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_high_1
command: python -m robotoff run-worker robotoff-high-1
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_1
# Each worker listens to a single high priority queue and to the low priority queue.
# As the low priority queue comes last, it will only be processed if there are no high
# priority tasks.
command: python -m robotoff run-worker robotoff-high-1 robotoff-low

worker_high_2:
worker_2:
<<: *robotoff-worker
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_high_2
command: python -m robotoff run-worker robotoff-high-2
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_2
command: python -m robotoff run-worker robotoff-high-2 robotoff-low

worker_low_1:
worker_3:
<<: *robotoff-worker
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_low_1
# Each worker (whether it's a high or low priority worker) listens to a
# single high priority queue
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_3
command: python -m robotoff run-worker robotoff-high-3 robotoff-low

worker_low_2:
worker_4:
<<: *robotoff-worker
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_low_2
container_name: ${COMPOSE_PROJECT_NAME:-robotoff}_worker_4
command: python -m robotoff run-worker robotoff-high-4 robotoff-low

scheduler:
Expand Down
8 changes: 4 additions & 4 deletions docker/dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,13 @@ x-robotoff-dev: &robotoff-dev
services:
api:
<<: *robotoff-dev
worker_high_1:
worker_1:
<<: *robotoff-dev
worker_high_2:
worker_2:
<<: *robotoff-dev
worker_low_1:
worker_3:
<<: *robotoff-dev
worker_low_2:
worker_4:
<<: *robotoff-dev

scheduler:
Expand Down
89 changes: 80 additions & 9 deletions robotoff/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
import typer

from robotoff.types import (
NeuralCategoryClassifierModel,
ObjectDetectionModel,
PredictionType,
ProductIdentifier,
Expand Down Expand Up @@ -132,11 +131,11 @@
ServerType.off, help="Server type of the product"
),
deepest_only: bool = False,
model_name: NeuralCategoryClassifierModel = typer.Option(
NeuralCategoryClassifierModel.keras_image_embeddings_3_0,
help="name of the model to use",
),
threshold: Optional[float] = typer.Option(0.5, help="detection threshold to use"),
triton_uri: Optional[str] = typer.Option(
None,
help="URI of the Triton server to use. If not provided, the default value from settings is used.",
),
) -> None:
"""Predict product categories based on the neural category classifier.

Expand Down Expand Up @@ -164,7 +163,11 @@
predictions, _ = CategoryClassifier(
get_taxonomy(TaxonomyType.category.name, offline=True)
).predict(
product, product_id, deepest_only, threshold=threshold, model_name=model_name
product,
product_id,
deepest_only,
threshold=threshold,
triton_uri=triton_uri,
)

if predictions:
Expand Down Expand Up @@ -357,6 +360,70 @@
)


@app.command()
def run_category_prediction(
triton_uri: Optional[str] = typer.Option(
None,
help="URI of the Triton Inference Server to use. If not provided, the default value from settings is used.",
),
limit: Optional[int] = typer.Option(
None, help="Maximum numbers of job to launch (default: all)"
),
):
"""Launch category prediction jobs on all products without categories in
DB."""
import tqdm
from openfoodfacts.dataset import ProductDataset

Check warning on line 376 in robotoff/cli/main.py

View check run for this annotation

Codecov / codecov/patch

robotoff/cli/main.py#L375-L376

Added lines #L375 - L376 were not covered by tests

from robotoff.models import Prediction, db
from robotoff.settings import DATASET_DIR
from robotoff.utils import get_logger
from robotoff.workers.queues import enqueue_job, low_queue
from robotoff.workers.tasks.product_updated import add_category_insight_job

Check warning on line 382 in robotoff/cli/main.py

View check run for this annotation

Codecov / codecov/patch

robotoff/cli/main.py#L378-L382

Added lines #L378 - L382 were not covered by tests

logger = get_logger()

Check warning on line 384 in robotoff/cli/main.py

View check run for this annotation

Codecov / codecov/patch

robotoff/cli/main.py#L384

Added line #L384 was not covered by tests
# Download the latest dump of the dataset, cache it in DATASET_DIR
ds = ProductDataset(force_download=True, download_newer=True, cache_dir=DATASET_DIR)

Check warning on line 386 in robotoff/cli/main.py

View check run for this annotation

Codecov / codecov/patch

robotoff/cli/main.py#L386

Added line #L386 was not covered by tests

# The category detector only works for food products
server_type = ServerType.off

Check warning on line 389 in robotoff/cli/main.py

View check run for this annotation

Codecov / codecov/patch

robotoff/cli/main.py#L389

Added line #L389 was not covered by tests

logger.info("Fetching products without categories in DB...")
with db:
barcode_with_categories = set(

Check warning on line 393 in robotoff/cli/main.py

View check run for this annotation

Codecov / codecov/patch

robotoff/cli/main.py#L391-L393

Added lines #L391 - L393 were not covered by tests
barcode
for (barcode,) in Prediction.select(Prediction.barcode)
.distinct()
.where(
Prediction.server_type == server_type.name,
Prediction.type == PredictionType.category.name,
)
.tuples()
.limit(limit)
)
logger.info(

Check warning on line 404 in robotoff/cli/main.py

View check run for this annotation

Codecov / codecov/patch

robotoff/cli/main.py#L404

Added line #L404 was not covered by tests
"%d products with categories already in DB", len(barcode_with_categories)
)
seen: set[str] = set()
added = 0
for product in tqdm.tqdm(ds, desc="products"):
barcode = product.get("code")
if not barcode or barcode in seen or barcode in barcode_with_categories:
continue
seen.add(barcode)

Check warning on line 413 in robotoff/cli/main.py

View check run for this annotation

Codecov / codecov/patch

robotoff/cli/main.py#L407-L413

Added lines #L407 - L413 were not covered by tests
# Enqueue a job to predict category for this product
enqueue_job(

Check warning on line 415 in robotoff/cli/main.py

View check run for this annotation

Codecov / codecov/patch

robotoff/cli/main.py#L415

Added line #L415 was not covered by tests
add_category_insight_job,
low_queue,
job_kwargs={"result_ttl": 0},
product_id=ProductIdentifier(barcode, server_type),
triton_uri=triton_uri,
)
added += 1

Check warning on line 422 in robotoff/cli/main.py

View check run for this annotation

Codecov / codecov/patch

robotoff/cli/main.py#L422

Added line #L422 was not covered by tests

logger.info("%d jobs added", added)

Check warning on line 424 in robotoff/cli/main.py

View check run for this annotation

Codecov / codecov/patch

robotoff/cli/main.py#L424

Added line #L424 was not covered by tests


@app.command()
def run_object_detection_model(
server_type: ServerType = typer.Option(
Expand All @@ -375,6 +442,10 @@
"for the specified model.",
),
limit: Optional[int] = typer.Option(None, help="Maximum numbers of job to launch"),
triton_uri: Optional[str] = typer.Option(
None,
help="URI of the Triton Inference Server to use. If not provided, the default value from settings is used.",
),
):
"""Launch object detection model jobs on all missing images (images
without an ImagePrediction item for this model) in DB."""
Expand Down Expand Up @@ -415,7 +486,7 @@
else:
with db:
query = (
ImageModel.select(ImageModel.barcode, ImageModel.id)
ImageModel.select(ImageModel.barcode, ImageModel.image_id)
.join(
ImagePrediction,
JOIN.LEFT_OUTER,
Expand All @@ -425,8 +496,7 @@
),
)
.where(
ImageModel.server_type
== server_type.name
(ImageModel.server_type == server_type.name)
& ImagePrediction.model_name.is_null()
& (ImageModel.deleted == False), # noqa: E712
)
Expand All @@ -451,6 +521,7 @@
job_kwargs={"result_ttl": 0},
product_id=ProductIdentifier(barcode, server_type),
image_url=image_url,
triton_uri=triton_uri,
)


Expand Down
8 changes: 6 additions & 2 deletions robotoff/insights/extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ def run_object_detection_model(
image_model: ImageModel,
threshold: float = 0.1,
return_null_if_exist: bool = True,
triton_uri: str | None = None,
) -> Optional[ImagePrediction]:
"""Run a model detection model and save the results in the
`image_prediction` table.
Expand All @@ -67,7 +68,10 @@ def run_object_detection_model(
`image` table)
:param threshold: the minimum object score above which we keep the object
data

:param return_null_if_exist: if True, return None if the image prediction
already exists in DB
:param triton_uri: URI of the Triton Inference Server, defaults to
None. If not provided, the default value from settings is used.
:return: return None if the image does not exist in DB, or the created
`ImagePrediction` otherwise
"""
Expand All @@ -82,7 +86,7 @@ def run_object_detection_model(

timestamp = datetime.datetime.utcnow()
results = ObjectDetectionModelRegistry.get(model_name.value).detect_from_image(
image, output_image=False
image, output_image=False, triton_uri=triton_uri
)
data = results.to_json(threshold=threshold)
max_confidence = max((item["score"] for item in data), default=None)
Expand Down
10 changes: 6 additions & 4 deletions robotoff/prediction/category/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
from typing import Optional

from robotoff.taxonomy import TaxonomyType, get_taxonomy
from robotoff.types import JSONType, NeuralCategoryClassifierModel, ProductIdentifier

Expand All @@ -10,9 +8,10 @@ def predict_category(
product: dict,
product_id: ProductIdentifier,
deepest_only: bool,
threshold: Optional[float] = None,
neural_model_name: Optional[NeuralCategoryClassifierModel] = None,
threshold: float | None = None,
neural_model_name: NeuralCategoryClassifierModel | None = None,
clear_cache: bool = False,
triton_uri: str | None = None,
) -> JSONType:
"""Predict categories for a product using neural model.

Expand All @@ -30,6 +29,8 @@ def predict_category(
prediction
:param clear_cache: if True, clear ingredient processing cache of neural
model before returning results
:param triton_uri: URI of the Triton Inference Server, defaults to
None. If not provided, the default value from settings is used.
"""
taxonomy = get_taxonomy(TaxonomyType.category.name)
predictions, debug = CategoryClassifier(taxonomy).predict(
Expand All @@ -39,6 +40,7 @@ def predict_category(
threshold,
neural_model_name,
clear_cache=clear_cache,
triton_uri=triton_uri,
)
return {
"neural": {
Expand Down
11 changes: 7 additions & 4 deletions robotoff/prediction/category/neural/category_classifier.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Any, Optional
from typing import Any

import numpy as np

Expand Down Expand Up @@ -66,9 +66,10 @@ def predict(
product: dict,
product_id: ProductIdentifier,
deepest_only: bool = False,
threshold: Optional[float] = None,
model_name: Optional[NeuralCategoryClassifierModel] = None,
threshold: float | None = None,
model_name: NeuralCategoryClassifierModel | None = None,
clear_cache: bool = False,
triton_uri: str | None = None,
) -> tuple[list[Prediction], JSONType]:
"""Return an unordered list of category predictions for the given
product and additional debug information.
Expand Down Expand Up @@ -122,6 +123,8 @@ def predict(
default.
:param clear_cache: if True, clear ingredient processing cache before
returning results
:param triton_uri: URI of the Triton Inference Server, defaults to
None. If not provided, the default value from settings is used.
"""
logger.debug("predicting category with model %s", model_name)

Expand All @@ -142,7 +145,7 @@ def predict(
)

# Only generate image embeddings if it's required by the model
triton_stub = get_triton_inference_stub()
triton_stub = get_triton_inference_stub(triton_uri)

# We check whether image embeddings were provided as input
if "image_embeddings" in product:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from typing import Literal, Optional

import numpy as np
Expand Down Expand Up @@ -168,9 +169,15 @@
)

if non_null_image_by_ids:
start_time = time.monotonic()

Check warning on line 172 in robotoff/prediction/category/neural/keras_category_classifier_3_0/__init__.py

View check run for this annotation

Codecov / codecov/patch

robotoff/prediction/category/neural/keras_category_classifier_3_0/__init__.py#L172

Added line #L172 was not covered by tests
computed_embeddings_by_id = _generate_image_embeddings(
non_null_image_by_ids, stub
)
logger.debug(

Check warning on line 176 in robotoff/prediction/category/neural/keras_category_classifier_3_0/__init__.py

View check run for this annotation

Codecov / codecov/patch

robotoff/prediction/category/neural/keras_category_classifier_3_0/__init__.py#L176

Added line #L176 was not covered by tests
"Computed %d embeddings in %.2f seconds",
len(computed_embeddings_by_id),
time.monotonic() - start_time,
)
# Make sure all image IDs are in image table
refresh_images_in_db(product_id, product.get("images", {}))
# Save embeddings in embeddings.image_embeddings table for
Expand Down Expand Up @@ -267,7 +274,9 @@

inputs = generate_inputs_dict(product, ocr_texts, image_embeddings)
debug = generate_debug_dict(model_name, threshold, inputs)
start_time = time.monotonic()
scores, labels = _predict(inputs, model_name, stub)
logger.debug("Predicted categories in %.2f seconds", time.monotonic() - start_time)
indices = np.argsort(-scores)

category_predictions: list[tuple[str, float, Optional[NeighborPredictionType]]] = []
Expand Down
Loading
Loading