diff --git a/river/sketch/hierarchical_heavy_hitters.py b/river/sketch/hierarchical_heavy_hitters.py new file mode 100644 index 0000000000..efcba1ac4e --- /dev/null +++ b/river/sketch/hierarchical_heavy_hitters.py @@ -0,0 +1,312 @@ +from __future__ import annotations + +import math +import typing + +from river import base + + +class HierarchicalHeavyHitters(base.Base): + + """Full Ancestry Algorithm implementation for the Hierarchical Heavy Hitters problem.[^1] + + The Hierarchical Heavy Hitters problem involves identifying the most frequent + items in a data stream while organizing them hierarchically.The Full Ancestry + Algorithm leverages the hierarchical structure of the data + to provide accurate frequency estimates by taking into account the frequencies + of ancestor nodes. + The algorithm operates in three principal phases: + - **Insertion:** For every new data element received, the algorithm recursively + tries to find the element in the trie. If it is present, it increments the counter + of the element by its weight. Otherwise, its parent is recursively called until finding + the closest one, or the root is reached. + - **Compression:** After every `w` updates, the compression phase is triggered to reduce + space usage by merging nodes with counts below the current bucket threshold. It merges + nodes where the sum of their exact count and estimated error is less than or equal to the + current bucket number minus one. + - **Output:** This function generates a list of heavy hitters with frequency estimates above + a threshold given by phi*N. It transverses the hierarchical tree and aggregates the frequencies + of nodes that meet the specified criteria, ensuring that the output reflects the most + significant elements in the data stream. + + Parameters + ---------- + k + The number of heavy hitters to track. + epsilon + The error parameter. Smaller values increase the accuracy but also + the memory usage. Should be in $[0, 1]$. + parent_func + Function to fetch the parent of order i from child x. The function should + return the root_value when i has reached the end of the tree and x when i + equals 0. If this parameter is not given it defaults to a function that returns + the prefix of length i of the input element. + root_value: + The value of the root node in the hierarchical tree. This parameter defines the + starting point of the hierarchy. If no root value is specified, the root will be + initialized when the first data element is processed and will have the value of None. + + Attributes + ---------- + bucket_size : int + The size of buckets used to compress counts. + N : int + The total number of updates processed. + root : HierarchicalHeavyHitters.Node + The root node of the hierarchical tree. + + Examples + -------- + + >>> from river import sketch + + >>> def custom_parent_func(x, i): + ... if i < len(x): + ... return None #root value + ... return x[:i] + + >>> hhh = sketch.HierarchicalHeavyHitters(k=10, epsilon=0.001, parent_func=custom_parent_func) + + >>> for line in [1,2,21,31,34,212,3,24]: + ... hhh.update(str(line)) + + >>> print(hhh) + ge: 0, delta_e: 0, max_e: 0 + : + ge: 0, delta_e: 0, max_e: 0 + 1: + ge: 1, delta_e: 0, max_e: 0 + 2: + ge: 1, delta_e: 0, max_e: 0 + 21: + ge: 1, delta_e: 0, max_e: 0 + 212: + ge: 1, delta_e: 0, max_e: 0 + 24: + ge: 1, delta_e: 0, max_e: 0 + 3: + ge: 1, delta_e: 0, max_e: 0 + 31: + ge: 1, delta_e: 0, max_e: 0 + 34: + ge: 1, delta_e: 0, max_e: 0 + + >>> print( hhh['212']) + 1 + + >>> phi = 0.01 + >>> heavy_hitters = hhh.output(phi) + >>> print(heavy_hitters) + [('1', 1), ('212', 1), ('21', 2), ('24', 1), ('2', 4), ('31', 1), ('34', 1), ('3', 3)] + + >>> def custom_parent_func2(x, i): + ... parts = x.split('.') + ... if i >= len(parts): + ... return None + ... return '.'.join(parts[:i+1]) + + >>> hhh = sketch.HierarchicalHeavyHitters(k=10, epsilon=0.001, parent_func=custom_parent_func2) + + >>> for line in ["123.456","123.123", "456.123", "123", "123"]: + ... hhh.update(line) + + >>> print(hhh) + ge: 0, delta_e: 0, max_e: 0 + 123: + ge: 2, delta_e: 0, max_e: 0 + 123.456: + ge: 1, delta_e: 0, max_e: 0 + 123.123: + ge: 1, delta_e: 0, max_e: 0 + 456: + ge: 0, delta_e: 0, max_e: 0 + 456.123: + ge: 1, delta_e: 0, max_e: 0 + + >>> heavy_hitters = hhh.output(phi) + [('123.456', 1), ('123.123', 1), ('123', 4), ('456.123', 1)] + + References + ---------- + - [^1]: Cormode, Graham, Flip Korn, S. Muthukrishnan, and Divesh Srivastava. + "Finding hierarchical heavy hitters in streaming data." Proceedings of the 16th + ACM SIGKDD International Conference on Knowledge Discovery and Data Mining. 2010. + """ + + class Node: + """Represents a node in the hierarchical tree structure used by HHH.""" + def __init__(self): + self.ge = 0 + self.delta_e = 0 + self.max_e = 0 + self.fe = 0 + self.m_fe = 0 + self.children: typing.dict[typing.Hashable, HierarchicalHeavyHitters.Node] = {} + + def __init__(self, k: int, epsilon: float, parent_func: typing.Callable[[typing.Hashable, int], typing.Hashable] | None = None, root_value: typing.Hashable | None = None): + self.k = k + self.epsilon = epsilon + self.bucket_size = math.floor(1 / epsilon) + self.n = 0 + self.root = None if root_value is None else HierarchicalHeavyHitters.Node() + self.parent_func = parent_func if parent_func is not None else lambda x, i: None if i > len(str(x)) else str(x)[:i] + self.root_value = root_value + + def update(self, x: typing.Hashable, w: int = 1): + """Update the count for a given hierarchical key with an optional weight.""" + + self.n += 1 + if self.root is None: + self.root = HierarchicalHeavyHitters.Node() + self.root.delta_e = self.current_bucket() - 1 + self.root.max_e = self.root.delta_e + + current = self.root + parent_me = 0 + + sub_key = x + i = 0 + + while str(sub_key)!=str(self.root_value): + + sub_key = self.parent_func(x, i) + i+=1 + if str(sub_key) == str(self.root_value): + if self.root is None: + self.root = HierarchicalHeavyHitters.Node() + self.root.delta_e = self.current_bucket() - 1 + self.root.max_e = self.root.delta_e + current = self.root + + elif sub_key in current.children: + current = current.children[sub_key] + if str(sub_key) == str(x): + current.ge += w + + else: + current.children[sub_key] = HierarchicalHeavyHitters.Node() + current = current.children[sub_key] + current.delta_e = parent_me + current.max_e = parent_me + + if str(sub_key) == str(x): + current.ge += w + + parent_me = current.max_e + + self.compress() + + def current_bucket(self): + """Calculate the current bucket number based on the total updates processed.""" + return math.ceil(self.n / self.bucket_size) + + def compress(self): + """Compress the hierarchical tree by merging nodes with counts below the current bucket threshold.""" + if self.n % self.bucket_size == 0: + self._compress_node(self.root) + + def _compress_node(self, node: HierarchicalHeavyHitters.Node): + """Recursively compress nodes in the hierarchical tree.""" + if node is not None and not node.children: + return + + for child_key, child_node in list(node.children.items()): + + if not child_node.children=={} : + self._compress_node(child_node) + else: + if child_node.ge + child_node.delta_e <= self.current_bucket() - 1: + node.ge += child_node.ge + node.max_e = max (node.max_e, child_node.ge + child_node.delta_e) + del node.children[child_key] + + def output(self, phi: float) -> list[tuple[typing.Hashable, int]]: + """Generate a list of heavy hitters with frequency estimates above the given threshold.""" + result: list[tuple[typing.Hashable, int]] = [] + if self.root: + self.root.fe = 0 + self.root.m_fe = 0 + + for _, child_node in list(self.root.children.items()): + child_node.fe = 0 + child_node.m_fe = 0 + + self._output_node(self.root, phi, result) + return result + + def _output_node(self, node: HierarchicalHeavyHitters.Node, phi: float, result: list): + """Recursively generate heavy hitters from the hierarchical tree.""" + + if node is None: + return + + if not node.children: + return + for child_key, child_node in list(node.children.items()): + + if not child_node.children=={} : + self._output_node(child_node, phi, result) + + if child_node.ge + node.ge + node.delta_e >= phi * self.n: + result.append((child_key,child_node.fe + child_node.ge + child_node.delta_e)) + + else: + node.m_fe += child_node.m_fe + child_node.ge + + node.fe += child_node.fe + child_node.ge + return None + + def __getitem__(self, key: typing.Hashable) -> int: + """Get the count of a specific hierarchical key.""" + current = self.root + + if isinstance(key, str): + for i in range(len(key)): + sub_key = key[:i + 1] + + if current is None or sub_key not in current.children: + return 0 + + current = current.children[sub_key] + + if sub_key == key and current is not None: + return current.ge + else: + return 0 + + return 0 + def totals(self) -> int: + """Return the total number of elements in the hierarchical tree.""" + if self.root is not None: + total = self._count_entries(self.root) - 1 + else: + total = 0 + return total + + def _count_entries(self, node: HierarchicalHeavyHitters.Node) -> int: + """Recursively count the total number of nodes in the hierarchical tree.""" + if node is None: + return 0 + + total = 1 # Include the current node + + for child_node in node.children.values(): + total += self._count_entries(child_node) + + return total + + def __str__(self): + """Return a string representation of the hierarchical tree.""" + if self.root is None: + return "None" + return self._print_node(self.root, 0) + + def _print_node(self, node: HierarchicalHeavyHitters.Node, level: int) -> str: + """Recursively generate a string representation of the hierarchical tree.""" + indent = ' ' * 4 + result = '' + result += f"{indent * level}ge: {node.ge}, delta_e: {node.delta_e}, max_e: {node.max_e}\n" + for child_key, child_node in node.children.items(): + result += f"{indent * level}{child_key }: \n" + result += self._print_node(child_node, level + 1) + return result diff --git a/river/sketch/hyper_log_log.py b/river/sketch/hyper_log_log.py new file mode 100644 index 0000000000..27c93c1e81 --- /dev/null +++ b/river/sketch/hyper_log_log.py @@ -0,0 +1,146 @@ +from __future__ import annotations + +import math +import typing + +from river import base + + +class HyperLogLog(base.Base): + + """HyperLogLog algorithm for cardinality estimation.[^1][^2] + + The LogLog algorithm is designed to estimate cardinality of a data set with the aid + of m bytes of auxiliary memory, known as registers. + + Firstly, each element in the data set is hashed into a binary string, ensuring data is + uniformly distributed and simulating random distribution. The algorithm hashes each element + into a binary string and then organizes these binary representations into registers. + + HyperLogLog, represents an improvement over the original LogLog algorithm by utilizing a + technique called harmonic mean to estimate the cardinality. + + Parameters + ---------- + b : int + The precision parameter which determines the number of registers used (m = 2^b). + Higher values of b provide more accurate estimates but use more memory. + + Attributes + ---------- + m : int + The number of registers (2^b). + alpha : float + A constant used in the cardinality estimation formula, which depends on m. + registers : list of int + A list of size m to store the maximum number of leading zeroes observed in the hash values. + + Methods + ------- + update(x) + Update the registers with the given element. + count() -> int + Estimate the number of distinct elements. + __len__() -> int + Return the estimated number of distinct elements. + get_alpha(m) -> float + Compute the bias correction constant alpha based on the number of registers. + left_most_one(w) -> int + Find the position of the left-most 1-bit in the binary representation of a number. + + + Examples + -------- + + >>> from river import sketch + + >>> hyperloglog = sketch.HyperLogLog(b=15) + + >>> for i in range(100): + ... hyperloglog.update(i) + + >>> print(hyperloglog.count()) + 100 + + >>> hyperloglog = HyperLogLog(b=15) + + >>> for i in range(100): + ... hyperloglog.update(i%10) + + >>> print(hyperloglog.count()) + 10 + + References + ---------- + + - [^1]: Marianne Durand and Philippe Flajolet. + Loglog counting of large cardinalities (extended abstract). + Algorithms Project, INRIA–Rocquencourt, 2003. + - [^2]: Philippe Flajolet, ́Eric Fusy, Olivier Gandouet, and Fr ́ed ́eric Meunier. + Hyperloglog: the analysis of a near-optimal cardinality estimation algorithm. + Algorithms Project, IN-RIA–Rocquencourt. + + """ + + def __init__(self, b: int): + self.b = b + self.m = 2 ** b + self.alpha = self.get_alpha(self.m) + self.registers = [0] * self.m + + @staticmethod + def get_alpha(m: int) -> float: + """ + Compute the bias correction constant alpha based on the number of registers. + This constant improves the accuracy of the cardinality estimation. + """ + if m == 16: + return 0.673 + if m == 32: + return 0.697 + if m == 64: + return 0.709 + return 0.7213 / (1 + 1.079 / m) + + @staticmethod + def left_most_one(w: int) -> int: + """ + Find the position of the left-most 1-bit in the binary representation of a number. + This helps determine the rank of the hash value. + """ + return len(bin(w)) - bin(w).rfind('1') - 1 + + def update(self, x: typing.Hashable): + """ + Update the registers with the given element. + The element is hashed, and the hash value is used to update the appropriate register. + """ + hash_val = hash(x) + j = hash_val & (self.m - 1) + w = hash_val >> self.b + self.registers[j] = max(self.registers[j], self.left_most_one(w)) + + + def count(self) -> int: + """ + Estimate the number of distinct elements. + This method uses the harmonic mean of the registers to provide an estimate. + """ + est = self.alpha * self.m ** 2 / sum(2 ** (-reg) for reg in self.registers) + + if est <= 5 / 2 * self.m: + v = self.registers.count(0) + if v != 0: + return round(self.m * math.log(self.m / v)) + elif est <= 1 / 30 * 2 ** 32: + return round(est) + else: + return round(-2 ** 32 * math.log(1 - est / 2 ** 32)) + return 0 + + def __len__(self) -> int: + """ + Return the estimated number of distinct elements. + This method simply calls the count method. + """ + return self.count() diff --git a/river/sketch/space_saving.py b/river/sketch/space_saving.py new file mode 100644 index 0000000000..b348c1b2ce --- /dev/null +++ b/river/sketch/space_saving.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +import typing + +from river import base + + +class SpaceSaving(base.Base): + """Space-Saving algorithm for finding heavy hitters.[^1] + + The Space-Saving algorithm is designed to find the heavy hitters in a data stream using a + hash map with a fixed amount of memory. It keeps track of the k most frequent items at any + given time, as well as their corresponding approximate frequency. + + Upon receiving a new item from the data stream, if it corresponds to a monitored element, + the algorithm increments its counter. Conversely, if the received element does not match + any monitored element, the algorithm finds the tuple with the smallest counter value and + replaces its element with the new element, incrementing its counter. + + Parameters + ---------- + k + The maximum number of heavy hitters to store. The higher the value of k, the higher the + accuracy of the algorithm. + + Attributes + ---------- + counts : dict + A dictionary to store the counts of items. The keys correspond to the elements and the + values to their respective count. + + Methods + ------- + update(x, w=1) + Update the counts with the given element and weight. + __getitem__(x) -> int + Get the count of the given element. + __len__() -> int + Return the number of elements stored. + total() -> int + Return the total count. + heavy_hitters() -> int + Return the heavy hitters stored. + + Examples + -------- + >>> from river import sketch + + >>> spacesaving = sketch.SpaceSaving(k=10) + + >>> for i in range(100): + ... spacesaving.update(i % 10) + + >>> print(len(spacesaving)) + 10 + >>> print(spacesaving.total()) + 100 + >>> print(spacesaving.heavy_hitters) + {0: 10, 1: 10, 2: 10, 3: 10, 4: 10, 5: 10, 6: 10, 7: 10, 8: 10, 9: 10} + >>> print(spacesaving[10]) + 10 + + + References + ---------- + - [^1]: Cormode, G., & Hadjieleftheriou, M. (2008). + Finding Frequent Items in Data Streams. AT&T Labs–Research, Florham Park, NJ. + """ + + def __init__(self, k: int): + self.k = k + self.counts: dict[typing.Hashable, int] = {} + + def update(self, x: typing.Hashable, w: int = 1): + """Update the counts with the given element.""" + + if x in self.counts: + self.counts[x] += w + elif len(self.counts) >= self.k: + min_count_key = min(self.counts, key=lambda k: self.counts[k]) + self.counts[x] = self.counts.pop(min_count_key, 0) + 1 + else: + self.counts[x] = w + + def __getitem__(self, x: typing.Hashable) -> int: + """Get the count of the given element.""" + return self.counts.get(x, 0) + + def __len__(self) -> int: + """Return the number of elements stored.""" + return len(self.counts) + + def total(self) -> int: + """Return the total count.""" + return sum(self.counts.values()) + + @property + def heavy_hitters(self) -> dict[typing.Hashable, int]: + """Return the heavy hitters stored.""" + return self.counts