Skip to content

Commit

Permalink
Merge pull request #117 from VIDA-NYU/tokens
Browse files Browse the repository at this point in the history
Tokens
  • Loading branch information
heikomuller authored Mar 29, 2021
2 parents 540a0e9 + ce1e71d commit b09c93a
Show file tree
Hide file tree
Showing 37 changed files with 1,215 additions and 594 deletions.
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,15 @@

* Standardize parameter names for sample methods (\#115)
* Bug fix for `openclean-notebook`


### 0.3.0 - 2021-03-29

* Add `openclean.function.token.base.Token` as separate class.
* Rename `openclean.function.token.base.StringTokenizer` to `Tokenizer`
* Adjust token transformer and tokenizer for new Token class.
* Change structure of datatype count in column profiler.
* Option to get set of conflicting values from `DataFrameGrouping` groups.
* Multi-threading for `ValueFunction.apply()`.
* Separate DBSCAN outlier class.
* Move us-street name functions to `openclean-geo`.
71 changes: 69 additions & 2 deletions openclean/cluster/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
from collections import Counter
from typing import Dict, Iterable, List, Optional, Union

from openclean.data.types import Value
from openclean.data.types import Schema, Value
from openclean.operator.stream.consumer import StreamConsumer
from openclean.operator.stream.processor import StreamProcessor


class Cluster(Counter):
Expand Down Expand Up @@ -78,7 +80,7 @@ def to_mapping(self, target: Optional[Value] = None) -> Dict:
return {key: target for key in self.keys() if key != target}


class Clusterer(metaclass=ABCMeta):
class Clusterer(StreamProcessor, metaclass=ABCMeta):
"""The value clusterer mixin class defines a single method `clusters()` to
cluster a given list of values.
"""
Expand All @@ -101,6 +103,71 @@ def clusters(self, values: Union[Iterable[Value], Counter]) -> List[Cluster]:
"""
raise NotImplementedError() # pragma: no cover

def open(self, schema: Schema) -> StreamConsumer:
"""Factory pattern for stream consumer.
Returns an instance of the stream clusterer that will collect the
distinct values in the stream and then call the cluster method of
this clusterer.
Parameters
----------
schema: list of string
List of column names in the data stream schema.
Returns
-------
openclean.cluster.base.StreamClusterer
"""
return StreamClusterer(clusterer=self)


class StreamClusterer(StreamConsumer):
"""Cluster values in a stream. This implementation will create a set of
distinct values in the stream together with their frequency counts. It will
then apply a given cluster algorithm on the created value set.
"""
def __init__(self, clusterer: Clusterer):
"""Initialize the cluster algorithm and the internal value counter.
Parameters
----------
clusterer: openclean.cluster.base.Clusterer
Cluster algorithm that is applied on the set of distinct values
that is generated from the data stream.
"""
self.clusterer = clusterer
self.counter = Counter()

def close(self) -> List[Cluster]:
"""Closing the consumer returns the result of applying the associated
clusterer on the collected set of distinct values.
Returns
-------
list of openclean.cluster.base.Cluster
"""
return self.clusterer.clusters(self.counter)

def consume(self, rowid: int, row: List):
"""Add the values in a given row to the internal counter.
If the row only has one value this value will be used as the key for
the counter. For rows with multiple values the values in the row will
be concatenated (separated by a blank space) to a single string value.
Parameters
-----------
rowid: int
Unique row identifier
row: list
List of values in the row.
"""
if len(row) == 1:
self.counter[row[0]] += 1
else:
self.counter[' '.join([str(v) for v in row])] += 1


# -- Helper Classes -----------------------------------------------------------

Expand Down
15 changes: 8 additions & 7 deletions openclean/cluster/knn.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from openclean.cluster.index import ClusterIndex
from openclean.cluster.key import key_collision
from openclean.function.value.base import ValueFunction
from openclean.function.token.base import StringTokenizer
from openclean.function.token.base import Tokenizer
from openclean.function.token.ngram import NGrams
from openclean.function.similarity.base import SimilarityConstraint

Expand All @@ -43,7 +43,7 @@ class kNNClusterer(Clusterer):
"""
def __init__(
self, sim: SimilarityConstraint,
tokenizer: Optional[StringTokenizer] = None, minsize: Optional[int] = 2,
tokenizer: Optional[Tokenizer] = None, minsize: Optional[int] = 2,
remove_duplicates: Optional[bool] = True
):
"""Initialize the string tokenizer, the similarity constraint, and the
Expand All @@ -54,7 +54,7 @@ def __init__(
sim: openclean.function.similarity.base.SimilarityConstraint
String similarity constraint for grouping strings in the generated
blocks.
tokenizer: openclean.function.token.base.StringTokenizer, default=None
tokenizer: openclean.function.token.base.Tokenizer, default=None
Generator for tokens that are used to group string values in the
first step of the algorithm. By default, n-grams of length 6 are
used as blocking tokens.
Expand Down Expand Up @@ -173,7 +173,7 @@ def _get_clusters(self, clusters: Iterable[Cluster]) -> List[Cluster]:

def knn_clusters(
values: Union[Iterable[Value], Counter], sim: SimilarityConstraint,
tokenizer: Optional[StringTokenizer] = None, minsize: Optional[int] = 2,
tokenizer: Optional[Tokenizer] = None, minsize: Optional[int] = 2,
remove_duplicates: Optional[bool] = True
) -> List[Cluster]:
"""Run kNN clustering for a given list of values.
Expand All @@ -186,7 +186,7 @@ def knn_clusters(
sim: openclean.function.similarity.base.SimilarityConstraint
String similarity constraint for grouping strings in the generated
blocks.
tokenizer: openclean.function.token.base.StringTokenizer, default=None
tokenizer: openclean.function.token.base.Tokenizer, default=None
Generator for tokens that are used to group string values in the
first step of the algorithm. By default, n-grams of length 6 are
used as blocking tokens.
Expand All @@ -211,7 +211,7 @@ def knn_clusters(
def knn_collision_clusters(
values: Union[Iterable[Value], Counter], sim: SimilarityConstraint,
keys: Optional[Union[Callable, ValueFunction]] = None,
tokenizer: Optional[StringTokenizer] = None, minsize: Optional[int] = 2,
tokenizer: Optional[Tokenizer] = None, minsize: Optional[int] = 2,
remove_duplicates: Optional[bool] = True, threads: Optional[int] = None
) -> List[Cluster]:
"""Run kNN clustering on a set of values that have been grouped using
Expand All @@ -232,7 +232,7 @@ def knn_collision_clusters(
keys: callable or ValueFunction, default=None
Function that is used to generate keys for values. By default the
token fingerprint generator is used.
tokenizer: openclean.function.token.base.StringTokenizer, default=None
tokenizer: openclean.function.token.base.Tokenizer, default=None
Generator for tokens that are used to group string values in the
first step of the algorithm. By default, n-grams of length 6 are
used as blocking tokens.
Expand All @@ -259,6 +259,7 @@ def knn_collision_clusters(
group_clusters = knn_clusters(
values=groups_map.keys(),
sim=sim,
tokenizer=tokenizer,
minsize=1,
remove_duplicates=remove_duplicates
)
Expand Down
52 changes: 52 additions & 0 deletions openclean/data/groupby.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@

import pandas as pd

from openclean.data.schema import as_list, select_clause
from openclean.data.types import Columns


class DataFrameGrouping(object):
"""A data frame grouping is a mapping of key values to subsets of rows for
Expand Down Expand Up @@ -97,6 +100,23 @@ def columns(self) -> List[str]:
"""
return list(self.df.columns)

def conflicts(self, key: str, columns: Columns) -> Counter:
"""Synonym to get set of values from columns in rows in a group.
Parameters
----------
key: scalar or tuple
Key value generated by the GroupBy operator for the rows in the
data frame.
columns: int, string, or list(int or string)
Single column or list of column index positions or column names.
Returns
-------
collections.Counter
"""
return self.values(key=key, columns=columns)

def get(self, key: str) -> pd.DataFrame:
"""Get the data frame that is associated with the given key. Returns
None if the given key does not exist in the grouping.
Expand Down Expand Up @@ -169,6 +189,38 @@ def rows(self, key: str) -> List[int]:
return None
return self._groups[key]

def values(self, key: str, columns: Columns) -> Counter:
"""Get values (and their frequency counts) for columns of rows in the
group that is identified by the given key.
Parameters
----------
key: scalar or tuple
Key value generated by the GroupBy operator for the rows in the
data frame.
columns: int, string, or list(int or string)
Single column or list of column index positions or column names.
Returns
-------
collections.Counter
"""
_, colidx = select_clause(self.df.columns, columns=as_list(columns))
# The result is None if no group associated with the given key.
if key not in self._groups:
return None
result = Counter()
if len(colidx) == 1:
cidx = colidx[0]
for rowidx in self._groups[key]:
result[self.df.iloc[rowidx][cidx]] += 1
else:
for rowidx in self._groups[key]:
row = self.df.iloc[rowidx]
key = tuple([row[cidx] for cidx in colidx])
result[key] += 1
return result


class DataFrameViolation(DataFrameGrouping):
"""Subclass of DataFrame Grouping which maintains extra meta value information
Expand Down
Loading

0 comments on commit b09c93a

Please sign in to comment.