Skip to content

Commit

Permalink
add new helper function to calculate chunksize
Browse files Browse the repository at this point in the history
  • Loading branch information
KedoKudo committed Sep 17, 2024
1 parent d6ce446 commit 6f2419d
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 1 deletion.
20 changes: 20 additions & 0 deletions src/imars3d/backend/util/functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,26 @@ def clamp_max_workers(max_workers: Union[int, None]) -> int:
return result


def calculate_chunksize(num_elements: int, max_workers: Union[int, None] = None, scale_factor: int = 4) -> int:
"""Calculate an optimal chunk size for multiprocessing.
Parameters:
- num_elements: The total number of elements to process.
- max_workers: The number of workers (processes) to use. Defaults to clamped max.
- scale_factor: A factor to fine-tune chunk size (default 4).
Returns:
- int: Suggested chunk size.
"""
# Calculate the number of workers
workers = clamp_max_workers(max_workers)

# Calculate chunk size based on number of elements and workers
chunksize = max(1, num_elements // (workers * scale_factor))

return chunksize


def to_time_str(value: datetime = datetime.now()) -> str:
"""
Convert the supplied datetime to a formatted string.
Expand Down
48 changes: 47 additions & 1 deletion tests/unit/backend/util/test_util.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
# package imports
from imars3d.backend.util.functions import clamp_max_workers, to_time_str
from imars3d.backend.util.functions import clamp_max_workers, to_time_str, calculate_chunksize

# third party imports
import pytest
from unittest.mock import patch

# standard imports
from datetime import datetime
Expand All @@ -13,6 +14,51 @@ def test_clamp_max_workers():
assert clamp_max_workers(-10) >= 1


@patch("multiprocessing.cpu_count", return_value=8)
def test_chunksize_with_small_number_of_elements(mock_cpu_count):
num_elements = 10
max_workers = None
chunksize = calculate_chunksize(num_elements, max_workers)
assert chunksize == 1


@patch("multiprocessing.cpu_count", return_value=8)
def test_chunksize_with_large_number_of_elements(mock_cpu_count):
num_elements = 10000
max_workers = None
chunksize = calculate_chunksize(num_elements, max_workers)
expected_chunksize = max(1, num_elements // (6 * 4)) # 6 workers, scale factor 4
assert chunksize == expected_chunksize


@patch("multiprocessing.cpu_count", return_value=4)
def test_chunksize_with_different_cpu_count(mock_cpu_count):
num_elements = 10000
max_workers = None
chunksize = calculate_chunksize(num_elements, max_workers)
expected_chunksize = max(1, num_elements // (2 * 4)) # 2 workers (cpu_count - 2), scale factor 4
assert chunksize == expected_chunksize


@patch("multiprocessing.cpu_count", return_value=8)
def test_chunksize_with_max_workers(mock_cpu_count):
num_elements = 10000
max_workers = 4
chunksize = calculate_chunksize(num_elements, max_workers)
expected_chunksize = max(1, num_elements // (4 * 4)) # 4 workers manually set
assert chunksize == expected_chunksize


@patch("multiprocessing.cpu_count", return_value=8)
def test_chunksize_with_custom_scale_factor(mock_cpu_count):
num_elements = 10000
max_workers = None
scale_factor = 2
chunksize = calculate_chunksize(num_elements, max_workers, scale_factor=scale_factor)
expected_chunksize = max(1, num_elements // (6 * 2)) # 6 workers, scale factor 2
assert chunksize == expected_chunksize


@pytest.mark.parametrize(
"timestamp",
[
Expand Down

0 comments on commit 6f2419d

Please sign in to comment.