diff --git a/src/python/flux_k8s/directivebreakdown.py b/src/python/flux_k8s/directivebreakdown.py index 698eed6..31d0fe8 100644 --- a/src/python/flux_k8s/directivebreakdown.py +++ b/src/python/flux_k8s/directivebreakdown.py @@ -4,6 +4,7 @@ import copy import functools import math +import collections from flux_k8s.crd import DIRECTIVEBREAKDOWN_CRD @@ -41,19 +42,45 @@ def build_allocation_sets(breakdown_alloc_sets, nodes_per_nnf, hlist, min_alloc_ } ) elif alloc_set["allocationStrategy"] == AllocationStrategy.ACROSS_SERVERS.value: - nodecount_gcd = functools.reduce(math.gcd, nodes_per_nnf.values()) - server_alloc_set["allocationSize"] = math.ceil( - nodecount_gcd * alloc_set["minimumCapacity"] / len(hlist) - ) - # split lustre across every rabbit, weighting the split based on - # the number of the job's nodes associated with each rabbit - for rabbit_name in nodes_per_nnf: - storage_field.append( - { - "allocationCount": nodes_per_nnf[rabbit_name] / nodecount_gcd, - "name": rabbit_name, - } + if "count" in alloc_set.get("constraints", {}): + # a specific number of allocations is required (generally for MDTs) + count = alloc_set["constraints"]["count"] + server_alloc_set["allocationSize"] = math.ceil( + alloc_set["minimumCapacity"] / count + ) + # place the allocations on the rabbits with the most nodes allocated + # to this job (and therefore the largest storage allocations) + while count > 0: + # count may be greater than the rabbits available, so we may need + # to place multiple on a single rabbit (hence the outer while-loop) + for name, _ in collections.Counter(nodes_per_nnf).most_common( + count + ): + storage_field.append( + { + "allocationCount": 1, + "name": name, + } + ) + count -= 1 + if count == 0: + break + else: + nodecount_gcd = functools.reduce(math.gcd, nodes_per_nnf.values()) + server_alloc_set["allocationSize"] = math.ceil( + nodecount_gcd * alloc_set["minimumCapacity"] / len(hlist) ) + # split lustre across every rabbit, weighting the split based on + # the number of the job's nodes associated with each rabbit + for rabbit_name in nodes_per_nnf: + storage_field.append( + { + "allocationCount": int( + nodes_per_nnf[rabbit_name] / nodecount_gcd + ), + "name": rabbit_name, + } + ) # enforce the minimum allocation size server_alloc_set["allocationSize"] = max( server_alloc_set["allocationSize"], min_alloc_size * 1024**3