Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AsyncSubstrateInterface Overhaul (with Sync AsyncSubstrate) #2526

Draft
wants to merge 61 commits into
base: staging
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
61 commits
Select commit Hold shift + click to select a range
cd760a8
[WIP] wrapper around AsyncSubstrateInterface to be able to be used in…
thewhaleking Dec 6, 2024
8cbcf20
[WIP] working on updating tests and reconnection logic.
thewhaleking Dec 6, 2024
529410b
Removed the `.value`s everywhere as we no longer get SCALE objects fr…
thewhaleking Dec 6, 2024
41820dd
Further test improvements.
thewhaleking Dec 6, 2024
e6d298e
Added garbage collection method for closing the websocket connection.
thewhaleking Dec 6, 2024
38b1f26
Neuron certificate fix
thewhaleking Dec 6, 2024
998f7d1
Commit weights fix (no longer SCALE objects).
thewhaleking Dec 6, 2024
158f300
Catch SSL errors during WS send/receive
thewhaleking Dec 8, 2024
3426b92
Caching block hash retrieval
thewhaleking Dec 9, 2024
4de5166
Improved asynchronous websocket item retrieval. Added comments/docs.
thewhaleking Dec 9, 2024
dcbaa00
Remove all ensure_connected references.
thewhaleking Dec 9, 2024
49bd785
Handle disconnect/reconnects
thewhaleking Dec 9, 2024
f820829
Some tests fixed
thewhaleking Dec 9, 2024
541f2b8
rest of tests fixed
thewhaleking Dec 10, 2024
57e01da
Add metagraph save dir location specifying
thewhaleking Dec 10, 2024
0d373ad
Removed everything about the submit_extrinsic timeouts, as timeouts/r…
thewhaleking Dec 10, 2024
13db162
lint
thewhaleking Dec 10, 2024
ded1eb5
Optional args should have default values
thewhaleking Dec 10, 2024
1faf12d
Mypy, docstrings, cleanup comments.
thewhaleking Dec 10, 2024
8acf7b8
Integration test progress
thewhaleking Dec 10, 2024
a8d5d17
Integration tests fixed.
thewhaleking Dec 10, 2024
0c99fa7
Unit tests fixed.
thewhaleking Dec 10, 2024
79dfd3f
Clean up
thewhaleking Dec 10, 2024
a354fad
Mypy
thewhaleking Dec 10, 2024
36084ce
Removed unused tests.
thewhaleking Dec 10, 2024
d541947
Adding more methods.
thewhaleking Dec 11, 2024
bd8b724
async property handling in querymapresult
thewhaleking Dec 11, 2024
5726d53
Adjust sleep time
thewhaleking Dec 12, 2024
c558ccd
[WIP]
thewhaleking Dec 12, 2024
2870c78
WIP
thewhaleking Dec 12, 2024
2bf3871
WIP
thewhaleking Dec 12, 2024
374bdae
WIP
thewhaleking Dec 12, 2024
6fcf414
Merge staging
thewhaleking Dec 13, 2024
34495cd
Merge staging
thewhaleking Dec 13, 2024
41359c5
More porting.
thewhaleking Dec 13, 2024
34acf1d
More porting.
thewhaleking Dec 13, 2024
526b60a
More porting.
thewhaleking Dec 13, 2024
83748f0
More porting.
thewhaleking Dec 13, 2024
67086ac
Removed async-property
thewhaleking Dec 13, 2024
cb43133
Use cache for get_block_hash, port a few methods, start adding block …
thewhaleking Dec 13, 2024
caa190e
WIP
thewhaleking Dec 13, 2024
dbc6121
Added `block` arg to all asyncsubtensor commands that accept `block_h…
thewhaleking Dec 13, 2024
e7f7fa7
Line length docstring fixes
thewhaleking Dec 13, 2024
c98f77c
More porting.
thewhaleking Dec 13, 2024
ed5b664
More porting.
thewhaleking Dec 16, 2024
076ed3a
Use our own errors
thewhaleking Dec 16, 2024
e4a6170
Remove process_events() calls if they precede `is_success` (which pro…
thewhaleking Dec 16, 2024
14e08b0
More porting.
thewhaleking Dec 16, 2024
012fff5
Porting.
thewhaleking Dec 16, 2024
ab7e6a1
Porting.
thewhaleking Dec 16, 2024
c9d66da
Porting
thewhaleking Dec 16, 2024
c1d58aa
Renaming.
thewhaleking Dec 16, 2024
075f52a
Porting.
thewhaleking Dec 17, 2024
4061aa8
Experimental
thewhaleking Dec 17, 2024
b0c38f2
Deprecate async from sync and add new methods for dendrite.
thewhaleking Dec 17, 2024
6a4a453
Substrate
thewhaleking Dec 17, 2024
93ce167
WIP
thewhaleking Dec 17, 2024
fd0f2a8
tests
thewhaleking Dec 17, 2024
da4420a
Catch all exceptions from ast.literal_eval
thewhaleking Dec 18, 2024
a9aff16
Subclass
thewhaleking Dec 18, 2024
9d91aed
Deprecation date.
thewhaleking Dec 18, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,447 changes: 2,447 additions & 0 deletions bittensor/core/_old_subtensor.py

Large diffs are not rendered by default.

1,536 changes: 1,269 additions & 267 deletions bittensor/core/async_subtensor.py

Large diffs are not rendered by default.

6 changes: 3 additions & 3 deletions bittensor/core/axon.py
Original file line number Diff line number Diff line change
Expand Up @@ -1396,10 +1396,10 @@ async def submit_task(
Returns:
tuple: A tuple containing the priority value and the result of the priority function execution.
"""
loop = asyncio.get_event_loop()
loop = asyncio.get_running_loop()
future = loop.run_in_executor(executor, lambda: priority)
result = await future
return priority, result
result_ = await future
return priority, result_

# If a priority function exists for the request's name
if priority_fn:
Expand Down
27 changes: 14 additions & 13 deletions bittensor/core/chain_data/proposal_vote_data.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,22 @@
from typing import TypedDict
from bittensor.core.chain_data.utils import decode_account_id


# Senate / Proposal data
class ProposalVoteData(TypedDict):
"""
This TypedDict represents the data structure for a proposal vote in the Senate.

Attributes:
index (int): The index of the proposal.
threshold (int): The threshold required for the proposal to pass.
ayes (List[str]): List of senators who voted 'aye'.
nays (List[str]): List of senators who voted 'nay'.
end (int): The ending timestamp of the voting period.
"""

class ProposalVoteData:
index: int
threshold: int
ayes: list[str]
nays: list[str]
end: int

def __init__(self, proposal_dict: dict) -> None:
self.index = proposal_dict["index"]
self.threshold = proposal_dict["threshold"]
self.ayes = self.decode_ss58_tuples(proposal_dict["ayes"])
self.nays = self.decode_ss58_tuples(proposal_dict["nays"])
self.end = proposal_dict["end"]

@staticmethod
def decode_ss58_tuples(line: tuple):
"""Decodes a tuple of ss58 addresses formatted as bytes tuples."""
return [decode_account_id(line[x][0]) for x in range(len(line))]
27 changes: 27 additions & 0 deletions bittensor/core/dendrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import asyncio
import time
import uuid
import warnings
from typing import Any, AsyncGenerator, Optional, Union, Type

import aiohttp
Expand Down Expand Up @@ -47,6 +48,14 @@
DENDRITE_DEFAULT_ERROR = ("422", "Failed to parse response")


def event_loop_is_running():
try:
asyncio.get_running_loop()
return True
except RuntimeError:
return False


class DendriteMixin:
"""
The Dendrite class represents the abstracted implementation of a network client module.
Expand Down Expand Up @@ -174,6 +183,12 @@ def close_session(self):
When finished with dendrite in a synchronous context
:func:`dendrite_instance.close_session()`.
"""
if event_loop_is_running():
warnings.warn(
"You are calling this from an already-running event loop. "
"You should instead use `Dendrite.aclose_session`. This will not work within a coroutine in version 9.0",
category=DeprecationWarning,
)
if self._session:
loop = asyncio.get_event_loop()
loop.run_until_complete(self._session.close())
Expand Down Expand Up @@ -329,6 +344,12 @@ def _log_incoming_response(self, synapse: "Synapse"):
f"dendrite | <-- | {synapse.get_total_size()} B | {synapse.name} | {synapse.axon.hotkey} | {synapse.axon.ip}:{str(synapse.axon.port)} | {synapse.dendrite.status_code} | {synapse.dendrite.status_message}"
)

async def aquery(self, *args, **kwargs):
result = await self.forward(*args, **kwargs)
if self._session:
await self._session.close()
return result

def query(
self, *args, **kwargs
) -> list[Union["AsyncGenerator[Any, Any]", "Synapse", "StreamingSynapse"]]:
Expand All @@ -345,6 +366,12 @@ def query(
Returns:
Union[bittensor.core.synapse.Synapse, list[bittensor.core.synapse.Synapse]]: If a single target axon is provided, returns the response from that axon. If multiple target axons are provided, returns a list of responses from all target axons.
"""
if event_loop_is_running():
warnings.warn(
"You are calling this from an already-running event loop. "
"You should instead use `Dendrite.aquery`. This will not work within a coroutine in version 9.0",
category=DeprecationWarning,
)
result = None
try:
loop = asyncio.get_event_loop()
Expand Down
14 changes: 12 additions & 2 deletions bittensor/core/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,22 @@

from typing import Optional, TYPE_CHECKING

from substrateinterface.exceptions import SubstrateRequestException

if TYPE_CHECKING:
from bittensor.core.synapse import Synapse


class SubstrateRequestException(Exception):
pass


class BlockNotFound(Exception):
pass


class ExtrinsicNotFound(Exception):
pass


class ChainError(SubstrateRequestException):
"""Base error for any chain related errors."""

Expand Down
Empty file.
157 changes: 157 additions & 0 deletions bittensor/core/extrinsics/asyncio/commit_reveal.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
from typing import Optional, Union, TYPE_CHECKING

import numpy as np
from bittensor_commit_reveal import get_encrypted_commit
from numpy.typing import NDArray

from bittensor.core.extrinsics.utils import async_submit_extrinsic
from bittensor.core.settings import version_as_int
from bittensor.utils import format_error_message
from bittensor.utils.btlogging import logging
from bittensor.utils.weight_utils import convert_weights_and_uids_for_emit

if TYPE_CHECKING:
from bittensor_wallet import Wallet
from bittensor.core.async_subtensor import AsyncSubtensor
from bittensor.utils.registration import torch


async def _do_commit_reveal_v3(
subtensor: "AsyncSubtensor",
wallet: "Wallet",
netuid: int,
commit: bytes,
reveal_round: int,
wait_for_inclusion: bool = False,
wait_for_finalization: bool = False,
) -> tuple[bool, Optional[str]]:
"""
Executes the commit-reveal phase 3 for a given netuid and commit, and optionally waits for extrinsic inclusion or
finalization.

Arguments:
wallet: Wallet An instance of the Wallet class containing the user's keypair.
netuid: int The network unique identifier.
commit bytes The commit data in bytes format.
reveal_round: int The round number for the reveal phase.
wait_for_inclusion: bool, optional Flag indicating whether to wait for the extrinsic to be included in a block.
wait_for_finalization: bool, optional Flag indicating whether to wait for the extrinsic to be finalized.

Returns:
A tuple where the first element is a boolean indicating success or failure, and the second element is an
optional string containing error message if any.
"""
logging.info(
f"Committing weights hash [blue]{commit.hex()}[/blue] for subnet #[blue]{netuid}[/blue] with "
f"reveal round [blue]{reveal_round}[/blue]..."
)

call = await subtensor.substrate.compose_call(
call_module="SubtensorModule",
call_function="commit_crv3_weights",
call_params={
"netuid": netuid,
"commit": commit,
"reveal_round": reveal_round,
},
)
extrinsic = await subtensor.substrate.create_signed_extrinsic(
call=call,
keypair=wallet.hotkey,
)

response = await async_submit_extrinsic(
subtensor=subtensor,
extrinsic=extrinsic,
wait_for_inclusion=wait_for_inclusion,
wait_for_finalization=wait_for_finalization,
)

if not wait_for_finalization and not wait_for_inclusion:
return True, "Not waiting for finalization or inclusion."

if response.is_success:
return True, None
else:
return False, format_error_message(response.error_message)


async def commit_reveal_v3_extrinsic(
subtensor: "AsyncSubtensor",
wallet: "Wallet",
netuid: int,
uids: Union[NDArray[np.int64], "torch.LongTensor", list],
weights: Union[NDArray[np.float32], "torch.FloatTensor", list],
version_key: int = version_as_int,
wait_for_inclusion: bool = False,
wait_for_finalization: bool = False,
) -> tuple[bool, str]:
"""
Commits and reveals weights for given subtensor and wallet with provided uids and weights.

Arguments:
subtensor: The AsyncSubtensor instance.
wallet: The wallet to use for committing and revealing.
netuid: The id of the network.
uids: The uids to commit.
weights: The weights associated with the uids.
version_key: The version key to use for committing and revealing. Default is version_as_int.
wait_for_inclusion: Whether to wait for the inclusion of the transaction. Default is False.
wait_for_finalization: Whether to wait for the finalization of the transaction. Default is False.

Returns:
A tuple where the first element is a boolean indicating success or failure, and the second element is a message
associated with the result.
"""
try:
# Convert uids and weights
if isinstance(uids, list):
uids = np.array(uids, dtype=np.int64)
if isinstance(weights, list):
weights = np.array(weights, dtype=np.float32)

# Reformat and normalize.
uids, weights = convert_weights_and_uids_for_emit(uids, weights)

current_block = await subtensor.get_current_block()
subnet_hyperparameters = await subtensor.get_subnet_hyperparameters(
netuid, block=current_block
)
tempo = subnet_hyperparameters.tempo
subnet_reveal_period_epochs = (
subnet_hyperparameters.commit_reveal_weights_interval
)

# Encrypt `commit_hash` with t-lock and `get reveal_round`
commit_for_reveal, reveal_round = get_encrypted_commit(
uids=uids,
weights=weights,
version_key=version_key,
tempo=tempo,
current_block=current_block,
netuid=netuid,
subnet_reveal_period_epochs=subnet_reveal_period_epochs,
)

success, message = await _do_commit_reveal_v3(
subtensor=subtensor,
wallet=wallet,
netuid=netuid,
commit=commit_for_reveal,
reveal_round=reveal_round,
wait_for_inclusion=wait_for_inclusion,
wait_for_finalization=wait_for_finalization,
)

if success is True:
logging.success(
f"[green]Finalized![/green] Weights commited with reveal round [blue]{reveal_round}[/blue]."
)
return True, f"reveal_round:{reveal_round}"
else:
logging.error(message)
return False, message

except Exception as e:
logging.error(f":cross_mark: [red]Failed. Error:[/red] {e}")
return False, str(e)
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,6 @@ async def _do_pow_register(
return True, None

# process if registration successful, try again if pow is still valid
await response.process_events()
if not await response.is_success:
return False, format_error_message(error_message=await response.error_message)
# Successful registration
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ async def _do_set_root_weights(
if not wait_for_finalization and not wait_for_inclusion:
return True, "Not waiting for finalization or inclusion."

await response.process_events()
if await response.is_success:
return True, "Successfully set weights."
else:
Expand Down
Loading
Loading