diff --git a/robotoff/app/api.py b/robotoff/app/api.py index 4cf3c5eaa7..5daf219fb3 100644 --- a/robotoff/app/api.py +++ b/robotoff/app/api.py @@ -92,6 +92,8 @@ GoogleBatchJobConfig, BatchExtraction, GoogleStorageBucketForBatchJob, + generate_predictions_from_batch, + ) logger = get_logger() @@ -1762,7 +1764,7 @@ def on_get(self, req: falcon.Request, resp: falcon.Response): resp.status = falcon.HTTP_200 -class BatchJobResource: +class BatchJobLaunchResource: def on_post(self, req: falcon.Request, resp: falcon.Response): job_type_str: str = req.get_param("job_type", required=True) @@ -1779,7 +1781,6 @@ def on_post(self, req: falcon.Request, resp: falcon.Response): ) if not BatchExtraction.extracted_file_path: raise ValueError("The extracted file was not found.") - bucket_handler = GoogleStorageBucketForBatchJob.from_job_type(job_type) bucket_handler.upload_file(file_path=BatchExtraction.extracted_file_path) @@ -1789,6 +1790,27 @@ def on_post(self, req: falcon.Request, resp: falcon.Response): resp.media = {"batch_job_details": batch_job} +class BatchJobImportResource: + def on_post(self, req: falcon.Request, resp: falcon.Response): + job_type_str: str = req.get_param("job_type", required=True) + + from robotoff.insights.importer import import_insights + try: + job_type = BatchJobType[job_type_str] + except KeyError: + raise falcon.HTTPBadRequest( + description=f"invalid job_type: {job_type_str}. Valid job_types are: {[elt.value for elt in BatchJobType]}" + ) + + bucket_handler = GoogleStorageBucketForBatchJob.from_job_type(job_type) + predictions = generate_predictions_from_batch( + bucket_handler.download_file, + job_type + ) + with db: + import_insights(predictions=predictions, server_type="off") + + def custom_handle_uncaught_exception( req: falcon.Request, resp: falcon.Response, ex: Exception, params ): @@ -1856,4 +1878,4 @@ def custom_handle_uncaught_exception( api.add_route("/api/v1/predictions", PredictionCollection()) api.add_route("/api/v1/annotation/collection", LogoAnnotationCollection()) api.add_route("/robots.txt", RobotsTxtResource()) -api.add_route("/api/v1/batch/launch", BatchJobResource()) \ No newline at end of file +api.add_route("/api/v1/batch/launch", BatchJobLaunchResource()) \ No newline at end of file diff --git a/robotoff/batch/__init__.py b/robotoff/batch/__init__.py index 7bb0a17d87..d9470f8e2b 100644 --- a/robotoff/batch/__init__.py +++ b/robotoff/batch/__init__.py @@ -9,3 +9,4 @@ from .buckets import ( GoogleStorageBucketForBatchJob, ) +from .importer import generate_predictions_from_batch diff --git a/robotoff/batch/importer.py b/robotoff/batch/importer.py new file mode 100644 index 0000000000..d8df8d48ec --- /dev/null +++ b/robotoff/batch/importer.py @@ -0,0 +1,55 @@ +import io +from typing import Iterator + +import pandas as pd + +from robotoff.batch import BatchJobType +from robotoff.types import Prediction, PredictionType + + +BATCH_JOB_TYPE_TO_FEATURES = { + BatchJobType.ingredients_spellcheck: { + "barcode": "code", + "value": "correction", + "value_tag": "lang", + }, +} + +BATCH_JOB_TYPE_TO_PREDICTION_TYPE = { + BatchJobType.ingredients_spellcheck: PredictionType.ingredient_spellcheck, +} + +PREDICTOR_VERSION = "1" + + +def generate_predictions_from_batch( + f: io.BufferedReader, + job_type: BatchJobType +) -> Iterator[Prediction]: + """From a file imported from google storage, generate predictions depending on the job type. + + :param f: Readable object. Should be a parquet file. + :type f: io.BufferedReader + :param job_type: Batch job type. + :type job_type: BatchJobType + :rtype: Iterable[Prediction] + :yield: Predictions. + :rtype: Iterator[Prediction] + """ + features_dict = BATCH_JOB_TYPE_TO_FEATURES[job_type] + prediction_type = BATCH_JOB_TYPE_TO_PREDICTION_TYPE[job_type] + + try: + df = pd.read_parquet(f) + except Exception as e: + raise ValueError(f"Failed to read parquet file: {e}") + + for _, row in df.iterrows(): + yield Prediction( + type=prediction_type, + value=row[features_dict["value"]], + value_tag=[features_dict["value_tag"]], + barcode=row[features_dict["barcode"]], + predictor_version=PREDICTOR_VERSION, + predictor="llm", + ) diff --git a/robotoff/insights/importer.py b/robotoff/insights/importer.py index bc5cdcebde..275393b9eb 100644 --- a/robotoff/insights/importer.py +++ b/robotoff/insights/importer.py @@ -1475,6 +1475,35 @@ def compute_crop_bounding_box( return results +class IngredientsSpellcheckImporter(InsightImporter): + + @staticmethod + def get_type() -> InsightType: + return InsightType.ingredient_spellcheck + + @classmethod + def get_required_prediction_types(cls) -> set[PredictionType]: + return {PredictionType.ingredient_spellcheck} + + @classmethod + def generate_candidates( + cls, + product: Optional[Product], + predictions: list[Prediction], + product_id: ProductIdentifier, + ) -> Iterator[ProductInsight]: + # No reason to have different candidates for now + candidate = predictions[0] + yield ProductInsight(**candidate.to_dict()) + + @classmethod + def is_conflicting_insight( + cls, + candidate: ProductInsight, + reference: ProductInsight + ) -> bool: + candidate.value == reference.value + class PackagingElementTaxonomyException(Exception): pass