From ac28fda15c0221a332bb710f260c48b2d02795e4 Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Sat, 20 Mar 2021 07:28:26 -0400 Subject: [PATCH 01/23] Add string token class --- openclean/cluster/knn.py | 1 + openclean/function/token/base.py | 115 ++++++++++++++----- openclean/function/token/filter.py | 14 +-- openclean/function/token/ngram.py | 8 +- openclean/function/token/split.py | 54 +++++---- openclean/function/value/key/geo.py | 2 +- tests/function/token/test_split_tokenizer.py | 24 ++-- 7 files changed, 150 insertions(+), 68 deletions(-) diff --git a/openclean/cluster/knn.py b/openclean/cluster/knn.py index 89baa2c..8cbe722 100644 --- a/openclean/cluster/knn.py +++ b/openclean/cluster/knn.py @@ -259,6 +259,7 @@ def knn_collision_clusters( group_clusters = knn_clusters( values=groups_map.keys(), sim=sim, + tokenizer=tokenizer, minsize=1, remove_duplicates=remove_duplicates ) diff --git a/openclean/function/token/base.py b/openclean/function/token/base.py index 0ff9f9c..3642eda 100644 --- a/openclean/function/token/base.py +++ b/openclean/function/token/base.py @@ -15,15 +15,78 @@ from openclean.function.value.mapping import Standardize +# Definition of default raw token data types. +ALPHA = 'ALPHA' +ALPHANUM = 'ALPHANUM' +ANY = 'ANY' +DIGIT = DIGIT_REP = 'NUMERIC' +PUNCTUATION = 'PUNC' + + +class Token(str): + """Tokens are strings that have an optional (semantic) type label. + + The values for type labels are not constraint. It is good practice, to use + all upper case values for token types. The default token type is 'ANY'. + + This implementation is based on: + https://bytes.com/topic/python/answers/32098-my-experiences-subclassing-string + + The order of creation is that the __new__ method is called which returns + the object then __init__ is called. + """ + def __new__(cls, value: str, token_type: Optional[str] = None): + """Initialize the String object with the given value. + + the token type is ignored. + + Parameters + ---------- + value: string + Token value. + token_type: string, default=None + Unique token type identifier. + """ + return str.__new__(cls, value) + + def __init__(self, value: str, token_type: Optional[str] = None): + """Initialize the token type identifier. + + The token value has already been initialized by the __new__ method that + is called prior to the __init__ method. + + Parameters + ---------- + value: string + Token value. + token_type: string, default=None + Unique token type identifier. + """ + self.token_type = token_type + + def type(self) -> str: + """Get token type value. + + This is a wrapper around the ``token_type`` property. Returns the + default token type 'ANY' if no type was given when the object was + created. + + Returns + ------- + string + """ + return self.token_type if self.token_type is not None else ANY + + # -- Mixin classes ------------------------------------------------------------ class StringTokenizer(metaclass=ABCMeta): """Interface for string tokenizer. A string tokenizer should be able to handle any scalar value (e.g., by first transforming numeric values into - a string representation). The tokenizer returns a list of string values. + a string representation). The tokenizer returns a list of token objects. """ @abstractmethod - def tokens(self, value: Scalar) -> List[str]: + def tokens(self, value: Scalar) -> List[Token]: """Convert a given scalar values into a list of string tokens. If a given value cannot be converted into tokens None should be returned. @@ -37,7 +100,7 @@ def tokens(self, value: Scalar) -> List[str]: Returns ------- - list of string + list of openclean.function.token.base.Token """ raise NotImplementedError() # pragma: no cover @@ -49,18 +112,18 @@ class TokenTransformer(metaclass=ABCMeta): a list of strings as input and returns a (modified) list of strings. """ @abstractmethod - def transform(self, tokens: List[str]) -> List[str]: + def transform(self, tokens: List[Token]) -> List[Token]: """Transform a list of string tokens. Returns a modified copy of the input list of tokens. Patameters ---------- - tokens: list of string + tokens: list of openclean.function.token.base.Token List of string tokens. Returns ------- - list of string + list of openclean.function.token.base.Token """ raise NotImplementedError() # pragma: no cover @@ -142,7 +205,7 @@ def eval(self, value: Value) -> str: """ return self.delim.join(self.tokens(value)) - def tokens(self, value: Scalar) -> List[str]: + def tokens(self, value: Scalar) -> List[Token]: """Tokenize the given value using the associated tokenizer. Then modify the tokens with the optional token transformer. @@ -153,7 +216,7 @@ def tokens(self, value: Scalar) -> List[str]: Returns ------- - list of string + list of openclean.function.token.base.Token """ tokens = self.tokenizer.tokens(value) if self.transformer is not None: @@ -165,17 +228,17 @@ def tokens(self, value: Scalar) -> List[str]: class ReverseTokens(TokenTransformer): """Reverse a given list of string tokens.""" - def transform(self, tokens: List[str]) -> List[str]: + def transform(self, tokens: List[Token]) -> List[Token]: """Return a reversed copy of the token list. Patameters ---------- - tokens: list of string + tokens: list of openclean.function.token.base.Token List of string tokens. Returns ------- - list of string + list of openclean.function.token.base.Token """ return tokens[::-1] @@ -196,17 +259,17 @@ def __init__(self, key: Optional[Callable] = None, reverse: Optional[bool] = Fal self.sortkey = key self.reverse = reverse - def transform(self, tokens: List[str]) -> List[str]: + def transform(self, tokens: List[Token]) -> List[Token]: """Returns a sorted copy of the tken list. Patameters ---------- - tokens: list of string + tokens: list of openclean.function.token.base.Token List of string tokens. Returns ------- - list of string + list of openclean.function.token.base.Token """ return sorted(tokens, key=self.sortkey, reverse=self.reverse) @@ -226,7 +289,7 @@ def __init__(self, length: int): """ self.length = length - def transform(self, tokens: List[str]) -> List[str]: + def transform(self, tokens: List[Token]) -> List[Token]: """Return a list that contains the first N elements of the input list, where N is the length parameter defined during initialization. If the input list does not have more than N elements the input is returned as @@ -234,12 +297,12 @@ def transform(self, tokens: List[str]) -> List[str]: Patameters ---------- - tokens: list of string + tokens: list of openclean.function.token.base.Token List of string tokens. Returns ------- - list of string + list of openclean.function.token.base.Token """ return tokens[:self.length] if len(tokens) > self.length else tokens @@ -259,7 +322,7 @@ def __init__(self, transformers: List[TokenTransformer]): """ self.transformers = transformers - def transform(self, tokens: List[str]) -> List[str]: + def transform(self, tokens: List[Token]) -> List[Token]: """Transform a list of string tokens. Applies the transformers in the pipeline sequentially on the output of the respective successor in the pipeline. @@ -267,11 +330,11 @@ def transform(self, tokens: List[str]) -> List[str]: Patameters ---------- tokens: list of string - List of string tokens. + List of string openclean.function.token.base.Token. Returns ------- - list of string + list of openclean.function.token.base.Token """ for transformer in self.transformers: tokens = transformer.transform(tokens) @@ -280,17 +343,17 @@ def transform(self, tokens: List[str]) -> List[str]: class UniqueTokens(TokenTransformer): """Remove duplicate tokens to return a list of unique tokens.""" - def transform(self, tokens: List[str]) -> List[str]: + def transform(self, tokens: List[Token]) -> List[Token]: """Returns a list of unique tokens from the input list. Patameters ---------- - tokens: list of string + tokens: list of openclean.function.token.base.Token List of string tokens. Returns ------- - list of string + list of openclean.function.token.base.Token """ return list(set(tokens)) @@ -308,18 +371,18 @@ def __init__(self, func: Union[Callable, ValueFunction]): # Ensure that the function is a value function. self.func = CallableWrapper(func) if not isinstance(func, ValueFunction) else func - def transform(self, tokens: List[str]) -> List[str]: + def transform(self, tokens: List[Token]) -> List[Token]: """Returns the list of tokens that results from applying the associated value function of each of the tokens in the input list. Patameters ---------- - tokens: list of string + tokens: list of openclean.function.token.base.Token List of string tokens. Returns ------- - list of string + list of openclean.function.token.base.Token """ # Prepare function if necessary. f = self.func if self.func.is_prepared() else self.func.prepare(tokens) diff --git a/openclean/function/token/filter.py b/openclean/function/token/filter.py index ff9d5df..e07a18c 100644 --- a/openclean/function/token/filter.py +++ b/openclean/function/token/filter.py @@ -9,7 +9,7 @@ from typing import List -from openclean.function.token.base import SortTokens, TokenTransformer, TokenTransformerPipeline +from openclean.function.token.base import SortTokens, Token, TokenTransformer, TokenTransformerPipeline from openclean.function.value.base import ValueFunction @@ -18,18 +18,18 @@ class FirstLastFilter(TokenTransformer): list. """ - def transform(self, tokens: List[str]) -> List[str]: + def transform(self, tokens: List[Token]) -> List[Token]: """Return a list that contains the first and last element from the input list. If the input is empty the result is empty as well. Patameters ---------- - tokens: list of string + tokens: list of openclean.function.token.base.Token List of string tokens. Returns ------- - list of string + list of openclean.function.token.base.Token """ return [tokens[0], tokens[-1]] if tokens else tokens @@ -58,18 +58,18 @@ def __init__(self, predicate: ValueFunction): """ self.predicate = predicate - def transform(self, tokens: List[str]) -> List[str]: + def transform(self, tokens: List[Token]) -> List[Token]: """Returns a list that contains only those tokens that satisfy the filter condition defined by the associated predicate. Patameters ---------- - tokens: list of string + tokens: list of openclean.function.token.base.Token List of string tokens. Returns ------- - list of string + list of openclean.function.token.base.Token """ # Prepare the predicate if necessary. if not self.predicate.is_prepared(): diff --git a/openclean/function/token/ngram.py b/openclean/function/token/ngram.py index 97b32f8..369e986 100644 --- a/openclean/function/token/ngram.py +++ b/openclean/function/token/ngram.py @@ -12,7 +12,7 @@ from typing import List, Optional from openclean.data.types import Scalar -from openclean.function.token.base import StringTokenizer +from openclean.function.token.base import StringTokenizer, Token class NGrams(StringTokenizer): @@ -45,7 +45,7 @@ def __init__(self, n: int, pleft: Optional[str] = None, pright: Optional[str] = self.pleft = pleft self.pright = pright - def tokens(self, value: Scalar) -> List[str]: + def tokens(self, value: Scalar) -> List[Token]: """Convert a given scalar values into a list of n-grams. If the value length is not greater than n and no padding was specified, the returned list will only contain the given value. @@ -57,7 +57,7 @@ def tokens(self, value: Scalar) -> List[str]: Returns ------- - list of string + list of openclean.function.token.base.Token """ # Add left and right padding if specified. if self.pleft: @@ -70,5 +70,5 @@ def tokens(self, value: Scalar) -> List[str]: # Split value into n-grams. result = list() for i in range(len(value) - (self.n - 1)): - result.append(value[i: i + self.n]) + result.append(Token(value[i: i + self.n])) return result diff --git a/openclean/function/token/split.py b/openclean/function/token/split.py index 66fbce0..d52052e 100644 --- a/openclean/function/token/split.py +++ b/openclean/function/token/split.py @@ -7,12 +7,17 @@ """String tokenizer that is a wrapper around the string split method.""" -from typing import Callable, List, Optional +from typing import Callable, List, Optional, Tuple import re from openclean.data.types import Scalar -from openclean.function.token.base import StringTokenizer +from openclean.function.token.base import StringTokenizer, Token + +import openclean.function.token.base as TT + +# Default token classifier. +DEFAULT_CLASSIFIER = [(str.isalpha, TT.ALPHA), (str.isdigit, TT.DIGIT)] class ChartypeSplit(StringTokenizer): @@ -23,25 +28,30 @@ class ChartypeSplit(StringTokenizer): The type of a character is determined by a classifier that is given as a list of Boolean predicates, i.e., callables that accept a single character and that return True if the charcter belongs to the type that the function - represents or False otherwise. + represents or False otherwise. With each classifier a token type label is + associated that is assigned to the generated token. If a token does not + match any of the given classifier the default token type is returned. """ - def __init__(self, chartypes: Optional[List[Callable]] = None): + def __init__(self, chartypes: Optional[List[Tuple[Callable, str]]] = None): """Initialize the character type classifier. Parameters ---------- - chartypes: list of callable, default=None + chartypes: list of tuple of callable and string, default=None List of functions that are used to determine the type of a character. The functions are applied in the given order. The first function that returns True for given character defines the character type. By - default, we only distinguish between letters and digits. + default, we only distinguish between letters and digits. With each + funciton a token type label is associated that will be assigned to + the generated tokens. """ - self.chartypes = chartypes if chartypes is not None else [str.isalpha, str.isdigit] + self.chartypes = chartypes if chartypes is not None else DEFAULT_CLASSIFIER + + def get_type(self, c: str) -> str: + """The type of a character is the label that is associated with the + first type predicate that returns True. - def get_type(self, c: str) -> int: - """The type of a character is the index position of the first type - predicate that returns True. If no predicate evaluates to True for a - given value the length of the predicate list is returned. + If no predicate evaluates to True for a given value None is returned. Parameters ---------- @@ -52,14 +62,12 @@ def get_type(self, c: str) -> int: ------- int """ - index = 0 - for f in self.chartypes: + for f, label in self.chartypes: if f(c): - return index - index += 1 - return index + return label + return None - def tokens(self, value: Scalar) -> List[str]: + def tokens(self, value: Scalar) -> List[Token]: """Convert a given scalar values into a list of string tokens. If a given value cannot be converted into tokens None should be returned. @@ -73,7 +81,7 @@ def tokens(self, value: Scalar) -> List[str]: Returns ------- - list of string + list of openclean.function.token.base.Token """ # Ensure that the value is a string. value = str(value) if not isinstance(value, str) else value @@ -89,12 +97,12 @@ def tokens(self, value: Scalar) -> List[str]: next_type = self.get_type(value[i]) if prev_type != next_type: # Add homogeneous token from start to previous postion. - tokens.append(value[start:i]) + tokens.append(Token(value[start:i], token_type=prev_type)) start = i prev_type = next_type # Ensure to add the homogenous suffix if necessary. if start < len(value): - tokens.append(value[start:]) + tokens.append(Token(value[start:], token_type=prev_type)) return tokens @@ -137,7 +145,7 @@ def __init__( self.preproc = preproc self.subtokens = subtokens - def tokens(self, value: Scalar) -> List[str]: + def tokens(self, value: Scalar) -> List[Token]: """Convert a given scalar values into a list of string tokens. If a given value cannot be converted into tokens None should be returned. @@ -151,7 +159,7 @@ def tokens(self, value: Scalar) -> List[str]: Returns ------- - list of string + list of openclean.function.token.base.Token """ # Convert value to string if necessary if not isinstance(value, str): @@ -160,7 +168,7 @@ def tokens(self, value: Scalar) -> List[str]: if self.preproc is not None: value = self.preproc(value) # Use split and the defined pattern to generate initial token list. - tokens = list(filter(None, re.split(self.pattern, value))) + tokens = [Token(t) for t in filter(None, re.split(self.pattern, value))] # Remove duplicates if the unique flag is True. if self.unique: tokens = list(set(tokens)) diff --git a/openclean/function/value/key/geo.py b/openclean/function/value/key/geo.py index 2fec53d..543fcbd 100644 --- a/openclean/function/value/key/geo.py +++ b/openclean/function/value/key/geo.py @@ -387,7 +387,7 @@ class USStreetNameKey(Tokens): def __init__(self): """Initialize the tokenizer and token transformer in the super class.""" super(USStreetNameKey, self).__init__( - tokenizer=ChartypeSplit(chartypes=[str.isalpha, str.isdigit]), + tokenizer=ChartypeSplit(), transformer=[ TokenFilter(AlphaNumeric()), UpperTokens(), diff --git a/tests/function/token/test_split_tokenizer.py b/tests/function/token/test_split_tokenizer.py index b782fd0..bf1514d 100644 --- a/tests/function/token/test_split_tokenizer.py +++ b/tests/function/token/test_split_tokenizer.py @@ -11,15 +11,25 @@ from openclean.function.token.split import ChartypeSplit, Split +import openclean.function.token.base as TT -def test_homogeneous_split(): + +@pytest.mark.parametrize( + 'value,result_tokens,result_types', + [ + ('W35ST', ['W', '35', 'ST'], ['A', 'D', 'A']), + ('W35ST/', ['W', '35', 'ST', '/'], ['A', 'D', 'A', TT.ANY]), + ('W35ST/8AVE', ['W', '35', 'ST', '/', '8', 'AVE'], ['A', 'D', 'A', TT.ANY, 'D', 'A']), + (1234, ['1234'], ['D']), + (12.34, ['12', '.', '34'], ['D', TT.ANY, 'D']) + ] +) +def test_homogeneous_split(value, result_tokens, result_types): """Test tokenizer that splits on character types.""" - f = ChartypeSplit(chartypes=[str.isalpha, str.isdigit]) - assert f.tokens('W35ST') == ['W', '35', 'ST'] - assert f.tokens('W35ST/') == ['W', '35', 'ST', '/'] - assert f.tokens('W35ST/8AVE') == ['W', '35', 'ST', '/', '8', 'AVE'] - assert f.tokens(1234) == ['1234'] - assert f.tokens(12.34) == ['12', '.', '34'] + f = ChartypeSplit(chartypes=[(str.isalpha, 'A'), (str.isdigit, 'D')]) + tokens = f.tokens(value) + assert tokens == result_tokens + assert [t.type() for t in tokens] == result_types def test_split_numeric_value(): From f79588e212c78b52b55f2f394662f4be15ebb3cf Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Mon, 22 Mar 2021 17:12:39 -0400 Subject: [PATCH 02/23] Adjust token transformer for new Token class --- openclean/function/token/base.py | 12 +- openclean/function/token/convert.py | 181 ++++++++++++++++++ openclean/function/token/filter.py | 44 ++++- tests/function/token/test_token_converter.py | 52 +++++ tests/function/token/test_token_filter.py | 22 ++- .../function/token/test_token_transformers.py | 50 +++-- 6 files changed, 339 insertions(+), 22 deletions(-) create mode 100644 openclean/function/token/convert.py create mode 100644 tests/function/token/test_token_converter.py diff --git a/openclean/function/token/base.py b/openclean/function/token/base.py index 3642eda..8043ad8 100644 --- a/openclean/function/token/base.py +++ b/openclean/function/token/base.py @@ -19,8 +19,9 @@ ALPHA = 'ALPHA' ALPHANUM = 'ALPHANUM' ANY = 'ANY' -DIGIT = DIGIT_REP = 'NUMERIC' +DIGIT = 'NUMERIC' PUNCTUATION = 'PUNC' +SPACE = 'SPACE' class Token(str): @@ -386,10 +387,15 @@ def transform(self, tokens: List[Token]) -> List[Token]: """ # Prepare function if necessary. f = self.func if self.func.is_prepared() else self.func.prepare(tokens) - return f.apply(tokens) + return [Token(f(t), token_type=t.type()) for t in tokens] -# -- Shortcuts for common update functions ------------------------------------ +class CapitalizeTokens(UpdateTokens): + """Capitalize all tokens in a given list.""" + def __init__(self): + """Initialize the update function.""" + super(CapitalizeTokens, self).__init__(func=CallableWrapper(func=str.capitalize)) + class LowerTokens(UpdateTokens): """Convert all tokens in a given list to lower case.""" diff --git a/openclean/function/token/convert.py b/openclean/function/token/convert.py new file mode 100644 index 0000000..9145303 --- /dev/null +++ b/openclean/function/token/convert.py @@ -0,0 +1,181 @@ +# This file is part of the Data Cleaning Library (openclean). +# +# Copyright (C) 2018-2021 New York University. +# +# openclean is released under the Revised BSD License. See file LICENSE for +# full license details. + +"""Converter for tokens that allows to change the value of a token and/or the +token type. +""" + +from abc import ABCMeta, abstractmethod +from typing import Dict, List, Set, Union + +from openclean.function.token.base import Token, TokenTransformer + + +class TokenConverter(TokenTransformer, metaclass=ABCMeta): + """Interface for token convertrs that change token values and/or token + types. The converter interface consist of two methods: the ``contains`` + method checks whether the converter accepts a given token for conversion, + and the ``convert`` method converts the token if it is accepted by he + converter. + """ + @abstractmethod + def contains(self, token: Token) -> bool: + """Test if the converter contains a conversion rule for the given + token. + + Parameters + ---------- + token: openclean.function.token.base.Token + Token that is tested for acceptance by this converter. + + Returns + ------- + bool + """ + raise NotImplementedError() # pragma: no cover + + @abstractmethod + def convert(self, token: Token) -> Token: + """Convert the given token according to the conversion ruls that are + implemented by the converter. + + Returns a modified token. + + Parameters + ---------- + token: openclean.function.token.base.Token + Token that is converted. + + Returns + ------- + openclean.function.token.base.Token + """ + raise NotImplementedError() # pragma: no cover + + def transform(self, tokens: List[Token]) -> List[Token]: + """Convert accpeted token in a given list of tokens. + + For each token in the given list, if the converter accepts the token it + is transformed. Otherwise, the original token is added to the resulting + token list. + + Patameters + ---------- + tokens: list of openclean.function.token.base.Token + List of string tokens. + + Returns + ------- + list of openclean.function.token.base.Token + """ + result = list() + for t in tokens: + # Test if this converter accepts the token. + if self.contains(t): + # Transform the token. + t = self.convert(t) + result.append(t) + return result + + +class TokenListConverter(TokenTransformer): + """Converter for a list of tokens. Implements the token transformer mixin + interface. Uses a list of converters to convert tokens i a given list. The + first converter that accepts a token in the list is used to transform the + token. + """ + def __init__(self, converters: List[TokenConverter]): + """Initialize the list of converters that are used to transform tokens. + + Parameters + ---------- + converters: list of openclean.function.token.convert.TokenConverter + List of converter that are used to transform tokens in a given + list. + """ + self.converters = converters + + def transform(self, tokens: List[Token]) -> List[Token]: + """Transform a list of tokens. + + For each token in the given list, the initialized converters are used in + given order. The first converter that accepts the token is used to + convert it. If no converter accepts the token it is added to the result + without changes. + + Patameters + ---------- + tokens: list of openclean.function.token.base.Token + List of string tokens. + + Returns + ------- + list of openclean.function.token.base.Token + """ + result = list() + for t in tokens: + # Test each of the initialized converters in given order. The first + # converter that accepts the token is used to transform it. + for c in self.converters: + if c.contains(t): + # Transform the token and stop the iteration. + t = c.convert(t) + break + result.append(t) + return result + + +class TokenMapper(TokenConverter): + """Converter for tokens that uses a lookup table to map a given token to a + new token value and a new token type. This class is used for example to + standardize tokens for a semantic type. + """ + def __init__(self, label: str, lookup: Union[Dict, Set]): + """Initialize the token label and the lookup table. + + Parameters + ---------- + label: string + Token type label for converted tokens. + lookup: dict or set + Lookup table for tokens that are converted. If a set is given all + tokens in the set are converted to the sme value but with the new + token type label. + """ + self.label = label + self.lookup = {v: v for v in lookup} if isinstance(lookup, set) else lookup + + def contains(self, token: Token) -> bool: + """Test if the given token is contained in the lookup table. + + Parameters + ---------- + token: openclean.function.token.base.Token + Token that is tested for acceptance by this converter. + + Returns + ------- + bool + """ + return token in self.lookup + + def convert(self, token: Token) -> Token: + """Replace the given token with the respective value in the lookup table + and the converter token type. + + Returns a modified token. + + Parameters + ---------- + token: openclean.function.token.base.Token + Token that is converted. + + Returns + ------- + openclean.function.token.base.Token + """ + return Token(self.lookup[token], token_type=self.label) diff --git a/openclean/function/token/filter.py b/openclean/function/token/filter.py index e07a18c..9915faf 100644 --- a/openclean/function/token/filter.py +++ b/openclean/function/token/filter.py @@ -7,7 +7,7 @@ """Collection of functions to filter (remove) tokens from given token lists.""" -from typing import List +from typing import List, Optional, Set from openclean.function.token.base import SortTokens, Token, TokenTransformer, TokenTransformerPipeline from openclean.function.value.base import ValueFunction @@ -78,3 +78,45 @@ def transform(self, tokens: List[Token]) -> List[Token]: pred = self.predicate # Return only those tokens that satisfy the predicate. return [t for t in tokens if pred(t)] + + +class TokenTypeFilter(TokenTransformer): + """Filter tokens in a given list by their type.""" + def __init__(self, types: Set[str], negated: Optional[bool] = False): + """Initialize the list of tpken types to filter on and the negated + flag. + + If the negated flag is True, the filter will retain all tokens of types + that do not occur in the given filter list. + + Parameters + ---------- + types: set of string + List of token types to filter on. + negated: bool, default=False + Determine whether to retain tokens of types that occur in the given + set (*False*) or those of types that do not occur in the type set + (*True*). + """ + self.types = types + self.negated = negated + + def transform(self, tokens: List[Token]) -> List[Token]: + """Returns a list that contains only those tokens that satisfy the + filter condition defined by the associated predicate. + + Patameters + ---------- + tokens: list of openclean.function.token.base.Token + List of string tokens. + + Returns + ------- + list of openclean.function.token.base.Token + """ + result = list() + for t in tokens: + is_in = not self.negated if t.type() in self.types else self.negated + if is_in: + result.append(t) + return result diff --git a/tests/function/token/test_token_converter.py b/tests/function/token/test_token_converter.py new file mode 100644 index 0000000..05f9fc1 --- /dev/null +++ b/tests/function/token/test_token_converter.py @@ -0,0 +1,52 @@ +# This file is part of the Data Cleaning Library (openclean). +# +# Copyright (C) 2018-2021 New York University. +# +# openclean is released under the Revised BSD License. See file LICENSE for +# full license details. + +"""Unit tests for token converter.""" + +import pytest + +from openclean.function.token.base import Token +from openclean.function.token.convert import TokenListConverter, TokenMapper + + +def test_token_list_converter(): + """Test the token list converter.""" + m1 = TokenMapper(label='T', lookup={'a': 'b'}) + m2 = TokenMapper(label='S', lookup={'b': 'c'}) + f = TokenListConverter(converters=[m1, m2]) + # -- Empty token list ----------------------------------------------------- + assert f.transform([]) == [] + # -- Non-empty token list ------------------------------------------------- + tokens = f.transform([Token('a'), Token('b'), Token('c')]) + assert ''.join([t for t in tokens]) == 'bcc' + + +@pytest.mark.parametrize( + 'label,lookup,token,result_value', + [ + ('T', {'a': 'b'}, Token('a', token_type='A'), 'b'), + ('T', {'a', 'b'}, Token('a', token_type='A'), 'a') + ] +) +def test_token_mapper(label, lookup, token, result_value): + """Test functionality of the token mapper.""" + mapper = TokenMapper(label=label, lookup=lookup) + assert mapper.contains(token) + t = mapper.convert(token) + assert t == result_value + assert t.type() == label + + +def test_token_mapper_transform(): + """Test token transformer functionality of the token mapper.""" + f = TokenMapper(label='T', lookup={'a': 'b'}) + # -- Empty token list ----------------------------------------------------- + assert f.transform([]) == [] + # -- Non-empty token list ------------------------------------------------- + tokens = f.transform([Token('a'), Token('b', token_type='S')]) + assert ''.join([t for t in tokens]) == 'bb' + assert ''.join([t.type() for t in tokens]) == 'TS' diff --git a/tests/function/token/test_token_filter.py b/tests/function/token/test_token_filter.py index e8f4768..62eee10 100644 --- a/tests/function/token/test_token_filter.py +++ b/tests/function/token/test_token_filter.py @@ -7,7 +7,10 @@ """Unit tests for basic token list filters.""" -from openclean.function.token.filter import FirstLastFilter, MinMaxFilter, TokenFilter +import pytest + +from openclean.function.token.base import Token +from openclean.function.token.filter import FirstLastFilter, MinMaxFilter, TokenFilter, TokenTypeFilter from openclean.function.value.base import UnpreparedFunction from openclean.function.value.domain import IsInDomain @@ -31,6 +34,23 @@ def test_prepared_token_filter(): assert f.transform(TOKENS) == ['A'] +@pytest.mark.parametrize( + 'types,negated,result', + [({'A'}, False, 'ac'), ({'A'}, True, 'bde'), ({'A', 'C'}, False, 'ace'), ({'A', 'C'}, True, 'bd')] +) +def test_token_type_filter(types, negated, result): + """Test filtering tokens based on their token type.""" + filter = TokenTypeFilter(types=types, negated=negated) + tokens = [ + Token('a', token_type='A'), + Token('b', token_type='B'), + Token('c', token_type='A'), + Token('d', token_type='B'), + Token('e', token_type='C'), + ] + assert ''.join(filter.transform(tokens)) == result + + def test_unprepared_token_filter(): """Test token filter with a value function that needs to be prepared.""" diff --git a/tests/function/token/test_token_transformers.py b/tests/function/token/test_token_transformers.py index ab227a4..9249f6e 100644 --- a/tests/function/token/test_token_transformers.py +++ b/tests/function/token/test_token_transformers.py @@ -11,56 +11,72 @@ from openclean.function.token.base import ( - LowerTokens, ReverseTokens, SortTokens, StandardizeTokens, TokenPrefix, - TokenTransformerPipeline, UniqueTokens, UpperTokens + CapitalizeTokens, LowerTokens, ReverseTokens, SortTokens, StandardizeTokens, + Token, TokenPrefix, TokenTransformerPipeline, UniqueTokens, UpperTokens ) def test_prefix_transformer(): """Test the token prefix transformer.""" - tokens = ['B', 'A', 'C'] - assert TokenPrefix(length=2).transform(tokens) == ['B', 'A'] - assert TokenPrefix(length=3).transform(tokens) == tokens - assert TokenPrefix(length=4).transform(tokens) == tokens + values = ['B', 'A', 'C'] + tokens = [Token(t, token_type=t) for t in values] + for i in [2, 3, 4]: + result = concat_tokens(TokenPrefix(length=i).transform(tokens)) + assert result == ['{}{}'.format(v, v) for v in values[:min(i, 3)]] def test_reverse_transformer(): """Test reversing a list of tokens.""" - assert ReverseTokens().transform(['A', 'B']) == ['B', 'A'] + values = ['a', 'b'] + tokens = [Token(t, token_type=t.upper()) for t in values] + result = concat_tokens(ReverseTokens().transform(tokens)) + assert result == ['bB', 'aA'] def test_sort_transformer(): """Test sorting a list of tokens.""" - tokens = ['B', 'A', 'C'] - assert SortTokens().transform(tokens) == ['A', 'B', 'C'] - assert SortTokens(reverse=True).transform(tokens) == ['C', 'B', 'A'] + tokens = [Token(t, token_type=t.upper()) for t in ['b', 'a', 'c']] + assert concat_tokens(SortTokens().transform(tokens)) == ['aA', 'bB', 'cC'] + assert concat_tokens(SortTokens(reverse=True).transform(tokens)) == ['cC', 'bB', 'aA'] f = SortTokens(key=lambda x: x[1]) - assert f.transform(['A2', 'W3', 'Z1']) == ['Z1', 'A2', 'W3'] + assert f.transform([Token(t) for t in ['A2', 'W3', 'Z1']]) == ['Z1', 'A2', 'W3'] f = SortTokens(key=lambda x: x[1], reverse=True) - assert f.transform(['A2', 'W3', 'Z1']) == ['W3', 'A2', 'Z1'] + assert f.transform([Token(t) for t in ['A2', 'W3', 'Z1']]) == ['W3', 'A2', 'Z1'] def test_standardize_tokens(): """Test token stadardization using a mapping dictionary.""" f = StandardizeTokens({'ST': 'STREET'}) - assert f.transform(['ROAD', 'ST', 'STREET']) == ['ROAD', 'STREET', 'STREET'] + assert f.transform([Token(t) for t in ['ROAD', 'ST', 'STREET']]) == ['ROAD', 'STREET', 'STREET'] @pytest.mark.parametrize( 'func,result', - [(LowerTokens(), ['a', 'b', 'c']), (UpperTokens(), ['A', 'B', 'C'])] + [ + (CapitalizeTokens(), ['Ab', 'Bb', 'Cb']), + (LowerTokens(), ['ab', 'bb', 'cb']), + (UpperTokens(), ['AB', 'BB', 'CB']) + ] ) def test_token_case(func, result): """Test transformers that change token case.""" - assert func.transform(['A', 'b', 'C']) == result + tokens = func.transform([Token(t, token_type=t.upper()) for t in ['Ab', 'bb', 'CB']]) + tokens == result + assert [t.type() for t in tokens] == ['AB', 'BB', 'CB'] def test_transformer_pipeline(): """Test chaining token transformers.""" f = TokenTransformerPipeline(transformers=[SortTokens(), TokenPrefix(length=2)]) - assert f.transform(['C', 'A', 'B']) == ['A', 'B'] + assert f.transform([Token(t) for t in ['C', 'A', 'B']]) == ['A', 'B'] def test_unique_transformer(): """Test creating a list of unique tokens.""" - assert set(UniqueTokens().transform(['A', 'A', 'B'])) == {'A', 'B'} + assert set(UniqueTokens().transform([Token(t) for t in ['A', 'A', 'B']])) == {'A', 'B'} + + +# -- Helper functions --------------------------------------------------------- + +def concat_tokens(tokens): + return ['{}{}'.format(t, t.type()) for t in tokens] From 8a6a2174b7850eb92779dea58e55bf815eb0d196 Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Tue, 23 Mar 2021 15:24:57 -0400 Subject: [PATCH 03/23] Add cluster operator for data streams --- openclean/cluster/base.py | 71 ++++++++++++++++++++++++++- openclean/operator/stream/consumer.py | 2 +- openclean/pipeline.py | 19 +++++++ tests/cluster/test_cluster_stream.py | 35 +++++++++++++ tests/pipeline/test_stream_cluster.py | 16 ++++++ 5 files changed, 140 insertions(+), 3 deletions(-) create mode 100644 tests/cluster/test_cluster_stream.py create mode 100644 tests/pipeline/test_stream_cluster.py diff --git a/openclean/cluster/base.py b/openclean/cluster/base.py index 4063823..7dae0cd 100644 --- a/openclean/cluster/base.py +++ b/openclean/cluster/base.py @@ -17,7 +17,9 @@ from collections import Counter from typing import Dict, Iterable, List, Optional, Union -from openclean.data.types import Value +from openclean.data.types import Schema, Value +from openclean.operator.stream.consumer import StreamConsumer +from openclean.operator.stream.processor import StreamProcessor class Cluster(Counter): @@ -78,7 +80,7 @@ def to_mapping(self, target: Optional[Value] = None) -> Dict: return {key: target for key in self.keys() if key != target} -class Clusterer(metaclass=ABCMeta): +class Clusterer(StreamProcessor, metaclass=ABCMeta): """The value clusterer mixin class defines a single method `clusters()` to cluster a given list of values. """ @@ -101,6 +103,71 @@ def clusters(self, values: Union[Iterable[Value], Counter]) -> List[Cluster]: """ raise NotImplementedError() # pragma: no cover + def open(self, schema: Schema) -> StreamConsumer: + """Factory pattern for stream consumer. + + Returns an instance of the stream clusterer that will collect the + distinct values in the stream and then call the cluster method of + this clusterer. + + Parameters + ---------- + schema: list of string + List of column names in the data stream schema. + + Returns + ------- + openclean.cluster.base.StreamClusterer + """ + return StreamClusterer(clusterer=self) + + +class StreamClusterer(StreamConsumer): + """Cluster values in a stream. This implementation will create a set of + distinct values in the stream together with their frequency counts. It will + then apply a given cluster algorithm on the created value set. + """ + def __init__(self, clusterer: Clusterer): + """Initialize the cluster algorithm and the internal value counter. + + Parameters + ---------- + clusterer: openclean.cluster.base.Clusterer + Cluster algorithm that is applied on the set of distinct values + that is generated from the data stream. + """ + self.clusterer = clusterer + self.counter = Counter() + + def close(self) -> List[Cluster]: + """Closing the consumer returns the result of applying the associated + clusterer on the collected set of distinct values. + + Returns + ------- + list of openclean.cluster.base.Cluster + """ + return self.clusterer.clusters(self.counter) + + def consume(self, rowid: int, row: List): + """Add the values in a given row to the internal counter. + + If the row only has one value this value will be used as the key for + the counter. For rows with multiple values the values in the row will + be concatenated (separated by a blank space) to a single string value. + + Parameters + ----------- + rowid: int + Unique row identifier + row: list + List of values in the row. + """ + if len(row) == 1: + self.counter[row[0]] += 1 + else: + self.counter[' '.join([str(v) for v in row])] += 1 + # -- Helper Classes ----------------------------------------------------------- diff --git a/openclean/operator/stream/consumer.py b/openclean/operator/stream/consumer.py index 85815e5..01d5834 100644 --- a/openclean/operator/stream/consumer.py +++ b/openclean/operator/stream/consumer.py @@ -91,7 +91,7 @@ def process(self, ds: DataReader) -> Any: class ProducingConsumer(StreamConsumer): """A producing consumer passes the processed row on to a downstream - consumer. This consumer therefore acts a s a consumer and a producer. + consumer. This consumer therefore acts as a consumer and a producer. """ def __init__(self, columns: Schema, consumer: Optional[StreamConsumer]): """Initialize the row schema and the optional downstream consumer. Note diff --git a/openclean/pipeline.py b/openclean/pipeline.py index 80a6885..10143a0 100644 --- a/openclean/pipeline.py +++ b/openclean/pipeline.py @@ -19,6 +19,7 @@ from openclean.data.stream.csv import CSVFile from openclean.data.stream.df import DataFrameStream from openclean.data.types import Columns, Scalar, Schema +from openclean.cluster.base import Cluster, Clusterer from openclean.function.eval.base import EvalFunction from openclean.function.matching.base import StringMatcher from openclean.operator.stream.collector import Distinct, DataFrame, RowCount, Write @@ -100,6 +101,24 @@ def append( pipeline=self.pipeline + [op] ) + def cluster(self, clusterer: Clusterer) -> List[Cluster]: + """Cluster values in a data stream. + + This operator will create a distinct set of values in the data stream + rows. The collected values are then passed on to the given cluster + algorithm. + + Parameters + ---------- + clusterer: openclean.cluster.base.Clusterer + Cluster algorithm for distinct values in the data stream. + + Returns + ------- + list of openclean.cluster.base.Cluster + """ + return self.stream(clusterer) + def count(self) -> int: """Count the number of rows in a data stream. diff --git a/tests/cluster/test_cluster_stream.py b/tests/cluster/test_cluster_stream.py new file mode 100644 index 0000000..e896ae8 --- /dev/null +++ b/tests/cluster/test_cluster_stream.py @@ -0,0 +1,35 @@ +# This file is part of the Data Cleaning Library (openclean). +# +# Copyright (C) 2018-2021 New York University. +# +# openclean is released under the Revised BSD License. See file LICENSE for +# full license details. + +"""Unit tests for the cluster stream class.""" + +from openclean.cluster.key import KeyCollision +from openclean.function.value.key.fingerprint import Fingerprint + + +def test_cluster_multi_value_stream(): + """Test stream cluster functionality using the key collision cluster + algorithm on a stream of multi-column rows. + """ + clusterer = KeyCollision(func=Fingerprint()).open(['col']) + for val in ['A B', 'B C', 'B c', 'C\tb']: + clusterer.consume(0, val.split()) + clusters = clusterer.close() + assert len(clusters) == 1 + assert len(clusters[0]) == 3 + + +def test_cluster_single_value_stream(): + """Test stream cluster functionality using the key collision cluster + algorithm on a stream of single-column rows. + """ + clusterer = KeyCollision(func=Fingerprint()).open(['col']) + for val in ['A B', 'B C', 'B c', 'C\tb']: + clusterer.consume(0, [val]) + clusters = clusterer.close() + assert len(clusters) == 1 + assert len(clusters[0]) == 3 diff --git a/tests/pipeline/test_stream_cluster.py b/tests/pipeline/test_stream_cluster.py new file mode 100644 index 0000000..622431f --- /dev/null +++ b/tests/pipeline/test_stream_cluster.py @@ -0,0 +1,16 @@ +# This file is part of the Data Cleaning Library (openclean). +# +# Copyright (C) 2018-2021 New York University. +# +# openclean is released under the Revised BSD License. See file LICENSE for +# full license details. + +"""Unit tests for the distinct operator in data processing pipelines.""" + +from openclean.cluster.key import KeyCollision +from openclean.function.value.key.fingerprint import Fingerprint + + +def test_cluster_rows(ds): + """Test distinct count over a stream of rows.""" + assert len(ds.cluster(KeyCollision(Fingerprint()))) == 5 From 6162b605824f61c4a46651424cca235c82f4e18f Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Wed, 24 Mar 2021 10:20:41 -0400 Subject: [PATCH 04/23] Move us-street name functions to openclean-geo --- openclean/function/value/key/geo.py | 399 -------------------------- tests/function/value/test_func_key.py | 10 - 2 files changed, 409 deletions(-) delete mode 100644 openclean/function/value/key/geo.py diff --git a/openclean/function/value/key/geo.py b/openclean/function/value/key/geo.py deleted file mode 100644 index 543fcbd..0000000 --- a/openclean/function/value/key/geo.py +++ /dev/null @@ -1,399 +0,0 @@ -# This file is part of the Data Cleaning Library (openclean). -# -# Copyright (C) 2018-2021 New York University. -# -# openclean is released under the Revised BSD License. See file LICENSE for -# full license details. - -"""Collection of specialized key generation functions for geo-spatial data.""" - -from openclean.function.token.base import Tokens, StandardizeTokens, UpperTokens -from openclean.function.token.filter import TokenFilter -from openclean.function.token.split import ChartypeSplit -from openclean.function.value.text import AlphaNumeric - - -"""Create mapping of common street type abbreviations to a standardized value. -This is primarily intended for US street names. Generated from: -https://github.com/VIDA-NYU/openclean-pattern/blob/master/resources/data/street_abvs.csv -""" - -STANDARDIZE_US_STREET_NAME = { - 'ALLEE': 'ALY', - 'ALLEY': 'ALY', - 'ALLY': 'ALY', - 'ANEX': 'ANX', - 'ANNEX': 'ANX', - 'ANNX': 'ANX', - 'ARCADE': 'ARC', - 'AV': 'AVE', - 'AVEN': 'AVE', - 'AVENU': 'AVE', - 'AVENUE': 'AVE', - 'AVN': 'AVE', - 'AVNUE': 'AVE', - 'BAYOO': 'BYU', - 'BAYOU': 'BYU', - 'BEACH': 'BCH', - 'BEND': 'BND', - 'BLUF': 'BLF', - 'BLUFF': 'BLF', - 'BLUFFS': 'BLFS', - 'BOT': 'BTM', - 'BOTTM': 'BTM', - 'BOTTOM': 'BTM', - 'BOUL': 'BLVD', - 'BOULEVARD': 'BLVD', - 'BOULV': 'BLVD', - 'BRNCH': 'BR', - 'BRANCH': 'BR', - 'BRDGE': 'BRG', - 'BRIDGE': 'BRG', - 'BROOK': 'BRK', - 'BROOKS': 'BRKS', - 'BURG': 'BG', - 'BURGS': 'BGS', - 'BYPA': 'BYP', - 'BYPAS': 'BYP', - 'BYPASS': 'BYP', - 'BYPS': 'BYP', - 'CAMP': 'CP', - 'CMP': 'CP', - 'CANYN': 'CYN', - 'CANYON': 'CYN', - 'CNYN': 'CYN', - 'CAPE': 'CPE', - 'CAUSEWAY': 'CSWY', - 'CAUSWA': 'CSWY', - 'CEN': 'CTR', - 'CENT': 'CTR', - 'CENTER': 'CTR', - 'CENTR': 'CTR', - 'CENTRE': 'CTR', - 'CNTER': 'CTR', - 'CNTR': 'CTR', - 'CENTERS': 'CTRS', - 'CIRC': 'CIR', - 'CIRCL': 'CIR', - 'CIRCLE': 'CIR', - 'CRCL': 'CIR', - 'CRCLE': 'CIR', - 'CIRCLES': 'CIRS', - 'CLIFF': 'CLF', - 'CLIFFS': 'CLFS', - 'CLUB': 'CLB', - 'COMMON': 'CMN', - 'COMMONS': 'CMNS', - 'CORNER': 'COR', - 'CORNERS': 'CORS', - 'COURSE': 'CRSE', - 'COURT': 'CT', - 'COURTS': 'CTS', - 'COVE': 'CV', - 'COVES': 'CVS', - 'CREEK': 'CRK', - 'CRESCENT': 'CRES', - 'CRSENT': 'CRES', - 'CRSNT': 'CRES', - 'CREST': 'CRST', - 'CROSSING': 'XING', - 'CRSSNG': 'XING', - 'CROSSROAD': 'XRD', - 'CROSSROADS': 'XRDS', - 'CURVE': 'CURV', - 'DALE': 'DL', - 'DAM': 'DM', - 'DIV': 'DV', - 'DIVIDE': 'DV', - 'DVD': 'DV', - 'DRIV': 'DR', - 'DRIVE': 'DR', - 'DRV': 'DR', - 'DRIVES': 'DRS', - 'ESTATE': 'EST', - 'ESTATES': 'ESTS', - 'EXP': 'EXPY', - 'EXPR': 'EXPY', - 'EXPRESS': 'EXPY', - 'EXPRESSWAY': 'EXPY', - 'EXPW': 'EXPY', - 'EXTENSION': 'EXT', - 'EXTN': 'EXT', - 'EXTNSN': 'EXT', - 'FALLS': 'FLS', - 'FERRY': 'FRY', - 'FRRY': 'FRY', - 'FIELD': 'FLD', - 'FIELDS': 'FLDS', - 'FLAT': 'FLT', - 'FLATS': 'FLTS', - 'FORD': 'FRD', - 'FORDS': 'FRDS', - 'FOREST': 'FRST', - 'FORESTS': 'FRST', - 'FORG': 'FRG', - 'FORGE': 'FRG', - 'FORGES': 'FRGS', - 'FORK': 'FRK', - 'FORKS': 'FRKS', - 'FORT': 'FT', - 'FRT': 'FT', - 'FREEWAY': 'FWY', - 'FREEWY': 'FWY', - 'FRWAY': 'FWY', - 'FRWY': 'FWY', - 'GARDEN': 'GDN', - 'GARDN': 'GDN', - 'GRDEN': 'GDN', - 'GRDN': 'GDN', - 'GARDENS': 'GDNS', - 'GRDNS': 'GDNS', - 'GATEWAY': 'GTWY', - 'GATEWY': 'GTWY', - 'GATWAY': 'GTWY', - 'GTWAY': 'GTWY', - 'GLEN': 'GLN', - 'GLENS': 'GLNS', - 'GREEN': 'GRN', - 'GREENS': 'GRNS', - 'GROV': 'GRV', - 'GROVE': 'GRV', - 'GROVES': 'GRVS', - 'HARB': 'HBR', - 'HARBOR': 'HBR', - 'HARBR': 'HBR', - 'HRBOR': 'HBR', - 'HARBORS': 'HBRS', - 'HAVEN': 'HVN', - 'HT': 'HTS', - 'HIGHWAY': 'HWY', - 'HIGHWY': 'HWY', - 'HIWAY': 'HWY', - 'HIWY': 'HWY', - 'HWAY': 'HWY', - 'HILL': 'HL', - 'HILLS': 'HLS', - 'HLLW': 'HOLW', - 'HOLLOW': 'HOLW', - 'HOLLOWS': 'HOLW', - 'HOLWS': 'HOLW', - 'ISLAND': 'IS', - 'ISLND': 'IS', - 'ISLANDS': 'ISS', - 'ISLNDS': 'ISS', - 'ISLES': 'ISLE', - 'JCTION': 'JCT', - 'JCTN': 'JCT', - 'JUNCTION': 'JCT', - 'JUNCTN': 'JCT', - 'JUNCTON': 'JCT', - 'JCTNS': 'JCTS', - 'JUNCTIONS': 'JCTS', - 'KEY': 'KY', - 'KEYS': 'KYS', - 'KNOL': 'KNL', - 'KNOLL': 'KNL', - 'KNOLLS': 'KNLS', - 'LAKE': 'LK', - 'LAKES': 'LKS', - 'LANDING': 'LNDG', - 'LNDNG': 'LNDG', - 'LANE': 'LN', - 'LIGHT': 'LGT', - 'LIGHTS': 'LGTS', - 'LOAF': 'LF', - 'LOCK': 'LCK', - 'LOCKS': 'LCKS', - 'LDGE': 'LDG', - 'LODG': 'LDG', - 'LODGE': 'LDG', - 'LOOPS': 'LOOP', - 'MANOR': 'MNR', - 'MANORS': 'MNRS', - 'MEADOW': 'MDW', - 'MDW': 'MDWS', - 'MEADOWS': 'MDWS', - 'MEDOWS': 'MDWS', - 'MILL': 'ML', - 'MILLS': 'MLS', - 'MISSN': 'MSN', - 'MSSN': 'MSN', - 'MOTORWAY': 'MTWY', - 'MNT': 'MT', - 'MOUNT': 'MT', - 'MNTAIN': 'MTN', - 'MNTN': 'MTN', - 'MOUNTAIN': 'MTN', - 'MOUNTIN': 'MTN', - 'MTIN': 'MTN', - 'MNTNS': 'MTNS', - 'MOUNTAINS': 'MTNS', - 'NECK': 'NCK', - 'ORCHARD': 'ORCH', - 'ORCHRD': 'ORCH', - 'OVL': 'OVAL', - 'OVERPASS': 'OPAS', - 'PRK': 'PARK', - 'PARKS': 'PARK', - 'PARKWAY': 'PKWY', - 'PARKWY': 'PKWY', - 'PKWAY': 'PKWY', - 'PKY': 'PKWY', - 'PARKWAYS': 'PKWY', - 'PKWYS': 'PKWY', - 'PASSAGE': 'PSGE', - 'PATHS': 'PATH', - 'PIKES': 'PIKE', - 'PINE': 'PNE', - 'PINES': 'PNES', - 'PLAIN': 'PLN', - 'PLAINS': 'PLNS', - 'PLAZA': 'PLZ', - 'PLZA': 'PLZ', - 'POINT': 'PT', - 'POINTS': 'PTS', - 'PORT': 'PRT', - 'PORTS': 'PRTS', - 'PRAIRIE': 'PR', - 'PRR': 'PR', - 'RAD': 'RADL', - 'RADIAL': 'RADL', - 'RADIEL': 'RADL', - 'RANCH': 'RNCH', - 'RANCHES': 'RNCH', - 'RNCHS': 'RNCH', - 'RAPID': 'RPD', - 'RAPIDS': 'RPDS', - 'REST': 'RST', - 'RDGE': 'RDG', - 'RIDGE': 'RDG', - 'RIDGES': 'RDGS', - 'RIVER': 'RIV', - 'RVR': 'RIV', - 'RIVR': 'RIV', - 'ROAD': 'RD', - 'ROADS': 'RDS', - 'ROUTE': 'RTE', - 'SHOAL': 'SHL', - 'SHOALS': 'SHLS', - 'SHOAR': 'SHR', - 'SHORE': 'SHR', - 'SHOARS': 'SHRS', - 'SHORES': 'SHRS', - 'SKYWAY': 'SKWY', - 'SPNG': 'SPG', - 'SPRING': 'SPG', - 'SPRNG': 'SPG', - 'SPNGS': 'SPGS', - 'SPRINGS': 'SPGS', - 'SPRNGS': 'SPGS', - 'SPURS': 'SPUR', - 'SQR': 'SQ', - 'SQRE': 'SQ', - 'SQU': 'SQ', - 'SQUARE': 'SQ', - 'SQRS': 'SQS', - 'SQUARES': 'SQS', - 'STATION': 'STA', - 'STATN': 'STA', - 'STN': 'STA', - 'STRAV': 'STRA', - 'STRAVEN': 'STRA', - 'STRAVENUE': 'STRA', - 'STRAVN': 'STRA', - 'STRVN': 'STRA', - 'STRVNUE': 'STRA', - 'STREAM': 'STRM', - 'STREME': 'STRM', - 'STREET': 'ST', - 'STRT': 'ST', - 'STR': 'ST', - 'STREETS': 'STS', - 'SUMIT': 'SMT', - 'SUMITT': 'SMT', - 'SUMMIT': 'SMT', - 'TERR': 'TER', - 'TERRACE': 'TER', - 'THROUGHWAY': 'TRWY', - 'TRACE': 'TRCE', - 'TRACES': 'TRCE', - 'TRACK': 'TRAK', - 'TRACKS': 'TRAK', - 'TRK': 'TRAK', - 'TRKS': 'TRAK', - 'TRAFFICWAY': 'TRFY', - 'TRAIL': 'TRL', - 'TRAILS': 'TRL', - 'TRLS': 'TRL', - 'TRAILER': 'TRLR', - 'TRLRS': 'TRLR', - 'TUNEL': 'TUNL', - 'TUNLS': 'TUNL', - 'TUNNEL': 'TUNL', - 'TUNNELS': 'TUNL', - 'TUNNL': 'TUNL', - 'TRNPK': 'TPKE', - 'TURNPIKE': 'TPKE', - 'TURNPK': 'TPKE', - 'UNDERPASS': 'UPAS', - 'UNION': 'UN', - 'UNIONS': 'UNS', - 'VALLEY': 'VLY', - 'VALLY': 'VLY', - 'VLLY': 'VLY', - 'VALLEYS': 'VLYS', - 'VDCT': 'VIA', - 'VIADCT': 'VIA', - 'VIADUCT': 'VIA', - 'VIEW': 'VW', - 'VIEWS': 'VWS', - 'VILL': 'VLG', - 'VILLAG': 'VLG', - 'VILLAGE': 'VLG', - 'VILLG': 'VLG', - 'VILLIAGE': 'VLG', - 'VILLAGES': 'VLGS', - 'VILLE': 'VL', - 'VIST': 'VIS', - 'VISTA': 'VIS', - 'VST': 'VIS', - 'VSTA': 'VIS', - 'WALKS': 'WALK', - 'WY': 'WAY', - 'WELL': 'WL', - 'WELLS': 'WLS', - 'E': 'EAST', - 'W': 'WEST', - 'N': 'NORTH', - 'S': 'SOUTH' -} - - -class USStreetNameKey(Tokens): - """Key generator for US street names. Keys are generated based on tokenization - of input values. Tokens are generated using the character type splitter that - generates tokens of homogeneous character type distinguishing between letters, - digits and other. FOr example, a value of 'W35ST' is split into three tokens - 'W', '35', 'ST'. The key generator removes all tokens that contain non- - alphenumeric characters. The remaining tokens are then normalized using a - mapping that attempts to standardize different abbreviations for street - types (e.g., ST, STR, STREET -> ST). - - Note that duplicate tokens are not removed by this key generator. The reason - is that some abbreviations for street types (e.g., ST) can have multiple - semantics, e.g., 'ST. MARKS ST'. Removing duplicates would make the previous - example similar to 'MARKS STREET'. - """ - def __init__(self): - """Initialize the tokenizer and token transformer in the super class.""" - super(USStreetNameKey, self).__init__( - tokenizer=ChartypeSplit(), - transformer=[ - TokenFilter(AlphaNumeric()), - UpperTokens(), - StandardizeTokens(STANDARDIZE_US_STREET_NAME) - ], - delim=' ', - sort=True, - unique=False - ) diff --git a/tests/function/value/test_func_key.py b/tests/function/value/test_func_key.py index 30a564b..4e83656 100644 --- a/tests/function/value/test_func_key.py +++ b/tests/function/value/test_func_key.py @@ -10,7 +10,6 @@ import pytest from openclean.function.value.key.fingerprint import Fingerprint, NGramFingerprint -from openclean.function.value.key.geo import USStreetNameKey @pytest.mark.parametrize( @@ -37,12 +36,3 @@ def test_default_fingerprint_key(text, result): def test_ngram_fingerprint_key(text, pleft, pright, result): """Test 3-gram key generator.""" assert NGramFingerprint(n=3, pleft=pleft, pright=pright).eval(text) == result - - -@pytest.mark.parametrize( - 'text,result', - [('5TH AVEN.', '5 AVE TH'), ('ST. MARKS STR', 'MARKS ST ST')] -) -def test_us_street_name_key(text, result): - """Test the specialized US streen name key generator.""" - assert USStreetNameKey().eval(text) == result From 0642d6f3d6d14f2a4188669171a8991f86b91e40 Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Wed, 24 Mar 2021 15:19:09 -0400 Subject: [PATCH 05/23] Add filter for repeated tokens --- openclean/function/token/filter.py | 28 +++++++++++++++++++++++ tests/function/token/test_token_filter.py | 21 ++++++++++++++++- 2 files changed, 48 insertions(+), 1 deletion(-) diff --git a/openclean/function/token/filter.py b/openclean/function/token/filter.py index 9915faf..1e024c9 100644 --- a/openclean/function/token/filter.py +++ b/openclean/function/token/filter.py @@ -7,6 +7,7 @@ """Collection of functions to filter (remove) tokens from given token lists.""" +from collections.abc import Container from typing import List, Optional, Set from openclean.function.token.base import SortTokens, Token, TokenTransformer, TokenTransformerPipeline @@ -46,6 +47,33 @@ def __init__(self): ) +class RepeatedTokenFilter(TokenTransformer): + """Remove consecutive identical tokens in a given sequence.""" + def transform(self, tokens: List[Token]) -> List[Token]: + """Returns a list where no two consecutive tokens are identical. + + Patameters + ---------- + tokens: list of openclean.function.token.base.Token + List of string tokens. + + Returns + ------- + list of openclean.function.token.base.Token + """ + if len(tokens) < 2: + return tokens + # Create initial list containing the first token. + prev = tokens[0] + result = [prev] + for i in range(1, len(tokens)): + token = tokens[i] + if token != prev: + result.append(token) + prev = token + return result + + class TokenFilter(TokenTransformer): """Filter tokens based on a given predicate.""" def __init__(self, predicate: ValueFunction): diff --git a/tests/function/token/test_token_filter.py b/tests/function/token/test_token_filter.py index 62eee10..bb1404c 100644 --- a/tests/function/token/test_token_filter.py +++ b/tests/function/token/test_token_filter.py @@ -10,7 +10,10 @@ import pytest from openclean.function.token.base import Token -from openclean.function.token.filter import FirstLastFilter, MinMaxFilter, TokenFilter, TokenTypeFilter +from openclean.function.token.filter import ( + FirstLastFilter, MinMaxFilter, RepeatedTokenFilter, TokenFilter, + TokenTypeFilter +) from openclean.function.value.base import UnpreparedFunction from openclean.function.value.domain import IsInDomain @@ -34,6 +37,22 @@ def test_prepared_token_filter(): assert f.transform(TOKENS) == ['A'] +@pytest.mark.parametrize( + 'values,result', [ + ([], []), + (['a'], ['a']), + (['a', 'a'], ['a']), + (['a', 'b', 'a'], ['a', 'b', 'a']), + (['a', 'b', 'b', 'a'], ['a', 'b', 'a']), + (['a', 'a', 'b', 'a', 'a'], ['a', 'b', 'a']) + ] +) +def test_repeated_token_filter(values, result): + """Test the filter that removes repeated tokens.""" + tokens = [Token(v) for v in values] + assert RepeatedTokenFilter().transform(tokens) == result + + @pytest.mark.parametrize( 'types,negated,result', [({'A'}, False, 'ac'), ({'A'}, True, 'bde'), ({'A', 'C'}, False, 'ace'), ({'A', 'C'}, True, 'bd')] From 1bd4192ad7b16fd718e6d33b81d33b359f906a98 Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Wed, 24 Mar 2021 16:01:30 -0400 Subject: [PATCH 06/23] Rename StringTokenizer to Tokenizer --- openclean/cluster/knn.py | 14 +++++++------- openclean/function/token/base.py | 8 ++++---- openclean/function/token/ngram.py | 4 ++-- openclean/function/token/split.py | 10 +++++----- openclean/function/value/key/fingerprint.py | 6 +++--- openclean/profiling/anomalies/pattern.py | 8 ++++---- 6 files changed, 25 insertions(+), 25 deletions(-) diff --git a/openclean/cluster/knn.py b/openclean/cluster/knn.py index 8cbe722..37910e3 100644 --- a/openclean/cluster/knn.py +++ b/openclean/cluster/knn.py @@ -27,7 +27,7 @@ from openclean.cluster.index import ClusterIndex from openclean.cluster.key import key_collision from openclean.function.value.base import ValueFunction -from openclean.function.token.base import StringTokenizer +from openclean.function.token.base import Tokenizer from openclean.function.token.ngram import NGrams from openclean.function.similarity.base import SimilarityConstraint @@ -43,7 +43,7 @@ class kNNClusterer(Clusterer): """ def __init__( self, sim: SimilarityConstraint, - tokenizer: Optional[StringTokenizer] = None, minsize: Optional[int] = 2, + tokenizer: Optional[Tokenizer] = None, minsize: Optional[int] = 2, remove_duplicates: Optional[bool] = True ): """Initialize the string tokenizer, the similarity constraint, and the @@ -54,7 +54,7 @@ def __init__( sim: openclean.function.similarity.base.SimilarityConstraint String similarity constraint for grouping strings in the generated blocks. - tokenizer: openclean.function.token.base.StringTokenizer, default=None + tokenizer: openclean.function.token.base.Tokenizer, default=None Generator for tokens that are used to group string values in the first step of the algorithm. By default, n-grams of length 6 are used as blocking tokens. @@ -173,7 +173,7 @@ def _get_clusters(self, clusters: Iterable[Cluster]) -> List[Cluster]: def knn_clusters( values: Union[Iterable[Value], Counter], sim: SimilarityConstraint, - tokenizer: Optional[StringTokenizer] = None, minsize: Optional[int] = 2, + tokenizer: Optional[Tokenizer] = None, minsize: Optional[int] = 2, remove_duplicates: Optional[bool] = True ) -> List[Cluster]: """Run kNN clustering for a given list of values. @@ -186,7 +186,7 @@ def knn_clusters( sim: openclean.function.similarity.base.SimilarityConstraint String similarity constraint for grouping strings in the generated blocks. - tokenizer: openclean.function.token.base.StringTokenizer, default=None + tokenizer: openclean.function.token.base.Tokenizer, default=None Generator for tokens that are used to group string values in the first step of the algorithm. By default, n-grams of length 6 are used as blocking tokens. @@ -211,7 +211,7 @@ def knn_clusters( def knn_collision_clusters( values: Union[Iterable[Value], Counter], sim: SimilarityConstraint, keys: Optional[Union[Callable, ValueFunction]] = None, - tokenizer: Optional[StringTokenizer] = None, minsize: Optional[int] = 2, + tokenizer: Optional[Tokenizer] = None, minsize: Optional[int] = 2, remove_duplicates: Optional[bool] = True, threads: Optional[int] = None ) -> List[Cluster]: """Run kNN clustering on a set of values that have been grouped using @@ -232,7 +232,7 @@ def knn_collision_clusters( keys: callable or ValueFunction, default=None Function that is used to generate keys for values. By default the token fingerprint generator is used. - tokenizer: openclean.function.token.base.StringTokenizer, default=None + tokenizer: openclean.function.token.base.Tokenizer, default=None Generator for tokens that are used to group string values in the first step of the algorithm. By default, n-grams of length 6 are used as blocking tokens. diff --git a/openclean/function/token/base.py b/openclean/function/token/base.py index 8043ad8..54d0a19 100644 --- a/openclean/function/token/base.py +++ b/openclean/function/token/base.py @@ -81,7 +81,7 @@ def type(self) -> str: # -- Mixin classes ------------------------------------------------------------ -class StringTokenizer(metaclass=ABCMeta): +class Tokenizer(metaclass=ABCMeta): """Interface for string tokenizer. A string tokenizer should be able to handle any scalar value (e.g., by first transforming numeric values into a string representation). The tokenizer returns a list of token objects. @@ -131,7 +131,7 @@ def transform(self, tokens: List[Token]) -> List[Token]: # -- Default tokenizer -------------------------------------------------------- -class Tokens(PreparedFunction, StringTokenizer): +class Tokens(PreparedFunction, Tokenizer): """The default tokenizer is a simple wrapper around a given tokenizer and an (optional) token transformer that is applied on the output of the given tokenizer. @@ -143,7 +143,7 @@ class Tokens(PreparedFunction, StringTokenizer): functionality to concatenate the generated token list to a token key string. """ def __init__( - self, tokenizer: StringTokenizer, + self, tokenizer: Tokenizer, transformer: Optional[Union[List[TokenTransformer], TokenTransformer]] = None, delim: Optional[str] = '', sort: Optional[bool] = False, reverse: Optional[bool] = False, unique: Optional[bool] = False @@ -153,7 +153,7 @@ def __init__( Parameters ---------- - tokenizer: openclean.function.token.base.StringTokenizer + tokenizer: openclean.function.token.base.Tokenizer Tokenizer that is used to generate initial token list for given values. transformer: list or single object of openclean.function.token.base.TokenTransformer, diff --git a/openclean/function/token/ngram.py b/openclean/function/token/ngram.py index 369e986..bd06219 100644 --- a/openclean/function/token/ngram.py +++ b/openclean/function/token/ngram.py @@ -12,10 +12,10 @@ from typing import List, Optional from openclean.data.types import Scalar -from openclean.function.token.base import StringTokenizer, Token +from openclean.function.token.base import Tokenizer, Token -class NGrams(StringTokenizer): +class NGrams(Tokenizer): """Split values into lists of n-grams. n-grams are substrings of length n. Provides the option to pad stings with special characters to the left and right before computing n-grams. That is, if a left (right) padding character diff --git a/openclean/function/token/split.py b/openclean/function/token/split.py index d52052e..55d3628 100644 --- a/openclean/function/token/split.py +++ b/openclean/function/token/split.py @@ -12,7 +12,7 @@ import re from openclean.data.types import Scalar -from openclean.function.token.base import StringTokenizer, Token +from openclean.function.token.base import Tokenizer, Token import openclean.function.token.base as TT @@ -20,7 +20,7 @@ DEFAULT_CLASSIFIER = [(str.isalpha, TT.ALPHA), (str.isdigit, TT.DIGIT)] -class ChartypeSplit(StringTokenizer): +class ChartypeSplit(Tokenizer): """Split values basesd of a list of character type functions. That is, a value that contains characters of different types, e.g., W35ST, will be split into tokens with homogeneous character type, e.g., ['W', '35', 'ST']. @@ -106,7 +106,7 @@ def tokens(self, value: Scalar) -> List[Token]: return tokens -class Split(StringTokenizer): +class Split(Tokenizer): """String tokenizer that is a wrapper around the regular expression split method. Defines a extra parameters to (a) pre-process a given value and (b) modify the generated token lists. @@ -117,7 +117,7 @@ class Split(StringTokenizer): def __init__( self, pattern: str, sort: Optional[bool] = False, reverse: Optional[bool] = False, unique: Optional[bool] = False, - preproc: Optional[Callable] = None, subtokens: Optional[StringTokenizer] = None + preproc: Optional[Callable] = None, subtokens: Optional[Tokenizer] = None ): """Initialize the tokenizer parameters. @@ -135,7 +135,7 @@ def __init__( preproc: callable, default=None Optional pre-processor that is evaluated on each value before tokenization. - subtokens: openclean.function.token.base.StringTokenizer, default=None + subtokens: openclean.function.token.base.Tokenizer, default=None Tokenizer that is used to split generated tokens into sub-tokens. """ self.pattern = pattern diff --git a/openclean/function/value/key/fingerprint.py b/openclean/function/value/key/fingerprint.py index c5c3eea..d41307a 100644 --- a/openclean/function/value/key/fingerprint.py +++ b/openclean/function/value/key/fingerprint.py @@ -16,7 +16,7 @@ from openclean.data.types import Value -from openclean.function.token.base import StringTokenizer +from openclean.function.token.base import Tokenizer from openclean.function.token.ngram import NGrams from openclean.function.token.split import Split from openclean.function.value.base import PreparedFunction @@ -41,7 +41,7 @@ class Fingerprint(PreparedFunction): 5) Concatenate remaining (sorted) tokens using a single space character as the delimiter. """ - def __init__(self, tokenizer: Optional[StringTokenizer] = None, normalizer: Optional[Callable] = None): + def __init__(self, tokenizer: Optional[Tokenizer] = None, normalizer: Optional[Callable] = None): """Initialize the tokenizer that is used by the fingerprint function and the optional normalizer. By default, a tokenizer is used that splits on whitespaces. the default normalizer is the openclean text normalizer. @@ -52,7 +52,7 @@ def __init__(self, tokenizer: Optional[StringTokenizer] = None, normalizer: Opti Patameters ---------- - tokenizer: openclean.function.token.base.StringTokenizer, default=None + tokenizer: openclean.function.token.base.Tokenizer, default=None Tokenizer that is used during fingerprint generation. normalizer: callable, default=None Callable that is used to normalize values before token generation. diff --git a/openclean/profiling/anomalies/pattern.py b/openclean/profiling/anomalies/pattern.py index 4031cfb..0709c4c 100644 --- a/openclean/profiling/anomalies/pattern.py +++ b/openclean/profiling/anomalies/pattern.py @@ -16,7 +16,7 @@ from openclean.data.types import Value from openclean.function.eval.base import InputColumn -from openclean.function.token.base import StringTokenizer +from openclean.function.token.base import Tokenizer from openclean.function.token.split import Split from openclean.function.value.normalize.text import TextNormalizer from openclean.function.value.regex import IsMatch, IsNotMatch @@ -112,7 +112,7 @@ def outlier(self, value: Value) -> bool: # -- Token signatures --------------------------------------------------------- -def DefaultTokenizer() -> StringTokenizer: +def DefaultTokenizer() -> Tokenizer: """Create an instance of the default tokenizer.""" return Split('\\s+', unique=True, preproc=TextNormalizer()) @@ -127,7 +127,7 @@ class TokenSignatureOutliers(ConditionalOutliers): tokens for that entry. """ def __init__( - self, signature: TokenSignature, tokenizer: Optional[StringTokenizer] = None, + self, signature: TokenSignature, tokenizer: Optional[Tokenizer] = None, exactly_one: Optional[bool] = False ): """Initialize the token signature and the string tokenizer. @@ -138,7 +138,7 @@ def __init__( signature: openclean.profiling.pattern.token_signature.TokenSignature Token signature. - tokenizer: openclean.function.token.base.StringTokenizer, default=None + tokenizer: openclean.function.token.base.Tokenizer, default=None Tokenizer that is used to generate tokens for input values. exactly_one: bool, default=False If the exactly one flag is set a value that matches multiple entries From d3dfee173fe6a67801fdbb2bfefa6ac983c1f44f Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Wed, 24 Mar 2021 16:01:52 -0400 Subject: [PATCH 07/23] Bump version to 0.3.0 --- openclean/version.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openclean/version.py b/openclean/version.py index 944d3a8..068ae1d 100644 --- a/openclean/version.py +++ b/openclean/version.py @@ -7,4 +7,4 @@ """Version information for the openclean package.""" -__version__ = '0.2.0' +__version__ = '0.3.0' From ef3647a50df6a6753f7df9467f991f3b70e2448f Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Thu, 25 Mar 2021 13:12:02 -0400 Subject: [PATCH 08/23] Add column profiler that keeps full list of distinct values --- openclean/profiling/column.py | 43 +++++++++++++++++++++++++- tests/profiling/test_profile_column.py | 13 ++++++-- 2 files changed, 53 insertions(+), 3 deletions(-) diff --git a/openclean/profiling/column.py b/openclean/profiling/column.py index 1be5ee7..9fd3bf7 100644 --- a/openclean/profiling/column.py +++ b/openclean/profiling/column.py @@ -107,6 +107,25 @@ def consume( datatypes[type_label] += count return value + def distinct(self, top_k: Optional[int] = None) -> Counter: + """Get Counter object containing the list most frequent values and their + counts that was generated by the profiler. + + Parameters + ---------- + top_k: int, default=None + Limit the number of elements in the returned Counter to the k most + common values (if given). If None, the full set of values is + returned. + + Returns + ------- + collections.Counter + """ + top_val_count = len(self['topValues']) + k = min(top_k, top_val_count) if top_k is not None else top_val_count + return Counter({key: val for key, val in self['topValues'][:k]}) + # -- Column profiler ---------------------------------------------------------- @@ -149,7 +168,7 @@ def __init__( self.converter = converter if converter else DefaultConverter() def process(self, values: Counter) -> ColumnProfile: - """Compute profile for given counter of values in teh column. + """Compute profile for given counter of values in the column. Parameters ---------- @@ -168,6 +187,28 @@ def process(self, values: Counter) -> ColumnProfile: ) +class DistinctValueProfiler(DefaultColumnProfiler): + """Column profiler that maintains the full list of distinct values in a + column. This class is a simple wrapper for the + :class:`openclean.profiling.column.DefaultColumnProfiler` that sets + ``top_k=None``. + """ + def __init__(self, converter: Optional[DatatypeConverter] = None): + """Initialize the optional data value converter. + + Parameters + ---------- + converter: openclean.profiling.datatype.convert.DatatypeConverter, + default=None + Datatype converter that is used to determing the type of the + values in the data stream. + """ + super(DistinctValueProfiler, self).__init__( + top_k=None, + converter=converter + ) + + class DefaultStreamProfiler(DataStreamProfiler): """Default profiler for columns in a data stream. This profiler does not maintain a set of distinct values due to the unkown size of the stream diff --git a/tests/profiling/test_profile_column.py b/tests/profiling/test_profile_column.py index 774145f..fc1af5e 100644 --- a/tests/profiling/test_profile_column.py +++ b/tests/profiling/test_profile_column.py @@ -8,10 +8,19 @@ """Unit tests for the column profiler.""" from openclean.profiling.column import ( - DefaultColumnProfiler, DefaultStreamProfiler + DefaultColumnProfiler, DefaultStreamProfiler, DistinctValueProfiler ) +def test_profile_distinct_values(schools): + """Test profiling a single data frame column using the distinct value + profiler that maintains a full list of distinct values. + """ + # -- Use default labels for result ---------------------------------------- + metadata = DistinctValueProfiler().run(schools, 'school_code') + assert len(metadata.distinct()) == 96 + + def test_profile_single_column(schools): """Test profiling a single data frame column using the default profiler.""" # -- Use default labels for result ---------------------------------------- @@ -51,4 +60,4 @@ def test_profile_single_column_stream(schools): } assert metadata['totalValueCount'] == 100 assert metadata['emptyValueCount'] == 0 - assert metadata['datatypes'] == {'int': 30, 'float': 6, 'str': 64} + assert metadata['datatypes'] == {'int': 30, 'float': 6, 'str': 64} From a6a0f98c3180315cea75ebcf74c666507323e49b Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Thu, 25 Mar 2021 13:13:02 -0400 Subject: [PATCH 09/23] Add option to find anomalies in a given Counter --- openclean/profiling/anomalies/base.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/openclean/profiling/anomalies/base.py b/openclean/profiling/anomalies/base.py index f0b24bf..d73c934 100644 --- a/openclean/profiling/anomalies/base.py +++ b/openclean/profiling/anomalies/base.py @@ -20,7 +20,7 @@ class AnomalyDetector(DistinctSetProfiler): frame or a metadata object) as input and return a list of values that were identified as outliers. """ - def find(self, values: Iterable[Value]) -> List[Union[Dict, Value]]: + def find(self, values: Union[Iterable[Value], Counter]) -> List[Union[Dict, Value]]: """Identify values in a given set of values that are classified as outliers or anomalities. Returns a list of identified values. @@ -33,4 +33,4 @@ def find(self, values: Iterable[Value]) -> List[Union[Dict, Value]]: ------- list """ - return self.process(Counter(values)) + return self.process(values) if isinstance(values, Counter) else self.process(Counter(values)) From 76b83fc48745657401b3a08e9e740f2f098eac6f Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Thu, 25 Mar 2021 13:13:32 -0400 Subject: [PATCH 10/23] Create separate DBSCAN outlier class --- openclean/profiling/anomalies/sklearn.py | 78 ++++++++++++++++++++++-- 1 file changed, 72 insertions(+), 6 deletions(-) diff --git a/openclean/profiling/anomalies/sklearn.py b/openclean/profiling/anomalies/sklearn.py index 7a716e1..9de7fa6 100644 --- a/openclean/profiling/anomalies/sklearn.py +++ b/openclean/profiling/anomalies/sklearn.py @@ -71,6 +71,74 @@ def process(self, values: Counter) -> List: return result +class DBSCANOutliers(SklearnOutliers): + """Perform outlier detection using DBSCAN clustering.""" + def __init__( + self, features=None, eps=0.5, min_samples=5, metric='minkowski', + metric_params=None, algorithm='auto', leaf_size=30, p=2, n_jobs=None + ): + """Initialize the feature generator and all parameters of the DBSCAN + implementation in scikit-learn (documentation copied below). + + Parameters + ---------- + features: openclean.profiling.embedding.base.ValueEmbedder, optional + Generator for feature vectors that computes a vector of numeric values + for a given scalar value (or tuple). + eps : float, default=0.5 + The maximum distance between two samples for one to be considered + as in the neighborhood of the other. This is not a maximum bound + on the distances of points within a cluster. This is the most + important DBSCAN parameter to choose appropriately for your data set + and distance function. + min_samples : int, default=5 + The number of samples (or total weight) in a neighborhood for a point + to be considered as a core point. This includes the point itself. + metric : string, or callable + The metric to use when calculating distance between instances in a + feature array. If metric is a string or callable, it must be one of + the options allowed by :func:`sklearn.metrics.pairwise_distances` for + its metric parameter. + If metric is "precomputed", X is assumed to be a distance matrix and + must be square during fit. + X may be a :term:`sparse graph `, + in which case only "nonzero" elements may be considered neighbors. + metric_params : dict, default=None + Additional keyword arguments for the metric function. + algorithm : {'auto', 'ball_tree', 'kd_tree', 'brute'}, default='auto' + The algorithm to be used by the NearestNeighbors module + to compute pointwise distances and find nearest neighbors. + See NearestNeighbors module documentation for details. + leaf_size : int, default=30 + Leaf size passed to BallTree or cKDTree. This can affect the speed + of the construction and query, as well as the memory required + to store the tree. The optimal value depends + on the nature of the problem. + p : float, default=2 + The power of the Minkowski metric to be used to calculate distance + between points. + n_jobs : int, default=None + The number of parallel jobs to run for neighbors search. ``None`` means + 1 unless in a :obj:`joblib.parallel_backend` context. ``-1`` means + using all processors. See :term:`Glossary ` for more details. + If precomputed distance are used, parallel execution is not available + and thus n_jobs will have no effect. + """ + # Initialize the DBSCAN estimator + from sklearn.cluster import DBSCAN + algo = DBSCAN( + eps=eps, + min_samples=min_samples, + metric=metric, + metric_params=metric_params, + algorithm=algorithm, + leaf_size=leaf_size, + p=p, + n_jobs=n_jobs + ) + super(DBSCANOutliers, self).__init__(algorithm=algo, features=features) + + # -- Functions for specific scikit-learn outlier detectors -------------------- def dbscan( @@ -135,9 +203,10 @@ def dbscan( ------- list """ - # Initialize the DBSCAN estimator - from sklearn.cluster import DBSCAN - algo = DBSCAN( + # Run the scikit-learn outlier detection algoritm with DBSCAN as the + # estimator. + op = DBSCANOutliers( + features=features, eps=eps, min_samples=min_samples, metric=metric, @@ -147,9 +216,6 @@ def dbscan( p=p, n_jobs=n_jobs ) - # Run the scikit-learn outlier detection algoritm with DBSCAN as the - # estimator. - op = SklearnOutliers(algorithm=algo, features=features) return op.run(df=df, columns=columns) From 385bbc3734c344f70485737020a5b239e9bae35b Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Thu, 25 Mar 2021 13:13:48 -0400 Subject: [PATCH 11/23] Minor fix --- openclean/pipeline.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openclean/pipeline.py b/openclean/pipeline.py index 10143a0..13f4817 100644 --- a/openclean/pipeline.py +++ b/openclean/pipeline.py @@ -395,7 +395,7 @@ def profile( ---------- profilers: int, string, tuple, or list of tuples of column reference and openclean.profiling.base.DataProfiler, default=None - Specify he list of columns that are profiled and the profiling + Specify the list of columns that are profiled and the profiling function. If only a column reference is given (not a tuple) the default stream profiler is used for profiling the column. default_profiler: class, default=None From 1c9c50041e7a2c72d71dca4a725ab0dc3de39f5e Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Thu, 25 Mar 2021 16:22:30 -0400 Subject: [PATCH 12/23] Add method to get set of conflicting values from DataFrameGrouping groups --- openclean/data/groupby.py | 35 +++++++++++++++++++ .../collector/test_violation_repair.py | 14 ++++++++ 2 files changed, 49 insertions(+) diff --git a/openclean/data/groupby.py b/openclean/data/groupby.py index 138dc13..1be89cc 100644 --- a/openclean/data/groupby.py +++ b/openclean/data/groupby.py @@ -15,6 +15,9 @@ import pandas as pd +from openclean.data.schema import as_list, select_clause +from openclean.data.types import Columns + class DataFrameGrouping(object): """A data frame grouping is a mapping of key values to subsets of rows for @@ -97,6 +100,38 @@ def columns(self) -> List[str]: """ return list(self.df.columns) + def conflicts(self, key: str, columns: Columns) -> Counter: + """Get values (and their frequency counts) for columns of rows in the + group that is identified by the given key. + + Parameters + ---------- + key: scalar or tuple + Key value generated by the GroupBy operator for the rows in the + data frame. + columns: int, string, or list(int or string) + Single column or list of column index positions or column names. + + Returns + ------- + collections.Counter + """ + _, colidx = select_clause(self.df.columns, columns=as_list(columns)) + # The result is None if no group associated with the given key. + if key not in self._groups: + return None + result = Counter() + if len(colidx) == 1: + cidx = colidx[0] + for rowidx in self._groups[key]: + result[self.df.iloc[rowidx][cidx]] += 1 + else: + for rowidx in self._groups[key]: + row = self.df.iloc[rowidx] + key = tuple([row[cidx] for cidx in colidx]) + result[key] += 1 + return result + def get(self, key: str) -> pd.DataFrame: """Get the data frame that is associated with the given key. Returns None if the given key does not exist in the grouping. diff --git a/tests/operator/collector/test_violation_repair.py b/tests/operator/collector/test_violation_repair.py index 81e050e..6b928c2 100644 --- a/tests/operator/collector/test_violation_repair.py +++ b/tests/operator/collector/test_violation_repair.py @@ -40,6 +40,20 @@ CONFIG = [({'B': Min()}, COL_C), ({'B': Min(), 2: Max()}, UPD_COL_C)] +def test_fd_violation_conflicts(): + """Test getting set of conflicting values for rows in a FD violation group.""" + groups = fd_violations(DATASET, lhs='A', rhs=['B', 'C']) + conflicts = groups.conflicts(key=2, columns=['B', 'C']) + assert len(conflicts) == 3 + assert (1, 2) in conflicts + assert (1, 4) in conflicts + assert (3, 1) in conflicts + conflicts = groups.conflicts(key=2, columns=['B']) + assert len(conflicts) == 2 + assert 1 in conflicts + assert 3 in conflicts + + @pytest.mark.parametrize('strategy,col_c', CONFIG) def test_fd_violation_repair_in_order(strategy, col_c): """Test repair for FD A -> BC. Use Min to resolve conflicts in attribute B From 99acaaf9493c79782a4bc812ca9d9382ad88608f Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Thu, 25 Mar 2021 16:24:31 -0400 Subject: [PATCH 13/23] Add method to get set of conflicting values from DataFrameGrouping groups --- openclean/data/groupby.py | 51 ++++++++++++++++++++++++++------------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/openclean/data/groupby.py b/openclean/data/groupby.py index 1be89cc..497f3ab 100644 --- a/openclean/data/groupby.py +++ b/openclean/data/groupby.py @@ -101,8 +101,7 @@ def columns(self) -> List[str]: return list(self.df.columns) def conflicts(self, key: str, columns: Columns) -> Counter: - """Get values (and their frequency counts) for columns of rows in the - group that is identified by the given key. + """Synonym to get set of values from columns in rows in a group. Parameters ---------- @@ -116,21 +115,7 @@ def conflicts(self, key: str, columns: Columns) -> Counter: ------- collections.Counter """ - _, colidx = select_clause(self.df.columns, columns=as_list(columns)) - # The result is None if no group associated with the given key. - if key not in self._groups: - return None - result = Counter() - if len(colidx) == 1: - cidx = colidx[0] - for rowidx in self._groups[key]: - result[self.df.iloc[rowidx][cidx]] += 1 - else: - for rowidx in self._groups[key]: - row = self.df.iloc[rowidx] - key = tuple([row[cidx] for cidx in colidx]) - result[key] += 1 - return result + return self.values(key=key, columns=columns) def get(self, key: str) -> pd.DataFrame: """Get the data frame that is associated with the given key. Returns @@ -204,6 +189,38 @@ def rows(self, key: str) -> List[int]: return None return self._groups[key] + def values(self, key: str, columns: Columns) -> Counter: + """Get values (and their frequency counts) for columns of rows in the + group that is identified by the given key. + + Parameters + ---------- + key: scalar or tuple + Key value generated by the GroupBy operator for the rows in the + data frame. + columns: int, string, or list(int or string) + Single column or list of column index positions or column names. + + Returns + ------- + collections.Counter + """ + _, colidx = select_clause(self.df.columns, columns=as_list(columns)) + # The result is None if no group associated with the given key. + if key not in self._groups: + return None + result = Counter() + if len(colidx) == 1: + cidx = colidx[0] + for rowidx in self._groups[key]: + result[self.df.iloc[rowidx][cidx]] += 1 + else: + for rowidx in self._groups[key]: + row = self.df.iloc[rowidx] + key = tuple([row[cidx] for cidx in colidx]) + result[key] += 1 + return result + class DataFrameViolation(DataFrameGrouping): """Subclass of DataFrame Grouping which maintains extra meta value information From 358bae74598f10629771f711861fa47d5dac00da Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Fri, 26 Mar 2021 04:05:13 -0400 Subject: [PATCH 14/23] Change structure of datatype count in column profiler --- openclean/profiling/column.py | 21 +++++++-------------- openclean/profiling/dataset.py | 10 ++++------ tests/profiling/test_profile_column.py | 9 +++------ 3 files changed, 14 insertions(+), 26 deletions(-) diff --git a/openclean/profiling/column.py b/openclean/profiling/column.py index 9fd3bf7..7a9c659 100644 --- a/openclean/profiling/column.py +++ b/openclean/profiling/column.py @@ -13,7 +13,7 @@ the stream profiler cannot collect. """ -from collections import Counter +from collections import Counter, defaultdict from typing import Optional from openclean.data.types import Scalar @@ -41,7 +41,7 @@ def __init__( # Initialize the internal statistic. self['totalValueCount'] = 0 self['emptyValueCount'] = 0 - self['datatypes'] = Counter() + self['datatypes'] = defaultdict(Counter) self['minmaxValues'] = dict() # Consume the list of values if given. non_empty_values = Counter() @@ -92,19 +92,12 @@ def consume( datatypes = self['datatypes'] minmax = self['minmaxValues'] val, type_label = self.converter.convert(value) - if type_label in minmax: - minmax[type_label].consume(val) - if distinct: - datatypes[type_label]['total'] += count - datatypes[type_label]['distinct'] += 1 - else: - datatypes[type_label] += count - else: + if type_label not in minmax: minmax[type_label] = MinMaxCollector(first_value=val) - if distinct: - datatypes[type_label] = {'total': count, 'distinct': 1} - else: - datatypes[type_label] += count + minmax[type_label].consume(val) + datatypes['total'][type_label] += count + if distinct: + datatypes['distinct'][type_label] += 1 return value def distinct(self, top_k: Optional[int] = None) -> Counter: diff --git a/openclean/profiling/dataset.py b/openclean/profiling/dataset.py index a67c345..09a0159 100644 --- a/openclean/profiling/dataset.py +++ b/openclean/profiling/dataset.py @@ -117,7 +117,7 @@ def multitype_columns(self) -> DatasetProfile: """ profile = DatasetProfile() for col, stats in self.profiles(): - if len(stats['datatypes']) > 1: + if len(stats['datatypes']['total']) > 1: profile.add(name=col, stats=stats) return profile @@ -189,19 +189,17 @@ def types(self, distinct: Optional[bool] = False) -> pd.DataFrame: # Make a pass over the profiling results to get a list of all data # types that are present. for obj in self: - types.update(obj['stats']['datatypes'].keys()) + types.update(obj['stats']['datatypes']['total'].keys()) # Convert types to a sorted list of types. types = sorted(types) # Create a data frame with the type results. Datatype labels are used # as column names in the returned data frame. data = list() for obj in self: - datatypes = obj['stats']['datatypes'] + dt = obj['stats']['datatypes'] row = list() for t in types: - count = datatypes.get(t, 0) - if isinstance(count, dict): - count = count['distinct'] if distinct else count['total'] + count = dt.get('distinct', dt.get('total', {})).get(t, 0) row.append(count) data.append(row) return pd.DataFrame(data=data, index=self.columns, columns=types) diff --git a/tests/profiling/test_profile_column.py b/tests/profiling/test_profile_column.py index fc1af5e..de019f3 100644 --- a/tests/profiling/test_profile_column.py +++ b/tests/profiling/test_profile_column.py @@ -34,11 +34,8 @@ def test_profile_single_column(schools): assert metadata['totalValueCount'] == 100 assert metadata['emptyValueCount'] == 0 assert metadata['distinctValueCount'] == 13 - assert metadata['datatypes'] == { - 'int': {'distinct': 8, 'total': 30}, - 'float': {'distinct': 1, 'total': 6}, - 'str': {'distinct': 4, 'total': 64} - } + assert metadata['datatypes']['total'] == {'int': 30, 'float': 6, 'str': 64} + assert metadata['datatypes']['distinct'] == {'int': 8, 'float': 1, 'str': 4} assert metadata['topValues'] == [ ("09-12", 38), ("MS Core", 21), @@ -60,4 +57,4 @@ def test_profile_single_column_stream(schools): } assert metadata['totalValueCount'] == 100 assert metadata['emptyValueCount'] == 0 - assert metadata['datatypes'] == {'int': 30, 'float': 6, 'str': 64} + assert metadata['datatypes']['total'] == {'int': 30, 'float': 6, 'str': 64} From 4254678b30e1bc1ed49aeb2358e3ee8b7e3183fb Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Fri, 26 Mar 2021 07:26:49 -0400 Subject: [PATCH 15/23] Fix issue with column idx values in result schemas --- openclean/operator/transform/select.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/openclean/operator/transform/select.py b/openclean/operator/transform/select.py index c4ec322..06d5a0a 100644 --- a/openclean/operator/transform/select.py +++ b/openclean/operator/transform/select.py @@ -102,12 +102,18 @@ def open(self, schema: Schema) -> StreamFunctionHandler: """ # Get the names and index positions for the selected columns. colnames, colidxs = select_clause(schema=schema, columns=self.columns) + # Adjust column indices. + columns = list() + for colidx in range(len(colnames)): + col = colnames[colidx] + colid = col.colid if isinstance(col, Column) else colidx + columns.append(Column(colid=colid, name=col, colidx=colidx)) def streamfunc(row: DataRow) -> DataRow: """Include only columns in the select clause.""" return [row[i] for i in colidxs] - return StreamFunctionHandler(columns=colnames, func=streamfunc) + return StreamFunctionHandler(columns=columns, func=streamfunc) def transform(self, df: pd.DataFrame) -> pd.DataFrame: """Return a data frame that contains all rows but only those columns From 20527893961ac685566567459e60e323cb17f874 Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Fri, 26 Mar 2021 07:28:17 -0400 Subject: [PATCH 16/23] Remove additional select step for distinct on subset of columns --- openclean/operator/stream/collector.py | 16 +++++---- openclean/pipeline.py | 35 ++++++++++++------- .../operator/stream/test_consumer_distinct.py | 4 +-- tests/pipeline/test_stream_distinct.py | 7 ++++ 4 files changed, 41 insertions(+), 21 deletions(-) diff --git a/openclean/operator/stream/collector.py b/openclean/operator/stream/collector.py index 66e586f..3f7ccc6 100644 --- a/openclean/operator/stream/collector.py +++ b/openclean/operator/stream/collector.py @@ -18,7 +18,8 @@ import pandas as pd -from openclean.data.types import Schema +from openclean.data.schema import as_list, select_clause +from openclean.data.types import Columns, Schema from openclean.data.stream.csv import CSVFile, CSVWriter from openclean.operator.stream.consumer import StreamConsumer from openclean.operator.stream.processor import StreamProcessor @@ -141,11 +142,12 @@ class Distinct(StreamConsumer, StreamProcessor): """Consumer that popuates a counter with the frequency counts for distinct values (or value combinations) in the processed rows for the data stream. """ - def __init__(self): + def __init__(self, columns: Optional[Columns] = None): """Initialize the counter that maintains the frequency counts for each distinct row in the data stream. """ self.counter = Counter() + self.columns = columns def close(self) -> Counter: """Closing the consumer returns the populated Counter object. @@ -172,10 +174,10 @@ def consume(self, rowid: int, row: List): row: list List of values in the row. """ - if len(row) == 1: - self.counter[row[0]] += 1 + if len(self.columns) == 1: + self.counter[row[self.columns[0]]] += 1 else: - self.counter[tuple(row)] += 1 + self.counter[tuple([row[i] for i in self.columns])] += 1 def open(self, schema: Schema) -> StreamConsumer: """Factory pattern for stream consumer. Returns an instance of the @@ -190,7 +192,9 @@ def open(self, schema: Schema) -> StreamConsumer: ------- openclean.operator.stream.consumer.StreamConsumer """ - return Distinct() + columns = self.columns if self.columns else schema + _, colidx = select_clause(schema, columns=as_list(columns)) + return Distinct(columns=colidx) class RowCount(StreamConsumer, StreamProcessor): diff --git a/openclean/pipeline.py b/openclean/pipeline.py index 13f4817..67c91a8 100644 --- a/openclean/pipeline.py +++ b/openclean/pipeline.py @@ -18,7 +18,7 @@ from openclean.data.stream.base import DataReader from openclean.data.stream.csv import CSVFile from openclean.data.stream.df import DataFrameStream -from openclean.data.types import Columns, Scalar, Schema +from openclean.data.types import Columns, Scalar, Schema, Value from openclean.cluster.base import Cluster, Clusterer from openclean.function.eval.base import EvalFunction from openclean.function.matching.base import StringMatcher @@ -144,30 +144,39 @@ def delete(self, predicate: EvalFunction) -> DataPipeline: # to the pipeline to remove rows from the stream. return self.append(Filter(predicate=predicate, negated=True)) - def distinct( - self, columns: Optional[Columns] = None, names: Optional[Schema] = None - ) -> Counter: + def distinct(self, columns: Optional[Columns] = None) -> Counter: """Get counts for all distinct values over all columns in the - associated data stream. Allows the user to specify te list of columns + associated data stream. Allows the user to specify the list of columns for which they want to count values. Parameters ---------- columns: int, str, or list of int or string, default=None References to the column(s) for which unique values are counted. - names: int, str, or list of int or string, default=None - Optional renaming for selected columns. Returns ------- collections.Counter """ - op = Distinct() - # If optional list of columns is given append a select operation first - # to filter on those columns before running the data stream. - if columns is not None: - return self.select(columns=columns, names=names).stream(op) - return self.stream(op) + return self.stream(Distinct(columns=columns)) + + def distinct_values(self, columns: Optional[Columns] = None) -> List[Value]: + """Get list all distinct values over all columns in the associated data + stream. + + Provides the option to the user to specify the list of columns for + which they want to count values. + + Parameters + ---------- + columns: int, str, or list of int or string, default=None + References to the column(s) for which unique values are counted. + + Returns + ------- + collections.Counter + """ + return list(self.distinct(columns=columns).keys()) def filter( self, predicate: EvalFunction, limit: Optional[int] = None diff --git a/tests/operator/stream/test_consumer_distinct.py b/tests/operator/stream/test_consumer_distinct.py index bfebb3e..449d827 100644 --- a/tests/operator/stream/test_consumer_distinct.py +++ b/tests/operator/stream/test_consumer_distinct.py @@ -12,7 +12,7 @@ def test_distinct_consumer_ternary(): """Test frequency counts for ternary rows.""" - consumer = Distinct().open([]) + consumer = Distinct().open(['A', 'B']) consumer.consume(1, ['A', 1]) consumer.consume(2, ['A', 2]) consumer.consume(3, ['B', 1]) @@ -26,7 +26,7 @@ def test_distinct_consumer_ternary(): def test_distinct_consumer_unary(): """Test frequency counts for distinct values for unary rows.""" - consumer = Distinct().open([]) + consumer = Distinct().open(['A']) consumer.consume(1, [3]) consumer.consume(2, [4]) consumer.consume(3, [3]) diff --git a/tests/pipeline/test_stream_distinct.py b/tests/pipeline/test_stream_distinct.py index 815a978..496ac0e 100644 --- a/tests/pipeline/test_stream_distinct.py +++ b/tests/pipeline/test_stream_distinct.py @@ -15,3 +15,10 @@ def test_count_distinct_rows(ds): assert len(ds.distinct()) == 10 count_a = ds.distinct('A') assert count_a == Counter({'A': 10}) + + +def test_count_distinct_values(ds): + """Test distinct count over a stream of rows.""" + values = ds.distinct_values(columns='A') + assert len(values) == 1 + assert 'A' in values From 329ee7388e2721d75d264604db31b2c45779fc01 Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Fri, 26 Mar 2021 07:28:47 -0400 Subject: [PATCH 17/23] Add no cover for abstract method --- openclean/profiling/pattern/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openclean/profiling/pattern/base.py b/openclean/profiling/pattern/base.py index 0c13a36..2ed5f80 100644 --- a/openclean/profiling/pattern/base.py +++ b/openclean/profiling/pattern/base.py @@ -74,7 +74,7 @@ def pattern(self): ------- string """ - raise NotImplementedError() + raise NotImplementedError() # pragma: no cover @abstractmethod def to_dict(self): From dd52e3d3dc34f0b7d2805cf601bf1f4812593b5e Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Fri, 26 Mar 2021 07:54:29 -0400 Subject: [PATCH 18/23] Enable multi-threading and Counter inputs for ValueFunction.apply() --- openclean/function/value/base.py | 36 +++++++++++++++++++--- tests/function/value/test_base_function.py | 18 +++++++++++ 2 files changed, 49 insertions(+), 5 deletions(-) diff --git a/openclean/function/value/base.py b/openclean/function/value/base.py index 58eaa6c..aa60973 100644 --- a/openclean/function/value/base.py +++ b/openclean/function/value/base.py @@ -9,9 +9,13 @@ from __future__ import annotations from abc import ABCMeta, abstractmethod -from typing import Callable, Dict, List +from collections import Counter +from typing import Callable, Dict, List, Optional, Union from openclean.data.types import Value +from openclean.engine.parallel import process_list + +import openclean.config as config # -- Abstract base class for value functions ---------------------------------- @@ -34,25 +38,47 @@ def __call__(self, value: Value) -> Value: """ return self.eval(value) - def apply(self, values: List[Value]) -> List[Value]: - """Apply the function to each value in a given set. Returns a list of + def apply( + self, values: Union[List[Value], Counter], threads: Optional[int] = None + ) -> Union[List[Value], Counter]: + """Apply the function to each value in a given set. + + Depending on the type of the input, the result is either a list of values that are the result of the eval method for the respective input - values. + values or a new counter object where keys are the modified values. Calls the prepare method before executing the eval method on each individual value in the given list. + Allows for multi-threaded execution of the apply function (only if the + input is a list). For ``collection.Counter`` inputs, processing will + always use a single thread only. + Parameters ---------- values: list List of scalar values or tuples of scalar values. + threads: int, default=None + Number of parallel threads to use for key generation. If None the + value from the environment variable 'OPENCLEAN_THREADS' is used as + the default. Returns ------- list """ f = self.prepare(values) - return [f.eval(v) for v in values] + threads = threads if threads is not None else config.THREADS() + if isinstance(values, Counter): + result = Counter() + for val, count in values.items(): + result[f.eval(val)] += count + return result + else: + if threads > 1: + return process_list(func=f, values=values, processes=threads) + else: + return [f.eval(v) for v in values] @abstractmethod def eval(self, value: Value) -> Value: diff --git a/tests/function/value/test_base_function.py b/tests/function/value/test_base_function.py index 124e793..a71a587 100644 --- a/tests/function/value/test_base_function.py +++ b/tests/function/value/test_base_function.py @@ -7,11 +7,29 @@ """Unit test for constructors of base functions.""" +from collections import Counter + import pytest from openclean.function.value.base import CallableWrapper, ConstantValue, UnpreparedFunction +@pytest.mark.parametrize('threads', [1, 2]) +def test_apply_on_counter(threads): + """Test apply method for counter inputs.""" + values = Counter(['a', 'A', 'b', 'b']) + f = CallableWrapper(func=str.upper) + assert f.apply(values, threads=threads) == {'A': 2, 'B': 2} + + +@pytest.mark.parametrize('threads', [1, 2]) +def test_apply_on_list(threads): + """Test apply method for list inputs.""" + values = ['a', 'A', 'b', 'b'] + f = CallableWrapper(func=str.upper) + assert Counter(f.apply(values, threads=threads)) == {'A': 2, 'B': 2} + + def test_callable_wrapper_function(): """Test initializing the callable wrapper function.""" # Use default funciton name. From a58f85b7114fa0288b473981f24ec5a42e92ffdb Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Fri, 26 Mar 2021 08:20:59 -0400 Subject: [PATCH 19/23] Multi-threading for ValueFunction.apply() on Counter inputs --- openclean/function/value/base.py | 60 ++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 7 deletions(-) diff --git a/openclean/function/value/base.py b/openclean/function/value/base.py index aa60973..837da64 100644 --- a/openclean/function/value/base.py +++ b/openclean/function/value/base.py @@ -50,16 +50,12 @@ def apply( Calls the prepare method before executing the eval method on each individual value in the given list. - Allows for multi-threaded execution of the apply function (only if the - input is a list). For ``collection.Counter`` inputs, processing will - always use a single thread only. - Parameters ---------- values: list List of scalar values or tuples of scalar values. threads: int, default=None - Number of parallel threads to use for key generation. If None the + Number of parallel threads to use for processing. If None the value from the environment variable 'OPENCLEAN_THREADS' is used as the default. @@ -70,9 +66,16 @@ def apply( f = self.prepare(values) threads = threads if threads is not None else config.THREADS() if isinstance(values, Counter): + in_values = values.items() if isinstance(values, Counter) else values + f = CounterConverter(func=f) + if threads > 1: + proc_values = process_list(func=f, values=in_values, processes=threads) + else: + proc_values = [f.eval(v) for v in in_values] + result = Counter() - for val, count in values.items(): - result[f.eval(val)] += count + for val, count in proc_values: + result[val] += count return result else: if threads > 1: @@ -219,6 +222,49 @@ def eval(self, value: Value) -> Value: return self.func(value) +class CounterConverter(PreparedFunction): + """Wrapper for callable functions that are appied on items of a value + counter. + """ + def __init__(self, func: Callable): + """Initialize the wrapped callable function. Raises a ValueError if the + function is not a callable. + + Parameters + ---------- + func: callable + Function that is wrapped as a value finction. + + Raises + ------ + TypeError + """ + # Ensure that the given function is actually a callable. + if not callable(func): + raise TypeError('not a callable function') + self.func = func + + def eval(self, value: Value) -> Value: + """Evaluate the wrapped function on a given value. + + The value is expected to be a tuple (item from a ``collection.Counter`` + object) that contains a value and its count. The wrapped callable is + applied on the value and a tuple with the modified value and the + original count is returned. + + Parameters + ---------- + value: scalar or tuple + Value from the list that was used to prepare the function. + + Returns + ------- + scalar or tuple + """ + val, count = value + return (self.func.eval(val), count) + + class ConstantValue(PreparedFunction): """Value function that returns a given constant value for all inputs.""" def __init__(self, value: Value): From d160dae08896a4bd28e3541e22c35b2cdaa2e22f Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Mon, 29 Mar 2021 06:52:49 -0400 Subject: [PATCH 20/23] Add methods and properties for integration with openclean-pattern --- openclean/function/token/base.py | 88 ++++++++++++++++++++++++++++++-- 1 file changed, 84 insertions(+), 4 deletions(-) diff --git a/openclean/function/token/base.py b/openclean/function/token/base.py index 54d0a19..a0bb0b0 100644 --- a/openclean/function/token/base.py +++ b/openclean/function/token/base.py @@ -8,7 +8,7 @@ """Interfaces for string tokenizer and token set transformers.""" from abc import ABCMeta, abstractmethod -from typing import Callable, Dict, List, Optional, Union +from typing import Callable, Dict, List, Optional, Tuple, Union from openclean.data.types import Scalar, Value from openclean.function.value.base import CallableWrapper, PreparedFunction, ValueFunction @@ -36,7 +36,10 @@ class Token(str): The order of creation is that the __new__ method is called which returns the object then __init__ is called. """ - def __new__(cls, value: str, token_type: Optional[str] = None): + def __new__( + cls, value: str, token_type: Optional[str] = None, + rowidx: Optional[int] = None + ): """Initialize the String object with the given value. the token type is ignored. @@ -50,7 +53,10 @@ def __new__(cls, value: str, token_type: Optional[str] = None): """ return str.__new__(cls, value) - def __init__(self, value: str, token_type: Optional[str] = None): + def __init__( + self, value: str, token_type: Optional[str] = None, + rowidx: Optional[int] = None + ): """Initialize the token type identifier. The token value has already been initialized by the __new__ method that @@ -60,11 +66,55 @@ def __init__(self, value: str, token_type: Optional[str] = None): ---------- value: string Token value. + token_type: string, default=None + Unique token type identifier. + rowidx: int, default=None + Optional identifier for the row that contained the value that this + token was generated from. + """ + self.token_type = token_type + self.rowidx = rowidx + + @property + def regex_type(self) -> str: + """Synonym for getting the token type. + + Returns + ------- + str + """ + return self.type() + + @regex_type.setter + def regex_type(self, token_type) -> str: + """Set the token type. + + Parameters + ---------- token_type: string, default=None Unique token type identifier. """ self.token_type = token_type + @property + def size(self) -> int: + """Synonym to get the length of the token. + + Returns + ------- + int + """ + return len(self) + + def to_tuple(self) -> Tuple[str, str, int]: + """Returns a tuple of the string, type and value size. + + Returns + ------- + tuple of string, string, int + """ + return tuple([self, self.token_type, len(self)]) + def type(self) -> str: """Get token type value. @@ -78,6 +128,16 @@ def type(self) -> str: """ return self.token_type if self.token_type is not None else ANY + @property + def value(self) -> str: + """Get the value for this token. + + Returns + ------- + str + """ + return str(self) + # -- Mixin classes ------------------------------------------------------------ @@ -86,8 +146,26 @@ class Tokenizer(metaclass=ABCMeta): handle any scalar value (e.g., by first transforming numeric values into a string representation). The tokenizer returns a list of token objects. """ + def encode(self, values: List[Value]) -> List[List[Token]]: + """Encodes all values in a given column (i.e., list of values) into + their type representations and tokenizes each value. + + Parameters + ---------- + values: list of scalar + List of column values + + Returns + ------- + list of list of openclean.function.token.base.Token + """ + encoded = list() + for rowidx, value in enumerate(values): + encoded.append(self.tokens(rowidx=rowidx, value=value)) + return encoded + @abstractmethod - def tokens(self, value: Scalar) -> List[Token]: + def tokens(self, value: Scalar, rowidx: Optional[int] = None) -> List[Token]: """Convert a given scalar values into a list of string tokens. If a given value cannot be converted into tokens None should be returned. @@ -98,6 +176,8 @@ def tokens(self, value: Scalar) -> List[Token]: ---------- value: scalar Value that is converted into a list of tokens. + rowidx: int, default=None + Optional index of the dataset row that the value originates from. Returns ------- From e36826dc7a7fe0f8fb6a83daadd7ae57b3e97c1e Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Mon, 29 Mar 2021 06:58:38 -0400 Subject: [PATCH 21/23] Update CHANGELOG --- CHANGELOG.md | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 809d2e3..2154d5a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,3 +16,15 @@ * Standardize parameter names for sample methods (\#115) * Bug fix for `openclean-notebook` + + +### 0.3.0 - 2021-03-29 + +* Add `openclean.function.token.base.Token` as separate class. +* Rename `openclean.function.token.base.StringTokenizer` to `Tokenizer` +* Adjust token transformer and tokenizer for new Token class. +* Change structure of datatype count in column profiler. +* Option to get set of conflicting values from `DataFrameGrouping` groups. +* Multi-threading for `ValueFunction.apply()`. +* Separate DBSCAN outlier class. +* Move us-street name functions to `openclean-geo`. From 9ac6b7f617f487e80bd598aad303f7dd78aba704 Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Mon, 29 Mar 2021 07:33:40 -0400 Subject: [PATCH 22/23] Adjust signature of tokens method for existing tokenizers --- openclean/function/token/base.py | 6 ++-- openclean/function/token/ngram.py | 6 ++-- openclean/function/token/split.py | 15 +++++---- openclean/profiling/constraints/ucc.py | 4 +-- tests/function/token/test_split_tokenizer.py | 31 +++++++++++++++++- tests/function/token/test_token_base.py | 33 ++++++++++++++++++++ 6 files changed, 82 insertions(+), 13 deletions(-) create mode 100644 tests/function/token/test_token_base.py diff --git a/openclean/function/token/base.py b/openclean/function/token/base.py index a0bb0b0..fd81afc 100644 --- a/openclean/function/token/base.py +++ b/openclean/function/token/base.py @@ -286,7 +286,7 @@ def eval(self, value: Value) -> str: """ return self.delim.join(self.tokens(value)) - def tokens(self, value: Scalar) -> List[Token]: + def tokens(self, value: Scalar, rowidx: Optional[int] = None) -> List[Token]: """Tokenize the given value using the associated tokenizer. Then modify the tokens with the optional token transformer. @@ -294,12 +294,14 @@ def tokens(self, value: Scalar) -> List[Token]: ---------- value: scalar Value that is converted into a list of tokens. + rowidx: int, default=None + Optional index of the dataset row that the value originates from. Returns ------- list of openclean.function.token.base.Token """ - tokens = self.tokenizer.tokens(value) + tokens = self.tokenizer.tokens(value=value, rowidx=rowidx) if self.transformer is not None: tokens = self.transformer.transform(tokens) return tokens diff --git a/openclean/function/token/ngram.py b/openclean/function/token/ngram.py index bd06219..c2e8032 100644 --- a/openclean/function/token/ngram.py +++ b/openclean/function/token/ngram.py @@ -45,7 +45,7 @@ def __init__(self, n: int, pleft: Optional[str] = None, pright: Optional[str] = self.pleft = pleft self.pright = pright - def tokens(self, value: Scalar) -> List[Token]: + def tokens(self, value: Scalar, rowidx: Optional[int] = None) -> List[Token]: """Convert a given scalar values into a list of n-grams. If the value length is not greater than n and no padding was specified, the returned list will only contain the given value. @@ -54,6 +54,8 @@ def tokens(self, value: Scalar) -> List[Token]: ---------- value: scalar Value that is converted into a list of n-grams. + rowidx: int, default=None + Optional index of the dataset row that the value originates from. Returns ------- @@ -70,5 +72,5 @@ def tokens(self, value: Scalar) -> List[Token]: # Split value into n-grams. result = list() for i in range(len(value) - (self.n - 1)): - result.append(Token(value[i: i + self.n])) + result.append(Token(value=value[i: i + self.n], rowidx=rowidx)) return result diff --git a/openclean/function/token/split.py b/openclean/function/token/split.py index 55d3628..23065a4 100644 --- a/openclean/function/token/split.py +++ b/openclean/function/token/split.py @@ -67,7 +67,7 @@ def get_type(self, c: str) -> str: return label return None - def tokens(self, value: Scalar) -> List[Token]: + def tokens(self, value: Scalar, rowidx: Optional[int] = None) -> List[Token]: """Convert a given scalar values into a list of string tokens. If a given value cannot be converted into tokens None should be returned. @@ -78,6 +78,8 @@ def tokens(self, value: Scalar) -> List[Token]: ---------- value: scalar Value that is converted into a list of tokens. + rowidx: int, default=None + Optional index of the dataset row that the value originates from. Returns ------- @@ -97,12 +99,11 @@ def tokens(self, value: Scalar) -> List[Token]: next_type = self.get_type(value[i]) if prev_type != next_type: # Add homogeneous token from start to previous postion. - tokens.append(Token(value[start:i], token_type=prev_type)) + tokens.append(Token(value[start:i], token_type=prev_type, rowidx=rowidx)) start = i prev_type = next_type # Ensure to add the homogenous suffix if necessary. - if start < len(value): - tokens.append(Token(value[start:], token_type=prev_type)) + tokens.append(Token(value[start:], token_type=prev_type, rowidx=rowidx)) return tokens @@ -145,7 +146,7 @@ def __init__( self.preproc = preproc self.subtokens = subtokens - def tokens(self, value: Scalar) -> List[Token]: + def tokens(self, value: Scalar, rowidx: Optional[int] = None) -> List[Token]: """Convert a given scalar values into a list of string tokens. If a given value cannot be converted into tokens None should be returned. @@ -156,6 +157,8 @@ def tokens(self, value: Scalar) -> List[Token]: ---------- value: scalar Value that is converted into a list of tokens. + rowidx: int, default=None + Optional index of the dataset row that the value originates from. Returns ------- @@ -168,7 +171,7 @@ def tokens(self, value: Scalar) -> List[Token]: if self.preproc is not None: value = self.preproc(value) # Use split and the defined pattern to generate initial token list. - tokens = [Token(t) for t in filter(None, re.split(self.pattern, value))] + tokens = [Token(value=t, rowidx=rowidx) for t in filter(None, re.split(self.pattern, value))] # Remove duplicates if the unique flag is True. if self.unique: tokens = list(set(tokens)) diff --git a/openclean/profiling/constraints/ucc.py b/openclean/profiling/constraints/ucc.py index d5b1abc..5f54b3a 100644 --- a/openclean/profiling/constraints/ucc.py +++ b/openclean/profiling/constraints/ucc.py @@ -17,7 +17,7 @@ from openclean.data.types import Columns -class UniqueColumnCombinationFinder(metaclass=ABCMeta): +class UniqueColumnCombinationFinder(metaclass=ABCMeta): # pragma: no cover """Interface for operators that discover combinations of unique columns in a given data frame. """ @@ -36,4 +36,4 @@ def run(self, df: pd.DataFrame) -> List[Columns]: ------- list """ - raise NotImplementedError() # pragma: no cover + raise NotImplementedError() diff --git a/tests/function/token/test_split_tokenizer.py b/tests/function/token/test_split_tokenizer.py index bf1514d..000be0e 100644 --- a/tests/function/token/test_split_tokenizer.py +++ b/tests/function/token/test_split_tokenizer.py @@ -21,7 +21,9 @@ ('W35ST/', ['W', '35', 'ST', '/'], ['A', 'D', 'A', TT.ANY]), ('W35ST/8AVE', ['W', '35', 'ST', '/', '8', 'AVE'], ['A', 'D', 'A', TT.ANY, 'D', 'A']), (1234, ['1234'], ['D']), - (12.34, ['12', '.', '34'], ['D', TT.ANY, 'D']) + (12.34, ['12', '.', '34'], ['D', TT.ANY, 'D']), + ('WW', ['WW'], ['A']), + ('', [], []) ] ) def test_homogeneous_split(value, result_tokens, result_types): @@ -53,3 +55,30 @@ def test_split_parameters(unique, sorted, reverse, result): """Test different transformation options for the returned token sets.""" s = Split(pattern='\\s+', sort=sorted, reverse=reverse, unique=unique) assert s.tokens('A C \t A B D') == result + + +@pytest.mark.parametrize( + 'value,result,unique', + [ + ('WEST 35ST CO ST', ['35', 'CO', 'ST', 'ST', 'WEST'], False), + ('WEST 35ST CO ST', ['35', 'CO', 'ST', 'WEST'], True) + ] +) +def test_split_subtokens(value, result, unique): + """Test nested tokenization using the subtokens option.""" + tokenizer = Split( + pattern='\\s+', + subtokens=ChartypeSplit(chartypes=[(str.isalpha, 'A'), (str.isdigit, 'D')]), + unique=unique, + sort=True + ) + assert tokenizer.tokens(value) == result + + +def test_tokenize_column(): + """Test option to tokenize a list of values.""" + tokenizer = Split(pattern='\\s+', sort=True) + tokens = tokenizer.encode(values=['W 35 ST', '5TH AVE']) + assert len(tokens) == 2 + assert tokens[0] == ['35', 'ST', 'W'] + assert tokens[1] == ['5TH', 'AVE'] diff --git a/tests/function/token/test_token_base.py b/tests/function/token/test_token_base.py new file mode 100644 index 0000000..18107c5 --- /dev/null +++ b/tests/function/token/test_token_base.py @@ -0,0 +1,33 @@ +# This file is part of the Data Cleaning Library (openclean). +# +# Copyright (C) 2018-2021 New York University. +# +# openclean is released under the Revised BSD License. See file LICENSE for +# full license details. + +"""Unit tests for the base token class.""" + +from openclean.function.token.base import Token + + +def test_token_size(): + """Test the token size method.""" + assert Token('a').size == 1 + assert Token('abc').size == 3 + + +def test_token_to_tuple(): + """Test the token to tuple function.""" + t = Token(value='abc', token_type='b', rowidx=1) + assert t.to_tuple() == ('abc', 'b', 3) + + +def test_token_type(): + """Test token type values.""" + t = Token(value='a', token_type='b') + assert t == 'a' + assert t.type() == 'b' + assert t.regex_type == 'b' + t.regex_type = 'c' + assert t.type() == 'c' + assert t.regex_type == 'c' From ce1e71dc101f1509ccc990df659db8ac1014fa15 Mon Sep 17 00:00:00 2001 From: Heiko Mueller Date: Mon, 29 Mar 2021 07:36:00 -0400 Subject: [PATCH 23/23] Unit test for Tokens --- tests/function/token/test_token_base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/function/token/test_token_base.py b/tests/function/token/test_token_base.py index 18107c5..3b6a7a6 100644 --- a/tests/function/token/test_token_base.py +++ b/tests/function/token/test_token_base.py @@ -25,7 +25,7 @@ def test_token_to_tuple(): def test_token_type(): """Test token type values.""" t = Token(value='a', token_type='b') - assert t == 'a' + assert t.value == 'a' assert t.type() == 'b' assert t.regex_type == 'b' t.regex_type = 'c'