Skip to content

Commit

Permalink
feat: ✨ Restructure code
Browse files Browse the repository at this point in the history
  • Loading branch information
jeremyarancio committed Aug 27, 2024
1 parent 729d4e1 commit 34ce80e
Show file tree
Hide file tree
Showing 7 changed files with 112 additions and 142 deletions.
75 changes: 68 additions & 7 deletions robotoff/batch/__init__.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,42 @@
import tempfile
from typing import List

import pandas as pd

from robotoff.utils import get_logger
from robotoff.types import (
BatchJobType,
Prediction,
ServerType
)
from robotoff.models import db
from robotoff.insights.importer import import_insights

from .launch import (
GoogleBatchJob,
GoogleBatchJobConfig,
BatchJobType,
)
from .extraction import (
BatchExtraction,
)
from .buckets import (
GoogleStorageBucketForBatchJob,
)
from .importer import import_batch_predictions
from robotoff.utils import get_logger
from .types import (
BATCH_JOB_TYPE_TO_FEATURES,
BATCH_JOB_TYPE_TO_PREDICTION_TYPE,
)


LOGGER = get_logger(__name__)

PREDICTOR_VERSION = "1" #TODO: shard HF model version? instead of manual change?

def launch_batch_job(job_type: BatchJobType) -> None:
"""_summary_
PREDICTOR = "llm"

:param job_type: _description_
:type job_type: BatchJobType

def launch_batch_job(job_type: BatchJobType) -> None:
"""Launch a batch job.
"""
with tempfile.TemporaryDirectory() as tmp_dir:
BatchExtraction.extract_from_dataset(
Expand All @@ -42,3 +56,50 @@ def launch_batch_job(job_type: BatchJobType) -> None:
batch_job_config = GoogleBatchJobConfig.init(job_type=job_type)
batch_job = GoogleBatchJob.launch_job(batch_job_config=batch_job_config)
LOGGER.info(f"Batch job succesfully launched. Batch job name: {batch_job.name}.")


def import_batch_predictions(job_type: BatchJobType) -> None:
"""Import predictions from remote storage.
"""
bucket_handler = GoogleStorageBucketForBatchJob.from_job_type(job_type)
LOGGER.debug(f"Batch data downloaded from bucket {bucket_handler.bucket}/{bucket_handler.suffix_postprocess}")
df = bucket_handler.download_file()
predictions = _generate_predictions_from_batch(df, job_type)
with db:
import_results = import_insights(
predictions=predictions,
server_type=ServerType.off
)
LOGGER.info(f"Batch import results: {repr(import_results)}.")


def _generate_predictions_from_batch(
df: pd.DataFrame,
job_type: BatchJobType
) -> List[Prediction]:
"""From a file imported from google storage, generate predictions depending on the job type.
:param f: Readable object. Should be a parquet file.
:type f: io.BufferedReader
:param job_type: Batch job type.
:type job_type: BatchJobType
:rtype: Iterable[Prediction]
:yield: Predictions.
:rtype: Iterator[Prediction]
"""
predictions = []
features_dict = BATCH_JOB_TYPE_TO_FEATURES[job_type]
prediction_type = BATCH_JOB_TYPE_TO_PREDICTION_TYPE[job_type]
for _, row in df.iterrows():
predictions.append(
Prediction(
type=prediction_type,
value=row[features_dict["value"]],
value_tag=row[features_dict["value_tag"]],
barcode=row[features_dict["barcode"]],
predictor_version=PREDICTOR_VERSION,
predictor=PREDICTOR,
)
)
return predictions

11 changes: 2 additions & 9 deletions robotoff/batch/buckets.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,8 @@
import pandas as pd

from robotoff.utils.buckets import GoogleStorageBucket
from robotoff.batch import BatchJobType


BATCH_JOB_TYPE_TO_BUCKET = {
BatchJobType.ingredients_spellcheck: {
"bucket": "robotoff-spellcheck",
"suffix_preprocess": "data/preprocessed_data.parquet",
"suffix_postprocess": "data/postprocessed_data.parquet",
},
}
from robotoff.batch.types import BATCH_JOB_TYPE_TO_BUCKET


class GoogleStorageBucketForBatchJob(GoogleStorageBucket):
Expand Down
5 changes: 1 addition & 4 deletions robotoff/batch/extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,12 @@

from robotoff import settings
from robotoff.batch import BatchJobType
from robotoff.batch.types import BATCH_JOB_TYPE_TO_QUERY_FILE_PATH
from robotoff.utils import get_logger


LOGGER = get_logger(__name__)

BATCH_JOB_TYPE_TO_QUERY_FILE_PATH = {
BatchJobType.ingredients_spellcheck: settings.BATCH_JOB_CONFIG_DIR / "sql/spellcheck.sql",
}


class BatchExtraction:
"""Handle batch extraction from the dataset.
Expand Down
73 changes: 0 additions & 73 deletions robotoff/batch/importer.py

