Skip to content

Commit

Permalink
fix(ws): Slow empty address verification on address subscription
Browse files Browse the repository at this point in the history
  • Loading branch information
msbrogli committed Jan 23, 2025
1 parent cb26fa9 commit a3bf50c
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 6 deletions.
25 changes: 19 additions & 6 deletions hathor/websocket/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,20 +333,27 @@ def subscribe_address(self, connection: HathorAdminWebsocketProtocol, address: s
if self.max_subs_addrs_conn is not None and len(subs) >= self.max_subs_addrs_conn:
return False, f'Reached maximum number of subscribed addresses ({self.max_subs_addrs_conn}).'

elif self.max_subs_addrs_empty is not None and (
self.address_index and _count_empty(subs, self.address_index) >= self.max_subs_addrs_empty
):
empty_addresses: set[str] = connection.empty_addresses
if (
self.max_subs_addrs_empty is not None
and self.address_index
and len(empty_addresses) >= self.max_subs_addrs_empty
and self._update_and_count_empty(empty_addresses) >= self.max_subs_addrs_empty
):
return False, f'Reached maximum number of subscribed empty addresses ({self.max_subs_addrs_empty}).'

self.address_connections[address].add(connection)
connection.subscribed_to.add(address)
if self.address_index and self.address_index.is_address_empty(address):
connection.empty_addresses.add(address)
return True, ''

def _handle_unsubscribe_address(self, connection: HathorAdminWebsocketProtocol, message: dict[Any, Any]) -> None:
""" Handler for unsubscribing from an address, also removes address connection set if it ends up empty."""
addr = message['address']
if addr in self.address_connections and connection in self.address_connections[addr]:
connection.subscribed_to.remove(addr)
connection.empty_addresses.discard(addr)
self._remove_connection_from_address_dict(connection, addr)
# Reply back to the client
payload = json_dumpb({'type': 'unsubscribe_address', 'success': True})
Expand All @@ -372,7 +379,13 @@ def on_client_close(self, connection: HathorAdminWebsocketProtocol) -> None:
for address in connection.subscribed_to:
self._remove_connection_from_address_dict(connection, address)

def _update_and_count_empty(self, empty_addresses: set[str]) -> int:
"""Removes non-empty addresses from the given set and returns the updated count."""
assert self.address_index is not None
to_remove = set()
for addr in empty_addresses:
if not self.address_index.is_address_empty(addr):
to_remove.add(addr)

def _count_empty(addresses: set[str], address_index: AddressIndex) -> int:
""" Count how many of the addresses given are empty (have no outputs)."""
return sum(1 for addr in addresses if address_index.is_address_empty(addr))
empty_addresses.difference_update(to_remove)
return len(empty_addresses)
1 change: 1 addition & 0 deletions hathor/websocket/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ def __init__(self,
super().__init__()

self.subscribed_to: set[str] = set()
self.empty_addresses: set[str] = set()

# Enable/disable history streaming for this connection.
self.is_history_streaming_enabled = is_history_streaming_enabled
Expand Down

0 comments on commit a3bf50c

Please sign in to comment.