From 44952da245dcbbf38c40273ad4daba9059602843 Mon Sep 17 00:00:00 2001 From: Fanis Tharropoulos Date: Wed, 21 May 2025 15:41:36 +0300 Subject: [PATCH 1/2] feat(node): add round-robin host shuffling capability - add `round_robin_hosts` config option to `ConfigDict` typed dict - implement random shuffling of healthy nodes in `NodeManager.get_node()` - add test to verify shuffling behavior with round-robin enabled --- src/typesense/configuration.py | 5 +++ src/typesense/node_manager.py | 73 +++++++++++++++++++++++++++++++--- tests/api_call_test.py | 22 ++++++++++ 3 files changed, 94 insertions(+), 6 deletions(-) diff --git a/src/typesense/configuration.py b/src/typesense/configuration.py index d59ac5e..7f5d044 100644 --- a/src/typesense/configuration.py +++ b/src/typesense/configuration.py @@ -80,6 +80,8 @@ class ConfigDict(typing.TypedDict): dictionaries or URLs that represent the read replica nodes. connection_timeout_seconds (float): The connection timeout in seconds. + + round_robin_hosts (bool): Whether or not to shuffle hosts between requests """ nodes: typing.List[typing.Union[str, NodeConfigDict]] @@ -96,6 +98,7 @@ class ConfigDict(typing.TypedDict): typing.List[typing.Union[str, NodeConfigDict]] ] # deprecated connection_timeout_seconds: typing.NotRequired[float] + round_robin_hosts: typing.NotRequired[bool] class Node: @@ -184,6 +187,7 @@ class Configuration: retry_interval_seconds (float): The interval in seconds between retries. healthcheck_interval_seconds (int): The interval in seconds between health checks. verify (bool): Whether to verify the SSL certificate. + round_robin_hosts (bool): Whether or not to shuffle hosts between requests """ def __init__( @@ -219,6 +223,7 @@ def __init__( 60, ) self.verify = config_dict.get("verify", True) + self.round_robin_hosts = config_dict.get("round_robin_hosts", False) self.additional_headers = config_dict.get("additional_headers", {}) def _handle_nearest_node( diff --git a/src/typesense/node_manager.py b/src/typesense/node_manager.py index e671c8d..f071418 100644 --- a/src/typesense/node_manager.py +++ b/src/typesense/node_manager.py @@ -31,6 +31,12 @@ import copy import time +import random +import sys +if sys.version_info >= (3, 11): + import typing +else: + import typing_extensions as typing from typesense.configuration import Configuration, Node from typesense.logger import logger @@ -71,12 +77,68 @@ def get_node(self) -> Node: Returns: Node: The selected node for the next operation. """ - if self.config.nearest_node: - if self.config.nearest_node.healthy or self._is_due_for_health_check( - self.config.nearest_node, - ): - return self.config.nearest_node + if self._should_use_nearest_node(): + return self.config.nearest_node + + healthy_nodes = self._get_healthy_nodes() + + if not healthy_nodes: + logger.debug("No healthy nodes were found. Returning the next node.") + return self.nodes[self.node_index] + + if self.config.round_robin_hosts: + return self._get_shuffled_node(healthy_nodes) + + return self._get_next_round_robin_node() + + def _should_use_nearest_node(self) -> bool: + """ + Check if we should use the nearest node. + + Returns: + bool: True if nearest node should be used, False otherwise. + """ + return bool( + self.config.nearest_node + and ( + self.config.nearest_node.healthy + or self._is_due_for_health_check(self.config.nearest_node) + ) + ) + + def _get_healthy_nodes(self) -> typing.List[Node]: + """ + Get a list of all healthy nodes. + + Returns: + List[Node]: List of healthy nodes. + """ + return [ + node for node in self.nodes + if node.healthy or self._is_due_for_health_check(node) + ] + + def _get_shuffled_node(self, healthy_nodes: typing.List[Node]) -> Node: + """ + Get a randomly shuffled node from the list of healthy nodes. + Args: + healthy_nodes (List[Node]): List of healthy nodes to choose from. + + Returns: + Node: A randomly selected healthy node. + """ + random.shuffle(healthy_nodes) + self.node_index = (self.node_index + 1) % len(self.nodes) + return healthy_nodes[0] + + def _get_next_round_robin_node(self) -> Node: + """ + Get the next node using standard round-robin selection. + + Returns: + Node: The next node in the round-robin sequence. + """ node_index = 0 while node_index < len(self.nodes): node_index += 1 @@ -85,7 +147,6 @@ def get_node(self) -> Node: if node.healthy or self._is_due_for_health_check(node): return node - logger.debug("No healthy nodes were found. Returning the next node.") return self.nodes[self.node_index] def set_node_health(self, node: Node, is_healthy: bool) -> None: diff --git a/tests/api_call_test.py b/tests/api_call_test.py index 1d5fa11..15c8f3b 100644 --- a/tests/api_call_test.py +++ b/tests/api_call_test.py @@ -80,6 +80,28 @@ def test_get_node_round_robin_selection( assert_match_object(node3, fake_api_call.config.nodes[2]) +def test_get_node_round_robin_shuffle( + fake_api_call: ApiCall, + mocker: MockerFixture, +) -> None: + """Test that it shuffles healthy nodes when round_robin_hosts is true.""" + fake_api_call.config.nearest_node = None + fake_api_call.config.round_robin_hosts = True + mocker.patch("time.time", return_value=100) + + shuffle_mock = mocker.patch("random.shuffle") + + for _ in range(3): + fake_api_call.node_manager.get_node() + + assert shuffle_mock.call_count == 3 + + for call in shuffle_mock.call_args_list: + args = call[0][0] + assert isinstance(args, list) + assert all(node.healthy for node in args) + + def test_get_exception() -> None: """Test that it correctly returns the exception class for a given status code.""" assert RequestHandler._get_exception(0) == exceptions.HTTPStatus0Error From 671a891ab368a1104748c53f105e447d278177dd Mon Sep 17 00:00:00 2001 From: Fanis Tharropoulos Date: Wed, 21 May 2025 15:46:47 +0300 Subject: [PATCH 2/2] chore: lint --- src/typesense/node_manager.py | 90 ++++++++++++++++++----------------- 1 file changed, 46 insertions(+), 44 deletions(-) diff --git a/src/typesense/node_manager.py b/src/typesense/node_manager.py index f071418..f660aa4 100644 --- a/src/typesense/node_manager.py +++ b/src/typesense/node_manager.py @@ -30,9 +30,10 @@ """ import copy -import time import random import sys +import time + if sys.version_info >= (3, 11): import typing else: @@ -81,16 +82,55 @@ def get_node(self) -> Node: return self.config.nearest_node healthy_nodes = self._get_healthy_nodes() - + if not healthy_nodes: logger.debug("No healthy nodes were found. Returning the next node.") return self.nodes[self.node_index] if self.config.round_robin_hosts: return self._get_shuffled_node(healthy_nodes) - + return self._get_next_round_robin_node() + def set_node_health(self, node: Node, is_healthy: bool) -> None: + """ + Set the health status of a node and update its last access timestamp. + + Args: + node (Node): The node to update. + is_healthy (bool): The health status to set for the node. + """ + node.healthy = is_healthy + node.last_access_ts = int(time.time()) + + def _is_due_for_health_check(self, node: Node) -> bool: + """ + Check if a node is due for a health check based on the configured interval. + + Args: + node (Node): The node to check. + + Returns: + bool: True if the node is due for a health check, False otherwise. + """ + current_epoch_ts = int(time.time()) + return bool( + (current_epoch_ts - node.last_access_ts) + > self.config.healthcheck_interval_seconds, + ) + + def _initialize_nodes(self) -> None: + """ + Initialize all nodes as healthy. + + This method sets the initial health status of all nodes, including the nearest node + if configured, to healthy. + """ + if self.config.nearest_node: + self.set_node_health(self.config.nearest_node, is_healthy=True) + for node in self.nodes: + self.set_node_health(node, is_healthy=True) + def _should_use_nearest_node(self) -> bool: """ Check if we should use the nearest node. @@ -103,7 +143,7 @@ def _should_use_nearest_node(self) -> bool: and ( self.config.nearest_node.healthy or self._is_due_for_health_check(self.config.nearest_node) - ) + ), ) def _get_healthy_nodes(self) -> typing.List[Node]: @@ -114,7 +154,8 @@ def _get_healthy_nodes(self) -> typing.List[Node]: List[Node]: List of healthy nodes. """ return [ - node for node in self.nodes + node + for node in self.nodes if node.healthy or self._is_due_for_health_check(node) ] @@ -148,42 +189,3 @@ def _get_next_round_robin_node(self) -> Node: return node return self.nodes[self.node_index] - - def set_node_health(self, node: Node, is_healthy: bool) -> None: - """ - Set the health status of a node and update its last access timestamp. - - Args: - node (Node): The node to update. - is_healthy (bool): The health status to set for the node. - """ - node.healthy = is_healthy - node.last_access_ts = int(time.time()) - - def _is_due_for_health_check(self, node: Node) -> bool: - """ - Check if a node is due for a health check based on the configured interval. - - Args: - node (Node): The node to check. - - Returns: - bool: True if the node is due for a health check, False otherwise. - """ - current_epoch_ts = int(time.time()) - return bool( - (current_epoch_ts - node.last_access_ts) - > self.config.healthcheck_interval_seconds, - ) - - def _initialize_nodes(self) -> None: - """ - Initialize all nodes as healthy. - - This method sets the initial health status of all nodes, including the nearest node - if configured, to healthy. - """ - if self.config.nearest_node: - self.set_node_health(self.config.nearest_node, is_healthy=True) - for node in self.nodes: - self.set_node_health(node, is_healthy=True)