This file was deleted.

16 changes: 2 additions & 14 deletions robotoff/batch/launch.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import abc
from typing import List, Optional
import enum
import yaml
import datetime
import re
Expand All @@ -9,19 +8,8 @@
from pydantic import BaseModel, Field, ConfigDict

from robotoff import settings


@enum.unique
class BatchJobType(enum.Enum):
"""Each job type correspond to a task that will be executed in the batch job."""

ingredients_spellcheck = "ingredients-spellcheck"


# Paths batch job config files
BATCH_JOB_TYPE_TO_CONFIG_PATH = {
BatchJobType.ingredients_spellcheck: settings.BATCH_JOB_CONFIG_DIR / "job_configs/spellcheck.yaml",
}
from robotoff.types import BatchJobType
from robotoff.batch.types import BATCH_JOB_TYPE_TO_CONFIG_PATH


class GoogleBatchJobConfig(BaseModel):
Expand Down
35 changes: 35 additions & 0 deletions robotoff/batch/types.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
from robotoff.types import BatchJobType, PredictionType
from robotoff import settings


# Bucket structure to enable the batch job to load and upload data
BATCH_JOB_TYPE_TO_BUCKET = {
BatchJobType.ingredients_spellcheck: {
"bucket": "robotoff-spellcheck",
"suffix_preprocess": "data/preprocessed_data.parquet",
"suffix_postprocess": "data/postprocessed_data.parquet",
},
}

# Paths batch job config files
BATCH_JOB_TYPE_TO_CONFIG_PATH = {
BatchJobType.ingredients_spellcheck: settings.BATCH_JOB_CONFIG_DIR / "job_configs/spellcheck.yaml",
}

BATCH_JOB_TYPE_TO_QUERY_FILE_PATH = {
BatchJobType.ingredients_spellcheck: settings.BATCH_JOB_CONFIG_DIR / "sql/spellcheck.sql",
}

# Mapping between batch job type and prediction type
BATCH_JOB_TYPE_TO_PREDICTION_TYPE = {
BatchJobType.ingredients_spellcheck: PredictionType.ingredient_spellcheck,
}

# Column names in the processed batch of data
BATCH_JOB_TYPE_TO_FEATURES = {
BatchJobType.ingredients_spellcheck: {
"barcode": "code",
"value": "correction",
"value_tag": "lang",
},
}
39 changes: 4 additions & 35 deletions robotoff/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -359,39 +359,8 @@ class PackagingElementProperty(enum.Enum):

InsightAnnotation = Literal[-1, 0, 1, 2]





@enum.unique
class Lang(str, enum.Enum):
english = "en"
french = "fr"
german = "de"
spanish = "es"
italian = "it"
portuguese = "pt"
dutch = "nl"
polish = "pl"
russian = "ru"
japanese = "ja"
chinese = "zh"
arabic = "ar"
turkish = "tr"
vietnamese = "vi"
thai = "th"
korean = "ko"
ukrainian = "uk"
indonesian = "id"
hungarian = "hu"
greek = "el"
romanian = "ro"
danish = "da"
swedish = "sv"
norwegian = "no"
finnish = "fi"
bulgarian = "bg"
czech = "cs"
slovak = "sk"
croatian = "hr"

class BatchJobType(enum.Enum):
"""Each job type correspond to a task that will be executed in the batch job.
"""
ingredients_spellcheck = "ingredients-spellcheck"

0 comments on commit 34ce80e

Please sign in to comment.