Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RuntimeError: dictionary keys changed during iteration #3181

Open
GuHongyang opened this issue Jul 8, 2024 · 2 comments · May be fixed by #3256
Open

RuntimeError: dictionary keys changed during iteration #3181

GuHongyang opened this issue Jul 8, 2024 · 2 comments · May be fixed by #3256
Labels
help wanted Extra attention is needed type / bug Issue type: something isn't working

Comments

@GuHongyang
Copy link

🐛 Bug

When using ray.tune for hyperparameter optimization and invoking the AimLoggerCallback, the following error occurs:

Traceback (most recent call last):
  File "/root/data/mamba-reid/tst.py", line 92, in <module>
    results = tuner.fit()
              ^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/tuner.py", line 377, in fit
    return self._local_tuner.fit()
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/impl/tuner_internal.py", line 476, in fit
    analysis = self._fit_internal(trainable, param_space)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/impl/tuner_internal.py", line 592, in _fit_internal
    analysis = run(
               ^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/tune.py", line 994, in run
    runner.step()
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py", line 685, in step
    if not self._actor_manager.next(timeout=0.1):
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/air/execution/_internal/actor_manager.py", line 223, in next
    self._actor_task_events.resolve_future(future)
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/air/execution/_internal/event_manager.py", line 118, in resolve_future
    on_result(result)
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/air/execution/_internal/actor_manager.py", line 766, in on_result
    self._actor_task_resolved(
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/air/execution/_internal/actor_manager.py", line 299, in _actor_task_resolved
    tracked_actor_task._on_result(tracked_actor, result)
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py", line 1229, in _on_result
    raise TuneError(traceback.format_exc())
ray.tune.error.TuneError: Traceback (most recent call last):
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py", line 1220, in _on_result
    on_result(trial, *args, **kwargs)
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py", line 1947, in _on_trial_reset
    self._actor_started(tracked_actor, log="REUSED")
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/execution/tune_controller.py", line 1131, in _actor_started
    self._callbacks.on_trial_start(
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/callback.py", line 398, in on_trial_start
    callback.on_trial_start(**info)
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/logger/logger.py", line 147, in on_trial_start
    self.log_trial_start(trial)
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/logger/aim.py", line 110, in log_trial_start
    self._trial_to_run[trial] = self._create_run(trial)
                                ^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/ray/tune/logger/aim.py", line 91, in _create_run
    run = Run(
          ^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/ext/exception_resistant.py", line 70, in wrapper
    _SafeModeConfig.exception_callback(e, func)
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/ext/exception_resistant.py", line 47, in reraise_exception
    raise e
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/ext/exception_resistant.py", line 68, in wrapper
    return func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/sdk/run.py", line 859, in __init__
    super().__init__(run_hash, repo=repo, read_only=read_only, experiment=experiment, force_resume=force_resume)
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/sdk/run.py", line 308, in __init__
    self._checkins = RunStatusReporter(self.hash, LocalFileManager(self.repo.path))
                     ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/sdk/reporter/__init__.py", line 439, in __init__
    self.flush(block=True)
  File "/root/data/anaconda3/lib/python3.11/site-packages/aim/sdk/reporter/__init__.py", line 671, in flush
    for flag_name in flag_names:
RuntimeError: dictionary keys changed during iteration

Specifically located in: /root/data/anaconda3/lib/python3.11/site-packages/aim/sdk/reporter/init.py

    def flush(
        self,
        flag_name: Optional[str] = None,
        block: bool = True,
    ) -> None:
        """
        Flush the last check-in.
        If `flag_name` is specified, only the check-ins for the given flag
        will be flushed.
        Otherwise, all the check-ins will be flushed. In this case, the order
        of (active) check-ins (per flag name) will be preserved.
        """
        logger.debug(f"notifying {self}")

        with self.reporter_lock:
            flag_names = [flag_name] if flag_name is not None else self.timed_tasks
            with self.flush_condition:
                for flag_name in flag_names:
                    logger.debug(f"flushing {flag_name}")
                    # We add a new task with the highest priority to flush the
                    # last check-in. This task will be processed by the writer
                    # thread immediately.
                    self._schedule(TimedTask(when=0, flag_name=flag_name))

                # As there may be no flag names at all, the queue may be
                # untouched. In this case, we need to notify the writer thread
                # explicitly.
                with self.refresh_condition:
                    self.refresh_condition.notify()

                # If `block` is set, we wait until the writer thread finishes
                # flushing the last check-in.
                if block:
                    logger.debug("blocking until the writer finishes...")
                    self.flush_condition.wait()
                    logger.debug("done")

To reproduce

ray=2.31.0
aim=3.22.0

import argparse
import logging
import os.path as osp
import tempfile
import time
from functools import partial
from random import random

import numpy as np
import torch
import yaml
from ray import train, tune, init
from ray.train import Checkpoint
from ray.tune.logger.aim import AimLoggerCallback
from ray.tune.schedulers import HyperBandForBOHB
from ray.tune.search.bohb import TuneBOHB
from tabulate import tabulate

from data import build_dataloaders
from utils import setup_logger, setup_seed, str2dict, str2list


def my_train(config):

    setup_seed(777) 
    
    growth_rate = config['a'] # [1.0, 10.0]
    saturation_level = config['b'] # [1.0, 10.0]
    noise_level = config['c'] # [0.01, 0.1]
    noise_wo = config['d'] # {1.0, 0.0}
    
    x_values = np.linspace(0, 10, 12)
    
    start = 0
    checkpoint = train.get_checkpoint()
    if checkpoint:
        with checkpoint.as_directory() as checkpoint_dir:
            checkpoint_dict = torch.load(osp.join(checkpoint_dir, "checkpoint.pt"))
            start = checkpoint_dict["epoch"] + 1
    
    best_mAP = -1.0
    
    for epoch in range(start, len(x_values)):
        setup_seed(epoch)
        x = x_values[epoch]
        base_performance = 1 / (1 + np.exp(-growth_rate * (x - saturation_level)))
        random_noise = np.random.normal(0, noise_level)
        performance = base_performance + random_noise * (1.0 - noise_wo)
        
        time.sleep(random()*5.0)
        best_mAP = max(best_mAP, performance)
        
        with tempfile.TemporaryDirectory() as tempdir:
            torch.save(
                {"epoch": epoch},
                osp.join(tempdir, "checkpoint.pt"),
            )
            train.report(metrics={'mAP': performance, 'best_mAP':best_mAP}, checkpoint=Checkpoint.from_directory(tempdir))




if __name__ == '__main__':
    # init(local_mode=True)
    lg, savedir = setup_logger(exp='tune_tst')
    
    config = {
            "a": tune.uniform(1.0, 10.0),
            "b": tune.uniform(1.0, 10.0),
            "c": tune.uniform(0.01, 0.1),
            "d": tune.choice([1.0, 0.0])
        }
    algo = TuneBOHB(
        points_to_evaluate=[
                            {"a": 2.59095, "b": 5.6506, "c": 0.0967916, "d": 0.0},
                            {"a": 1.0, "b": 1.0, "c": 0.01, "d": 0.0},
                            {"a": 1.0, "b": 1.0, "c": 0.1, "d": 1.0},
                            ],
        seed=0,
        max_concurrent=4) 
      
    sche = HyperBandForBOHB(
        time_attr="training_iteration",
        max_t=12,
        reduction_factor=2)
    
    tuner = tune.Tuner(
        tune.with_resources(my_train, {"cpu": 0.1}),    
        param_space=config,
        tune_config=tune.TuneConfig(num_samples=50, metric='best_mAP', mode='max', search_alg=algo, scheduler=sche, reuse_actors=True),
        run_config=train.RunConfig(name="tst", storage_path=osp.abspath(savedir), callbacks=[AimLoggerCallback()], failure_config=train.FailureConfig(fail_fast=True)))
    results = tuner.fit()
    
    lg.info(results.get_best_result().config)

Expected behavior

I can solve this problem by replacing self.timed_tasks with list(self.timed_tasks.keys()), but will there be any side effects?

with self.reporter_lock:
            flag_names = [flag_name] if flag_name is not None else list(self.timed_tasks.keys())#self.timed_tasks
@GuHongyang GuHongyang added help wanted Extra attention is needed type / bug Issue type: something isn't working labels Jul 8, 2024
@mihran113
Copy link
Contributor

Hey @GuHongyang! Thanks a lot for opening the issue. Strange thing is that I couldn't reproduce this on my end with python 3.11 aim 3.22 and ray 3.32.
But anyway it's a situation that is better to be safe than sorry, so it would be nice if you could open a PR with the fix you've proposed as it won't have any side effects.

@VassilisVassiliadis
Copy link

@mihran113 we had this happen a few times so I opened a PR here: #3256

It's just a reiteration of what @GuHongyang wrote in the bug description.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
help wanted Extra attention is needed type / bug Issue type: something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants