From f5f38ff23e89e924f2828c7675f427e9c985a291 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 22 May 2025 16:41:36 +0200 Subject: [PATCH 1/4] Add proper mock support for SubstrateInterface and AsyncSubstrateInterface --- async_substrate_interface/async_substrate.py | 22 +++++++++------ async_substrate_interface/sync_substrate.py | 11 ++++++-- tests/unit_tests/test_mock.py | 29 ++++++++++++++++++++ 3 files changed, 51 insertions(+), 11 deletions(-) create mode 100644 tests/unit_tests/test_mock.py diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 94bda13..312fb30 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -9,6 +9,7 @@ import logging import ssl import time +from unittest.mock import AsyncMock from hashlib import blake2b from typing import ( Optional, @@ -697,13 +698,16 @@ def __init__( self.chain_endpoint = url self.url = url self._chain = chain_name - self.ws = Websocket( - url, - options={ - "max_size": self.ws_max_size, - "write_limit": 2**16, - }, - ) + if not _mock: + self.ws = Websocket( + url, + options={ + "max_size": self.ws_max_size, + "write_limit": 2**16, + }, + ) + else: + self.ws = AsyncMock(spec=Websocket) self._lock = asyncio.Lock() self.config = { "use_remote_preset": use_remote_preset, @@ -726,9 +730,11 @@ def __init__( self._initializing = False self.registry_type_map = {} self.type_id_to_name = {} + self._mock = _mock async def __aenter__(self): - await self.initialize() + if not self._mock: + await self.initialize() return self async def initialize(self): diff --git a/async_substrate_interface/sync_substrate.py b/async_substrate_interface/sync_substrate.py index 4c91fd2..dc8d178 100644 --- a/async_substrate_interface/sync_substrate.py +++ b/async_substrate_interface/sync_substrate.py @@ -3,6 +3,7 @@ import socket from hashlib import blake2b from typing import Optional, Union, Callable, Any +from unittest.mock import MagicMock from bt_decode import MetadataV15, PortableRegistry, decode as decode_by_type_string from scalecodec import ( @@ -13,7 +14,7 @@ MultiAccountId, ) from scalecodec.base import RuntimeConfigurationObject, ScaleBytes, ScaleType -from websockets.sync.client import connect +from websockets.sync.client import connect, ClientConnection from websockets.exceptions import ConnectionClosed from async_substrate_interface.const import SS58_FORMAT @@ -522,14 +523,18 @@ def __init__( ) self.metadata_version_hex = "0x0f000000" # v15 self.reload_type_registry() - self.ws = self.connect(init=True) self.registry_type_map = {} self.type_id_to_name = {} + self._mock = _mock if not _mock: + self.ws = self.connect(init=True) self.initialize() + else: + self.ws = MagicMock(spec=ClientConnection) def __enter__(self): - self.initialize() + if not self._mock: + self.initialize() return self def __del__(self): diff --git a/tests/unit_tests/test_mock.py b/tests/unit_tests/test_mock.py new file mode 100644 index 0000000..81a1f50 --- /dev/null +++ b/tests/unit_tests/test_mock.py @@ -0,0 +1,29 @@ +from websockets.exceptions import InvalidURI +import pytest + +from async_substrate_interface import AsyncSubstrateInterface, SubstrateInterface + + +@pytest.mark.asyncio +async def test_async_mock(): + ssi = AsyncSubstrateInterface("notreal") + assert isinstance(ssi, AsyncSubstrateInterface) + with pytest.raises(InvalidURI): + await ssi.initialize() + async with AsyncSubstrateInterface("notreal", _mock=True) as ssi: + assert isinstance(ssi, AsyncSubstrateInterface) + ssi = AsyncSubstrateInterface("notreal", _mock=True) + async with ssi: + pass + + +def test_sync_mock(): + with pytest.raises(InvalidURI): + SubstrateInterface("notreal") + ssi = SubstrateInterface("notreal", _mock=True) + assert isinstance(ssi, SubstrateInterface) + with pytest.raises(InvalidURI): + with SubstrateInterface("notreal") as ssi: + pass + with SubstrateInterface("notreal", _mock=True) as ssi: + assert isinstance(ssi, SubstrateInterface) From f9c6228d87ed95d35f61e364611e0714d30a13a3 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 22 May 2025 17:12:00 +0200 Subject: [PATCH 2/4] =?UTF-8?q?We=20occasionally=20get=20incorrect=20timeo?= =?UTF-8?q?uts=20with=20AsyncSubstrateInterface,=20and=20this=20is=20large?= =?UTF-8?q?ly=20due=20to=20the=20calculation=20of=20time=20=E2=80=94=20`lo?= =?UTF-8?q?op.time()`=20is=20not=20necessarily=20`time.time()`?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- async_substrate_interface/async_substrate.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index 94bda13..a4c6847 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -531,13 +531,21 @@ def __init__( self._open_subscriptions = 0 self._options = options if options else {} self.last_received = time.time() + self.last_sent = time.time() async def __aenter__(self): async with self._lock: self._in_use += 1 await self.connect() + now = asyncio.get_running_loop().time() + self.last_received = now + self.last_sent = now return self + @staticmethod + async def loop_time() -> float: + return asyncio.get_running_loop().time() + async def connect(self, force=False): if self._exit_task: self._exit_task.cancel() @@ -594,7 +602,7 @@ async def _recv(self) -> None: try: # TODO consider wrapping this in asyncio.wait_for and use that for the timeout logic response = json.loads(await self.ws.recv(decode=False)) - self.last_received = time.time() + self.last_received = await self.loop_time() async with self._lock: # note that these 'subscriptions' are all waiting sent messages which have not received # responses, and thus are not the same as RPC 'subscriptions', which are unique @@ -630,12 +638,12 @@ async def send(self, payload: dict) -> int: Returns: id: the internal ID of the request (incremented int) """ - # async with self._lock: original_id = get_next_id() # self._open_subscriptions += 1 await self.max_subscriptions.acquire() try: await self.ws.send(json.dumps({**payload, **{"id": original_id}})) + self.last_sent = await self.loop_time() return original_id except (ConnectionClosed, ssl.SSLError, EOFError): async with self._lock: @@ -2120,7 +2128,11 @@ async def _make_rpc_request( if request_manager.is_complete: break - if time.time() - self.ws.last_received >= self.retry_timeout: + if ( + (current_time := await self.ws.loop_time()) - self.ws.last_received + >= self.retry_timeout + and current_time - self.ws.last_sent >= self.retry_timeout + ): if attempt >= self.max_retries: logger.warning( f"Timed out waiting for RPC requests {attempt} times. Exiting." From 11461fffa93c72d5169377caa46ce156c37245b0 Mon Sep 17 00:00:00 2001 From: Benjamin Himes Date: Thu, 22 May 2025 18:44:50 +0200 Subject: [PATCH 3/4] Ensure everything uses loop time rather than time.time --- async_substrate_interface/async_substrate.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/async_substrate_interface/async_substrate.py b/async_substrate_interface/async_substrate.py index a4c6847..f991ea0 100644 --- a/async_substrate_interface/async_substrate.py +++ b/async_substrate_interface/async_substrate.py @@ -9,6 +9,7 @@ import logging import ssl import time +import warnings from hashlib import blake2b from typing import ( Optional, @@ -530,16 +531,21 @@ def __init__( self._exit_task = None self._open_subscriptions = 0 self._options = options if options else {} - self.last_received = time.time() - self.last_sent = time.time() + try: + now = asyncio.get_running_loop().time() + except RuntimeError: + warnings.warn( + "You are instantiating the AsyncSubstrateInterface Websocket outside of an event loop. " + "Verify this is intended." + ) + now = asyncio.new_event_loop().time() + self.last_received = now + self.last_sent = now async def __aenter__(self): async with self._lock: self._in_use += 1 await self.connect() - now = asyncio.get_running_loop().time() - self.last_received = now - self.last_sent = now return self @staticmethod @@ -547,6 +553,9 @@ async def loop_time() -> float: return asyncio.get_running_loop().time() async def connect(self, force=False): + now = await self.loop_time() + self.last_received = now + self.last_sent = now if self._exit_task: self._exit_task.cancel() if not self._initialized or force: From 8b2a1041a32197e8ee54fb5589cfb3b746f92ecd Mon Sep 17 00:00:00 2001 From: ibraheem-opentensor Date: Thu, 22 May 2025 12:26:38 -0700 Subject: [PATCH 4/4] 1.2.2: updates changelog --- CHANGELOG.md | 8 ++++++++ pyproject.toml | 2 +- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index d96a918..80e0e80 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,13 @@ # Changelog +## 1.2.1 /2025-05-22 + +## What's Changed +* Add proper mock support by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/123 +* Handle Incorrect Timeouts by @thewhaleking in https://github.com/opentensor/async-substrate-interface/pull/124 + +**Full Changelog**: https://github.com/opentensor/async-substrate-interface/compare/v1.2.1...v1.2.2 + ## 1.2.1 /2025-05-12 ## What's Changed diff --git a/pyproject.toml b/pyproject.toml index 769da94..2f65c1c 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "async-substrate-interface" -version = "1.2.1" +version = "1.2.2" description = "Asyncio library for interacting with substrate. Mostly API-compatible with py-substrate-interface" readme = "README.md" license = { file = "LICENSE" }