Skip to content

Latest commit

 

History

History
223 lines (180 loc) · 9.38 KB

filters.md

File metadata and controls

223 lines (180 loc) · 9.38 KB

Filters

Filters are models or algorithms that calculate metrics for a dataset. Filters process the data and add new columns with the calculated metrics.

List of implemented filters:

Datafilter

Datafilters are filters that calculate new metadata (scores, captions, probabilities, etc) based on a file modalities: images and videos. To run a datafilter, use processor.apply_data_filter() method.

Example of using datafilter that adds metadata about images (width, height, channels):

from DPF.filters.images.info_filter import ImageInfoFilter
datafilter = ImageInfoFilter(workers=8)
processor.apply_data_filter(datafilter)
processor.df # new columns ['width', 'height', 'is_correct'] are added

Columnfilter

Columnfilters are filters that also calculates new metadata, but based on a existing metadata (texts, etc). To run a columnfilter, use processor.apply_column_filter() method.

Example of using column filter that classifies the text language:

from DPF.filters.texts.lang_filter import LangFilter

columnfilter = LangFilter(workers=16)
processor.apply_column_filter(columnfilter)
processor.df # new columns ["lang", "lang_score"] are added

Running filter on several GPUs

To run a datafilter on multiple GPUs use MultiGPUDataFilter class:

from DPF.filters.images.llava_captioning_filter import LLaVaCaptioningFilter
from DPF.filters.multigpu_filter import MultiGPUDataFilter

multigpufilter = MultiGPUDataFilter(
    ['cuda:0', 'cuda:1', 'cuda:2', 'cuda:3'],
    LLaVaCaptioningFilter,
    dict(
        pbar=True, workers=8,
        prompt='short', batch_size=16
    )
)
processor.apply_multi_gpu_data_filter(multigpufilter)

See help(MultiGPUDataFilter) for more information.

Examples

You can find usage examples there.

Creating new filter

To add your filter, you should create new filter class. If your filter uses only data from columns (e.g. text modality), you should inherit your class from ColumnFilter class If your filter uses data from files, you should inherit your class from DataFilter class

Creating DataFilter

To create a new datafilter, add new file in a folder with the modality used by your filter. For example, if your filter uses images modality, create file in DPF/filters/images/ folder. If your filter uses texts and images modality, create file in DPF/filters/text2image/ and so on.

Inherit you filter from corresponding DataFilter class in modality folder:

Then you should implement result_columns, dataloader_kwargs properties and preprocess_data, process_batch methods.

  • result_columns - list of result columns that filter adds to a DataFrame
  • dataloader_kwargs - parameters for a pytorch dataloader
  • preprocess_data - method where data preprocessing is implemented. This method is passed to dataloader and preprocessing runs in multiple processes. Do not use cuda operations in this method.
  • process_batch - method where batch is processed with model

For more information run:

from DPF.filters import DataFilter
help(DataFilter)

Example of custom DataFilter:

from typing import Any

from DPF.filters.images.img_filter import ImageFilter
from DPF.types import ModalityToDataMapping

class PHashFilter(ImageFilter):
    def __init__(
        self,
        sim_hash_size: int = 8,
        workers: int = 16,
        pbar: bool = True,
        _pbar_position: int = 0
    ):
        super().__init__(pbar, _pbar_position)
        self.num_workers = workers
        self.sim_hash_size = sim_hash_size

    @property
    def result_columns(self) -> list[str]:
        return [f"image_phash_{self.sim_hash_size}"]

    @property
    def dataloader_kwargs(self) -> dict[str, Any]:
        return {"num_workers": self.num_workers, "batch_size": 1, "drop_last": False}

    def preprocess_data(
        self,
        modality2data: ModalityToDataMapping,
        metadata: dict[str, Any]
    ) -> Any:
        key = metadata[self.key_column]
        img_simhash = get_phash(
            read_image_rgb_from_bytes(modality2data['image']), 
            hash_size=self.sim_hash_size
        )
        return key, img_simhash

    def process_batch(self, batch: list[Any]) -> dict[str, list[Any]]:
        df_batch_labels = self._get_dict_from_schema()

        keys, img_simhashes = list(zip(*batch))
        df_batch_labels[self.key_column].extend(keys)
        df_batch_labels[f"image_phash_{self.sim_hash_size}"].extend(img_simhashes)

        return df_batch_labels

This filter reads images and calculates PHash in dataloader. Then dataloader returns PHash strings and these strings are added in result dataframe.

Creating ColumnFilter

To create a new columnfilter, add new file in a folder with the modality used by your filter. Inherit your class from ColumnFilter class.

Then you should implement result_columns, columns_to_process properties and process_sample methods.

  • result_columns - list of result columns that filter adds to a DataFrame
  • columns_to_process - columns in original dataframe used for processing. These columns are being passed in method
  • process_sample - method that processes one sample of data.

For more information run:

from DPF.filters import ColumnFilter
help(ColumnFilter)

Example of custom ColumnFilter:

from typing import Any
from py3langid.langid import MODEL_FILE, LanguageIdentifier
from DPF.filters import ColumnFilter

class LangFilter(ColumnFilter):
    """
    LangFilter class
    """

    def __init__(
        self,
        text_column_name: str = "text",
        workers: int = 16,
        pbar: bool = True
    ):
        super().__init__(workers, pbar)
        self.lang_identifier = LanguageIdentifier.from_pickled_model(
            MODEL_FILE, norm_probs=True
        )
        self.text_column_name = text_column_name

    @property
    def columns_to_process(self) -> list[str]:
        return [self.text_column_name]

    @property
    def result_columns(self) -> list[str]:
        return ["lang", "lang_score"]

    def process_sample(self, sample: dict[str, Any]) -> list[Any]:
        lg, score = self.lang_identifier.classify(sample[self.text_column_name])
        return [lg, round(score, 2)]

This filter creates 2 new columns: lang and lang_score. It uses column with text name to identify the language of a text.