Skip to content

Commit

Permalink
Use memray callback in test_mem_utilization
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwhite committed Aug 30, 2024
1 parent 81eb040 commit 11cd529
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 6 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/slow-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ jobs:
- name: Install
run: |
python -m pip install -e .[test]
python -m pip install -e .[test] memray
- name: Run tests
run: |
Expand Down
33 changes: 28 additions & 5 deletions cubed/tests/test_mem_utilization.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import sys
from functools import partial, reduce

import pandas as pd
import pytest

import cubed
Expand All @@ -14,9 +15,13 @@
from cubed.core.optimization import multiple_inputs_optimize_dag
from cubed.diagnostics.history import HistoryCallback
from cubed.diagnostics.mem_warn import MemoryWarningCallback
from cubed.diagnostics.memray import MemrayCallback
from cubed.runtime.create import create_executor
from cubed.tests.utils import LITHOPS_LOCAL_CONFIG

pd.set_option("display.max_columns", None)


ALLOWED_MEM = 2_000_000_000

EXECUTORS = {}
Expand Down Expand Up @@ -107,15 +112,16 @@ def test_tril(tmp_path, spec, executor):


@pytest.mark.slow
def test_add(tmp_path, spec, executor):
@pytest.mark.parametrize("optimize_graph", [False, True])
def test_add(tmp_path, spec, executor, optimize_graph):
a = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
b = cubed.random.random(
(10000, 10000), chunks=(5000, 5000), spec=spec
) # 200MB chunks
c = xp.add(a, b)
run_operation(tmp_path, executor, "add", c)
run_operation(tmp_path, executor, "add", c, optimize_graph=optimize_graph)


@pytest.mark.slow
Expand Down Expand Up @@ -305,17 +311,27 @@ def test_sum_partial_reduce(tmp_path, spec, executor):
# Internal functions


def run_operation(tmp_path, executor, name, result_array, *, optimize_function=None):
# result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False)
def run_operation(
tmp_path,
executor,
name,
result_array,
*,
optimize_graph=True,
optimize_function=None,
):
# result_array.visualize(f"cubed-{name}-unoptimized", optimize_graph=False, show_hidden=True)
# result_array.visualize(f"cubed-{name}", optimize_function=optimize_function)
hist = HistoryCallback()
mem_warn = MemoryWarningCallback()
memray = MemrayCallback()
# use store=None to write to temporary zarr
cubed.to_zarr(
result_array,
store=None,
executor=executor,
callbacks=[hist, mem_warn],
callbacks=[hist, mem_warn, memray],
optimize_graph=optimize_graph,
optimize_function=optimize_function,
)

Expand All @@ -328,6 +344,13 @@ def run_operation(tmp_path, executor, name, result_array, *, optimize_function=N
# check change in peak memory is no more than projected mem
assert (df["peak_measured_mem_delta_mb_max"] <= df["projected_mem_mb"]).all()

# check memray peak memory allocated is no more than projected mem
for op_name, stats in memray.stats.items():
assert (
stats.peak_memory_allocated
<= df.query(f"name=='{op_name}'")["projected_mem_mb"].item() * 1_000_000
), f"projected mem exceeds memray's peak allocated for {op_name}"

# check projected_mem_utilization does not exceed 1
# except on processes executor that runs multiple tasks in a process
if (
Expand Down

0 comments on commit 11cd529

Please sign in to comment.