From 42fc50d89f26f2887ac6c288110a18241c6f4b1c Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 13 Dec 2023 15:49:36 -0800 Subject: [PATCH 01/10] Add logic for getting wandb path from run dir --- scripts/storage_cleaner.py | 31 ++++++++++++++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 54676571e..b1d018b1b 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -17,6 +17,7 @@ import botocore.exceptions as boto_exceptions import google.cloud.storage as gcs import torch +import wandb from cached_path import add_scheme_client, cached_path, set_cache_dir from cached_path.schemes import S3Client from google.api_core.exceptions import NotFound @@ -937,7 +938,35 @@ def unshard_run_checkpoints(run_path: str, checkpoints_dest_dir: str, config: Un def _get_wandb_path(run_dir: str) -> str: - raise NotImplementedError() + run_dir_storage = _get_storage_adapter_for_path(run_dir) + + config_path = os.path.join(run_dir, CONFIG_YAML) + if not run_dir_storage.is_file(config_path): + raise FileNotFoundError("No config file found in run dir, cannot get wandb path") + + local_config_path = cached_path(config_path) + config = TrainConfig.load(local_config_path, validate_paths=False) + + if config.wandb is None or config.wandb.entity is None or config.wandb.project is None: + raise ValueError(f"Run at {run_dir} has missing wandb config, cannot get wandb run path") + + run_filters = { + "display_name": config.wandb.name, + } + if config.wandb.group is not None: + run_filters["group"] = config.wandb.group + + api = wandb.Api() + wandb_matching_runs = api.runs(path=f"{config.wandb.entity}/{config.wandb.project}", filters=run_filters) + + if len(wandb_matching_runs) == 0: + raise RuntimeError(f"Failed to find any wandb runs for {run_dir}. Run might no longer exist") + + if len(wandb_matching_runs) > 1: + raise RuntimeError(f"Found {len(wandb_matching_runs)} runs matching run dir {run_dir}, cannot determine correct run") + + wandb_run = wandb_matching_runs[0] + return "/".join(wandb_run.path) def _append_wandb_path( From 70645bde79d9d4d6cb35aa8dd4fec82e43c617a2 Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 13 Dec 2023 15:50:34 -0800 Subject: [PATCH 02/10] Add trailing / for dirs to avoid future problems --- scripts/storage_cleaner.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index b1d018b1b..9c146efcb 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -980,9 +980,11 @@ def _append_wandb_path( if _is_archive(run_dir_or_archive, run_dir_or_archive_storage) and append_archive_extension: archive_extension = "".join(Path(run_dir_or_archive).suffixes) - wandb_path = wandb_path + archive_extension + relative_wandb_path = wandb_path + archive_extension + else: + relative_wandb_path = wandb_path + "/" - return os.path.join(base_dir, wandb_path) + return os.path.join(base_dir, relative_wandb_path) def _copy(src_path: str, dest_path: str, temp_dir: str): @@ -1064,7 +1066,7 @@ def _move_run(src_storage: StorageAdapter, run_dir_or_archive: str, dest_dir: st src_move_path, dest_move_path = _get_src_and_dest_for_copy(src_storage, run_dir_or_archive, dest_dir, config) - if src_move_path == dest_move_path: + if src_move_path.rstrip("/") == dest_move_path.rstrip("/"): # This could be a valid scenario if the user is, for example, trying to # append wandb path to runs and this run has the right wandb path already. log.info("Source and destination move paths are both %s, skipping", src_move_path) @@ -1087,6 +1089,7 @@ def _move_run(src_storage: StorageAdapter, run_dir_or_archive: str, dest_dir: st def move_run(run_path: str, dest_dir: str, config: MoveRunConfig): storage = _get_storage_adapter_for_path(run_path) run_dir_or_archive = _format_dir_or_archive_path(storage, run_path) + dest_dir = f"{dest_dir}/" if not dest_dir.endswith("/") else dest_dir _move_run(storage, run_dir_or_archive, dest_dir, config) From b67c49fd5b0baffe6c47427644ff76a68f105b18 Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 13 Dec 2023 16:14:32 -0800 Subject: [PATCH 03/10] Add some extra logging for wandb path logic --- scripts/storage_cleaner.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 9c146efcb..a855195a4 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -956,6 +956,9 @@ def _get_wandb_path(run_dir: str) -> str: if config.wandb.group is not None: run_filters["group"] = config.wandb.group + log.info("Wandb entity/project: %s/%s", config.wandb.entity, config.wandb.project) + log.info("Wandb filters: %s", run_filters) + api = wandb.Api() wandb_matching_runs = api.runs(path=f"{config.wandb.entity}/{config.wandb.project}", filters=run_filters) From 2d63b04a24666b5083226346eb77f48b5bee19d9 Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 13 Dec 2023 16:16:11 -0800 Subject: [PATCH 04/10] Run ruff --- scripts/storage_cleaner.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index a855195a4..d3306a515 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -966,7 +966,9 @@ def _get_wandb_path(run_dir: str) -> str: raise RuntimeError(f"Failed to find any wandb runs for {run_dir}. Run might no longer exist") if len(wandb_matching_runs) > 1: - raise RuntimeError(f"Found {len(wandb_matching_runs)} runs matching run dir {run_dir}, cannot determine correct run") + raise RuntimeError( + f"Found {len(wandb_matching_runs)} runs matching run dir {run_dir}, cannot determine correct run" + ) wandb_run = wandb_matching_runs[0] return "/".join(wandb_run.path) From 47f1af68edb6d833616b154daba6e6ab0e98aa5c Mon Sep 17 00:00:00 2001 From: Shane A Date: Wed, 13 Dec 2023 16:46:54 -0800 Subject: [PATCH 05/10] Change wandb path logging from log to debug Co-authored-by: Pete --- scripts/storage_cleaner.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index d3306a515..1f95f4c16 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -956,8 +956,8 @@ def _get_wandb_path(run_dir: str) -> str: if config.wandb.group is not None: run_filters["group"] = config.wandb.group - log.info("Wandb entity/project: %s/%s", config.wandb.entity, config.wandb.project) - log.info("Wandb filters: %s", run_filters) + log.debug("Wandb entity/project: %s/%s", config.wandb.entity, config.wandb.project) + log.debug("Wandb filters: %s", run_filters) api = wandb.Api() wandb_matching_runs = api.runs(path=f"{config.wandb.entity}/{config.wandb.project}", filters=run_filters) From bacb7b1e386800a4d10b316b0dcade009528c183 Mon Sep 17 00:00:00 2001 From: Shane A Date: Thu, 14 Dec 2023 14:58:57 -0800 Subject: [PATCH 06/10] Add ability to get wandb run info from wandb directory --- scripts/storage_cleaner.py | 69 +++++++++++++++++++++++++++++++------- 1 file changed, 57 insertions(+), 12 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index db06ee415..0b9a6b8dd 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -943,6 +943,49 @@ def unshard_run_checkpoints(run_path: str, checkpoints_dest_dir: str, config: Un _unshard_checkpoints(storage, run_dir_or_archive, checkpoints_dest_dir, config) +def _get_wandb_runs_from_wandb_dir(storage: StorageAdapter, wandb_dir: str, run_config: TrainConfig) -> List: + # For some reason, we often have a redundant nested wandb directory. Step into it here. + nested_wandb_dir = os.path.join(wandb_dir, "wandb/") + if storage.is_dir(nested_wandb_dir): + wandb_dir = nested_wandb_dir + + # Wandb run directory names are stored in format -- + # https://docs.wandb.ai/guides/track/save-restore#examples-of-wandbsave + dir_names = storage.list_dirs(wandb_dir) + wandb_run_dir_names = [dir_name for dir_name in dir_names if dir_name.startswith("run")] + if len(wandb_run_dir_names) == 0: + log.warning("No wandb run directories found in wandb dir %s", wandb_dir) + return [] + + wandb_ids = [dir_name.split("-")[2] for dir_name in wandb_run_dir_names if dir_name.count("-") >= 2] + + log.debug("Wandb ids: %s", wandb_ids) + + assert run_config.wandb is not None + api: wandb.Api = wandb.Api() + return [api.run(path=f"{run_config.wandb.entity}/{run_config.wandb.project}/{id}") for id in wandb_ids] + + +def _get_wandb_path_from_run(wandb_run) -> str: + return "/".join(wandb_run.path) + + +def _get_wandb_runs_from_train_config(config: TrainConfig) -> List: + assert config.wandb is not None + + run_filters = { + "display_name": config.wandb.name, + } + if config.wandb.group is not None: + run_filters["group"] = config.wandb.group + + log.debug("Wandb entity/project: %s/%s", config.wandb.entity, config.wandb.project) + log.debug("Wandb filters: %s", run_filters) + + api = wandb.Api() + return api.runs(path=f"{config.wandb.entity}/{config.wandb.project}", filters=run_filters) + + def _get_wandb_path(run_dir: str) -> str: run_dir_storage = _get_storage_adapter_for_path(run_dir) @@ -956,28 +999,30 @@ def _get_wandb_path(run_dir: str) -> str: if config.wandb is None or config.wandb.entity is None or config.wandb.project is None: raise ValueError(f"Run at {run_dir} has missing wandb config, cannot get wandb run path") - run_filters = { - "display_name": config.wandb.name, - } - if config.wandb.group is not None: - run_filters["group"] = config.wandb.group + wandb_runs = [] - log.debug("Wandb entity/project: %s/%s", config.wandb.entity, config.wandb.project) - log.debug("Wandb filters: %s", run_filters) + wandb_dir = os.path.join(run_dir, "wandb/") + if run_dir_storage.is_dir(wandb_dir): + wandb_runs += _get_wandb_runs_from_wandb_dir(run_dir_storage, wandb_dir, config) - api = wandb.Api() - wandb_matching_runs = api.runs(path=f"{config.wandb.entity}/{config.wandb.project}", filters=run_filters) + wandb_runs += _get_wandb_runs_from_train_config(config) + + # Remove duplicate wandb runs based on run path, and wandb runs that do not match our run. + wandb_runs = list( + {_get_wandb_path_from_run(wandb_run): wandb_run for wandb_run in wandb_runs}.values() + ) + wandb_matching_runs = wandb_runs if len(wandb_matching_runs) == 0: raise RuntimeError(f"Failed to find any wandb runs for {run_dir}. Run might no longer exist") if len(wandb_matching_runs) > 1: + wandb_run_urls = [wandb_run.url for wandb_run in wandb_matching_runs] raise RuntimeError( - f"Found {len(wandb_matching_runs)} runs matching run dir {run_dir}, cannot determine correct run" + f"Found {len(wandb_matching_runs)} runs matching run dir {run_dir}, cannot determine correct run: {wandb_run_urls}" ) - wandb_run = wandb_matching_runs[0] - return "/".join(wandb_run.path) + return _get_wandb_path_from_run(wandb_matching_runs[0]) def _append_wandb_path( From 223eb6a5f76a38d63312c89ecb35b2ad5235fc50 Mon Sep 17 00:00:00 2001 From: Shane A Date: Thu, 14 Dec 2023 15:00:47 -0800 Subject: [PATCH 07/10] Verify that the wandb run config matches the training config --- scripts/storage_cleaner.py | 86 +++++++++++++++++++++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 0b9a6b8dd..52b282123 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -21,6 +21,7 @@ from cached_path import add_scheme_client, cached_path, set_cache_dir from cached_path.schemes import S3Client from google.api_core.exceptions import NotFound +from omegaconf import OmegaConf as om from rich.progress import Progress, TaskID, track from olmo import util @@ -986,6 +987,89 @@ def _get_wandb_runs_from_train_config(config: TrainConfig) -> List: return api.runs(path=f"{config.wandb.entity}/{config.wandb.project}", filters=run_filters) +def _are_equal_config_settings(wandb_setting, training_setting, wandb_path: str) -> bool: + if isinstance(wandb_setting, dict) and isinstance(training_setting, dict): + wandb_keys = set(wandb_setting.keys()) + training_keys = set(training_setting.keys()) + if wandb_keys != training_keys: + log.debug( + "Setting of wandb %s and training setting do not have matching keys. Wandb extra keys: %s Training extra keys: %s", + wandb_path, + wandb_keys - training_keys, + training_keys - wandb_keys, + ) + return False + + mismatched_keys = [ + key + for key in training_setting.keys() + if not _are_equal_config_settings(wandb_setting[key], training_setting[key], wandb_path) + ] + if len(mismatched_keys) > 0: + log.debug( + "Setting of wandb %s and training setting do not match for the following keys: %s", + wandb_path, + mismatched_keys, + ) + log.debug( + "Mismatches in format 'Key: (Wandb, Training)': %s", + {key: (wandb_setting[key], training_setting[key]) for key in mismatched_keys}, + ) + return False + + return True + + if isinstance(wandb_setting, list) and isinstance(training_setting, list): + if len(wandb_setting) != len(training_setting): + log.debug( + "Setting of wandb %s and training setting has lists of different length. Wandb lists: %s Training list: %s", + wandb_path, + wandb_setting, + training_setting, + ) + return False + + return all( + _are_equal_config_settings(wandb_list_entry, training_list_entry, wandb_path) + for wandb_list_entry, training_list_entry in zip(wandb_setting, training_setting) + ) + + if isinstance(wandb_setting, str) and isinstance(training_setting, str): + # Wandb keeps enum values in the form ., whereas the config file has them as . + if wandb_setting.count(".") == 1 and wandb_setting.split(".")[1].lower() == training_setting.lower(): + return True + + if isinstance(training_setting, tuple): + # Wandb seems to turn tuples to lists. This can cause false equality check failures. + if _are_equal_config_settings(wandb_setting, list(training_setting), wandb_path): + return True + + return wandb_setting == training_setting + + +def _are_equal_configs(wandb_config: Dict, train_config: Dict, wandb_path: str) -> bool: + return _are_equal_config_settings(wandb_config, train_config, wandb_path) + + +def _get_matching_wandb_runs(wandb_runs, training_run_dir: str) -> List: + config_path = os.path.join(training_run_dir, CONFIG_YAML) + local_config_path = cached_path(config_path) + # Do not use TrainConfig.load since any changes in TrainConfig defaults could cause + # false mismatches between the local and wandb config + train_config = om.to_object(om.load(local_config_path)) + if not isinstance(train_config, Dict): + raise ValueError("Training config should be a dictionary") + + if "wandb" in train_config: + del train_config["wandb"] + + return [ + wandb_run + for wandb_run in wandb_runs + if _are_equal_configs(wandb_run.config, train_config, _get_wandb_path_from_run(wandb_run)) + ] + + def _get_wandb_path(run_dir: str) -> str: run_dir_storage = _get_storage_adapter_for_path(run_dir) @@ -1011,7 +1095,7 @@ def _get_wandb_path(run_dir: str) -> str: wandb_runs = list( {_get_wandb_path_from_run(wandb_run): wandb_run for wandb_run in wandb_runs}.values() ) - wandb_matching_runs = wandb_runs + wandb_matching_runs = _get_matching_wandb_runs(wandb_runs, run_dir) if len(wandb_matching_runs) == 0: raise RuntimeError(f"Failed to find any wandb runs for {run_dir}. Run might no longer exist") From 4cc662a9f8f493f5dfdf58c37879fbb4472e5088 Mon Sep 17 00:00:00 2001 From: Shane A Date: Thu, 14 Dec 2023 15:01:43 -0800 Subject: [PATCH 08/10] Run ruff --- scripts/storage_cleaner.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 52b282123..1ff8b9d73 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -1092,9 +1092,7 @@ def _get_wandb_path(run_dir: str) -> str: wandb_runs += _get_wandb_runs_from_train_config(config) # Remove duplicate wandb runs based on run path, and wandb runs that do not match our run. - wandb_runs = list( - {_get_wandb_path_from_run(wandb_run): wandb_run for wandb_run in wandb_runs}.values() - ) + wandb_runs = list({_get_wandb_path_from_run(wandb_run): wandb_run for wandb_run in wandb_runs}.values()) wandb_matching_runs = _get_matching_wandb_runs(wandb_runs, run_dir) if len(wandb_matching_runs) == 0: From b4ac61e39591c06111aec97acd774e0afdea1031 Mon Sep 17 00:00:00 2001 From: Shane A Date: Fri, 15 Dec 2023 10:56:56 -0800 Subject: [PATCH 09/10] Use TrainConfig for comparing both wandb and run config --- scripts/storage_cleaner.py | 77 +++++--------------------------------- 1 file changed, 10 insertions(+), 67 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 1ff8b9d73..89fa6b583 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -987,86 +987,29 @@ def _get_wandb_runs_from_train_config(config: TrainConfig) -> List: return api.runs(path=f"{config.wandb.entity}/{config.wandb.project}", filters=run_filters) -def _are_equal_config_settings(wandb_setting, training_setting, wandb_path: str) -> bool: - if isinstance(wandb_setting, dict) and isinstance(training_setting, dict): - wandb_keys = set(wandb_setting.keys()) - training_keys = set(training_setting.keys()) - if wandb_keys != training_keys: - log.debug( - "Setting of wandb %s and training setting do not have matching keys. Wandb extra keys: %s Training extra keys: %s", - wandb_path, - wandb_keys - training_keys, - training_keys - wandb_keys, - ) - return False - - mismatched_keys = [ - key - for key in training_setting.keys() - if not _are_equal_config_settings(wandb_setting[key], training_setting[key], wandb_path) - ] - if len(mismatched_keys) > 0: - log.debug( - "Setting of wandb %s and training setting do not match for the following keys: %s", - wandb_path, - mismatched_keys, - ) - log.debug( - "Mismatches in format 'Key: (Wandb, Training)': %s", - {key: (wandb_setting[key], training_setting[key]) for key in mismatched_keys}, - ) - return False - - return True - - if isinstance(wandb_setting, list) and isinstance(training_setting, list): - if len(wandb_setting) != len(training_setting): - log.debug( - "Setting of wandb %s and training setting has lists of different length. Wandb lists: %s Training list: %s", - wandb_path, - wandb_setting, - training_setting, - ) - return False +def _are_equal_configs(wandb_config: TrainConfig, train_config: TrainConfig) -> bool: + return wandb_config.asdict(exclude=["wandb"]) == train_config.asdict(exclude=["wandb"]) - return all( - _are_equal_config_settings(wandb_list_entry, training_list_entry, wandb_path) - for wandb_list_entry, training_list_entry in zip(wandb_setting, training_setting) - ) - - if isinstance(wandb_setting, str) and isinstance(training_setting, str): - # Wandb keeps enum values in the form ., whereas the config file has them as . - if wandb_setting.count(".") == 1 and wandb_setting.split(".")[1].lower() == training_setting.lower(): - return True - if isinstance(training_setting, tuple): - # Wandb seems to turn tuples to lists. This can cause false equality check failures. - if _are_equal_config_settings(wandb_setting, list(training_setting), wandb_path): - return True - - return wandb_setting == training_setting +def _get_wandb_config(wandb_run) -> TrainConfig: + local_storage = LocalFileSystemAdapter() + temp_file = local_storage.create_temp_file(suffix=".yaml") + om.save(config=wandb_run.config, f=temp_file) + wandb_config = TrainConfig.load(temp_file) -def _are_equal_configs(wandb_config: Dict, train_config: Dict, wandb_path: str) -> bool: - return _are_equal_config_settings(wandb_config, train_config, wandb_path) + return wandb_config def _get_matching_wandb_runs(wandb_runs, training_run_dir: str) -> List: config_path = os.path.join(training_run_dir, CONFIG_YAML) local_config_path = cached_path(config_path) - # Do not use TrainConfig.load since any changes in TrainConfig defaults could cause - # false mismatches between the local and wandb config - train_config = om.to_object(om.load(local_config_path)) - if not isinstance(train_config, Dict): - raise ValueError("Training config should be a dictionary") - - if "wandb" in train_config: - del train_config["wandb"] + train_config = TrainConfig.load(local_config_path) return [ wandb_run for wandb_run in wandb_runs - if _are_equal_configs(wandb_run.config, train_config, _get_wandb_path_from_run(wandb_run)) + if _are_equal_configs(_get_wandb_config(wandb_run), train_config) ] From b89d7022e7e4318e62ee641e4a527a3286572381 Mon Sep 17 00:00:00 2001 From: Shane A Date: Fri, 15 Dec 2023 10:57:21 -0800 Subject: [PATCH 10/10] Run ruff --- scripts/storage_cleaner.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/scripts/storage_cleaner.py b/scripts/storage_cleaner.py index 89fa6b583..62e636f55 100644 --- a/scripts/storage_cleaner.py +++ b/scripts/storage_cleaner.py @@ -1007,9 +1007,7 @@ def _get_matching_wandb_runs(wandb_runs, training_run_dir: str) -> List: train_config = TrainConfig.load(local_config_path) return [ - wandb_run - for wandb_run in wandb_runs - if _are_equal_configs(_get_wandb_config(wandb_run), train_config) + wandb_run for wandb_run in wandb_runs if _are_equal_configs(_get_wandb_config(wandb_run), train_config) ]