diff --git a/.github/workflows/slow-tests.yml b/.github/workflows/slow-tests.yml index 1cf63ab9..f95761b1 100644 --- a/.github/workflows/slow-tests.yml +++ b/.github/workflows/slow-tests.yml @@ -40,7 +40,7 @@ jobs: - name: Install run: | - python -m pip install -e .[test] + python -m pip install -e .[test] memray - name: Run tests run: | diff --git a/cubed/tests/test_mem_utilization.py b/cubed/tests/test_mem_utilization.py index 290f8da7..2bee8c40 100644 --- a/cubed/tests/test_mem_utilization.py +++ b/cubed/tests/test_mem_utilization.py @@ -4,6 +4,7 @@ import sys from functools import partial, reduce +import pandas as pd import pytest import cubed @@ -14,9 +15,13 @@ from cubed.core.optimization import multiple_inputs_optimize_dag from cubed.diagnostics.history import HistoryCallback from cubed.diagnostics.mem_warn import MemoryWarningCallback +from cubed.diagnostics.memray import MemrayCallback from cubed.runtime.create import create_executor from cubed.tests.utils import LITHOPS_LOCAL_CONFIG +pd.set_option("display.max_columns", None) + + ALLOWED_MEM = 2_000_000_000 EXECUTORS = {} @@ -107,7 +112,8 @@ def test_tril(tmp_path, spec, executor): @pytest.mark.slow -def test_add(tmp_path, spec, executor): +@pytest.mark.parametrize("optimize_graph", [False, True]) +def test_add(tmp_path, spec, executor, optimize_graph): a = cubed.random.random( (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks @@ -115,7 +121,7 @@ def test_add(tmp_path, spec, executor): (10000, 10000), chunks=(5000, 5000), spec=spec ) # 200MB chunks c = xp.add(a, b) - run_operation(tmp_path, executor, "add", c) + run_operation(tmp_path, executor, "add", c, optimize_graph=optimize_graph) @pytest.mark.slow @@ -305,17 +311,27 @@ def test_sum_partial_reduce(tmp_path, spec, executor): # Internal functions -def run_operation(tmp_path, executor, name, result_array, *, optimize_function=None): - # result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False) +def run_operation( + tmp_path, + executor, + name, + result_array, + *, + optimize_graph=True, + optimize_function=None, +): + # result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False, show_hidden=True) # result_array.visualize(f"cubed-{name}", optimize_function=optimize_function) hist = HistoryCallback() mem_warn = MemoryWarningCallback() + memray = MemrayCallback() # use store=None to write to temporary zarr cubed.to_zarr( result_array, store=None, executor=executor, - callbacks=[hist, mem_warn], + callbacks=[hist, mem_warn, memray], + optimize_graph=optimize_graph, optimize_function=optimize_function, ) @@ -328,6 +344,13 @@ def run_operation(tmp_path, executor, name, result_array, *, optimize_function=N # check change in peak memory is no more than projected mem assert (df["peak_measured_mem_delta_mb_max"] <= df["projected_mem_mb"]).all() + # check memray peak memory allocated is no more than projected mem + for op_name, stats in memray.stats.items(): + assert ( + stats.peak_memory_allocated + <= df.query(f"name=='{op_name}'")["projected_mem_mb"].item() * 1_000_000 + ), f"projected mem exceeds memray's peak allocated for {op_name}" + # check projected_mem_utilization does not exceed 1 # except on processes executor that runs multiple tasks in a process if (