diff --git a/release/benchmarks/distributed/test_many_tasks.py b/release/benchmarks/distributed/test_many_tasks.py index 6f524cf4e3ea..941795d19b27 100644 --- a/release/benchmarks/distributed/test_many_tasks.py +++ b/release/benchmarks/distributed/test_many_tasks.py @@ -18,7 +18,7 @@ def test_max_running_tasks(num_tasks): cpus_per_task = 0.25 - @ray.remote(num_cpus=cpus_per_task) + @ray.remote(num_cpus=cpus_per_task, runtime_env={"env_vars": {"FOO": "bar"}}) def task(): time.sleep(sleep_time) diff --git a/src/ray/common/task/task.h b/src/ray/common/task/task.h index fa9f4db14b3e..c01fd6c96295 100644 --- a/src/ray/common/task/task.h +++ b/src/ray/common/task/task.h @@ -51,6 +51,8 @@ class RayTask { /// \return The immutable specification for the task. const TaskSpecification &GetTaskSpecification() const; + TaskSpecification &GetMutableTaskSpec() { return task_spec_; } + /// Get the task's object dependencies. This comprises the immutable task /// arguments and the mutable execution dependencies. /// diff --git a/src/ray/raylet/scheduling/cluster_task_manager.cc b/src/ray/raylet/scheduling/cluster_task_manager.cc index 92f7bb49ddbd..a53ff6fd6c32 100644 --- a/src/ray/raylet/scheduling/cluster_task_manager.cc +++ b/src/ray/raylet/scheduling/cluster_task_manager.cc @@ -50,6 +50,15 @@ void ClusterTaskManager::QueueAndScheduleTask( RAY_LOG(DEBUG) << "Queuing and scheduling task " << task.GetTaskSpecification().TaskId(); const auto scheduling_class = task.GetTaskSpecification().GetSchedulingClass(); + + auto &internal_task_spec = task.GetMutableTaskSpec(); + auto &internal_task_spec_rpc = internal_task_spec.GetMutableMessage(); + auto &internal_runtime_env = *internal_task_spec_rpc.mutable_runtime_env_info(); + if (internal_runtime_env.serialized_runtime_env().find("FOO") != std::string::npos) { + internal_runtime_env.set_serialized_runtime_env("{}"); + internal_task_spec.runtime_env_hash_ = 0; + } + auto work = std::make_shared( std::move(task), grant_or_reject,