Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

The algorithm framework differs from the DreamerV1 paper #226

Closed
LYK-love opened this issue Mar 2, 2024 · 5 comments
Closed

The algorithm framework differs from the DreamerV1 paper #226

LYK-love opened this issue Mar 2, 2024 · 5 comments
Labels
question Further information is requested

Comments

@LYK-love
Copy link

LYK-love commented Mar 2, 2024

Hello, I found followig code in sheeprl/algos/dreamer_v3.py:

# Train the agent
        if update >= learning_starts and updates_before_training <= 0:
            local_data = rb.sample_tensors(
                cfg.algo.per_rank_batch_size,
                sequence_length=cfg.algo.per_rank_sequence_length,
                n_samples=(
                    cfg.algo.per_rank_pretrain_steps if update == learning_starts else cfg.algo.per_rank_gradient_steps
                ),
                dtype=None,
                device=fabric.device,
                from_numpy=cfg.buffer.from_numpy,
            )
            with timer("Time/train_time", SumMetric, sync_on_compute=cfg.metric.sync_on_compute):
                
                # All update steps
                for i in range(next(iter(local_data.values())).shape[0]):
                    if per_rank_gradient_steps % cfg.algo.critic.target_network_update_freq == 0:
                        tau = 1 if per_rank_gradient_steps == 0 else cfg.algo.critic.tau
                        for cp, tcp in zip(critic.module.parameters(), target_critic.parameters()):
                            tcp.data.copy_(tau * cp.data + (1 - tau) * tcp.data)
                    batch = {k: v[i].float() for k, v in local_data.items()}
                    train(
                        fabric,
                        world_model,
                        actor,
                        critic,
                        target_critic,
                        world_optimizer,
                        actor_optimizer,
                        critic_optimizer,
                        batch,
                        aggregator,
                        cfg,
                        is_continuous,
                        actions_dim,
                        moments,
                    )
                    per_rank_gradient_steps += 1
                train_step += world_size
            updates_before_training = cfg.algo.train_every // policy_steps_per_update
            if cfg.algo.actor.expl_decay:
                expl_decay_steps += 1
                actor.expl_amount = polynomial_decay(
                    expl_decay_steps,
                    initial=cfg.algo.actor.expl_amount,
                    final=cfg.algo.actor.expl_min,
                    max_decay_steps=max_step_expl_decay,
                )
            if aggregator and not aggregator.disabled:
                aggregator.update("Params/exploration_amount", actor.expl_amount)

Based on my understanding, the training procedure of DreamerV3 is the same as DreamerV1, as is shown in DreamerV1's paper:

image-20240225122007862

Basically, we need to:

  1. initialize the replay buffer with some data, this is done through the interaction with the environment
  2. Start a while not converged loop, in each iteration we:
    1. Start a for i in range(update_steps) loop, in each iteration we:
      1. Start one time of dynamic learning
      2. Start one time of behavior learning
    2. After the for loop, we trained the agent to be better. We then use this better agent to do interaction with the environment and collect one episode and add it into the replay buffer.
  3. When the while loop ends, the training is over.

So, I think sheeprl's train() function is the same as: one time of dynamic learning + one time of behavior learning. It should be called for update_steps for a for loop, and the for loop should be called multiple times before the agent converges.

However, in the code I provided in the beginning, I didn't see the train() function is called for update_steps for a for loop, and I didn't see the outermost while loop. Meanwhile, I didn't find that, after each for loop, an episode is collected and added to the replay buffer.

I think sheeprl's implementation is a little different from the paper. Can you explain it?

What's more, can you explain what are :
train_step, per_rank_gradient_steps, if update >= learning_starts and updates_before_training <= 0, updates_before_training, num_updates?
I also can't understand the logic of this piece of code

for update in range(start_step, num_updates + 1):
        policy_step += cfg.env.num_envs * world_size

        # Measure environment interaction time: this considers both the model forward
        # to get the action given the observation and the time taken into the environment
        with timer("Time/env_interaction_time", SumMetric, sync_on_compute=False):
            # Sample an action given the observation received by the environment
            if (
                update <= learning_starts
                and cfg.checkpoint.resume_from is None
                and "minedojo" not in cfg.env.wrapper._target_.lower()
            ):
                real_actions = actions = np.array(envs.action_space.sample())
                if not is_continuous:
                    actions = np.concatenate(
                        [
                            F.one_hot(torch.as_tensor(act), act_dim).numpy()
                            for act, act_dim in zip(actions.reshape(len(actions_dim), -1), actions_dim)
                        ],
                        axis=-1,
                    )
            else:
                with torch.no_grad():
                    preprocessed_obs = {}
                    for k, v in obs.items():
                        preprocessed_obs[k] = torch.as_tensor(v[np.newaxis], dtype=torch.float32, device=device)
                        if k in cfg.algo.cnn_keys.encoder:
                            preprocessed_obs[k] = preprocessed_obs[k] / 255.0 - 0.5
                    mask = {k: v for k, v in preprocessed_obs.items() if k.startswith("mask")}
                    if len(mask) == 0:
                        mask = None
                    real_actions = actions = player.get_exploration_action(preprocessed_obs, mask)
                    actions = torch.cat(actions, -1).cpu().numpy()
                    if is_continuous:
                        real_actions = torch.cat(real_actions, dim=-1).cpu().numpy()
                    else:
                        real_actions = (
                            torch.cat([real_act.argmax(dim=-1) for real_act in real_actions], dim=-1).cpu().numpy()
                        )

            step_data["actions"] = actions.reshape((1, cfg.env.num_envs, -1))
            rb.add(step_data, validate_args=cfg.buffer.validate_args)

            next_obs, rewards, dones, truncated, infos = envs.step(real_actions.reshape(envs.action_space.shape))
            dones = np.logical_or(dones, truncated).astype(np.uint8)

        step_data["is_first"] = np.zeros_like(step_data["dones"])
        if "restart_on_exception" in infos:
            for i, agent_roe in enumerate(infos["restart_on_exception"]):
                if agent_roe and not dones[i]:
                    last_inserted_idx = (rb.buffer[i]._pos - 1) % rb.buffer[i].buffer_size
                    rb.buffer[i]["dones"][last_inserted_idx] = np.ones_like(rb.buffer[i]["dones"][last_inserted_idx])
                    rb.buffer[i]["is_first"][last_inserted_idx] = np.zeros_like(
                        rb.buffer[i]["is_first"][last_inserted_idx]
                    )
                    step_data["is_first"][i] = np.ones_like(step_data["is_first"][i])

        if cfg.metric.log_level > 0 and "final_info" in infos:
            for i, agent_ep_info in enumerate(infos["final_info"]):
                if agent_ep_info is not None:
                    ep_rew = agent_ep_info["episode"]["r"]
                    ep_len = agent_ep_info["episode"]["l"]
                    if aggregator and not aggregator.disabled:
                        aggregator.update("Rewards/rew_avg", ep_rew)
                        aggregator.update("Game/ep_len_avg", ep_len)
                    fabric.print(f"Rank-0: policy_step={policy_step}, reward_env_{i}={ep_rew[-1]}")

        # Save the real next observation
        real_next_obs = copy.deepcopy(next_obs)
        if "final_observation" in infos:
            for idx, final_obs in enumerate(infos["final_observation"]):
                if final_obs is not None:
                    for k, v in final_obs.items():
                        real_next_obs[k][idx] = v

        for k in obs_keys:
            step_data[k] = next_obs[k][np.newaxis]

