diff --git a/.gitignore b/.gitignore index 26dbcda736cd..52f03c37522c 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,5 @@ # The build output should clearly not be checked in +*test-output.xml /bazel-* /python/ray/core /python/ray/pickle5_files/ @@ -11,7 +12,7 @@ /thirdparty/pkg/ /build/java .jar - +/dashboard/client/build # Files generated by flatc should be ignored /src/ray/gcs/format/*_generated.h /src/ray/object_manager/format/*_generated.h diff --git a/ci/travis/test-wheels.sh b/ci/travis/test-wheels.sh index 7fd671332460..e763bb076d8e 100755 --- a/ci/travis/test-wheels.sh +++ b/ci/travis/test-wheels.sh @@ -22,7 +22,6 @@ if [ -z "${BUILD_DIR}" ]; then fi TEST_DIR="${BUILD_DIR}/python/ray/tests" TEST_SCRIPTS=("$TEST_DIR/test_microbenchmarks.py" "$TEST_DIR/test_basic.py") -UI_TEST_SCRIPT="${BUILD_DIR}/python/ray/tests/test_webui.py" function retry { local n=1 @@ -77,9 +76,6 @@ if [[ "$platform" == "linux" ]]; then for SCRIPT in "${TEST_SCRIPTS[@]}"; do retry "$PYTHON_EXE" "$SCRIPT" done - - # Run the UI test to make sure that the packaged UI works. - retry "$PYTHON_EXE" "$UI_TEST_SCRIPT" done # Check that the other wheels are present. @@ -118,12 +114,6 @@ elif [[ "$platform" == "macosx" ]]; then for SCRIPT in "${TEST_SCRIPTS[@]}"; do retry "$PYTHON_EXE" "$SCRIPT" done - - if (( $(echo "$PY_MM >= 3.0" | bc) )); then - # Run the UI test to make sure that the packaged UI works. - retry "$PYTHON_EXE" "$UI_TEST_SCRIPT" - fi - done elif [ "${platform}" = windows ]; then echo "WARNING: Wheel testing not yet implemented for Windows." diff --git a/dashboard/agent.py b/dashboard/agent.py index 73a1bd94f89d..110cb9a1e22f 100644 --- a/dashboard/agent.py +++ b/dashboard/agent.py @@ -38,6 +38,7 @@ class DashboardAgent(object): def __init__(self, redis_address, + dashboard_agent_port, redis_password=None, temp_dir=None, log_dir=None, @@ -51,6 +52,7 @@ def __init__(self, self.redis_password = redis_password self.temp_dir = temp_dir self.log_dir = log_dir + self.dashboard_agent_port = dashboard_agent_port self.metrics_export_port = metrics_export_port self.node_manager_port = node_manager_port self.object_store_name = object_store_name @@ -59,7 +61,8 @@ def __init__(self, assert self.node_id, "Empty node id (RAY_NODE_ID)." self.ip = ray._private.services.get_node_ip_address() self.server = aiogrpc.server(options=(("grpc.so_reuseport", 0), )) - self.grpc_port = self.server.add_insecure_port("[::]:0") + self.grpc_port = self.server.add_insecure_port( + f"[::]:{self.dashboard_agent_port}") logger.info("Dashboard agent grpc address: %s:%s", self.ip, self.grpc_port) self.aioredis_client = None @@ -186,6 +189,11 @@ async def _check_parent(): required=True, type=int, help="The port to expose metrics through Prometheus.") + parser.add_argument( + "--dashboard-agent-port", + required=True, + type=int, + help="The port on which the dashboard agent will receive GRPCs.") parser.add_argument( "--node-manager-port", required=True, @@ -288,6 +296,7 @@ async def _check_parent(): agent = DashboardAgent( args.redis_address, + args.dashboard_agent_port, redis_password=args.redis_password, temp_dir=temp_dir, log_dir=log_dir, diff --git a/dashboard/dashboard.py b/dashboard/dashboard.py index f8b4b3c5ffd8..1e35cb66a302 100644 --- a/dashboard/dashboard.py +++ b/dashboard/dashboard.py @@ -3,7 +3,6 @@ except ImportError: print("The dashboard requires aiohttp to run.") import sys - sys.exit(1) import argparse diff --git a/dashboard/datacenter.py b/dashboard/datacenter.py index 23a239f29faa..109bdc13e41e 100644 --- a/dashboard/datacenter.py +++ b/dashboard/datacenter.py @@ -111,15 +111,13 @@ async def get_node_info(cls, node_id): node_physical_stats = DataSource.node_physical_stats.get(node_id, {}) node_stats = DataSource.node_stats.get(node_id, {}) node = DataSource.nodes.get(node_id, {}) - + node_ip = DataSource.node_id_to_ip.get(node_id) # Merge node log count information into the payload - log_info = DataSource.ip_and_pid_to_logs.get(node_physical_stats["ip"], - {}) + log_info = DataSource.ip_and_pid_to_logs.get(node_ip, {}) node_log_count = 0 for entries in log_info.values(): node_log_count += len(entries) - error_info = DataSource.ip_and_pid_to_errors.get( - node_physical_stats["ip"], {}) + error_info = DataSource.ip_and_pid_to_errors.get(node_ip, {}) node_err_count = 0 for entries in error_info.values(): node_err_count += len(entries) diff --git a/dashboard/modules/logical_view/test_logical_view_head.py b/dashboard/modules/logical_view/test_logical_view_head.py index f4118da51e9a..f9ffebfdbe1a 100644 --- a/dashboard/modules/logical_view/test_logical_view_head.py +++ b/dashboard/modules/logical_view/test_logical_view_head.py @@ -33,9 +33,8 @@ class InfeasibleActor: foo_actors = [Foo.remote(4), Foo.remote(5)] infeasible_actor = InfeasibleActor.remote() # noqa results = [actor.do_task.remote() for actor in foo_actors] # noqa - assert (wait_until_server_available(ray_start_with_dashboard["webui_url"]) - is True) webui_url = ray_start_with_dashboard["webui_url"] + assert wait_until_server_available(webui_url) webui_url = format_web_url(webui_url) timeout_seconds = 5 @@ -75,5 +74,66 @@ class InfeasibleActor: raise Exception(f"Timed out while testing, {ex_stack}") +def test_kill_actor(ray_start_with_dashboard): + @ray.remote + class Actor: + def __init__(self): + pass + + def f(self): + ray.show_in_dashboard("test") + return os.getpid() + + a = Actor.remote() + worker_pid = ray.get(a.f.remote()) # noqa + + webui_url = ray_start_with_dashboard["webui_url"] + assert wait_until_server_available(webui_url) + webui_url = format_web_url(webui_url) + + def actor_killed(pid): + """Check For the existence of a unix pid.""" + try: + os.kill(pid, 0) + except OSError: + return True + else: + return False + + def get_actor(): + resp = requests.get(f"{webui_url}/logical/actor_groups") + resp.raise_for_status() + actor_groups_resp = resp.json() + assert actor_groups_resp["result"] is True, actor_groups_resp["msg"] + actor_groups = actor_groups_resp["data"]["actorGroups"] + actor = actor_groups["Actor"]["entries"][0] + return actor + + def kill_actor_using_dashboard(actor): + resp = requests.get( + webui_url + "/logical/kill_actor", + params={ + "actorId": actor["actorId"], + "ipAddress": actor["ipAddress"], + "port": actor["port"] + }) + resp.raise_for_status() + resp_json = resp.json() + assert resp_json["result"] is True, "msg" in resp_json + + start = time.time() + last_exc = None + while time.time() - start <= 10: + try: + actor = get_actor() + kill_actor_using_dashboard(actor) + last_exc = None + break + except (KeyError, AssertionError) as e: + last_exc = e + time.sleep(.1) + assert last_exc is None + + if __name__ == "__main__": sys.exit(pytest.main(["-v", __file__])) diff --git a/dashboard/modules/reporter/reporter_agent.py b/dashboard/modules/reporter/reporter_agent.py index d1e53b6441a8..01d17a8011cc 100644 --- a/dashboard/modules/reporter/reporter_agent.py +++ b/dashboard/modules/reporter/reporter_agent.py @@ -94,23 +94,15 @@ async def GetProfilingStats(self, request, context): return reporter_pb2.GetProfilingStatsReply( profiling_stats=profiling_stats, std_out=stdout, std_err=stderr) - async def ReportMetrics(self, request, context): - # NOTE: Exceptions are not propagated properly - # when we don't catch them here. + async def ReportOCMetrics(self, request, context): + # This function receives a GRPC containing OpenCensus (OC) metrics + # from a Ray process, then exposes those metrics to Prometheus. try: - metrcs_description_required = ( - self._metrics_agent.record_metrics_points( - request.metrics_points)) - except Exception as e: - logger.error(e) + self._metrics_agent.record_metric_points_from_protobuf( + request.metrics) + except Exception: logger.error(traceback.format_exc()) - - # If metrics description is missing, we should notify cpp processes - # that we need them. Cpp processes will then report them to here. - # We need it when (1) a new metric is reported (application metric) - # (2) a reporter goes down and restarted (currently not implemented). - return reporter_pb2.ReportMetricsReply( - metrcs_description_required=metrcs_description_required) + return reporter_pb2.ReportOCMetricsReply() @staticmethod def _get_cpu_percent(): @@ -125,8 +117,7 @@ def _get_gpu_usage(): try: gpus = gpustat.new_query().gpus except Exception as e: - logger.debug( - "gpustat failed to retrieve GPU information: {}".format(e)) + logger.debug(f"gpustat failed to retrieve GPU information: {e}") for gpu in gpus: # Note the keys in this dict have periods which throws # off javascript so we change .s to _s @@ -233,12 +224,8 @@ def _get_all_stats(self): "cmdline": self._get_raylet_cmdline(), } - async def _perform_iteration(self): + async def _perform_iteration(self, aioredis_client): """Get any changes to the log files and push updates to Redis.""" - aioredis_client = await aioredis.create_redis_pool( - address=self._dashboard_agent.redis_address, - password=self._dashboard_agent.redis_password) - while True: try: stats = self._get_all_stats() @@ -249,5 +236,8 @@ async def _perform_iteration(self): reporter_consts.REPORTER_UPDATE_INTERVAL_MS / 1000) async def run(self, server): + aioredis_client = await aioredis.create_redis_pool( + address=self._dashboard_agent.redis_address, + password=self._dashboard_agent.redis_password) reporter_pb2_grpc.add_ReporterServiceServicer_to_server(self, server) - await self._perform_iteration() + await self._perform_iteration(aioredis_client) diff --git a/dashboard/modules/tune/tune_head.py b/dashboard/modules/tune/tune_head.py index 3f10e5df6dc3..5d9736b22b5b 100644 --- a/dashboard/modules/tune/tune_head.py +++ b/dashboard/modules/tune/tune_head.py @@ -130,7 +130,7 @@ async def collect(self): # search through all the sub_directories in log directory analysis = Analysis(str(self._logdir)) - df = analysis.dataframe(metric="episode_reward_mean", mode="max") + df = analysis.dataframe(metric=None, mode=None) if len(df) == 0 or "trial_id" not in df.columns: return diff --git a/python/build-wheel-macos.sh b/python/build-wheel-macos.sh index 04fefa1f86f6..a60b1d5d6a39 100755 --- a/python/build-wheel-macos.sh +++ b/python/build-wheel-macos.sh @@ -39,7 +39,8 @@ source "$HOME"/.nvm/nvm.sh nvm use node # Build the dashboard so its static assets can be included in the wheel. -pushd python/ray/dashboard/client +# TODO(mfitton): switch this back when deleting old dashboard code. +pushd python/ray/new_dashboard/client npm ci npm run build popd diff --git a/python/build-wheel-manylinux1.sh b/python/build-wheel-manylinux1.sh index 4855b5830bc6..5972d0101615 100755 --- a/python/build-wheel-manylinux1.sh +++ b/python/build-wheel-manylinux1.sh @@ -35,7 +35,8 @@ nvm install node nvm use node # Build the dashboard so its static assets can be included in the wheel. -pushd python/ray/dashboard/client +# TODO(mfitton): switch this back when deleting old dashboard code. +pushd python/ray/new_dashboard/client npm ci npm run build popd diff --git a/python/ray/_private/services.py b/python/ray/_private/services.py index 10b9803de3e4..17651be6f4f0 100644 --- a/python/ray/_private/services.py +++ b/python/ray/_private/services.py @@ -495,10 +495,18 @@ def preexec_fn(): process.kill() raise + def _get_stream_name(stream): + if stream is not None: + try: + return stream.name + except AttributeError: + return str(stream) + return None + return ProcessInfo( process=process, - stdout_file=stdout_file.name if stdout_file is not None else None, - stderr_file=stderr_file.name if stderr_file is not None else None, + stdout_file=_get_stream_name(stdout_file), + stderr_file=_get_stream_name(stderr_file), use_valgrind=use_valgrind, use_gdb=use_gdb, use_valgrind_profiler=use_valgrind_profiler, @@ -1037,12 +1045,7 @@ def start_dashboard(require_dashboard, raise ValueError( f"The given dashboard port {port} is already in use") - if "RAY_USE_NEW_DASHBOARD" in os.environ: - dashboard_dir = "new_dashboard" - else: - dashboard_dir = "dashboard" - logdir = None - + dashboard_dir = "new_dashboard" dashboard_filepath = os.path.join(RAY_PATH, dashboard_dir, "dashboard.py") command = [ sys.executable, @@ -1058,12 +1061,12 @@ def start_dashboard(require_dashboard, if redis_password: command += ["--redis-password", redis_password] - webui_dependencies_present = True + dashboard_dependencies_present = True try: import aiohttp # noqa: F401 import grpc # noqa: F401 except ImportError: - webui_dependencies_present = False + dashboard_dependencies_present = False warning_message = ( "Failed to start the dashboard. The dashboard requires Python 3 " "as well as 'pip install aiohttp grpcio'.") @@ -1071,8 +1074,7 @@ def start_dashboard(require_dashboard, raise ImportError(warning_message) else: logger.warning(warning_message) - - if webui_dependencies_present: + if dashboard_dependencies_present: process_info = start_ray_process( command, ray_constants.PROCESS_TYPE_DASHBOARD, @@ -1319,12 +1321,13 @@ def start_raylet(redis_address, sys.executable, "-u", os.path.join(RAY_PATH, "new_dashboard/agent.py"), - "--redis-address={}".format(redis_address), - "--metrics-export-port={}".format(metrics_export_port), - "--node-manager-port={}".format(node_manager_port), - "--object-store-name={}".format(plasma_store_name), - "--raylet-name={}".format(raylet_name), - "--temp-dir={}".format(temp_dir), + f"--redis-address={redis_address}", + f"--metrics-export-port={metrics_export_port}", + f"--dashboard-agent-port={metrics_agent_port}", + f"--node-manager-port={node_manager_port}", + f"--object-store-name={plasma_store_name}", + f"--raylet-name={raylet_name}", + f"--temp-dir={temp_dir}", ] if redis_password is not None and len(redis_password) != 0: @@ -1357,9 +1360,8 @@ def start_raylet(redis_address, if start_initial_python_workers_for_first_job: command.append("--num_initial_python_workers_for_first_job={}".format( resource_spec.num_cpus)) - if "RAY_USE_NEW_DASHBOARD" in os.environ: - command.append("--agent_command={}".format( - subprocess.list2cmdline(agent_command))) + command.append("--agent_command={}".format( + subprocess.list2cmdline(agent_command))) if config.get("plasma_store_as_thread"): # command related to the plasma store command += [ diff --git a/python/ray/node.py b/python/ray/node.py index 51dc143bd62d..ff3dd4d5b6b1 100644 --- a/python/ray/node.py +++ b/python/ray/node.py @@ -626,19 +626,14 @@ def start_dashboard(self, require_dashboard): if we fail to start the dashboard. Otherwise it will print a warning if we fail to start the dashboard. """ - if "RAY_USE_NEW_DASHBOARD" in os.environ: - stdout_file, stderr_file = None, None - else: - stdout_file, stderr_file = self.get_log_file_handles( - "dashboard", unique=True) self._webui_url, process_info = ray._private.services.start_dashboard( require_dashboard, self._ray_params.dashboard_host, self.redis_address, self._temp_dir, self._logs_dir, - stdout_file=stdout_file, - stderr_file=stderr_file, + stdout_file=subprocess.DEVNULL, # Avoid hang(fd inherit) + stderr_file=subprocess.DEVNULL, # Avoid hang(fd inherit) redis_password=self._ray_params.redis_password, fate_share=self.kernel_fate_share, port=self._ray_params.dashboard_port) @@ -828,9 +823,6 @@ def start_ray_processes(self): ) self.start_plasma_store(plasma_directory, object_store_memory) self.start_raylet(plasma_directory, object_store_memory) - if "RAY_USE_NEW_DASHBOARD" not in os.environ: - self.start_reporter() - if self._ray_params.include_log_monitor: self.start_log_monitor() diff --git a/python/ray/tests/BUILD b/python/ray/tests/BUILD index 9fc69cfd2f16..2739caebe0c9 100644 --- a/python/ray/tests/BUILD +++ b/python/ray/tests/BUILD @@ -93,7 +93,6 @@ py_test_module_list( "test_queue.py", "test_ray_init.py", "test_tempfile.py", - "test_webui.py", ], size = "small", extra_srcs = SRCS, diff --git a/python/ray/tests/test_metrics.py b/python/ray/tests/test_metrics.py index e17c77ed0acd..5b97adc93ce7 100644 --- a/python/ray/tests/test_metrics.py +++ b/python/ray/tests/test_metrics.py @@ -1,27 +1,19 @@ import os -import json import grpc -import pytest import requests import time -import numpy as np import ray from ray.core.generated import node_manager_pb2 from ray.core.generated import node_manager_pb2_grpc -from ray.core.generated import reporter_pb2 -from ray.core.generated import reporter_pb2_grpc -from ray.dashboard.memory import (ReferenceType, decode_object_ref_if_needed, - MemoryTableEntry, MemoryTable, SortingType) from ray.test_utils import (RayTestTimeoutException, - wait_until_succeeded_without_exception, - wait_until_server_available, wait_for_condition) + wait_until_succeeded_without_exception) import psutil # We must import psutil after ray because we bundle it with ray. def test_worker_stats(shutdown_only): - addresses = ray.init(num_cpus=1, include_dashboard=True) + ray.init(num_cpus=1, include_dashboard=True) raylet = ray.nodes()[0] num_cpus = raylet["Resources"]["CPU"] raylet_address = "{}:{}".format(raylet["NodeManagerAddress"], @@ -104,8 +96,6 @@ def f(self): # Check that the rest of the processes are workers, 1 for each CPU. assert len(reply.workers_stats) == num_cpus + 1 - views = [view.view_name for view in reply.view_data] - assert "local_available_resource" in views # Check that all processes are Python. pids = [worker.pid for worker in reply.workers_stats] processes = [ @@ -119,248 +109,6 @@ def f(self): or "runner" in process or "ray" in process) break - # Test kill_actor. - def actor_killed(PID): - """Check For the existence of a unix pid.""" - try: - os.kill(PID, 0) - except OSError: - return True - else: - return False - - assert (wait_until_server_available(addresses["webui_url"]) is True) - - webui_url = addresses["webui_url"] - webui_url = webui_url.replace("127.0.0.1", "http://127.0.0.1") - for worker in reply.workers_stats: - if worker.is_driver: - continue - requests.get( - webui_url + "/api/kill_actor", - params={ - "actor_id": ray.utils.binary_to_hex( - worker.core_worker_stats.actor_id), - "ip_address": worker.core_worker_stats.ip_address, - "port": worker.core_worker_stats.port - }) - timeout_seconds = 20 - start_time = time.time() - while True: - if time.time() - start_time > timeout_seconds: - raise RayTestTimeoutException("Timed out while killing actors") - if all( - actor_killed(worker.pid) for worker in reply.workers_stats - if not worker.is_driver): - break - - -def test_raylet_info_endpoint(shutdown_only): - addresses = ray.init(include_dashboard=True, num_cpus=6) - - @ray.remote - def f(): - return "test" - - @ray.remote(num_cpus=1) - class ActorA: - def __init__(self): - pass - - @ray.remote(resources={"CustomResource": 1}) - class ActorB: - def __init__(self): - pass - - @ray.remote(num_cpus=2) - class ActorC: - def __init__(self): - self.children = [ActorA.remote(), ActorB.remote()] - - def local_store(self): - self.local_storage = [f.remote() for _ in range(10)] - - def remote_store(self): - self.remote_storage = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) - - def getpid(self): - return os.getpid() - - c = ActorC.remote() - actor_pid = ray.get(c.getpid.remote()) - c.local_store.remote() - c.remote_store.remote() - - assert (wait_until_server_available(addresses["webui_url"]) is True) - - start_time = time.time() - while True: - time.sleep(1) - try: - webui_url = addresses["webui_url"] - webui_url = webui_url.replace("127.0.0.1", "http://127.0.0.1") - response = requests.get(webui_url + "/api/raylet_info") - response.raise_for_status() - try: - raylet_info = response.json() - except Exception as ex: - print("failed response: {}".format(response.text)) - raise ex - actor_groups = raylet_info["result"]["actorGroups"] - try: - assert len(actor_groups.keys()) == 3 - c_actor_info = actor_groups["ActorC"]["entries"][0] - assert c_actor_info["numObjectRefsInScope"] == 13 - assert c_actor_info["numLocalObjects"] == 10 - break - except AssertionError: - if time.time() > start_time + 30: - raise Exception("Timed out while waiting for actor info \ - or object store info update.") - except requests.exceptions.ConnectionError: - if time.time() > start_time + 30: - raise Exception( - "Timed out while waiting for dashboard to start.") - - def cpu_resources(actor_info): - cpu_resources = 0 - for slot in actor_info["usedResources"]["CPU"]["resourceSlots"]: - cpu_resources += slot["allocation"] - return cpu_resources - - assert cpu_resources(c_actor_info) == 2 - assert c_actor_info["numExecutedTasks"] == 4 - - profiling_id = requests.get( - webui_url + "/api/launch_profiling", - params={ - "node_id": ray.nodes()[0]["NodeID"], - "pid": actor_pid, - "duration": 5 - }).json()["result"] - start_time = time.time() - while True: - # Sometimes some startup time is required - if time.time() - start_time > 30: - raise RayTestTimeoutException( - "Timed out while collecting profiling stats.") - profiling_info = requests.get( - webui_url + "/api/check_profiling_status", - params={ - "profiling_id": profiling_id, - }).json() - status = profiling_info["result"]["status"] - assert status in ("finished", "pending", "error") - if status in ("finished", "error"): - break - time.sleep(1) - - -def test_raylet_infeasible_tasks(shutdown_only): - """ - This test creates an actor that requires 5 GPUs - but a ray cluster only has 3 GPUs. As a result, - the new actor should be an infeasible actor. - """ - addresses = ray.init(num_gpus=3) - - @ray.remote(num_gpus=5) - class ActorRequiringGPU: - def __init__(self): - pass - - ActorRequiringGPU.remote() - - def test_infeasible_actor(ray_addresses): - assert (wait_until_server_available(addresses["webui_url"]) is True) - webui_url = ray_addresses["webui_url"].replace("127.0.0.1", - "http://127.0.0.1") - raylet_info = requests.get(webui_url + "/api/raylet_info").json() - actor_info = raylet_info["result"]["actorGroups"] - assert len(actor_info) == 1 - - _, infeasible_actor_info = actor_info.popitem() - assert infeasible_actor_info["entries"][0]["state"] == -2 - - assert (wait_until_succeeded_without_exception( - test_infeasible_actor, - (AssertionError, requests.exceptions.ConnectionError), - addresses, - timeout_ms=30000, - retry_interval_ms=1000) is True) - - -def test_raylet_pending_tasks(shutdown_only): - # Make sure to specify num_cpus. Otherwise, the test can be broken - # when the number of cores is less than the number of spawned actors. - addresses = ray.init(num_gpus=3, num_cpus=4) - - @ray.remote(num_gpus=1) - class ActorRequiringGPU: - def __init__(self): - pass - - @ray.remote - class ParentActor: - def __init__(self): - self.a = [ActorRequiringGPU.remote() for i in range(4)] - - # If we do not get ParentActor actor handler, reference counter will - # terminate ParentActor. - parent_actor = ParentActor.remote() - assert parent_actor is not None - - def test_pending_actor(ray_addresses): - assert (wait_until_server_available(addresses["webui_url"]) is True) - webui_url = ray_addresses["webui_url"].replace("127.0.0.1", - "http://127.0.0.1") - raylet_info = requests.get(webui_url + "/api/raylet_info").json() - actor_info = raylet_info["result"]["actors"] - assert len(actor_info) == 1 - _, infeasible_actor_info = actor_info.popitem() - wait_until_succeeded_without_exception( - test_pending_actor, - (AssertionError, requests.exceptions.ConnectionError), - addresses, - timeout_ms=30000, - retry_interval_ms=1000) - - -@pytest.mark.skipif( - os.environ.get("TRAVIS") is None, - reason="This test requires password-less sudo due to py-spy requirement.") -def test_profiling_info_endpoint(shutdown_only): - ray.init(num_cpus=1) - - redis_client = ray.worker.global_worker.redis_client - - node_ip = ray.nodes()[0]["NodeManagerAddress"] - - while True: - reporter_port = redis_client.get("REPORTER_PORT:{}".format(node_ip)) - if reporter_port: - break - - reporter_channel = grpc.insecure_channel("{}:{}".format( - node_ip, int(reporter_port))) - reporter_stub = reporter_pb2_grpc.ReporterServiceStub(reporter_channel) - - @ray.remote(num_cpus=1) - class ActorA: - def __init__(self): - pass - - def getpid(self): - return os.getpid() - - a = ActorA.remote() - actor_pid = ray.get(a.getpid.remote()) - - reply = reporter_stub.GetProfilingStats( - reporter_pb2.GetProfilingStatsRequest(pid=actor_pid, duration=10)) - profiling_stats = json.loads(reply.profiling_stats) - assert profiling_stats is not None - def test_multi_node_metrics_export_port_discovery(ray_start_cluster): NUM_NODES = 3 @@ -390,438 +138,7 @@ def test_prometheus_endpoint(): test_prometheus_endpoint, (requests.exceptions.ConnectionError, )) -# This variable is used inside test_memory_dashboard. -# It is defined as a global variable to be used across all nested test -# functions. We use it because memory table is updated every one second, -# and we need to have a way to verify if the test is running with a fresh -# new memory table. -prev_memory_table = MemoryTable([]).__dict__()["group"] - - -def test_memory_dashboard(shutdown_only): - """Test Memory table. - - These tests verify examples in this document. - https://docs.ray.io/en/master/memory-management.html#debugging-using-ray-memory - """ - addresses = ray.init(num_cpus=2) - webui_url = addresses["webui_url"].replace("127.0.0.1", "http://127.0.0.1") - assert (wait_until_server_available(addresses["webui_url"]) is True) - - def get_memory_table(): - memory_table = requests.get(webui_url + "/api/memory_table").json() - return memory_table["result"] - - def memory_table_ready(): - """Wait until the new fresh memory table is ready.""" - global prev_memory_table - memory_table = get_memory_table() - is_ready = memory_table["group"] != prev_memory_table - prev_memory_table = memory_table["group"] - return is_ready - - def stop_memory_table(): - requests.get(webui_url + "/api/stop_memory_table").json() - - def test_local_reference(): - @ray.remote - def f(arg): - return arg - - # a and b are local references. - a = ray.put(None) # Noqa F841 - b = f.remote(None) # Noqa F841 - - wait_for_condition(memory_table_ready) - memory_table = get_memory_table() - summary = memory_table["summary"] - group = memory_table["group"] - assert summary["total_captured_in_objects"] == 0 - assert summary["total_pinned_in_memory"] == 0 - assert summary["total_used_by_pending_task"] == 0 - assert summary["total_local_ref_count"] == 2 - for table in group.values(): - for entry in table["entries"]: - assert ( - entry["reference_type"] == ReferenceType.LOCAL_REFERENCE) - stop_memory_table() - return True - - def test_object_pinned_in_memory(): - - a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) - b = ray.get(a) # Noqa F841 - del a - - wait_for_condition(memory_table_ready) - memory_table = get_memory_table() - summary = memory_table["summary"] - group = memory_table["group"] - assert summary["total_captured_in_objects"] == 0 - assert summary["total_pinned_in_memory"] == 1 - assert summary["total_used_by_pending_task"] == 0 - assert summary["total_local_ref_count"] == 0 - for table in group.values(): - for entry in table["entries"]: - assert ( - entry["reference_type"] == ReferenceType.PINNED_IN_MEMORY) - stop_memory_table() - return True - - def test_pending_task_references(): - @ray.remote - def f(arg): - time.sleep(1) - - a = ray.put(np.zeros(200 * 1024, dtype=np.uint8)) - b = f.remote(a) - - wait_for_condition(memory_table_ready) - memory_table = get_memory_table() - summary = memory_table["summary"] - assert summary["total_captured_in_objects"] == 0 - assert summary["total_pinned_in_memory"] == 1 - assert summary["total_used_by_pending_task"] == 1 - assert summary["total_local_ref_count"] == 1 - # Make sure the function f is done before going to the next test. - # Otherwise, the memory table will be corrupted because the - # task f won't be done when the next test is running. - ray.get(b) - stop_memory_table() - return True - - def test_serialized_object_ref_reference(): - @ray.remote - def f(arg): - time.sleep(1) - - a = ray.put(None) - b = f.remote([a]) # Noqa F841 - - wait_for_condition(memory_table_ready) - memory_table = get_memory_table() - summary = memory_table["summary"] - assert summary["total_captured_in_objects"] == 0 - assert summary["total_pinned_in_memory"] == 0 - assert summary["total_used_by_pending_task"] == 1 - assert summary["total_local_ref_count"] == 2 - # Make sure the function f is done before going to the next test. - # Otherwise, the memory table will be corrupted because the - # task f won't be done when the next test is running. - ray.get(b) - stop_memory_table() - return True - - def test_captured_object_ref_reference(): - a = ray.put(None) - b = ray.put([a]) # Noqa F841 - del a - - wait_for_condition(memory_table_ready) - memory_table = get_memory_table() - summary = memory_table["summary"] - assert summary["total_captured_in_objects"] == 1 - assert summary["total_pinned_in_memory"] == 0 - assert summary["total_used_by_pending_task"] == 0 - assert summary["total_local_ref_count"] == 1 - stop_memory_table() - return True - - def test_actor_handle_reference(): - @ray.remote - class Actor: - pass - - a = Actor.remote() # Noqa F841 - b = Actor.remote() # Noqa F841 - c = Actor.remote() # Noqa F841 - - wait_for_condition(memory_table_ready) - memory_table = get_memory_table() - summary = memory_table["summary"] - group = memory_table["group"] - assert summary["total_captured_in_objects"] == 0 - assert summary["total_pinned_in_memory"] == 0 - assert summary["total_used_by_pending_task"] == 0 - assert summary["total_local_ref_count"] == 0 - assert summary["total_actor_handles"] == 3 - for table in group.values(): - for entry in table["entries"]: - assert (entry["reference_type"] == ReferenceType.ACTOR_HANDLE) - stop_memory_table() - return True - - # These tests should be retried because it takes at least one second - # to get the fresh new memory table. It is because memory table is updated - # Whenever raylet and node info is renewed which takes 1 second. - wait_for_condition( - test_local_reference, timeout=30000, retry_interval_ms=1000) - - wait_for_condition( - test_object_pinned_in_memory, timeout=30000, retry_interval_ms=1000) - - wait_for_condition( - test_pending_task_references, timeout=30000, retry_interval_ms=1000) - - wait_for_condition( - test_serialized_object_ref_reference, - timeout=30000, - retry_interval_ms=1000) - - wait_for_condition( - test_captured_object_ref_reference, - timeout=30000, - retry_interval_ms=1000) - - wait_for_condition( - test_actor_handle_reference, timeout=30000, retry_interval_ms=1000) - - -"""Memory Table Unit Test""" - -NODE_ADDRESS = "127.0.0.1" -IS_DRIVER = True -PID = 1 -OBJECT_ID = "7wpsIhgZiBz/////AQAAyAEAAAA=" -ACTOR_ID = "fffffffffffffffff66d17ba010000c801000000" -DECODED_ID = decode_object_ref_if_needed(OBJECT_ID) -OBJECT_SIZE = 100 - - -def build_memory_entry(*, - local_ref_count, - pinned_in_memory, - submitted_task_reference_count, - contained_in_owned, - object_size, - pid, - object_id=OBJECT_ID, - node_address=NODE_ADDRESS): - object_ref = { - "objectId": object_id, - "callSite": "(task call) /Users:458", - "objectSize": object_size, - "localRefCount": local_ref_count, - "pinnedInMemory": pinned_in_memory, - "submittedTaskRefCount": submitted_task_reference_count, - "containedInOwned": contained_in_owned - } - return MemoryTableEntry( - object_ref=object_ref, - node_address=node_address, - is_driver=IS_DRIVER, - pid=pid) - - -def build_local_reference_entry(object_size=OBJECT_SIZE, - pid=PID, - node_address=NODE_ADDRESS): - return build_memory_entry( - local_ref_count=1, - pinned_in_memory=False, - submitted_task_reference_count=0, - contained_in_owned=[], - object_size=object_size, - pid=pid, - node_address=node_address) - - -def build_used_by_pending_task_entry(object_size=OBJECT_SIZE, - pid=PID, - node_address=NODE_ADDRESS): - return build_memory_entry( - local_ref_count=0, - pinned_in_memory=False, - submitted_task_reference_count=2, - contained_in_owned=[], - object_size=object_size, - pid=pid, - node_address=node_address) - - -def build_captured_in_object_entry(object_size=OBJECT_SIZE, - pid=PID, - node_address=NODE_ADDRESS): - return build_memory_entry( - local_ref_count=0, - pinned_in_memory=False, - submitted_task_reference_count=0, - contained_in_owned=[OBJECT_ID], - object_size=object_size, - pid=pid, - node_address=node_address) - - -def build_actor_handle_entry(object_size=OBJECT_SIZE, - pid=PID, - node_address=NODE_ADDRESS): - return build_memory_entry( - local_ref_count=1, - pinned_in_memory=False, - submitted_task_reference_count=0, - contained_in_owned=[], - object_size=object_size, - pid=pid, - node_address=node_address, - object_id=ACTOR_ID) - - -def build_pinned_in_memory_entry(object_size=OBJECT_SIZE, - pid=PID, - node_address=NODE_ADDRESS): - return build_memory_entry( - local_ref_count=0, - pinned_in_memory=True, - submitted_task_reference_count=0, - contained_in_owned=[], - object_size=object_size, - pid=pid, - node_address=node_address) - - -def build_entry(object_size=OBJECT_SIZE, - pid=PID, - node_address=NODE_ADDRESS, - reference_type=ReferenceType.PINNED_IN_MEMORY): - if reference_type == ReferenceType.USED_BY_PENDING_TASK: - return build_used_by_pending_task_entry( - pid=pid, object_size=object_size, node_address=node_address) - elif reference_type == ReferenceType.LOCAL_REFERENCE: - return build_local_reference_entry( - pid=pid, object_size=object_size, node_address=node_address) - elif reference_type == ReferenceType.PINNED_IN_MEMORY: - return build_pinned_in_memory_entry( - pid=pid, object_size=object_size, node_address=node_address) - elif reference_type == ReferenceType.ACTOR_HANDLE: - return build_actor_handle_entry( - pid=pid, object_size=object_size, node_address=node_address) - elif reference_type == ReferenceType.CAPTURED_IN_OBJECT: - return build_captured_in_object_entry( - pid=pid, object_size=object_size, node_address=node_address) - - -def test_invalid_memory_entry(): - memory_entry = build_memory_entry( - local_ref_count=0, - pinned_in_memory=False, - submitted_task_reference_count=0, - contained_in_owned=[], - object_size=OBJECT_SIZE, - pid=PID) - assert memory_entry.is_valid() is False - memory_entry = build_memory_entry( - local_ref_count=0, - pinned_in_memory=False, - submitted_task_reference_count=0, - contained_in_owned=[], - object_size=-1, - pid=PID) - assert memory_entry.is_valid() is False - - -def test_valid_reference_memory_entry(): - memory_entry = build_local_reference_entry() - assert memory_entry.reference_type == ReferenceType.LOCAL_REFERENCE - assert memory_entry.object_ref == ray.ObjectRef( - decode_object_ref_if_needed(OBJECT_ID)) - assert memory_entry.is_valid() is True - - -def test_reference_type(): - # pinned in memory - memory_entry = build_pinned_in_memory_entry() - assert memory_entry.reference_type == ReferenceType.PINNED_IN_MEMORY - - # used by pending task - memory_entry = build_used_by_pending_task_entry() - assert memory_entry.reference_type == ReferenceType.USED_BY_PENDING_TASK - - # captued in object - memory_entry = build_captured_in_object_entry() - assert memory_entry.reference_type == ReferenceType.CAPTURED_IN_OBJECT - - # actor handle - memory_entry = build_actor_handle_entry() - assert memory_entry.reference_type == ReferenceType.ACTOR_HANDLE - - -def test_memory_table_summary(): - entries = [ - build_pinned_in_memory_entry(), - build_used_by_pending_task_entry(), - build_captured_in_object_entry(), - build_actor_handle_entry(), - build_local_reference_entry(), - build_local_reference_entry() - ] - memory_table = MemoryTable(entries) - assert len(memory_table.group) == 1 - assert memory_table.summary["total_actor_handles"] == 1 - assert memory_table.summary["total_captured_in_objects"] == 1 - assert memory_table.summary["total_local_ref_count"] == 2 - assert memory_table.summary[ - "total_object_size"] == len(entries) * OBJECT_SIZE - assert memory_table.summary["total_pinned_in_memory"] == 1 - assert memory_table.summary["total_used_by_pending_task"] == 1 - - -def test_memory_table_sort_by_pid(): - unsort = [1, 3, 2] - entries = [build_entry(pid=pid) for pid in unsort] - memory_table = MemoryTable(entries, sort_by_type=SortingType.PID) - sort = sorted(unsort) - for pid, entry in zip(sort, memory_table.table): - assert pid == entry.pid - - -def test_memory_table_sort_by_reference_type(): - unsort = [ - ReferenceType.USED_BY_PENDING_TASK, ReferenceType.LOCAL_REFERENCE, - ReferenceType.LOCAL_REFERENCE, ReferenceType.PINNED_IN_MEMORY - ] - entries = [ - build_entry(reference_type=reference_type) for reference_type in unsort - ] - memory_table = MemoryTable( - entries, sort_by_type=SortingType.REFERENCE_TYPE) - sort = sorted(unsort) - for reference_type, entry in zip(sort, memory_table.table): - assert reference_type == entry.reference_type - - -def test_memory_table_sort_by_object_size(): - unsort = [312, 214, -1, 1244, 642] - entries = [build_entry(object_size=object_size) for object_size in unsort] - memory_table = MemoryTable(entries, sort_by_type=SortingType.OBJECT_SIZE) - sort = sorted(unsort) - for object_size, entry in zip(sort, memory_table.table): - assert object_size == entry.object_size - - -def test_group_by(): - node_second = "127.0.0.2" - node_first = "127.0.0.1" - entries = [ - build_entry(node_address=node_second, pid=2), - build_entry(node_address=node_second, pid=1), - build_entry(node_address=node_first, pid=2), - build_entry(node_address=node_first, pid=1) - ] - memory_table = MemoryTable(entries) - - # Make sure it is correctly grouped - assert node_first in memory_table.group - assert node_second in memory_table.group - - # make sure pid is sorted in the right order. - for group_key, group_memory_table in memory_table.group.items(): - pid = 1 - for entry in group_memory_table.table: - assert pid == entry.pid - pid += 1 - - if __name__ == "__main__": - import pytest import sys + import pytest sys.exit(pytest.main(["-v", __file__])) diff --git a/python/ray/tests/test_webui.py b/python/ray/tests/test_webui.py deleted file mode 100644 index 011993af3e77..000000000000 --- a/python/ray/tests/test_webui.py +++ /dev/null @@ -1,50 +0,0 @@ -import re -import sys -import time - -import pytest -import requests - -import ray - - -@pytest.mark.skipif( - sys.version_info < (3, 5, 3), reason="requires python3.5.3 or higher") -def test_get_webui(shutdown_only): - addresses = ray.init(include_dashboard=True, num_cpus=1) - webui_url = addresses["webui_url"] - assert ray.get_dashboard_url() == webui_url - - assert re.match(r"^(localhost|\d+\.\d+\.\d+\.\d+):\d+$", webui_url) - - start_time = time.time() - while True: - try: - node_info = requests.get("http://" + webui_url + - "/api/node_info").json() - break - except requests.exceptions.ConnectionError: - if time.time() > start_time + 30: - error_log = None - out_log = None - with open( - "{}/logs/dashboard.out".format( - addresses["session_dir"]), "r") as f: - out_log = f.read() - with open( - "{}/logs/dashboard.err".format( - addresses["session_dir"]), "r") as f: - error_log = f.read() - raise Exception( - "Timed out while waiting for dashboard to start. " - "Dashboard output log: {}\n" - "Dashboard error log: {}\n".format(out_log, error_log)) - assert node_info["error"] is None - assert node_info["result"] is not None - assert isinstance(node_info["timestamp"], float) - - -if __name__ == "__main__": - import pytest - import sys - sys.exit(pytest.main(["-v", __file__]))