diff --git a/examples/split_placement/main_ppo_split.py b/examples/split_placement/main_ppo_split.py index 27a7b35..180078a 100644 --- a/examples/split_placement/main_ppo_split.py +++ b/examples/split_placement/main_ppo_split.py @@ -135,9 +135,9 @@ def main_task(config): from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role role_worker_mapping = { - Role.ActorRollout: ActorRolloutRefWorker, - Role.Critic: CriticWorker, - Role.RefPolicy: ActorRolloutRefWorker + Role.ActorRollout: ray.remote(ActorRolloutRefWorker), + Role.Critic: ray.remote(CriticWorker), + Role.RefPolicy: ray.remote(ActorRolloutRefWorker) } # NOTE: initialze two resource pool @@ -173,7 +173,7 @@ def main_task(config): from verl.workers.megatron_workers import RewardModelWorker else: raise NotImplementedError - role_worker_mapping[Role.RewardModel] = RewardModelWorker + role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker) mapping[Role.RewardModel] = critic_pool_id reward_fn = RewardManager(tokenizer=tokenizer, num_examine=0) diff --git a/verl/trainer/main_generation.py b/verl/trainer/main_generation.py index b0bc7d7..8c3bd92 100644 --- a/verl/trainer/main_generation.py +++ b/verl/trainer/main_generation.py @@ -14,7 +14,7 @@ """ Generate responses given a dataset of prompts """ - +import ray import numpy as np import hydra import os @@ -59,7 +59,7 @@ def main(config): if tokenizer.pad_token is None: tokenizer.pad_token = tokenizer.eos_token - ray_cls_with_init = RayClassWithInitArgs(cls=ActorRolloutRefWorker, config=config, role='rollout') + ray_cls_with_init = RayClassWithInitArgs(cls=ray.remote(ActorRolloutRefWorker), config=config, role='rollout') resource_pool = RayResourcePool(process_on_nodes=[config.trainer.n_gpus_per_node] * config.trainer.nnodes) wg = RayWorkerGroup(resource_pool=resource_pool, ray_cls_with_init=ray_cls_with_init) wg.init_model() diff --git a/verl/trainer/main_ppo.py b/verl/trainer/main_ppo.py index 325e9be..eb3fb40 100644 --- a/verl/trainer/main_ppo.py +++ b/verl/trainer/main_ppo.py @@ -136,9 +136,9 @@ def main_task(config): from verl.trainer.ppo.ray_trainer import ResourcePoolManager, Role role_worker_mapping = { - Role.ActorRollout: ActorRolloutRefWorker, - Role.Critic: CriticWorker, - Role.RefPolicy: ActorRolloutRefWorker + Role.ActorRollout: ray.remote(ActorRolloutRefWorker), + Role.Critic: ray.remote(CriticWorker), + Role.RefPolicy: ray.remote(ActorRolloutRefWorker) } global_pool_id = 'global_pool' @@ -164,7 +164,7 @@ def main_task(config): from verl.workers.megatron_workers import RewardModelWorker else: raise NotImplementedError - role_worker_mapping[Role.RewardModel] = RewardModelWorker + role_worker_mapping[Role.RewardModel] = ray.remote(RewardModelWorker) mapping[Role.RewardModel] = global_pool_id reward_fn = RewardManager(tokenizer=tokenizer, num_examine=0) diff --git a/verl/workers/fsdp_workers.py b/verl/workers/fsdp_workers.py index 45c4197..d1479f2 100644 --- a/verl/workers/fsdp_workers.py +++ b/verl/workers/fsdp_workers.py @@ -40,7 +40,6 @@ logger.setLevel(os.getenv('VERL_PPO_LOGGING_LEVEL', 'WARN')) -@ray.remote class ActorRolloutRefWorker(Worker): """ This worker can be instantiated as a standalone actor or a standalone rollout or a standalone reference policy @@ -434,7 +433,6 @@ def save_checkpoint(self, local_path, hdfs_path=None): offload_fsdp_param_and_grad(module=self.actor_module_fsdp, offload_grad=self._is_offload_grad) -@ray.remote class CriticWorker(Worker): def __init__(self, config): @@ -642,7 +640,6 @@ def save_checkpoint(self, local_path, hdfs_path=None): offload_fsdp_param_and_grad(module=self.critic_module, offload_grad=self._is_offload_grad) -@ray.remote class RewardModelWorker(Worker): """ Note that we only implement the reward model that is subclass of AutoModelForSequenceClassification. diff --git a/verl/workers/megatron_workers.py b/verl/workers/megatron_workers.py index f76c4d4..f3a56f4 100644 --- a/verl/workers/megatron_workers.py +++ b/verl/workers/megatron_workers.py @@ -60,7 +60,6 @@ def set_random_seed(seed): # os.environ['CUBLAS_WORKSPACE_CONFIG'] = ':4096:8' -@ray.remote class ActorRolloutRefWorker(MegatronWorker): """ This worker can be instantiated as a standalone actor or a standalone rollout or a standalone reference policy @@ -406,7 +405,6 @@ def save_checkpoint(self, checkpoint_path): pass -@ray.remote class CriticWorker(MegatronWorker): def __init__(self, config): @@ -575,7 +573,6 @@ def save_checkpoint(self, checkpoint_path): pass -@ray.remote class RewardModelWorker(MegatronWorker): """ Note that we only implement the reward model that is subclass of AutoModelForSequenceClassification.