From 16caed4ab19ab4669611340f81054df0034b5c81 Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Fri, 21 Jun 2024 20:33:31 +0100 Subject: [PATCH 01/13] Space Saving, HyperLogLog and Hierarchical Heavy Hitters algorithms --- river/sketch/hierarchical_heavy_hitters.py | 290 +++++++++++++++++++++ river/sketch/hyper_log_log.py | 142 ++++++++++ river/sketch/space_saving.py | 101 +++++++ 3 files changed, 533 insertions(+) create mode 100644 river/sketch/hierarchical_heavy_hitters.py create mode 100644 river/sketch/hyper_log_log.py create mode 100644 river/sketch/space_saving.py diff --git a/river/sketch/hierarchical_heavy_hitters.py b/river/sketch/hierarchical_heavy_hitters.py new file mode 100644 index 0000000000..3e16e4bf32 --- /dev/null +++ b/river/sketch/hierarchical_heavy_hitters.py @@ -0,0 +1,290 @@ +from __future__ import annotations +import typing + +import math + +from river import base + +class HierarchicalHeavyHitters(base.Base): + + """Full Ancestry Algorithm implementation for the Hierarchical Heavy Hitters problem.[^1] + + The Hierarchical Heavy Hitters problem involves identifying the most frequent items in a data stream while organizing them hierarchically. + The Full Ancestry Algorithm leverages the hierarchical structure of the data to provide accurate frequency estimates by taking into account the frequencies of ancestor nodes. + The algorithm operates in three principal phases: + - **Insertion:** For every new data element received, the algorithm recursively tries to find the element in the trie. If it is present, it increments the counter of the element by its weight. Otherwise, its parent is recursively called until finding the closest one, or the root is reached. + - **Compression:** After every `w` updates, the compression phase is triggered to reduce space usage by merging nodes with counts below the current bucket threshold. It merges nodes where the sum of their exact count and estimated error is less than or equal to the current bucket number minus one. + - **Output:** This function generates a list of heavy hitters with frequency estimates above a threshold given by phi*N. It transverses the hierarchical tree and aggregates the frequencies of nodes that meet the specified criteria, ensuring that the output reflects the most significant elements in the data stream. + + Parameters + ---------- + k + The number of heavy hitters to track. + epsilon + The error parameter. Smaller values increase the accuracy but also the memory usage. Should be in $[0, 1]$. + parent_func + Function to fetch the parent of order i from child x. The function should return the root_value when i has reached the end of the tree and x when i equals 0. If this parameter is not given it defaults to a function that returns the prefix of length i of the input element. + root_value: + The value of the root node in the hierarchical tree. This parameter defines the starting point of the hierarchy. If no root value is specified, the root will be initialized when the first data element is processed and will have the value of None. + + Attributes + ---------- + bucketSize : int + The size of buckets used to compress counts. + N : int + The total number of updates processed. + root : HierarchicalHeavyHitters.Node + The root node of the hierarchical tree. + + Examples + -------- + + >>> from river import sketch + + >>> def custom_parent_func(x, i): + ... if i < len(x): + ... return None #root value + ... return x[:i] + + >>> hierarchical_hh = sketch.HierarchicalHeavyHitters(k=10, epsilon=0.001, parent_func=custom_parent_func, root_value=None) + + >>> for line in [1,2,21,31,34,212,3,24]: + ... hierarchical_hh.update(str(line)) + + >>> print(hierarchical_hh) + ge: 0, delta_e: 0, max_e: 0 + : + ge: 0, delta_e: 0, max_e: 0 + 1: + ge: 1, delta_e: 0, max_e: 0 + 2: + ge: 1, delta_e: 0, max_e: 0 + 21: + ge: 1, delta_e: 0, max_e: 0 + 212: + ge: 1, delta_e: 0, max_e: 0 + 24: + ge: 1, delta_e: 0, max_e: 0 + 3: + ge: 1, delta_e: 0, max_e: 0 + 31: + ge: 1, delta_e: 0, max_e: 0 + 34: + ge: 1, delta_e: 0, max_e: 0 + + >>> print( hierarchical_hh['212']) + 1 + + >>> phi = 0.01 + >>> heavy_hitters = hierarchical_hh.output(phi) + >>> print(heavy_hitters) + [('1', 1), ('212', 1), ('21', 2), ('24', 1), ('2', 4), ('31', 1), ('34', 1), ('3', 3)] + + >>> def custom_parent_func2(x, i): + ... parts = x.split('.') + ... if i >= len(parts): + ... return None + ... return '.'.join(parts[:i+1]) + + >>> hierarchical_hh = sketch.HierarchicalHeavyHitters(k=10, epsilon=0.001, parent_func=custom_parent_func2, root_value=None) + + >>> for line in ["123.456","123.123", "456.123", "123", "123"]: + ... hierarchical_hh.update(line) + + >>> print(hierarchical_hh) + ge: 0, delta_e: 0, max_e: 0 + 123: + ge: 2, delta_e: 0, max_e: 0 + 123.456: + ge: 1, delta_e: 0, max_e: 0 + 123.123: + ge: 1, delta_e: 0, max_e: 0 + 456: + ge: 0, delta_e: 0, max_e: 0 + 456.123: + ge: 1, delta_e: 0, max_e: 0 + + >>> heavy_hitters = hierarchical_hh.output(phi) + [('123.456', 1), ('123.123', 1), ('123', 4), ('456.123', 1)] + + References + ---------- + - [^1]: Cormode, Graham, Flip Korn, S. Muthukrishnan, and Divesh Srivastava. "Finding hierarchical heavy hitters in streaming data." Proceedings of the 16th ACM SIGKDD International Conference on Knowledge Discovery and Data Mining. 2010. + """ + + class Node: + """Represents a node in the hierarchical tree structure used by HHH.""" + + class Node: + def __init__(self): + self.ge = 0 + self.delta_e = 0 + self.max_e = 0 + self.fe = 0 + self.Fe = 0 + self.children: typing.Dict[typing.Hashable, HierarchicalHeavyHitters.Node] = {} + + def __init__(self, k: int, epsilon: float, parent_func: typing.Callable[[typing.Hashable, int], typing.Hashable] = None, root_value: typing.Hashable = None): + self.k = k + self.epsilon = epsilon + self.bucketSize = math.floor(1 / epsilon) + self.N = 0 + self.root = None if root_value is None else HierarchicalHeavyHitters.Node() + self.parent_func = parent_func if parent_func is not None else lambda x, i: None if i > len(str(x)) else str(x)[:i] + self.root_value = root_value + + def update(self, x: typing.Hashable, w: int = 1): + """Update the count for a given hierarchical key with an optional weight.""" + + self.N += 1 + if self.root is None: + self.root = HierarchicalHeavyHitters.Node() + self.root.delta_e = self.currentBucket() - 1 + self.root.max_e = self.root.delta_e + + current = self.root + parent_me = 0 + + sub_key = x + i = 0 + + while str(sub_key)!=str(self.root_value): + + sub_key = self.parent_func(x, i) + + i+=1 + + if str(sub_key) == str(self.root_value): + if self.root is None: + self.root = HierarchicalHeavyHitters.Node() + self.root.delta_e = self.currentBucket() - 1 + self.root.max_e = self.root.delta_e + current = self.root + + + elif sub_key in current.children: + current = current.children[sub_key] + if str(sub_key) == str(x): + current.ge += w + + else: + current.children[sub_key] = HierarchicalHeavyHitters.Node() + current = current.children[sub_key] + current.delta_e = parent_me + current.max_e = parent_me + + if str(sub_key) == str(x): + current.ge += w + + parent_me = current.max_e + + self.compress() + + def currentBucket(self): + """Calculate the current bucket number based on the total updates processed.""" + return math.ceil(self.N / self.bucketSize) + + def compress(self): + """Compress the hierarchical tree by merging nodes with counts below the current bucket threshold.""" + if (self.N % self.bucketSize == 0): + self._compress_node(self.root) + + def _compress_node(self, node: HierarchicalHeavyHitters.Node): + """Recursively compress nodes in the hierarchical tree.""" + if not node.children: + return + + for child_key, child_node in list(node.children.items()): + + if not child_node.children=={} : + self._compress_node(child_node) + + else: + + if child_node.ge + child_node.delta_e <= self.currentBucket() - 1: + node.ge += child_node.ge + node.max_e = max (node.max_e, child_node.ge + child_node.delta_e) + del node.children[child_key] + + + def output(self, phi: float) -> list[typing.Hashable]: + """Generate a list of heavy hitters with frequency estimates above the given threshold.""" + result = [] + self.root.fe = 0 + self.root.Fe = 0 + + for _, child_node in list(self.root.children.items()): + child_node.fe = 0 + child_node.Fe = 0 + + self._output_node(self.root, phi, result) + return result + + def _output_node(self, node: HierarchicalHeavyHitters.Node, phi: float, result: list): + """Recursively generate heavy hitters from the hierarchical tree.""" + if not node.children: + return + + for child_key, child_node in list(node.children.items()): + + if not child_node.children=={} : + self._output_node(child_node, phi, result) + + if child_node.ge + node.ge + node.delta_e >= phi * self.N: + result.append((child_key,child_node.fe + child_node.ge + child_node.delta_e)) + + else: + node.Fe += child_node.Fe + child_node.ge + + node.fe += child_node.fe + child_node.ge + + def __getitem__(self, key: typing.Hashable) -> int: + """Get the count of a specific hierarchical key.""" + current = self.root + + for i in range(len(key)): + + sub_key = key[:i + 1] + + + if sub_key not in current.children: + + return 0 + + current = current.children[sub_key] + + if sub_key == key: + + return current.ge + + + def totals(self) -> int: + """Return the total number of elements in the hierarchical tree.""" + return self._count_entries(self.root) -1 + + def _count_entries(self, node: HierarchicalHeavyHitters.Node) -> int: + """Recursively count the total number of nodes in the hierarchical tree.""" + total = 1 + + for child_node in node.children.values(): + total += self._count_entries(child_node) + + return total + + def __str__(self): + """Return a string representation of the hierarchical tree.""" + if self.root == None: + return "None" + return self._print_node(self.root, 0) + + def _print_node(self, node: HierarchicalHeavyHitters.Node, level: int) -> str: + """Recursively generate a string representation of the hierarchical tree.""" + indent = ' ' * 4 + result = '' + result += f"{indent * level}ge: {node.ge}, delta_e: {node.delta_e}, max_e: {node.max_e}\n" + for child_key, child_node in node.children.items(): + result += f"{indent * level}{child_key }: \n" + result += self._print_node(child_node, level + 1) + return result + + diff --git a/river/sketch/hyper_log_log.py b/river/sketch/hyper_log_log.py new file mode 100644 index 0000000000..6f0562dc18 --- /dev/null +++ b/river/sketch/hyper_log_log.py @@ -0,0 +1,142 @@ +from __future__ import annotations + +import math +import typing + +from river import base + +class HyperLogLog(base.Base): + + """HyperLogLog algorithm for cardinality estimation.[^1][^2] + + The LogLog algorithm is designed to estimate cardinality of a data set with the aid + of m bytes of auxiliary memory, known as registers. + + Firstly, each element in the data set is hashed into a binary string, ensuring data is + uniformly distributed and simulating random distribution. The algorithm hashes each element + into a binary string and then organizes these binary representations into registers. + + HyperLogLog, represents an improvement over the original LogLog algorithm by utilizing a + technique called harmonic mean to estimate the cardinality. + + Parameters + ---------- + b : int + The precision parameter which determines the number of registers used (m = 2^b). + Higher values of b provide more accurate estimates but use more memory. + + Attributes + ---------- + m : int + The number of registers (2^b). + alpha : float + A constant used in the cardinality estimation formula, which depends on m. + registers : list of int + A list of size m to store the maximum number of leading zeroes observed in the hash values. + + Methods + ------- + update(x) + Update the registers with the given element. + count() -> int + Estimate the number of distinct elements. + __len__() -> int + Return the estimated number of distinct elements. + get_alpha(m) -> float + Compute the bias correction constant alpha based on the number of registers. + left_most_one(w) -> int + Find the position of the left-most 1-bit in the binary representation of a number. + + + Examples + -------- + + >>> from river import sketch + + >>> hyperloglog = sketch.HyperLogLog(b=15) + + >>> for i in range(100): + ... hyperloglog.update(i) + + >>> print(hyperloglog.count()) + 100 + + >>> hyperloglog = HyperLogLog(b=15) + + >>> for i in range(100): + ... hyperloglog.update(i%10) + + >>> print(hyperloglog.count()) + 10 + + References + ---------- + + - [^1]: Marianne Durand and Philippe Flajolet. Loglog counting of large cardinalities (extended abstract). Algorithms Project, INRIA–Rocquencourt, 2003. + - [^2]: Philippe Flajolet, ́Eric Fusy, Olivier Gandouet, and Fr ́ed ́eric Meunier. Hyperloglog: the analysis of a near-optimal cardinality estimation algorithm. Algorithms Project, IN-RIA–Rocquencourt. + + """ + + def __init__(self, b: int): + self.b = b + self.m = 2 ** b + self.alpha = self.get_alpha(self.m) + self.registers = [0] * self.m + + @staticmethod + def get_alpha(m: int) -> float: + """ + Compute the bias correction constant alpha based on the number of registers. + This constant improves the accuracy of the cardinality estimation. + """ + if m == 16: + return 0.673 + if m == 32: + return 0.697 + if m == 64: + return 0.709 + return 0.7213 / (1 + 1.079 / m) + + @staticmethod + def left_most_one(w: int) -> int: + """ + Find the position of the left-most 1-bit in the binary representation of a number. + This helps determine the rank of the hash value. + """ + return len(bin(w)) - bin(w).rfind('1') - 1 + + def update(self, x: typing.Hashable): + """ + Update the registers with the given element. + The element is hashed, and the hash value is used to update the appropriate register. + """ + hash_val = hash(x) + j = hash_val & (self.m - 1) + + w = hash_val >> self.b + + self.registers[j] = max(self.registers[j], self.left_most_one(w)) + + def count(self) -> int: + """ + Estimate the number of distinct elements. + This method uses the harmonic mean of the registers to provide an estimate. + """ + + est = self.alpha * self.m ** 2 / sum(2 ** (-reg) for reg in self.registers) + + if est <= 5 / 2 * self.m: + v = self.registers.count(0) + if v != 0: + return round(self.m * math.log(self.m / v)) + elif est <= 1 / 30 * 2 ** 32: + return round(est) + else: + return round(-2 ** 32 * math.log(1 - est / 2 ** 32)) + + def __len__(self) -> int: + """ + Return the estimated number of distinct elements. + This method simply calls the count method. + """ + return self.count() diff --git a/river/sketch/space_saving.py b/river/sketch/space_saving.py new file mode 100644 index 0000000000..d748e0f06b --- /dev/null +++ b/river/sketch/space_saving.py @@ -0,0 +1,101 @@ +from __future__ import annotations + +import typing + + +from river import base + +class SpaceSaving(base.Base): + """Space-Saving algorithm for finding heavy hitters.[^1] + + The Space-Saving algorithm is designed to find the heavy hitters in a data stream using a + hash map with a fixed amount of memory. It keeps track of the k most frequent items at any + given time, as well as their corresponding approximate frequency. + + Upon receiving a new item from the data stream, if it corresponds to a monitored element, + the algorithm increments its counter. Conversely, if the received element does not match + any monitored element, the algorithm finds the tuple with the smallest counter value and + replaces its element with the new element, incrementing its counter. + + Parameters + ---------- + k + The maximum number of heavy hitters to store. The higher the value of k, the higher the + accuracy of the algorithm. + + Attributes + ---------- + counts : dict + A dictionary to store the counts of items. The keys correspond to the elements and the + values to their respective count. + + Methods + ------- + update(x, w=1) + Update the counts with the given element and weight. + __getitem__(x) -> int + Get the count of the given element. + __len__() -> int + Return the number of elements stored. + total() -> int + Return the total count. + heavy_hitters() -> int + Return the heavy hitters stored. + + Examples + -------- + >>> from river import sketch + + >>> spacesaving = sketch.SpaceSaving(k=10) + + >>> for i in range(100): + ... spacesaving.update(i % 10) + + >>> print(len(spacesaving)) + 10 + >>> print(spacesaving.total()) + 100 + >>> print(spacesaving.heavy_hitters) + {0: 10, 1: 10, 2: 10, 3: 10, 4: 10, 5: 10, 6: 10, 7: 10, 8: 10, 9: 10} + >>> print(spacesaving[10]) + 10 + + + References + ---------- + - [^1]: Cormode, G., & Hadjieleftheriou, M. (2008). Finding Frequent Items in Data Streams. AT&T Labs–Research, Florham Park, NJ. + """ + + def __init__(self, k: int): + self.k = k + self.counts = {} + + def update(self, x: typing.Hashable, w: int = 1): + """Update the counts with the given element.""" + if x in self.counts: + self.counts[x] += w + + elif len(self.counts) >= self.k: + min_count_key = min(self.counts, key=self.counts.get) + self.counts[x] = self.counts.get(min_count_key) + 1 + del self.counts[min_count_key] + + else: + self.counts[x] = w + + def __getitem__(self, x) -> int: + """Get the count of the given element.""" + return self.counts.get(x, 0) + + def __len__(self): + """Return the number of elements stored.""" + return len(self.counts) + + def total(self) -> int: + """Return the total count.""" + return sum(self.counts.values()) + + @property + def heavy_hitters(self): + """Return the heavy hitters stored.""" + return self.counts From 03983487cc26c0fcf843de4cab53b9f08ed68d8a Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Fri, 21 Jun 2024 21:04:46 +0100 Subject: [PATCH 02/13] Fixed code quality --- river/sketch/hierarchical_heavy_hitters.py | 4 +--- river/sketch/hyper_log_log.py | 2 ++ river/sketch/space_saving.py | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/river/sketch/hierarchical_heavy_hitters.py b/river/sketch/hierarchical_heavy_hitters.py index 3e16e4bf32..132c6152b6 100644 --- a/river/sketch/hierarchical_heavy_hitters.py +++ b/river/sketch/hierarchical_heavy_hitters.py @@ -114,8 +114,6 @@ class HierarchicalHeavyHitters(base.Base): class Node: """Represents a node in the hierarchical tree structure used by HHH.""" - - class Node: def __init__(self): self.ge = 0 self.delta_e = 0 @@ -273,7 +271,7 @@ def _count_entries(self, node: HierarchicalHeavyHitters.Node) -> int: def __str__(self): """Return a string representation of the hierarchical tree.""" - if self.root == None: + if self.root is None: return "None" return self._print_node(self.root, 0) diff --git a/river/sketch/hyper_log_log.py b/river/sketch/hyper_log_log.py index 6f0562dc18..571a424f78 100644 --- a/river/sketch/hyper_log_log.py +++ b/river/sketch/hyper_log_log.py @@ -116,6 +116,8 @@ def update(self, x: typing.Hashable): w = hash_val >> self.b self.registers[j] = max(self.registers[j], self.left_most_one(w)) + + return def count(self) -> int: """ diff --git a/river/sketch/space_saving.py b/river/sketch/space_saving.py index d748e0f06b..acc2d27b12 100644 --- a/river/sketch/space_saving.py +++ b/river/sketch/space_saving.py @@ -68,7 +68,7 @@ class SpaceSaving(base.Base): def __init__(self, k: int): self.k = k - self.counts = {} + self.counts : dict[str, int] = {} def update(self, x: typing.Hashable, w: int = 1): """Update the counts with the given element.""" From e6af5bb56bc6e56aea944189d03c3fafd2cfb6b2 Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Fri, 21 Jun 2024 21:26:12 +0100 Subject: [PATCH 03/13] Extra fixes on code quality --- river/sketch/hierarchical_heavy_hitters.py | 13 +++++++++++-- river/sketch/hyper_log_log.py | 3 --- river/sketch/space_saving.py | 11 +++++------ 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/river/sketch/hierarchical_heavy_hitters.py b/river/sketch/hierarchical_heavy_hitters.py index 132c6152b6..da7e8788e2 100644 --- a/river/sketch/hierarchical_heavy_hitters.py +++ b/river/sketch/hierarchical_heavy_hitters.py @@ -207,7 +207,7 @@ def _compress_node(self, node: HierarchicalHeavyHitters.Node): def output(self, phi: float) -> list[typing.Hashable]: """Generate a list of heavy hitters with frequency estimates above the given threshold.""" - result = [] + result: list[tuple[typing.Hashable, int]] = [] self.root.fe = 0 self.root.Fe = 0 @@ -220,6 +220,10 @@ def output(self, phi: float) -> list[typing.Hashable]: def _output_node(self, node: HierarchicalHeavyHitters.Node, phi: float, result: list): """Recursively generate heavy hitters from the hierarchical tree.""" + + if node is None: + return + if not node.children: return @@ -235,6 +239,8 @@ def _output_node(self, node: HierarchicalHeavyHitters.Node, phi: float, result: node.Fe += child_node.Fe + child_node.ge node.fe += child_node.fe + child_node.ge + + return def __getitem__(self, key: typing.Hashable) -> int: """Get the count of a specific hierarchical key.""" @@ -262,7 +268,10 @@ def totals(self) -> int: def _count_entries(self, node: HierarchicalHeavyHitters.Node) -> int: """Recursively count the total number of nodes in the hierarchical tree.""" - total = 1 + if node is None: + return 0 + + total = 1 # Include the current node for child_node in node.children.values(): total += self._count_entries(child_node) diff --git a/river/sketch/hyper_log_log.py b/river/sketch/hyper_log_log.py index 571a424f78..0ec28365ae 100644 --- a/river/sketch/hyper_log_log.py +++ b/river/sketch/hyper_log_log.py @@ -112,11 +112,8 @@ def update(self, x: typing.Hashable): """ hash_val = hash(x) j = hash_val & (self.m - 1) - w = hash_val >> self.b - self.registers[j] = max(self.registers[j], self.left_most_one(w)) - return def count(self) -> int: diff --git a/river/sketch/space_saving.py b/river/sketch/space_saving.py index acc2d27b12..88de70cef0 100644 --- a/river/sketch/space_saving.py +++ b/river/sketch/space_saving.py @@ -68,7 +68,7 @@ class SpaceSaving(base.Base): def __init__(self, k: int): self.k = k - self.counts : dict[str, int] = {} + self.counts: dict[typing.Hashable, int] = {} def update(self, x: typing.Hashable, w: int = 1): """Update the counts with the given element.""" @@ -77,17 +77,16 @@ def update(self, x: typing.Hashable, w: int = 1): elif len(self.counts) >= self.k: min_count_key = min(self.counts, key=self.counts.get) - self.counts[x] = self.counts.get(min_count_key) + 1 - del self.counts[min_count_key] + self.counts[x] = self.counts.pop(min_count_key, 0) + 1 else: self.counts[x] = w - def __getitem__(self, x) -> int: + def __getitem__(self, x: typing.Hashable) -> int: """Get the count of the given element.""" return self.counts.get(x, 0) - def __len__(self): + def __len__(self) -> int: """Return the number of elements stored.""" return len(self.counts) @@ -96,6 +95,6 @@ def total(self) -> int: return sum(self.counts.values()) @property - def heavy_hitters(self): + def heavy_hitters(self) -> dict[typing.Hashable, int]: """Return the heavy hitters stored.""" return self.counts From 0c69cce167927a9b44600b5f93fe2ecab5b20851 Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Fri, 21 Jun 2024 21:35:12 +0100 Subject: [PATCH 04/13] Fixes on HHH --- river/sketch/hierarchical_heavy_hitters.py | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/river/sketch/hierarchical_heavy_hitters.py b/river/sketch/hierarchical_heavy_hitters.py index da7e8788e2..4c936f78c2 100644 --- a/river/sketch/hierarchical_heavy_hitters.py +++ b/river/sketch/hierarchical_heavy_hitters.py @@ -205,17 +205,18 @@ def _compress_node(self, node: HierarchicalHeavyHitters.Node): del node.children[child_key] - def output(self, phi: float) -> list[typing.Hashable]: + def output(self, phi: float) -> list[typing.Tuple[typing.Hashable, int]]: """Generate a list of heavy hitters with frequency estimates above the given threshold.""" result: list[tuple[typing.Hashable, int]] = [] - self.root.fe = 0 - self.root.Fe = 0 + if self.root: + self.root.fe = 0 + self.root.Fe = 0 - for _, child_node in list(self.root.children.items()): - child_node.fe = 0 - child_node.Fe = 0 + for _, child_node in list(self.root.children.items()): + child_node.fe = 0 + child_node.Fe = 0 - self._output_node(self.root, phi, result) + self._output_node(self.root, phi, result) return result def _output_node(self, node: HierarchicalHeavyHitters.Node, phi: float, result: list): @@ -250,7 +251,6 @@ def __getitem__(self, key: typing.Hashable) -> int: sub_key = key[:i + 1] - if sub_key not in current.children: return 0 @@ -260,6 +260,8 @@ def __getitem__(self, key: typing.Hashable) -> int: if sub_key == key: return current.ge + + return 0 def totals(self) -> int: From 6c1c11541721ba8c651ad7b87ac688f96007ac6c Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Fri, 21 Jun 2024 22:35:40 +0100 Subject: [PATCH 05/13] FIxed return statements and annotations --- river/sketch/hierarchical_heavy_hitters.py | 23 +++++++++++++--------- river/sketch/hyper_log_log.py | 2 +- river/sketch/space_saving.py | 5 ++--- 3 files changed, 17 insertions(+), 13 deletions(-) diff --git a/river/sketch/hierarchical_heavy_hitters.py b/river/sketch/hierarchical_heavy_hitters.py index 4c936f78c2..fa32f7daff 100644 --- a/river/sketch/hierarchical_heavy_hitters.py +++ b/river/sketch/hierarchical_heavy_hitters.py @@ -189,7 +189,7 @@ def compress(self): def _compress_node(self, node: HierarchicalHeavyHitters.Node): """Recursively compress nodes in the hierarchical tree.""" - if not node.children: + if node is not None and not node.children: return for child_key, child_node in list(node.children.items()): @@ -247,26 +247,31 @@ def __getitem__(self, key: typing.Hashable) -> int: """Get the count of a specific hierarchical key.""" current = self.root - for i in range(len(key)): - + if isinstance(key, str): + for i in range(len(key)): sub_key = key[:i + 1] if sub_key not in current.children: - return 0 - + current = current.children[sub_key] if sub_key == key: - - return current.ge - + return current.ge + else: + + return 0 + return 0 def totals(self) -> int: """Return the total number of elements in the hierarchical tree.""" - return self._count_entries(self.root) -1 + if self.root is not None: + total = self._count_entries(self.root) - 1 + else: + total = 0 + return total def _count_entries(self, node: HierarchicalHeavyHitters.Node) -> int: """Recursively count the total number of nodes in the hierarchical tree.""" diff --git a/river/sketch/hyper_log_log.py b/river/sketch/hyper_log_log.py index 0ec28365ae..18281aad1b 100644 --- a/river/sketch/hyper_log_log.py +++ b/river/sketch/hyper_log_log.py @@ -114,7 +114,7 @@ def update(self, x: typing.Hashable): j = hash_val & (self.m - 1) w = hash_val >> self.b self.registers[j] = max(self.registers[j], self.left_most_one(w)) - return + return None def count(self) -> int: """ diff --git a/river/sketch/space_saving.py b/river/sketch/space_saving.py index 88de70cef0..5afef82c1d 100644 --- a/river/sketch/space_saving.py +++ b/river/sketch/space_saving.py @@ -72,13 +72,12 @@ def __init__(self, k: int): def update(self, x: typing.Hashable, w: int = 1): """Update the counts with the given element.""" + if x in self.counts: self.counts[x] += w - elif len(self.counts) >= self.k: - min_count_key = min(self.counts, key=self.counts.get) + min_count_key = min(self.counts, key=lambda k: self.counts[k]) # Use lambda to specify key function self.counts[x] = self.counts.pop(min_count_key, 0) + 1 - else: self.counts[x] = w From d8f6c0eb84c0fc10d94899f9c427ac5a41cda4ef Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Fri, 21 Jun 2024 22:47:43 +0100 Subject: [PATCH 06/13] Deal with None values --- river/sketch/hierarchical_heavy_hitters.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/river/sketch/hierarchical_heavy_hitters.py b/river/sketch/hierarchical_heavy_hitters.py index fa32f7daff..cacd6a702f 100644 --- a/river/sketch/hierarchical_heavy_hitters.py +++ b/river/sketch/hierarchical_heavy_hitters.py @@ -241,17 +241,19 @@ def _output_node(self, node: HierarchicalHeavyHitters.Node, phi: float, result: node.fe += child_node.fe + child_node.ge - return + return None def __getitem__(self, key: typing.Hashable) -> int: """Get the count of a specific hierarchical key.""" current = self.root + + if isinstance(key, str): for i in range(len(key)): sub_key = key[:i + 1] - if sub_key not in current.children: + if current is None or sub_key not in current.children: return 0 current = current.children[sub_key] From ff6621632f190dae968edda377671f0b4fe0901d Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Fri, 21 Jun 2024 22:53:59 +0100 Subject: [PATCH 07/13] Remove return stmt --- river/sketch/hyper_log_log.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river/sketch/hyper_log_log.py b/river/sketch/hyper_log_log.py index 18281aad1b..bd1d712e06 100644 --- a/river/sketch/hyper_log_log.py +++ b/river/sketch/hyper_log_log.py @@ -114,7 +114,7 @@ def update(self, x: typing.Hashable): j = hash_val & (self.m - 1) w = hash_val >> self.b self.registers[j] = max(self.registers[j], self.left_most_one(w)) - return None + def count(self) -> int: """ From 898f83a22178d0a001ffa95af8b2c261a720e698 Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Fri, 21 Jun 2024 23:02:10 +0100 Subject: [PATCH 08/13] Remove white line --- river/sketch/hyper_log_log.py | 1 - 1 file changed, 1 deletion(-) diff --git a/river/sketch/hyper_log_log.py b/river/sketch/hyper_log_log.py index bd1d712e06..8335aa45d7 100644 --- a/river/sketch/hyper_log_log.py +++ b/river/sketch/hyper_log_log.py @@ -121,7 +121,6 @@ def count(self) -> int: Estimate the number of distinct elements. This method uses the harmonic mean of the registers to provide an estimate. """ - est = self.alpha * self.m ** 2 / sum(2 ** (-reg) for reg in self.registers) if est <= 5 / 2 * self.m: From 30eef8145702c2e180aa2c9cc6c732bac0e806a5 Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Fri, 21 Jun 2024 23:07:22 +0100 Subject: [PATCH 09/13] Added return --- river/sketch/hyper_log_log.py | 1 + 1 file changed, 1 insertion(+) diff --git a/river/sketch/hyper_log_log.py b/river/sketch/hyper_log_log.py index 8335aa45d7..ad755adf0c 100644 --- a/river/sketch/hyper_log_log.py +++ b/river/sketch/hyper_log_log.py @@ -131,6 +131,7 @@ def count(self) -> int: return round(est) else: return round(-2 ** 32 * math.log(1 - est / 2 ** 32)) + return 0 def __len__(self) -> int: """ From 006709aa885f002188c1169b5e87195192aab8ca Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Sat, 22 Jun 2024 12:49:21 +0100 Subject: [PATCH 10/13] Removed trailing whitespace --- river/sketch/hierarchical_heavy_hitters.py | 117 +++++++++++---------- river/sketch/hyper_log_log.py | 12 ++- river/sketch/space_saving.py | 15 +-- 3 files changed, 77 insertions(+), 67 deletions(-) diff --git a/river/sketch/hierarchical_heavy_hitters.py b/river/sketch/hierarchical_heavy_hitters.py index cacd6a702f..770e2d1c4f 100644 --- a/river/sketch/hierarchical_heavy_hitters.py +++ b/river/sketch/hierarchical_heavy_hitters.py @@ -9,27 +9,45 @@ class HierarchicalHeavyHitters(base.Base): """Full Ancestry Algorithm implementation for the Hierarchical Heavy Hitters problem.[^1] - The Hierarchical Heavy Hitters problem involves identifying the most frequent items in a data stream while organizing them hierarchically. - The Full Ancestry Algorithm leverages the hierarchical structure of the data to provide accurate frequency estimates by taking into account the frequencies of ancestor nodes. + The Hierarchical Heavy Hitters problem involves identifying the most frequent + items in a data stream while organizing them hierarchically.The Full Ancestry + Algorithm leverages the hierarchical structure of the data + to provide accurate frequency estimates by taking into account the frequencies + of ancestor nodes. The algorithm operates in three principal phases: - - **Insertion:** For every new data element received, the algorithm recursively tries to find the element in the trie. If it is present, it increments the counter of the element by its weight. Otherwise, its parent is recursively called until finding the closest one, or the root is reached. - - **Compression:** After every `w` updates, the compression phase is triggered to reduce space usage by merging nodes with counts below the current bucket threshold. It merges nodes where the sum of their exact count and estimated error is less than or equal to the current bucket number minus one. - - **Output:** This function generates a list of heavy hitters with frequency estimates above a threshold given by phi*N. It transverses the hierarchical tree and aggregates the frequencies of nodes that meet the specified criteria, ensuring that the output reflects the most significant elements in the data stream. + - **Insertion:** For every new data element received, the algorithm recursively + tries to find the element in the trie. If it is present, it increments the counter + of the element by its weight. Otherwise, its parent is recursively called until finding + the closest one, or the root is reached. + - **Compression:** After every `w` updates, the compression phase is triggered to reduce + space usage by merging nodes with counts below the current bucket threshold. It merges + nodes where the sum of their exact count and estimated error is less than or equal to the + current bucket number minus one. + - **Output:** This function generates a list of heavy hitters with frequency estimates above + a threshold given by phi*N. It transverses the hierarchical tree and aggregates the frequencies + of nodes that meet the specified criteria, ensuring that the output reflects the most + significant elements in the data stream. Parameters ---------- k The number of heavy hitters to track. epsilon - The error parameter. Smaller values increase the accuracy but also the memory usage. Should be in $[0, 1]$. + The error parameter. Smaller values increase the accuracy but also + the memory usage. Should be in $[0, 1]$. parent_func - Function to fetch the parent of order i from child x. The function should return the root_value when i has reached the end of the tree and x when i equals 0. If this parameter is not given it defaults to a function that returns the prefix of length i of the input element. + Function to fetch the parent of order i from child x. The function should + return the root_value when i has reached the end of the tree and x when i + equals 0. If this parameter is not given it defaults to a function that returns + the prefix of length i of the input element. root_value: - The value of the root node in the hierarchical tree. This parameter defines the starting point of the hierarchy. If no root value is specified, the root will be initialized when the first data element is processed and will have the value of None. + The value of the root node in the hierarchical tree. This parameter defines the + starting point of the hierarchy. If no root value is specified, the root will be + initialized when the first data element is processed and will have the value of None. Attributes ---------- - bucketSize : int + bucket_size : int The size of buckets used to compress counts. N : int The total number of updates processed. @@ -46,12 +64,12 @@ class HierarchicalHeavyHitters(base.Base): ... return None #root value ... return x[:i] - >>> hierarchical_hh = sketch.HierarchicalHeavyHitters(k=10, epsilon=0.001, parent_func=custom_parent_func, root_value=None) + >>> hhh = sketch.HierarchicalHeavyHitters(k=10, epsilon=0.001, parent_func=custom_parent_func) >>> for line in [1,2,21,31,34,212,3,24]: - ... hierarchical_hh.update(str(line)) + ... hhh.update(str(line)) - >>> print(hierarchical_hh) + >>> print(hhh) ge: 0, delta_e: 0, max_e: 0 : ge: 0, delta_e: 0, max_e: 0 @@ -72,11 +90,11 @@ class HierarchicalHeavyHitters(base.Base): 34: ge: 1, delta_e: 0, max_e: 0 - >>> print( hierarchical_hh['212']) + >>> print( hhh['212']) 1 >>> phi = 0.01 - >>> heavy_hitters = hierarchical_hh.output(phi) + >>> heavy_hitters = hhh.output(phi) >>> print(heavy_hitters) [('1', 1), ('212', 1), ('21', 2), ('24', 1), ('2', 4), ('31', 1), ('34', 1), ('3', 3)] @@ -86,12 +104,12 @@ class HierarchicalHeavyHitters(base.Base): ... return None ... return '.'.join(parts[:i+1]) - >>> hierarchical_hh = sketch.HierarchicalHeavyHitters(k=10, epsilon=0.001, parent_func=custom_parent_func2, root_value=None) + >>> hhh = sketch.HierarchicalHeavyHitters(k=10, epsilon=0.001, parent_func=custom_parent_func2) >>> for line in ["123.456","123.123", "456.123", "123", "123"]: - ... hierarchical_hh.update(line) + ... hhh.update(line) - >>> print(hierarchical_hh) + >>> print(hhh) ge: 0, delta_e: 0, max_e: 0 123: ge: 2, delta_e: 0, max_e: 0 @@ -104,12 +122,14 @@ class HierarchicalHeavyHitters(base.Base): 456.123: ge: 1, delta_e: 0, max_e: 0 - >>> heavy_hitters = hierarchical_hh.output(phi) + >>> heavy_hitters = hhh.output(phi) [('123.456', 1), ('123.123', 1), ('123', 4), ('456.123', 1)] References ---------- - - [^1]: Cormode, Graham, Flip Korn, S. Muthukrishnan, and Divesh Srivastava. "Finding hierarchical heavy hitters in streaming data." Proceedings of the 16th ACM SIGKDD International Conference on Knowledge Discovery and Data Mining. 2010. + - [^1]: Cormode, Graham, Flip Korn, S. Muthukrishnan, and Divesh Srivastava. + "Finding hierarchical heavy hitters in streaming data." Proceedings of the 16th + ACM SIGKDD International Conference on Knowledge Discovery and Data Mining. 2010. """ class Node: @@ -119,14 +139,14 @@ def __init__(self): self.delta_e = 0 self.max_e = 0 self.fe = 0 - self.Fe = 0 + self.m_fe = 0 self.children: typing.Dict[typing.Hashable, HierarchicalHeavyHitters.Node] = {} def __init__(self, k: int, epsilon: float, parent_func: typing.Callable[[typing.Hashable, int], typing.Hashable] = None, root_value: typing.Hashable = None): self.k = k self.epsilon = epsilon - self.bucketSize = math.floor(1 / epsilon) - self.N = 0 + self.bucket_size = math.floor(1 / epsilon) + self.n = 0 self.root = None if root_value is None else HierarchicalHeavyHitters.Node() self.parent_func = parent_func if parent_func is not None else lambda x, i: None if i > len(str(x)) else str(x)[:i] self.root_value = root_value @@ -134,12 +154,12 @@ def __init__(self, k: int, epsilon: float, parent_func: typing.Callable[[typing. def update(self, x: typing.Hashable, w: int = 1): """Update the count for a given hierarchical key with an optional weight.""" - self.N += 1 + self.n += 1 if self.root is None: self.root = HierarchicalHeavyHitters.Node() - self.root.delta_e = self.currentBucket() - 1 + self.root.delta_e = self.current_bucket() - 1 self.root.max_e = self.root.delta_e - + current = self.root parent_me = 0 @@ -147,19 +167,16 @@ def update(self, x: typing.Hashable, w: int = 1): i = 0 while str(sub_key)!=str(self.root_value): - + sub_key = self.parent_func(x, i) - i+=1 - if str(sub_key) == str(self.root_value): if self.root is None: self.root = HierarchicalHeavyHitters.Node() - self.root.delta_e = self.currentBucket() - 1 + self.root.delta_e = self.current_bucket() - 1 self.root.max_e = self.root.delta_e current = self.root - elif sub_key in current.children: current = current.children[sub_key] if str(sub_key) == str(x): @@ -178,15 +195,15 @@ def update(self, x: typing.Hashable, w: int = 1): self.compress() - def currentBucket(self): + def current_bucket(self): """Calculate the current bucket number based on the total updates processed.""" - return math.ceil(self.N / self.bucketSize) + return math.ceil(self.n / self.bucket_size) def compress(self): """Compress the hierarchical tree by merging nodes with counts below the current bucket threshold.""" - if (self.N % self.bucketSize == 0): + if self.n % self.bucket_size == 0: self._compress_node(self.root) - + def _compress_node(self, node: HierarchicalHeavyHitters.Node): """Recursively compress nodes in the hierarchical tree.""" if node is not None and not node.children: @@ -196,25 +213,22 @@ def _compress_node(self, node: HierarchicalHeavyHitters.Node): if not child_node.children=={} : self._compress_node(child_node) - else: - - if child_node.ge + child_node.delta_e <= self.currentBucket() - 1: + if child_node.ge + child_node.delta_e <= self.current_bucket() - 1: node.ge += child_node.ge node.max_e = max (node.max_e, child_node.ge + child_node.delta_e) del node.children[child_key] - def output(self, phi: float) -> list[typing.Tuple[typing.Hashable, int]]: """Generate a list of heavy hitters with frequency estimates above the given threshold.""" result: list[tuple[typing.Hashable, int]] = [] if self.root: self.root.fe = 0 - self.root.Fe = 0 + self.root.m_fe = 0 for _, child_node in list(self.root.children.items()): child_node.fe = 0 - child_node.Fe = 0 + child_node.m_fe = 0 self._output_node(self.root, phi, result) return result @@ -227,27 +241,23 @@ def _output_node(self, node: HierarchicalHeavyHitters.Node, phi: float, result: if not node.children: return - for child_key, child_node in list(node.children.items()): if not child_node.children=={} : self._output_node(child_node, phi, result) - if child_node.ge + node.ge + node.delta_e >= phi * self.N: + if child_node.ge + node.ge + node.delta_e >= phi * self.n: result.append((child_key,child_node.fe + child_node.ge + child_node.delta_e)) else: - node.Fe += child_node.Fe + child_node.ge + node.m_fe += child_node.m_fe + child_node.ge node.fe += child_node.fe + child_node.ge - return None - + def __getitem__(self, key: typing.Hashable) -> int: """Get the count of a specific hierarchical key.""" current = self.root - - if isinstance(key, str): for i in range(len(key)): @@ -261,12 +271,9 @@ def __getitem__(self, key: typing.Hashable) -> int: if sub_key == key: return current.ge else: - return 0 return 0 - - def totals(self) -> int: """Return the total number of elements in the hierarchical tree.""" if self.root is not None: @@ -274,19 +281,19 @@ def totals(self) -> int: else: total = 0 return total - + def _count_entries(self, node: HierarchicalHeavyHitters.Node) -> int: """Recursively count the total number of nodes in the hierarchical tree.""" if node is None: return 0 total = 1 # Include the current node - + for child_node in node.children.values(): total += self._count_entries(child_node) - + return total - + def __str__(self): """Return a string representation of the hierarchical tree.""" if self.root is None: @@ -302,5 +309,3 @@ def _print_node(self, node: HierarchicalHeavyHitters.Node, level: int) -> str: result += f"{indent * level}{child_key }: \n" result += self._print_node(child_node, level + 1) return result - - diff --git a/river/sketch/hyper_log_log.py b/river/sketch/hyper_log_log.py index ad755adf0c..fa129fa22c 100644 --- a/river/sketch/hyper_log_log.py +++ b/river/sketch/hyper_log_log.py @@ -72,11 +72,15 @@ class HyperLogLog(base.Base): References ---------- - - [^1]: Marianne Durand and Philippe Flajolet. Loglog counting of large cardinalities (extended abstract). Algorithms Project, INRIA–Rocquencourt, 2003. - - [^2]: Philippe Flajolet, ́Eric Fusy, Olivier Gandouet, and Fr ́ed ́eric Meunier. Hyperloglog: the analysis of a near-optimal cardinality estimation algorithm. Algorithms Project, IN-RIA–Rocquencourt. + - [^1]: Marianne Durand and Philippe Flajolet. + Loglog counting of large cardinalities (extended abstract). + Algorithms Project, INRIA–Rocquencourt, 2003. + - [^2]: Philippe Flajolet, ́Eric Fusy, Olivier Gandouet, and Fr ́ed ́eric Meunier. + Hyperloglog: the analysis of a near-optimal cardinality estimation algorithm. + Algorithms Project, IN-RIA–Rocquencourt. """ - + def __init__(self, b: int): self.b = b self.m = 2 ** b @@ -115,7 +119,7 @@ def update(self, x: typing.Hashable): w = hash_val >> self.b self.registers[j] = max(self.registers[j], self.left_most_one(w)) - + def count(self) -> int: """ Estimate the number of distinct elements. diff --git a/river/sketch/space_saving.py b/river/sketch/space_saving.py index 5afef82c1d..1ccad7613c 100644 --- a/river/sketch/space_saving.py +++ b/river/sketch/space_saving.py @@ -63,20 +63,21 @@ class SpaceSaving(base.Base): References ---------- - - [^1]: Cormode, G., & Hadjieleftheriou, M. (2008). Finding Frequent Items in Data Streams. AT&T Labs–Research, Florham Park, NJ. + - [^1]: Cormode, G., & Hadjieleftheriou, M. (2008). + Finding Frequent Items in Data Streams. AT&T Labs–Research, Florham Park, NJ. """ def __init__(self, k: int): self.k = k - self.counts: dict[typing.Hashable, int] = {} + self.counts: dict[typing.Hashable, int] = {} def update(self, x: typing.Hashable, w: int = 1): """Update the counts with the given element.""" - + if x in self.counts: self.counts[x] += w elif len(self.counts) >= self.k: - min_count_key = min(self.counts, key=lambda k: self.counts[k]) # Use lambda to specify key function + min_count_key = min(self.counts, key=lambda k: self.counts[k]) self.counts[x] = self.counts.pop(min_count_key, 0) + 1 else: self.counts[x] = w @@ -84,15 +85,15 @@ def update(self, x: typing.Hashable, w: int = 1): def __getitem__(self, x: typing.Hashable) -> int: """Get the count of the given element.""" return self.counts.get(x, 0) - + def __len__(self) -> int: """Return the number of elements stored.""" return len(self.counts) - + def total(self) -> int: """Return the total count.""" return sum(self.counts.values()) - + @property def heavy_hitters(self) -> dict[typing.Hashable, int]: """Return the heavy hitters stored.""" From 55a827b5a7bfa3a56d6f1cb46c50ed0ef8b23bb0 Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Sat, 22 Jun 2024 19:12:21 +0100 Subject: [PATCH 11/13] Run Ruff --- river/sketch/hierarchical_heavy_hitters.py | 7 ++++--- river/sketch/hyper_log_log.py | 11 ++++++----- river/sketch/space_saving.py | 14 +++++++------- 3 files changed, 17 insertions(+), 15 deletions(-) diff --git a/river/sketch/hierarchical_heavy_hitters.py b/river/sketch/hierarchical_heavy_hitters.py index 770e2d1c4f..2c88166678 100644 --- a/river/sketch/hierarchical_heavy_hitters.py +++ b/river/sketch/hierarchical_heavy_hitters.py @@ -1,10 +1,11 @@ from __future__ import annotations -import typing import math +import typing from river import base + class HierarchicalHeavyHitters(base.Base): """Full Ancestry Algorithm implementation for the Hierarchical Heavy Hitters problem.[^1] @@ -140,7 +141,7 @@ def __init__(self): self.max_e = 0 self.fe = 0 self.m_fe = 0 - self.children: typing.Dict[typing.Hashable, HierarchicalHeavyHitters.Node] = {} + self.children: typing.dict[typing.Hashable, HierarchicalHeavyHitters.Node] = {} def __init__(self, k: int, epsilon: float, parent_func: typing.Callable[[typing.Hashable, int], typing.Hashable] = None, root_value: typing.Hashable = None): self.k = k @@ -219,7 +220,7 @@ def _compress_node(self, node: HierarchicalHeavyHitters.Node): node.max_e = max (node.max_e, child_node.ge + child_node.delta_e) del node.children[child_key] - def output(self, phi: float) -> list[typing.Tuple[typing.Hashable, int]]: + def output(self, phi: float) -> list[typing.tuple[typing.Hashable, int]]: """Generate a list of heavy hitters with frequency estimates above the given threshold.""" result: list[tuple[typing.Hashable, int]] = [] if self.root: diff --git a/river/sketch/hyper_log_log.py b/river/sketch/hyper_log_log.py index fa129fa22c..27c93c1e81 100644 --- a/river/sketch/hyper_log_log.py +++ b/river/sketch/hyper_log_log.py @@ -5,6 +5,7 @@ from river import base + class HyperLogLog(base.Base): """HyperLogLog algorithm for cardinality estimation.[^1][^2] @@ -13,10 +14,10 @@ class HyperLogLog(base.Base): of m bytes of auxiliary memory, known as registers. Firstly, each element in the data set is hashed into a binary string, ensuring data is - uniformly distributed and simulating random distribution. The algorithm hashes each element - into a binary string and then organizes these binary representations into registers. + uniformly distributed and simulating random distribution. The algorithm hashes each element + into a binary string and then organizes these binary representations into registers. - HyperLogLog, represents an improvement over the original LogLog algorithm by utilizing a + HyperLogLog, represents an improvement over the original LogLog algorithm by utilizing a technique called harmonic mean to estimate the cardinality. Parameters @@ -59,14 +60,14 @@ class HyperLogLog(base.Base): ... hyperloglog.update(i) >>> print(hyperloglog.count()) - 100 + 100 >>> hyperloglog = HyperLogLog(b=15) >>> for i in range(100): ... hyperloglog.update(i%10) - >>> print(hyperloglog.count()) + >>> print(hyperloglog.count()) 10 References diff --git a/river/sketch/space_saving.py b/river/sketch/space_saving.py index 1ccad7613c..b348c1b2ce 100644 --- a/river/sketch/space_saving.py +++ b/river/sketch/space_saving.py @@ -2,14 +2,14 @@ import typing - from river import base + class SpaceSaving(base.Base): """Space-Saving algorithm for finding heavy hitters.[^1] The Space-Saving algorithm is designed to find the heavy hitters in a data stream using a - hash map with a fixed amount of memory. It keeps track of the k most frequent items at any + hash map with a fixed amount of memory. It keeps track of the k most frequent items at any given time, as well as their corresponding approximate frequency. Upon receiving a new item from the data stream, if it corresponds to a monitored element, @@ -20,13 +20,13 @@ class SpaceSaving(base.Base): Parameters ---------- k - The maximum number of heavy hitters to store. The higher the value of k, the higher the + The maximum number of heavy hitters to store. The higher the value of k, the higher the accuracy of the algorithm. Attributes ---------- counts : dict - A dictionary to store the counts of items. The keys correspond to the elements and the + A dictionary to store the counts of items. The keys correspond to the elements and the values to their respective count. Methods @@ -47,10 +47,10 @@ class SpaceSaving(base.Base): >>> from river import sketch >>> spacesaving = sketch.SpaceSaving(k=10) - + >>> for i in range(100): ... spacesaving.update(i % 10) - + >>> print(len(spacesaving)) 10 >>> print(spacesaving.total()) @@ -59,7 +59,7 @@ class SpaceSaving(base.Base): {0: 10, 1: 10, 2: 10, 3: 10, 4: 10, 5: 10, 6: 10, 7: 10, 8: 10, 9: 10} >>> print(spacesaving[10]) 10 - + References ---------- From 643ef2dfd576013e18f5344157b3c2cde5a6ecfc Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Tue, 25 Jun 2024 10:01:35 +0100 Subject: [PATCH 12/13] Ran MyPy --- river/sketch/hierarchical_heavy_hitters.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/river/sketch/hierarchical_heavy_hitters.py b/river/sketch/hierarchical_heavy_hitters.py index 2c88166678..5e725ee6c2 100644 --- a/river/sketch/hierarchical_heavy_hitters.py +++ b/river/sketch/hierarchical_heavy_hitters.py @@ -143,7 +143,7 @@ def __init__(self): self.m_fe = 0 self.children: typing.dict[typing.Hashable, HierarchicalHeavyHitters.Node] = {} - def __init__(self, k: int, epsilon: float, parent_func: typing.Callable[[typing.Hashable, int], typing.Hashable] = None, root_value: typing.Hashable = None): + def __init__(self, k: int, epsilon: float, parent_func: typing.Optional[typing.Callable[[typing.Hashable, int], typing.Hashable]] = None, root_value: typing.Optional[typing.Hashable] = None): self.k = k self.epsilon = epsilon self.bucket_size = math.floor(1 / epsilon) @@ -220,7 +220,7 @@ def _compress_node(self, node: HierarchicalHeavyHitters.Node): node.max_e = max (node.max_e, child_node.ge + child_node.delta_e) del node.children[child_key] - def output(self, phi: float) -> list[typing.tuple[typing.Hashable, int]]: + def output(self, phi: float) -> list[tuple[typing.Hashable, int]]: """Generate a list of heavy hitters with frequency estimates above the given threshold.""" result: list[tuple[typing.Hashable, int]] = [] if self.root: @@ -269,7 +269,7 @@ def __getitem__(self, key: typing.Hashable) -> int: current = current.children[sub_key] - if sub_key == key: + if sub_key == key and current is not None: return current.ge else: return 0 From aa0c06f235c4e95a6bc4b850a971781689382ec4 Mon Sep 17 00:00:00 2001 From: laraabastoss <92671491+laraabastoss@users.noreply.github.com> Date: Tue, 25 Jun 2024 10:18:24 +0100 Subject: [PATCH 13/13] Error in types --- river/sketch/hierarchical_heavy_hitters.py | 38 +++++++++++----------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/river/sketch/hierarchical_heavy_hitters.py b/river/sketch/hierarchical_heavy_hitters.py index 5e725ee6c2..efcba1ac4e 100644 --- a/river/sketch/hierarchical_heavy_hitters.py +++ b/river/sketch/hierarchical_heavy_hitters.py @@ -60,7 +60,7 @@ class HierarchicalHeavyHitters(base.Base): >>> from river import sketch - >>> def custom_parent_func(x, i): + >>> def custom_parent_func(x, i): ... if i < len(x): ... return None #root value ... return x[:i] @@ -72,23 +72,23 @@ class HierarchicalHeavyHitters(base.Base): >>> print(hhh) ge: 0, delta_e: 0, max_e: 0 - : + : ge: 0, delta_e: 0, max_e: 0 - 1: + 1: ge: 1, delta_e: 0, max_e: 0 - 2: + 2: ge: 1, delta_e: 0, max_e: 0 - 21: + 21: ge: 1, delta_e: 0, max_e: 0 - 212: + 212: ge: 1, delta_e: 0, max_e: 0 - 24: + 24: ge: 1, delta_e: 0, max_e: 0 - 3: + 3: ge: 1, delta_e: 0, max_e: 0 - 31: + 31: ge: 1, delta_e: 0, max_e: 0 - 34: + 34: ge: 1, delta_e: 0, max_e: 0 >>> print( hhh['212']) @@ -99,10 +99,10 @@ class HierarchicalHeavyHitters(base.Base): >>> print(heavy_hitters) [('1', 1), ('212', 1), ('21', 2), ('24', 1), ('2', 4), ('31', 1), ('34', 1), ('3', 3)] - >>> def custom_parent_func2(x, i): + >>> def custom_parent_func2(x, i): ... parts = x.split('.') ... if i >= len(parts): - ... return None + ... return None ... return '.'.join(parts[:i+1]) >>> hhh = sketch.HierarchicalHeavyHitters(k=10, epsilon=0.001, parent_func=custom_parent_func2) @@ -112,17 +112,17 @@ class HierarchicalHeavyHitters(base.Base): >>> print(hhh) ge: 0, delta_e: 0, max_e: 0 - 123: + 123: ge: 2, delta_e: 0, max_e: 0 - 123.456: + 123.456: ge: 1, delta_e: 0, max_e: 0 - 123.123: + 123.123: ge: 1, delta_e: 0, max_e: 0 - 456: + 456: ge: 0, delta_e: 0, max_e: 0 - 456.123: + 456.123: ge: 1, delta_e: 0, max_e: 0 - + >>> heavy_hitters = hhh.output(phi) [('123.456', 1), ('123.123', 1), ('123', 4), ('456.123', 1)] @@ -143,7 +143,7 @@ def __init__(self): self.m_fe = 0 self.children: typing.dict[typing.Hashable, HierarchicalHeavyHitters.Node] = {} - def __init__(self, k: int, epsilon: float, parent_func: typing.Optional[typing.Callable[[typing.Hashable, int], typing.Hashable]] = None, root_value: typing.Optional[typing.Hashable] = None): + def __init__(self, k: int, epsilon: float, parent_func: typing.Callable[[typing.Hashable, int], typing.Hashable] | None = None, root_value: typing.Hashable | None = None): self.k = k self.epsilon = epsilon self.bucket_size = math.floor(1 / epsilon)