From c9ec6bc395caf490c0762ff4cf341a68d90dc90a Mon Sep 17 00:00:00 2001 From: rusticluftig Date: Fri, 19 Jan 2024 19:51:53 -0800 Subject: [PATCH 1/5] Simplify the mining API --- docs/miner.md | 19 ++- neurons/miner.py | 48 ++++-- pretrain/mining.py | 274 +++++++++++++++++++--------------- scripts/upload_model.py | 7 +- tests/pretrain/test_mining.py | 58 +++++-- utilities/utils.py | 7 +- 6 files changed, 249 insertions(+), 164 deletions(-) diff --git a/docs/miner.md b/docs/miner.md index 436535a..26277ef 100644 --- a/docs/miner.md +++ b/docs/miner.md @@ -133,21 +133,20 @@ import pretrain as pt import bittensor as bt from transformers import PreTrainedModel -config = bt.config(...) -wallet = bt.wallet() -metagraph = bt.metagraph(netuid=9) - -actions = pt.mining.actions.Actions.create(config, wallet) - # Load a model from another miner. -model: PreTrainedModel = actions.load_remote_model(uid=123, metagraph=metagraph, download_dir="mydir") +model: PreTrainedModel = await pt.mining.load_remote_model(uid=123, download_dir="mydir") # Save the model to local file. -actions.save(model, "model-foo/") +pt.mining.save(model, "model-foo/") # Load the model from disk. -actions.load_local_model("model-foo/") +pt.mining.load_local_model("model-foo/") # Publish the model for validator evaluation. -actions.push(model) +wallet = bt.wallet() +await pt.mining.push(model, repo="jdoe/my-repo", wallet=wallet) + +# Get the URL to the best model +best_uid = pt.graph.best_uid() +print(await pt.mining.get_repo(best_uid)) ``` \ No newline at end of file diff --git a/neurons/miner.py b/neurons/miner.py index ed1ef97..703237d 100644 --- a/neurons/miner.py +++ b/neurons/miner.py @@ -26,10 +26,11 @@ import constants from model.storage.chain.chain_model_metadata_store import ChainModelMetadataStore from model.storage.hugging_face.hugging_face_model_store import HuggingFaceModelStore +from model.storage.model_metadata_store import ModelMetadataStore +from model.storage.remote_model_store import RemoteModelStore import pretrain as pt import bittensor as bt from transformers import PreTrainedModel -from pretrain.mining import Actions from utilities import utils import datetime as dt @@ -154,7 +155,10 @@ def get_config(): async def load_starting_model( - actions: Actions, config: bt.config, metagraph: bt.metagraph + config: bt.config, + metagraph: bt.metagraph, + metadata_store: ModelMetadataStore, + remote_model_store: RemoteModelStore, ) -> PreTrainedModel: """Loads the model to train based on the provided config.""" @@ -162,7 +166,9 @@ async def load_starting_model( if config.load_best: # Get the best UID be incentive and load it. best_uid = pt.graph.best_uid(metagraph) - model = await actions.load_remote_model(best_uid, metagraph, config.model_dir) + model = await pt.mining.load_remote_model( + best_uid, config.model_dir, metagraph, metadata_store, remote_model_store + ) bt.logging.success( f"Training with model from best uid: {best_uid}. Model={str(model)}" ) @@ -171,8 +177,12 @@ async def load_starting_model( # Initialize the model based on a passed uid. if config.load_uid is not None: # Sync the state from the passed uid. - model = await actions.load_remote_model( - config.load_uid, metagraph, config.model_dir + model = await pt.mining.load_remote_model( + config.load_uid, + config.model_dir, + metagraph, + metadata_store, + remote_model_store, ) bt.logging.success( f"Training with model from uid: {config.load_uid}. Model={str(model)}" @@ -181,13 +191,13 @@ async def load_starting_model( # Check if we should load a model from a local directory. if config.load_model_dir: - model = actions.load_local_model(config.load_model_dir) + model = pt.mining.load_local_model(config.load_model_dir) bt.logging.success(f"Training with model from disk. Model={str(model)}") return model # Check if we should load a model from a local file. if config.load_model: - model = actions.load_gpt2_model(config.load_model) + model = pt.mining.load_gpt2_model(config.load_model) bt.logging.success(f"Training with model from disk. Model={str(model)}") return model @@ -211,9 +221,7 @@ async def main(config: bt.config): if not config.offline: my_uid = utils.assert_registered(wallet, metagraph) HuggingFaceModelStore.assert_access_token_exists() - - # Configure the stores and miner actions. - miner_actions = pt.mining.Actions.create(config, wallet, subtensor) + utils.validate_hf_repo_id(config.hf_repo_id) # Create a unique run id for this run. run_id = dt.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") @@ -230,12 +238,16 @@ async def main(config: bt.config): use_wandb = True # Init model. - model: PreTrainedModel = await load_starting_model(miner_actions, config, metagraph) + metadata_store = ChainModelMetadataStore(subtensor, wallet, config.netuid) + remote_store = HuggingFaceModelStore() + model: PreTrainedModel = await load_starting_model( + config, metagraph, metadata_store, remote_store + ) model = model.train() model = model.to(config.device) bt.logging.success(f"Saving model to path: {model_dir}.") - miner_actions.save(model, model_dir) + pt.mining.save(model, model_dir) # Build optimizer optimizer = torch.optim.AdamW(model.parameters(), lr=config.lr, weight_decay=0.01) @@ -345,7 +357,7 @@ async def main(config: bt.config): # Save the model to your mining dir. bt.logging.success(f"Saving model to path: {model_dir}.") - miner_actions.save(model, model_dir) + pt.mining.save(model, model_dir) bt.logging.success("Finished training") # Push the model to your run. @@ -356,8 +368,14 @@ async def main(config: bt.config): ) # First, reload the best model from the training run. - model_to_upload = miner_actions.load_local_model(model_dir) - await miner_actions.push(model_to_upload) + model_to_upload = pt.mining.load_local_model(model_dir) + await pt.mining.push( + model_to_upload, + config.hf_repo_id, + wallet, + metadata_store=metadata_store, + remote_model_store=remote_store, + ) else: bt.logging.success( f"This training run achieved a best_avg_loss={best_avg_loss}, which did not meet the upload threshold. Not uploading to hugging face." diff --git a/pretrain/mining.py b/pretrain/mining.py index 5247fc5..62ce41c 100644 --- a/pretrain/mining.py +++ b/pretrain/mining.py @@ -19,6 +19,7 @@ import os import time from typing import Optional +import constants from model.data import Model, ModelId from model.storage.chain.chain_model_metadata_store import ChainModelMetadataStore from model.storage.hugging_face.hugging_face_model_store import HuggingFaceModelStore @@ -39,129 +40,160 @@ def model_path(base_dir: str, run_id: str) -> str: return os.path.join(base_dir, "training", run_id) -class Actions: - """A suite of actions for Miners to save/load and upload/download models.""" - - def __init__( - self, - wallet: bt.wallet, - hf_repo_namespace: str, - hf_repo_name: str, - model_metadata_store: ModelMetadataStore, - remote_model_store: RemoteModelStore, - ): - self.wallet = wallet - self.hf_repo_namespace = hf_repo_namespace - self.hf_repo_name = hf_repo_name - self.model_metadata_store = model_metadata_store - self.remote_model_store = remote_model_store - - @classmethod - def create( - cls, - config: bt.config, - wallet: bt.wallet, - subtensor: Optional[bt.subtensor] = None, - ): - subtensor = subtensor or bt.subtensor(config) +async def push( + model: PreTrainedModel, + repo: str, + wallet: bt.wallet, + retry_delay_secs: int = 60, + metadata_store: Optional[ModelMetadataStore] = None, + remote_model_store: Optional[RemoteModelStore] = None, +): + """Pushes the model to Hugging Face and publishes it on the chain for evaluation by validators. + + Args: + model (PreTrainedModel): The model to push. + repo (str): The repo to push to. Must be in format "namespace/name". + wallet (bt.wallet): The wallet of the Miner uploading the model. + retry_delay_secs (int): The number of seconds to wait before retrying to push the model to the chain. + metadata_store (Optional[ModelMetadataStore]): The metadata store. If None, defaults to writing to the + chain. + remote_model_store (Optional[RemoteModelStore]): The remote model store. If None, defaults to writing to HuggingFace + """ + bt.logging.info("Pushing model") + + if metadata_store is None: + metadata_store = ChainModelMetadataStore(bt.subtensor(), wallet) + + if remote_model_store is None: remote_model_store = HuggingFaceModelStore() - chain_model_store = ChainModelMetadataStore( - subtensor, wallet, subnet_uid=config.netuid - ) - repo_namespace, repo_name = utils.validate_hf_repo_id(config.hf_repo_id) - - return Actions( - wallet, repo_namespace, repo_name, chain_model_store, remote_model_store - ) - - def save(self, model: PreTrainedModel, model_dir: str): - """Saves a model to the provided directory""" - if not os.path.exists(model_dir): - os.makedirs(model_dir, exist_ok=True) - - # Save the model state to the specified path. - model.save_pretrained( - save_directory=model_dir, - safe_serialization=True, - ) - - def load_gpt2_model(self, model_file: str) -> PreTrainedModel: - """For loading GPT2 models from the previous version of this subnet.""" - model = pt.model.get_model() - load_model(model, model_file) - return model - - def load_local_model(self, model_dir: str) -> PreTrainedModel: - """Loads a model from a directory.""" - return AutoModelForCausalLM.from_pretrained( - pretrained_model_name_or_path=model_dir, - local_files_only=True, - use_safetensors=True, - ) - - async def load_remote_model( - self, uid: int, metagraph: bt.metagraph, download_dir: str - ) -> PreTrainedModel: - """Loads the model currently being advertised by the Miner with the given UID. - - Args: - uid (int): The UID of the Miner who's model should be downloaded. - metagraph (bt.metagraph): The metagraph of the current subtensor. - download_dir (str): The directory to download the model to. - """ - hotkey = metagraph.hotkeys[uid] - model_metadata = await self.model_metadata_store.retrieve_model_metadata(hotkey) - if not model_metadata: - raise ValueError(f"No model metadata found for miner {uid}") - - model: Model = await self.remote_model_store.download_model( - model_metadata.id, download_dir - ) - return model.pt_model - - async def push(self, model: PreTrainedModel, retry_delay_secs: int = 60): - """Pushes the model to Hugging Face and publishes it on the chain for evaluation by validators.""" - bt.logging.info("Pushing model") - - # First upload the model to HuggingFace. - model_id = ModelId(namespace=self.hf_repo_namespace, name=self.hf_repo_name) - model_id = await self.remote_model_store.upload_model( - Model(id=model_id, pt_model=model) - ) - - bt.logging.success( - f"Uploaded model to hugging face. Now committing to the chain with model_id: {model_id}" - ) - - # We can only commit to the chain every 20 minutes, so run this in a loop, until - # successful. - while True: - try: - await self.model_metadata_store.store_model_metadata( - self.wallet.hotkey.ss58_address, model_id - ) - bt.logging.info( - "Wrote model metadata to the chain. Checking we can read it back..." + # First upload the model to HuggingFace. + namespace, name = utils.validate_hf_repo_id(repo) + model_id = ModelId(namespace=namespace, name=name) + model_id = await remote_model_store.upload_model(Model(id=model_id, pt_model=model)) + + bt.logging.success( + f"Uploaded model to hugging face. Now committing to the chain with model_id: {model_id}" + ) + + # We can only commit to the chain every 20 minutes, so run this in a loop, until + # successful. + while True: + try: + await metadata_store.store_model_metadata( + wallet.hotkey.ss58_address, model_id + ) + + bt.logging.info( + "Wrote model metadata to the chain. Checking we can read it back..." + ) + + model_metadata = await metadata_store.retrieve_model_metadata( + wallet.hotkey.ss58_address + ) + + if not model_metadata or model_metadata.id != model_id: + bt.logging.error( + f"Failed to read back model metadata from the chain. Expected: {model_id}, got: {model_metadata}" ) - - model_metadata = ( - await self.model_metadata_store.retrieve_model_metadata( - self.wallet.hotkey.ss58_address - ) + raise ValueError( + f"Failed to read back model metadata from the chain. Expected: {model_id}, got: {model_metadata}" ) - if not model_metadata or model_metadata.id != model_id: - bt.logging.error( - f"Failed to read back model metadata from the chain. Expected: {model_id}, got: {model_metadata}" - ) - raise ValueError( - f"Failed to read back model metadata from the chain. Expected: {model_id}, got: {model_metadata}" - ) - - bt.logging.success("Committed model to the chain.") - break - except Exception as e: - bt.logging.error(f"Failed to advertise model on the chain: {e}") - bt.logging.error(f"Retrying in {retry_delay_secs} seconds...") - time.sleep(retry_delay_secs) + bt.logging.success("Committed model to the chain.") + break + except Exception as e: + bt.logging.error(f"Failed to advertise model on the chain: {e}") + bt.logging.error(f"Retrying in {retry_delay_secs} seconds...") + time.sleep(retry_delay_secs) + + +def save(model: PreTrainedModel, model_dir: str): + """Saves a model to the provided directory""" + if not os.path.exists(model_dir): + os.makedirs(model_dir, exist_ok=True) + + # Save the model state to the specified path. + model.save_pretrained( + save_directory=model_dir, + safe_serialization=True, + ) + + +async def get_repo( + uid: int, + metagraph: Optional[bt.metagraph] = None, + metadata_store: Optional[ModelMetadataStore] = None, +) -> str: + """Returns a URL to the HuggingFace repo of the Miner with the given UID.""" + if metadata_store is None: + metadata_store = ChainModelMetadataStore(bt.subtensor()) + if metagraph is None: + metagraph = bt.metagraph(netuid=constants.SUBNET_UID) + hotkey = metagraph.hotkeys[uid] + model_metadata = await metadata_store.retrieve_model_metadata(hotkey) + + if not model_metadata: + raise ValueError(f"No model metadata found for miner {uid}") + + return utils.get_hf_url(model_metadata) + + +def load_gpt2_model(model_file: str) -> PreTrainedModel: + """For loading GPT2 models from the previous version of this subnet.""" + model = pt.model.get_model() + load_model(model, model_file) + return model + + +def load_local_model(model_dir: str) -> PreTrainedModel: + """Loads a model from a directory.""" + return AutoModelForCausalLM.from_pretrained( + pretrained_model_name_or_path=model_dir, + local_files_only=True, + use_safetensors=True, + ) + + +async def load_best_model(download_dir: str): + """Loads the model from the best performing miner to download_dir""" + best_uid = pt.graph.best_uid() + return await load_remote_model(best_uid, download_dir) + + +async def load_remote_model( + uid: int, + download_dir: str, + metagraph: Optional[bt.metagraph] = None, + metadata_store: Optional[ModelMetadataStore] = None, + remote_model_store: Optional[RemoteModelStore] = None, +) -> PreTrainedModel: + """Loads the model currently being advertised by the Miner with the given UID. + + Args: + uid (int): The UID of the Miner who's model should be downloaded. + download_dir (str): The directory to download the model to. + metagraph (Optional[bt.metagraph]): The metagraph of the subnet. + metadata_store (Optional[ModelMetadataStore]): The metadata store. If None, defaults to reading from the + remote_model_store (Optional[RemoteModelStore]): The remote model store. If None, defaults to reading from HuggingFace + """ + + if metagraph is None: + metagraph = bt.metagraph(netuid=constants.SUBNET_UID) + + if metadata_store is None: + metadata_store = ChainModelMetadataStore(subtensor=bt.subtensor()) + + if remote_model_store is None: + remote_model_store = HuggingFaceModelStore() + + hotkey = metagraph.hotkeys[uid] + model_metadata = await metadata_store.retrieve_model_metadata(hotkey) + if not model_metadata: + raise ValueError(f"No model metadata found for miner {uid}") + + bt.logging.success(f"Fetched model metadata: {model_metadata}") + model: Model = await remote_model_store.download_model( + model_metadata.id, download_dir + ) + return model.pt_model diff --git a/scripts/upload_model.py b/scripts/upload_model.py index b91f4b3..49ad8f2 100644 --- a/scripts/upload_model.py +++ b/scripts/upload_model.py @@ -71,12 +71,9 @@ async def main(config: bt.config): utils.assert_registered(wallet, metagraph) HuggingFaceModelStore.assert_access_token_exists() - # Create the actions object. - actions = pt.mining.Actions.create(config, wallet, subtensor) - # Load the model from disk and push it to the chain and Hugging Face. - model = actions.load_local_model(config.load_model_dir) - await actions.push(model) + model = pt.mining.load_local_model(config.load_model_dir) + await pt.mining.push(model, config.hf_repo_id, wallet) if __name__ == "__main__": diff --git a/tests/pretrain/test_mining.py b/tests/pretrain/test_mining.py index 1f0be17..25295d0 100644 --- a/tests/pretrain/test_mining.py +++ b/tests/pretrain/test_mining.py @@ -1,10 +1,12 @@ import asyncio import os import shutil +from unittest import mock import bittensor as bt import unittest -from model.data import Model -from pretrain.mining import Actions + +from model.data import Model, ModelId +import pretrain as pt from pretrain.model import get_model from tests.model.storage.fake_model_metadata_store import FakeModelMetadataStore from tests.model.storage.fake_remote_model_store import FakeRemoteModelStore @@ -19,13 +21,6 @@ def setUp(self): self.wallet.create_if_non_existent( coldkey_use_password=False, hotkey_use_password=False ) - self.actions = Actions( - wallet=self.wallet, - hf_repo_namespace="test-namespace", - hf_repo_name="test-repo-name", - model_metadata_store=self.metadata_store, - remote_model_store=self.remote_store, - ) self.tiny_model = get_model() self.model_dir = "test-models/test-mining" @@ -38,13 +33,22 @@ def tearDown(self): def test_model_to_disk_roundtrip(self): """Tests that saving a model to disk and loading it gets the same model.""" - self.actions.save(model=self.tiny_model, model_dir=self.model_dir) - model = self.actions.load_local_model(model_dir=self.model_dir) + pt.mining.save(model=self.tiny_model, model_dir=self.model_dir) + model = pt.mining.load_local_model(model_dir=self.model_dir) assert_model_equality(self, self.tiny_model, model) def _test_push(self, min_expected_block: int = 1): - asyncio.run(self.actions.push(model=self.tiny_model, retry_delay_secs=1)) + asyncio.run( + pt.mining.push( + model=self.tiny_model, + wallet=self.wallet, + repo="namespace/name", + retry_delay_secs=1, + metadata_store=self.metadata_store, + remote_model_store=self.remote_store, + ) + ) # Check that the model was uploaded to hugging face. model: Model = self.remote_store.get_only_model() @@ -83,6 +87,36 @@ def test_push_metadata_read_is_old(self): self._test_push(min_expected_block=2) + async def test_get_repo_no_metadata(self): + """Tests that get_repo raises a ValueError if the miner hasn't uploaded a model yet.""" + hotkey = "hotkey" + metagraph = mock.MagicMock(spec=bt.metagraph) + metagraph.hotkeys.return_value = [hotkey] + + # The miner hasn't uploaded a model yet, so expect a ValueError. + with self.assertRaises(ValueError): + await pt.mining.get_repo( + 0, metagraph=metagraph, metadata_store=self.metadata_store + ) + + async def test_get_repo(self): + """Tests that get_repo raises a ValueError if the miner hasn't uploaded a model yet.""" + hotkey = "hotkey" + metagraph = mock.MagicMock(spec=bt.metagraph) + metagraph.hotkeys.return_value = [hotkey] + + model_id = ModelId( + namespace="namespace", name="name", hash="hash", commit="commit" + ) + self.metadata_store.store_model_metadata(hotkey, model_id) + + self.assertEqual( + await pt.mining.get_repo( + 0, metagraph=metagraph, metadata_store=self.metadata_store + ), + "https://huggingface.co/namespace/name/tree/commit", + ) + if __name__ == "__main__": unittest.main() diff --git a/utilities/utils.py b/utilities/utils.py index b06e585..56ef95c 100644 --- a/utilities/utils.py +++ b/utilities/utils.py @@ -4,7 +4,7 @@ from typing import Any, Optional, Tuple import bittensor as bt -from model.data import ModelId +from model.data import ModelId, ModelMetadata def assert_registered(wallet: bt.wallet, metagraph: bt.metagraph) -> int: @@ -49,6 +49,11 @@ def validate_hf_repo_id(repo_id: str) -> Tuple[str, str]: return parts[0], parts[1] +def get_hf_url(model_metadata: ModelMetadata) -> str: + """Returns the URL to the Hugging Face repo for the provided model metadata.""" + return f"https://huggingface.co/{model_metadata.id.namespace}/{model_metadata.id.name}/tree/{model_metadata.id.commit}" + + def run_in_subprocess(func: functools.partial, ttl: int) -> Any: """Runs the provided function on a subprocess with 'ttl' seconds to complete. From ac31bb6909922043df52c1503071281ec074982e Mon Sep 17 00:00:00 2001 From: rusticluftig Date: Thu, 1 Feb 2024 19:15:00 -0800 Subject: [PATCH 2/5] Run each eval in a subprocess to avoid a bad model being able to corrupt the GPU --- neurons/validator.py | 28 ++++++++++++++++++++-------- utilities/utils.py | 26 ++++++++++++-------------- 2 files changed, 32 insertions(+), 22 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 3d1c054..c2660ed 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -18,6 +18,7 @@ from collections import defaultdict import datetime as dt +import functools import os import json import math @@ -326,7 +327,6 @@ def update_models(self): time.sleep(time_to_sleep) uid_last_checked[next_uid] = dt.datetime.now() - bt.logging.trace(f"Updating model for UID={next_uid}") # Get their hotkey from the metagraph. hotkey = self.metagraph.hotkeys[next_uid] @@ -334,9 +334,10 @@ def update_models(self): # Compare metadata and tracker, syncing new model from remote store to local if necessary. updated = asyncio.run(self.model_updater.sync_model(hotkey)) - bt.logging.trace( - f"Updated model for UID={next_uid}. Was new = {updated}" - ) + if updated: + bt.logging.trace( + f"Updated model for UID={next_uid}. Was new = {updated}" + ) # Ensure we eval the new model on the next loop. if updated: @@ -347,6 +348,7 @@ def update_models(self): ) except Exception as e: + pass bt.logging.error( f"Error in update loop: {e} \n {traceback.format_exc()}" ) @@ -486,15 +488,18 @@ async def run_step(self): pages=pages, ) ) + + bt.logging.debug(f"Computing losses on {uids} with pages {pages}") # Compute model losses on batches. - bt.logging.debug(f"Computing losses on {uids}") losses_per_uid = {muid: None for muid in uids} load_model_perf = PerfMonitor("Eval: Load model") compute_loss_perf = PerfMonitor("Eval: Compute loss") for uid_i in uids: + bt.logging.trace(f"Computing model losses for uid:{uid_i}.") + # Check that the model is in the tracker. hotkey = self.metagraph.hotkeys[uid_i] model_i_metadata = self.model_tracker.get_model_metadata_for_miner_hotkey( @@ -516,10 +521,17 @@ async def run_step(self): ) with compute_loss_perf.sample(): - losses = pt.validation.compute_losses( - model_i.pt_model, batches, device=self.config.device + # Run each computation in a subprocess so that the GPU is reset between each model. + losses = utils.run_in_subprocess( + functools.partial( + pt.validation.compute_losses, + model_i.pt_model, + batches, + self.config.device, + ), + ttl=60, + mode="spawn", ) - del model_i except Exception as e: bt.logging.error( diff --git a/utilities/utils.py b/utilities/utils.py index 56ef95c..766e720 100644 --- a/utilities/utils.py +++ b/utilities/utils.py @@ -54,7 +54,16 @@ def get_hf_url(model_metadata: ModelMetadata) -> str: return f"https://huggingface.co/{model_metadata.id.namespace}/{model_metadata.id.name}/tree/{model_metadata.id.commit}" -def run_in_subprocess(func: functools.partial, ttl: int) -> Any: +def _wrapped_func(func: functools.partial, queue: multiprocessing.Queue): + try: + result = func() + queue.put(result) + except (Exception, BaseException) as e: + # Catch exceptions here to add them to the queue. + queue.put(e) + + +def run_in_subprocess(func: functools.partial, ttl: int, mode="fork") -> Any: """Runs the provided function on a subprocess with 'ttl' seconds to complete. Args: @@ -64,20 +73,9 @@ def run_in_subprocess(func: functools.partial, ttl: int) -> Any: Returns: Any: The value returned by 'func' """ - - def wrapped_func(func: functools.partial, queue: multiprocessing.Queue): - try: - result = func() - queue.put(result) - except (Exception, BaseException) as e: - # Catch exceptions here to add them to the queue. - queue.put(e) - - # Use "fork" (the default on all POSIX except macOS), because pickling doesn't seem - # to work on "spawn". - ctx = multiprocessing.get_context("fork") + ctx = multiprocessing.get_context(mode) queue = ctx.Queue() - process = ctx.Process(target=wrapped_func, args=[func, queue]) + process = ctx.Process(target=_wrapped_func, args=[func, queue]) process.start() From 198e103e69e00cb2f280721a70cd196a78ff8899 Mon Sep 17 00:00:00 2001 From: rusticluftig Date: Thu, 1 Feb 2024 20:50:52 -0800 Subject: [PATCH 3/5] Remove model with inf loss --- neurons/validator.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index c2660ed..05e746f 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -488,7 +488,7 @@ async def run_step(self): pages=pages, ) ) - + bt.logging.debug(f"Computing losses on {uids} with pages {pages}") # Compute model losses on batches. @@ -570,8 +570,16 @@ async def run_step(self): self.weights = self.weights.nan_to_num(0.0) # Filter based on win rate removing all by the sample_min best models for evaluation. + # First remove any models with an inf loss. + filtered_win_rate = { + uid: win_rate + for uid, win_rate in win_rate.items() + if not math.isinf(losses_per_uid.get(uid, default=math.inf)) + } self.uids_to_eval = set( - sorted(win_rate, key=win_rate.get, reverse=True)[: self.config.sample_min] + sorted(filtered_win_rate, key=filtered_win_rate.get, reverse=True)[ + : self.config.sample_min + ] ) # Save state From 1f96e89e949ac35cee6be3cdc1344f18e97f6afb Mon Sep 17 00:00:00 2001 From: rusticluftig Date: Thu, 1 Feb 2024 20:59:18 -0800 Subject: [PATCH 4/5] Fix dict .get() --- neurons/validator.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/neurons/validator.py b/neurons/validator.py index 05e746f..615ae08 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -570,11 +570,11 @@ async def run_step(self): self.weights = self.weights.nan_to_num(0.0) # Filter based on win rate removing all by the sample_min best models for evaluation. - # First remove any models with an inf loss. + # First remove any models that have an infinite loss. filtered_win_rate = { - uid: win_rate - for uid, win_rate in win_rate.items() - if not math.isinf(losses_per_uid.get(uid, default=math.inf)) + uid: wr + for uid, wr in win_rate.items() + if not all(math.isinf(x) for x in losses_per_uid.get(uid, [math.inf])) } self.uids_to_eval = set( sorted(filtered_win_rate, key=filtered_win_rate.get, reverse=True)[ From 65b29aa157502306fe319825ecee7b77dec3b37e Mon Sep 17 00:00:00 2001 From: rusticluftig Date: Thu, 1 Feb 2024 22:54:47 -0800 Subject: [PATCH 5/5] Clean-up accidental test code --- neurons/validator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/neurons/validator.py b/neurons/validator.py index 615ae08..d29584f 100644 --- a/neurons/validator.py +++ b/neurons/validator.py @@ -348,7 +348,6 @@ def update_models(self): ) except Exception as e: - pass bt.logging.error( f"Error in update loop: {e} \n {traceback.format_exc()}" )