diff --git a/robotoff/batch/__init__.py b/robotoff/batch/__init__.py index 1d2e20c521..87e45fe69a 100644 --- a/robotoff/batch/__init__.py +++ b/robotoff/batch/__init__.py @@ -1,9 +1,20 @@ import tempfile +from typing import List + +import pandas as pd + +from robotoff.utils import get_logger +from robotoff.types import ( + BatchJobType, + Prediction, + ServerType +) +from robotoff.models import db +from robotoff.insights.importer import import_insights from .launch import ( GoogleBatchJob, GoogleBatchJobConfig, - BatchJobType, ) from .extraction import ( BatchExtraction, @@ -11,18 +22,21 @@ from .buckets import ( GoogleStorageBucketForBatchJob, ) -from .importer import import_batch_predictions -from robotoff.utils import get_logger +from .types import ( + BATCH_JOB_TYPE_TO_FEATURES, + BATCH_JOB_TYPE_TO_PREDICTION_TYPE, +) LOGGER = get_logger(__name__) +PREDICTOR_VERSION = "1" #TODO: shard HF model version? instead of manual change? -def launch_batch_job(job_type: BatchJobType) -> None: - """_summary_ +PREDICTOR = "llm" - :param job_type: _description_ - :type job_type: BatchJobType + +def launch_batch_job(job_type: BatchJobType) -> None: + """Launch a batch job. """ with tempfile.TemporaryDirectory() as tmp_dir: BatchExtraction.extract_from_dataset( @@ -42,3 +56,50 @@ def launch_batch_job(job_type: BatchJobType) -> None: batch_job_config = GoogleBatchJobConfig.init(job_type=job_type) batch_job = GoogleBatchJob.launch_job(batch_job_config=batch_job_config) LOGGER.info(f"Batch job succesfully launched. Batch job name: {batch_job.name}.") + + +def import_batch_predictions(job_type: BatchJobType) -> None: + """Import predictions from remote storage. + """ + bucket_handler = GoogleStorageBucketForBatchJob.from_job_type(job_type) + LOGGER.debug(f"Batch data downloaded from bucket {bucket_handler.bucket}/{bucket_handler.suffix_postprocess}") + df = bucket_handler.download_file() + predictions = _generate_predictions_from_batch(df, job_type) + with db: + import_results = import_insights( + predictions=predictions, + server_type=ServerType.off + ) + LOGGER.info(f"Batch import results: {repr(import_results)}.") + + +def _generate_predictions_from_batch( + df: pd.DataFrame, + job_type: BatchJobType +) -> List[Prediction]: + """From a file imported from google storage, generate predictions depending on the job type. + + :param f: Readable object. Should be a parquet file. + :type f: io.BufferedReader + :param job_type: Batch job type. + :type job_type: BatchJobType + :rtype: Iterable[Prediction] + :yield: Predictions. + :rtype: Iterator[Prediction] + """ + predictions = [] + features_dict = BATCH_JOB_TYPE_TO_FEATURES[job_type] + prediction_type = BATCH_JOB_TYPE_TO_PREDICTION_TYPE[job_type] + for _, row in df.iterrows(): + predictions.append( + Prediction( + type=prediction_type, + value=row[features_dict["value"]], + value_tag=row[features_dict["value_tag"]], + barcode=row[features_dict["barcode"]], + predictor_version=PREDICTOR_VERSION, + predictor=PREDICTOR, + ) + ) + return predictions + diff --git a/robotoff/batch/buckets.py b/robotoff/batch/buckets.py index 655c8f0af9..278b62bf18 100644 --- a/robotoff/batch/buckets.py +++ b/robotoff/batch/buckets.py @@ -1,15 +1,8 @@ import pandas as pd + from robotoff.utils.buckets import GoogleStorageBucket from robotoff.batch import BatchJobType - - -BATCH_JOB_TYPE_TO_BUCKET = { - BatchJobType.ingredients_spellcheck: { - "bucket": "robotoff-spellcheck", - "suffix_preprocess": "data/preprocessed_data.parquet", - "suffix_postprocess": "data/postprocessed_data.parquet", - }, -} +from robotoff.batch.types import BATCH_JOB_TYPE_TO_BUCKET class GoogleStorageBucketForBatchJob(GoogleStorageBucket): diff --git a/robotoff/batch/extraction.py b/robotoff/batch/extraction.py index c0054c3b8e..5ca4d2f0e1 100644 --- a/robotoff/batch/extraction.py +++ b/robotoff/batch/extraction.py @@ -5,15 +5,12 @@ from robotoff import settings from robotoff.batch import BatchJobType +from robotoff.batch.types import BATCH_JOB_TYPE_TO_QUERY_FILE_PATH from robotoff.utils import get_logger LOGGER = get_logger(__name__) -BATCH_JOB_TYPE_TO_QUERY_FILE_PATH = { - BatchJobType.ingredients_spellcheck: settings.BATCH_JOB_CONFIG_DIR / "sql/spellcheck.sql", -} - class BatchExtraction: """Handle batch extraction from the dataset. diff --git a/robotoff/batch/importer.py b/robotoff/batch/importer.py deleted file mode 100644 index 5ebbc9dff0..0000000000 --- a/robotoff/batch/importer.py +++ /dev/null @@ -1,73 +0,0 @@ -from typing import List - -import pandas as pd - -from robotoff.insights.importer import import_insights -from robotoff.batch import BatchJobType, GoogleStorageBucketForBatchJob -from robotoff.types import Prediction, PredictionType -from robotoff.models import db -from robotoff.utils import get_logger -from robotoff.types import ServerType - - -LOGGER = get_logger(__name__) - -BATCH_JOB_TYPE_TO_FEATURES = { - BatchJobType.ingredients_spellcheck: { - "barcode": "code", - "value": "correction", - "value_tag": "lang", - }, -} - -BATCH_JOB_TYPE_TO_PREDICTION_TYPE = { - BatchJobType.ingredients_spellcheck: PredictionType.ingredient_spellcheck, -} - -PREDICTOR_VERSION = "2" - - -def import_batch_predictions(job_type: BatchJobType) -> None: - """Import predictions from remote storage. - """ - bucket_handler = GoogleStorageBucketForBatchJob.from_job_type(job_type) - LOGGER.debug(f"Batch data downloaded from bucket {bucket_handler.bucket}/{bucket_handler.suffix_postprocess}") - df = bucket_handler.download_file() - predictions = _generate_predictions_from_batch(df, job_type) - with db: - import_results = import_insights( - predictions=predictions, - server_type=ServerType.off - ) - LOGGER.info(f"Batch import results: {repr(import_results)}.") - - -def _generate_predictions_from_batch( - df: pd.DataFrame, - job_type: BatchJobType -) -> List[Prediction]: - """From a file imported from google storage, generate predictions depending on the job type. - - :param f: Readable object. Should be a parquet file. - :type f: io.BufferedReader - :param job_type: Batch job type. - :type job_type: BatchJobType - :rtype: Iterable[Prediction] - :yield: Predictions. - :rtype: Iterator[Prediction] - """ - predictions = [] - features_dict = BATCH_JOB_TYPE_TO_FEATURES[job_type] - prediction_type = BATCH_JOB_TYPE_TO_PREDICTION_TYPE[job_type] - for _, row in df.iterrows(): - predictions.append( - Prediction( - type=prediction_type, - value=row[features_dict["value"]], - value_tag=row[features_dict["value_tag"]], - barcode=row[features_dict["barcode"]], - predictor_version=PREDICTOR_VERSION, - predictor="llm", - ) - ) - return predictions diff --git a/robotoff/batch/launch.py b/robotoff/batch/launch.py index cdd17b8b6f..a5ca69132c 100644 --- a/robotoff/batch/launch.py +++ b/robotoff/batch/launch.py @@ -1,6 +1,5 @@ import abc from typing import List, Optional -import enum import yaml import datetime import re @@ -9,19 +8,8 @@ from pydantic import BaseModel, Field, ConfigDict from robotoff import settings - - -@enum.unique -class BatchJobType(enum.Enum): - """Each job type correspond to a task that will be executed in the batch job.""" - - ingredients_spellcheck = "ingredients-spellcheck" - - -# Paths batch job config files -BATCH_JOB_TYPE_TO_CONFIG_PATH = { - BatchJobType.ingredients_spellcheck: settings.BATCH_JOB_CONFIG_DIR / "job_configs/spellcheck.yaml", -} +from robotoff.types import BatchJobType +from robotoff.batch.types import BATCH_JOB_TYPE_TO_CONFIG_PATH class GoogleBatchJobConfig(BaseModel): diff --git a/robotoff/batch/types.py b/robotoff/batch/types.py new file mode 100644 index 0000000000..c0c452cefd --- /dev/null +++ b/robotoff/batch/types.py @@ -0,0 +1,35 @@ +from robotoff.types import BatchJobType, PredictionType +from robotoff import settings + + +# Bucket structure to enable the batch job to load and upload data +BATCH_JOB_TYPE_TO_BUCKET = { + BatchJobType.ingredients_spellcheck: { + "bucket": "robotoff-spellcheck", + "suffix_preprocess": "data/preprocessed_data.parquet", + "suffix_postprocess": "data/postprocessed_data.parquet", + }, +} + +# Paths batch job config files +BATCH_JOB_TYPE_TO_CONFIG_PATH = { + BatchJobType.ingredients_spellcheck: settings.BATCH_JOB_CONFIG_DIR / "job_configs/spellcheck.yaml", +} + +BATCH_JOB_TYPE_TO_QUERY_FILE_PATH = { + BatchJobType.ingredients_spellcheck: settings.BATCH_JOB_CONFIG_DIR / "sql/spellcheck.sql", +} + +# Mapping between batch job type and prediction type +BATCH_JOB_TYPE_TO_PREDICTION_TYPE = { + BatchJobType.ingredients_spellcheck: PredictionType.ingredient_spellcheck, +} + +# Column names in the processed batch of data +BATCH_JOB_TYPE_TO_FEATURES = { + BatchJobType.ingredients_spellcheck: { + "barcode": "code", + "value": "correction", + "value_tag": "lang", + }, +} diff --git a/robotoff/types.py b/robotoff/types.py index 8105d2030a..52704e0ec5 100644 --- a/robotoff/types.py +++ b/robotoff/types.py @@ -359,39 +359,8 @@ class PackagingElementProperty(enum.Enum): InsightAnnotation = Literal[-1, 0, 1, 2] - - - - @enum.unique -class Lang(str, enum.Enum): - english = "en" - french = "fr" - german = "de" - spanish = "es" - italian = "it" - portuguese = "pt" - dutch = "nl" - polish = "pl" - russian = "ru" - japanese = "ja" - chinese = "zh" - arabic = "ar" - turkish = "tr" - vietnamese = "vi" - thai = "th" - korean = "ko" - ukrainian = "uk" - indonesian = "id" - hungarian = "hu" - greek = "el" - romanian = "ro" - danish = "da" - swedish = "sv" - norwegian = "no" - finnish = "fi" - bulgarian = "bg" - czech = "cs" - slovak = "sk" - croatian = "hr" - \ No newline at end of file +class BatchJobType(enum.Enum): + """Each job type correspond to a task that will be executed in the batch job. + """ + ingredients_spellcheck = "ingredients-spellcheck" \ No newline at end of file