Thanks!

@belerico
Copy link
Member

belerico commented Mar 2, 2024

Hi @LYK-love, as you said the algorithm you posted refers to the Dreamer-V1 one, which by the way is very similar to the one you shared here. The code you refer to is the one of Dreamer-V3, which is quite different from the V1 version, especially from the insights we have gained from looking at the authors code.
As pointed out in #218 we have run experiments and we are matching the Dreamer-V3 paper.
In #223 we are considering adopting the more generalized and accepted replay_ratio instead of the Hafner train_ratio (which is related) to better match the Dreamer-V3 implementation.
For your last question: please refer first to the #223, for which we have removed the per_rank_gradient_steps and updates_before_learning and replace it with the replay_ratio; also, have you tried to look at the how-to where we explain the policy_steps and everything related?

@belerico belerico added the question Further information is requested label Mar 5, 2024
@michele-milesi
Copy link
Member

Hi @LYK-love,
as @belerico said, we are moving from our way of computing the replay_ratio (with train_every and per_rank_gradient_steps parameters) to a more standard way.

In any case, I would like to answer your questions:

  • The update_steps of the pseudo-code you provided is equivalent to our per_rank_gradient_steps. The sample function is called 1 time, but we set the parameter n_samples = cfg.algo.per_rank_gradient_steps. The sample function returns an object with dimension (per_rank_gradient_steps, sequence_length, batch_size, ...) where ... means the dimension of the observations/actions/reward/done.
    After that, a "for cycle" is performed on the first dimension of this object, so you perform cfg.algo.per_rank_gradient_steps updates.
  • The train_step variable counts how many times the function train is called (globally if you are training with more than one GPU) and it is used only for logging.
  • The num_updates is the total number of iterations that a single process has to perform (is the outer loop "while not converged" in the pseudo-code). It is computed as cfg.algo.total_steps // (num_envs * world_size), so it takes into account the number of GPUs and the number of environments.
  • updates_before_training is the variable that specifies how many policy steps to play between one training and the next one: it is the T of the DreamerV1 pseudo-code. Again, it takes into account the number of environments and the number of GPUs you are using for training.
  • if update >= learning_starts and updates_before_training <= 0 checks if we have to train or not. (The learning_starts is the number of steps to perform before starting training)

Perhaps what has led you astray is that in this implementation we do not have an outer loop with two loops inside (one for environment interaction and the other for training), but we do have a loop for environment interaction and inside it, we check whether we need to carry out training. This choice was made to follow the original repository as closely as possible.

So the structure of our code is (let me change the name of the variables, for better understanding):

# counter of policy steps played, when zero, then you have to train the agent
initialize env_interaction_steps_between_trainings
for i in total_steps:
    env.step(action)
    
    env_interaction_steps_between_trainings -= 1
    if train_started and env_interaction_steps_between_trainings <= 0:
        for j in per_rank_gradient_steps:
            train()
        reset env_interaction_steps_between_trainings

Another thing I noticed is that num_updates and updates_before_training are imprecise names, we will fix them for clarity.
Let me know if it is clearer now.
Thanks

@LYK-love
Copy link
Author

Thanks! I'll do more work on sheeprl and try to reproduce the results of the oiginial paper.

@LYK-love
Copy link
Author

@michele-milesi So for each iteration in for i in total_steps:, only one step of env interaction is performed, right? While in the pseudocode, T steps will be performed in one iteration of the outermost while loop. I think T is the episode length.

@michele-milesi
Copy link
Member

Yeah, at each iteration of the outer for-loop, one step of env interaction is performed.
You can obtain the same behaviour of the pseudocode by properly setting the env_interaction_steps_between_trainings variable (our train_every). If set to 10, it means that you perform 10 steps of env interaction between one training and the next one.
Remember that the Dreamer aims to be sample efficient (the fewer steps of env interaction you do between one training session and the next, the better).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants