Skip to content

Commit

Permalink
Parallelize the TableVectorizer column-wise (#592)
Browse files Browse the repository at this point in the history
* fall back to pandas if no datetime format is found

* change changelog

* first working version

* add tests

* update changelog

* typo

* copy transformer when split between columns to avoid conflict

* update changelog

* fix bug with repeated transformer

* split and merge

* more tests

* fix test

* also split self.transformers_ to parallelize transform

* add _split to minhashencoder

* cleaning

* clean test

* fix wrong merge

* fix wrong merge

* Apply suggestions from code review

type hints

Co-authored-by: Lilian <[email protected]>

* Update skrub/_utils.py

Co-authored-by: Lilian <[email protected]>

* apply Lilian's suggestions

* add future annotation to avoid circular import in type hints

* add docstring

* Update skrub/_table_vectorizer.py

Co-authored-by: Lilian <[email protected]>

* type hint

* revert change

* first batch of Vincent's suggestions

* use tags

* compare fitted transformers better

* talk about tags in changelog

* run precommit checks

* get rid of _transformers_original

* clean tests

* Apply suggestions from code review

Co-authored-by: Vincent M <[email protected]>

* _parallel_on_columns

* explain transformers vs transformers_

* split merge into two functions

* add tests to check that splitting doesn't prevent resetting transformers

* Apply suggestions from code review

Co-authored-by: Vincent M <[email protected]>

* combine merge_unfitted and merge_fitted and move _split outside of class

* don't return empty new_transformer_to_input_indices

* remove future warning

* add docstrings

* fix merge

* fix test

* Update skrub/_table_vectorizer.py

Co-authored-by: Vincent M <[email protected]>

* fix type hints

---------

Co-authored-by: Lilian <[email protected]>
Co-authored-by: Vincent M <[email protected]>
  • Loading branch information
3 people authored Sep 20, 2023
1 parent df16844 commit f981247
Show file tree
Hide file tree
Showing 9 changed files with 839 additions and 10 deletions.
9 changes: 9 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ Major changes
* Parallelized the :class:`GapEncoder` column-wise. Parameters `n_jobs` and `verbose`
added to the signature. :pr:`582` by :user:`Lilian Boulard <LilianBoulard>`


Minor changes
-------------

* :class:`TableVectorizer` is now able to apply parallelism at the column level rather than the transformer level. This is the default for univariate transformers, like :class:`MinHashEncoder`, and :class:`GapEncoder`.
:pr:`592` by :user:`Leo Grinsztajn <LeoGrin>`

* Parallelized the :func:`deduplicate` function. Parameter `n_jobs`
added to the signature. :pr:`618` by :user:`Jovan Stojanovic <jovan-stojanovic>`
and :user:`Lilian Boulard <LilianBoulard>`
Expand Down Expand Up @@ -174,6 +181,8 @@ Minor changes
:pr:`543` by :user:`Leo Grinsztajn <LeoGrin>`
:pr:`587` by :user:`Leo Grinsztajn <LeoGrin>`



Dirty-cat Release 0.4.0
=========================

Expand Down
44 changes: 43 additions & 1 deletion skrub/_gap_encoder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""
Implements the GapEncoder: a probabilistic encoder for categorical variables.
"""
from __future__ import annotations

from collections.abc import Generator
from copy import deepcopy
Expand All @@ -13,7 +14,7 @@
from numpy.random import RandomState
from numpy.typing import ArrayLike, NDArray
from scipy import sparse
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.cluster import KMeans, kmeans_plusplus
from sklearn.decomposition._nmf import _beta_divergence
from sklearn.feature_extraction.text import CountVectorizer, HashingVectorizer
Expand Down Expand Up @@ -739,6 +740,43 @@ class GapEncoder(TransformerMixin, BaseEstimator):
fitted_models_: list[GapEncoderColumn]
column_names_: list[str]

@classmethod
def _merge(cls, transformers_list: list[GapEncoder]):
"""
Merge GapEncoders fitted on different columns
into a single GapEncoder. This is useful for parallelization
over columns in the TableVectorizer.
"""
full_transformer = clone(transformers_list[0])
# assert rho_ is the same for all transformers
rho_ = transformers_list[0].rho_
full_transformer.rho_ = rho_
full_transformer.fitted_models_ = []
for transformers in transformers_list:
full_transformer.fitted_models_.extend(transformers.fitted_models_)
if hasattr(transformers_list[0], "column_names_"):
full_transformer.column_names_ = []
for transformers in transformers_list:
full_transformer.column_names_.extend(transformers.column_names_)
return full_transformer

def _split(self):
"""
Split a GapEncoder fitted on multiple columns
into a list of GapEncoders fitted on one column each.
This is useful for parallelizing transform over columns
in the TableVectorizer.
"""
check_is_fitted(self)
transformers_list = []
for i, model in enumerate(self.fitted_models_):
transformer = clone(self)
transformer.rho_ = model.rho_
transformer.fitted_models_ = [model]
transformer.column_names_ = [self.column_names_[i]]
transformers_list.append(transformer)
return transformers_list

def __init__(
self,
*,
Expand Down Expand Up @@ -1025,6 +1063,10 @@ def _more_tags(self):
),
"check_estimators_dtypes": "We only support string dtypes.",
},
"univariate": True, # whether the estimator is univariate and can be
# applied column by column. This is useful for the TableVectorizer,
# to decide whether to apply the transformer on each column separately
# and thus improve the parallelization when the transformer is slow enough.
}


Expand Down
50 changes: 48 additions & 2 deletions skrub/_minhash_encoder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,21 @@
Implements the MinHashEncoder, which encodes string categorical features by
applying the MinHash method to n-gram decompositions of strings.
"""
from __future__ import annotations

from collections.abc import Callable, Collection
from typing import Literal

import numpy as np
from joblib import Parallel, delayed, effective_n_jobs
from numpy.typing import ArrayLike, NDArray
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.base import BaseEstimator, TransformerMixin, clone
from sklearn.utils import gen_even_slices, murmurhash3_32
from sklearn.utils.validation import _check_feature_names_in, check_is_fitted

from ._fast_hash import ngram_min_hash
from ._string_distances import get_unique_ngrams
from ._utils import LRUDict, check_input
from ._utils import LRUDict, check_input, combine_lru_dicts

NoneType = type(None)

Expand Down Expand Up @@ -120,6 +121,47 @@ class MinHashEncoder(TransformerMixin, BaseEstimator):

_capacity: int = 2**10

@classmethod
def _merge(cls, transformers_list: list[MinHashEncoder]):
"""
Merge MinHashEncoders fitted on different columns
into a single MinHashEncoder. This is useful for parallelization
over columns in the TableVectorizer.
"""
full_transformer = clone(transformers_list[0])
capacity = transformers_list[0]._capacity
full_transformer.hash_dict_ = combine_lru_dicts(
capacity, *[transformer.hash_dict_ for transformer in transformers_list]
)
full_transformer.n_features_in_ = sum(
transformer.n_features_in_ for transformer in transformers_list
)
full_transformer.feature_names_in_ = np.concatenate(
[transformer.feature_names_in_ for transformer in transformers_list]
)
return full_transformer

def _split(self):
"""
Split a MinHashEncoder fitted on multiple columns
into a list of MinHashEncoders (one for each column).
This is useful for parallelizing transform over columns
in the TableVectorizer.
"""
check_is_fitted(self)
transformer_list = []
for i in range(self.n_features_in_):
trans = clone(self)
attributes = ["hash_dict_", "_capacity"]
for a in attributes:
if hasattr(self, a):
setattr(trans, a, getattr(self, a))
# TODO; do we want to deepcopy hash_dict_
trans.n_features_in_ = 1
trans.feature_names_in_ = np.array([self.feature_names_in_[i]])
transformer_list.append(trans)
return transformer_list

def __init__(
self,
*,
Expand Down Expand Up @@ -395,4 +437,8 @@ def _more_tags(self):
),
"check_estimators_dtypes": "We only support string dtypes.",
},
"univariate": True, # whether the estimator is univariate and can be
# applied column by column. This is useful for the TableVectorizer,
# to decide whether to apply the transformer on each column separately
# and thus improve the parallelization when the transformer is slow enough.
}
Loading

0 comments on commit f981247

Please sign in to comment.