From e4e88bd4df23581e16c3d126e9823f1b72f0be59 Mon Sep 17 00:00:00 2001 From: Clint Armstrong Date: Thu, 10 Nov 2022 10:28:12 -0500 Subject: [PATCH] fix: use current process binding to limit thread cores (#1633) Use the current processes bound cores to limit the possible cores that threads can be bound to. This allows core binding to work properly when the lotus-worker service is limited to certain CPUs by cgroups. --- .../src/stacked/vanilla/cores.rs | 105 +++++++++++++++--- 1 file changed, 92 insertions(+), 13 deletions(-) diff --git a/storage-proofs-porep/src/stacked/vanilla/cores.rs b/storage-proofs-porep/src/stacked/vanilla/cores.rs index 850b03a74..c7b6632f5 100644 --- a/storage-proofs-porep/src/stacked/vanilla/cores.rs +++ b/storage-proofs-porep/src/stacked/vanilla/cores.rs @@ -157,6 +157,7 @@ fn create_core_units( core_count: usize, group_count: usize, cores_per_unit: usize, + allowed_cores: &hwloc::CpuSet, ) -> Vec> { assert_eq!(0, core_count % group_count); // The number of cores that belong to a single group. @@ -176,7 +177,7 @@ fn create_core_units( let core_units = (0..unit_count) .map(|i| { (0..cores_per_unit) - .map(|j| { + .filter_map(|j| { // Every group gets a single unit assigned first. Only if all groups have // already one unit, a second one will be assigned if possible. This would then // be the second "round" of assignments. @@ -184,10 +185,14 @@ fn create_core_units( // The index of the core that is bound to a unit. let core_index = (j + i * group_size) % core_count + (round * cores_per_unit); assert!(core_index < core_count); - core_index + + allowed_cores + .is_set(core_index.try_into().ok()?) + .then_some(core_index) }) .collect::>() }) + .filter(|x| !x.is_empty()) .collect::>(); debug!("Core units: {:?}", core_units); core_units @@ -229,6 +234,15 @@ fn core_units(cores_per_unit: usize) -> Option>> { let all_cores = topo .objects_with_type(&ObjectType::Core) .expect("objects_with_type failed"); + + let allowed_cores = topo + .get_cpubind(hwloc::CpuBindFlags::empty()) + .unwrap_or_else(|| { + topo.object_at_root() + .allowed_cpuset() + .unwrap_or_else(hwloc::CpuSet::full) + }); + // The total number of physical cores, even across packages. let core_count = all_cores.len(); @@ -238,7 +252,8 @@ fn core_units(cores_per_unit: usize) -> Option>> { let group_count = get_shared_cache_count(&topo, core_depth, core_count); // The list of units the multicore SDR threads can be bound to. - let core_units = create_core_units(core_count, group_count, cores_per_unit); + let core_units = create_core_units(core_count, group_count, cores_per_unit, &allowed_cores); + // this needs to take the all_cores vec instead of just a core count Some( core_units .iter() @@ -283,13 +298,13 @@ mod tests { fn test_create_core_units() { fil_logger::maybe_init(); - let ci = create_core_units(18, 1, 4); + let ci = create_core_units(18, 1, 4, &(0..18).collect()); assert_eq!( ci, [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] ); - let dc = create_core_units(32, 2, 4); + let dc = create_core_units(32, 2, 4, &(0..32).collect()); assert_eq!( dc, [ @@ -304,28 +319,28 @@ mod tests { ] ); - let amd = create_core_units(16, 4, 4); + let amd = create_core_units(16, 4, 4, &(0..16).collect()); assert_eq!( amd, [[0, 1, 2, 3], [4, 5, 6, 7], [8, 9, 10, 11], [12, 13, 14, 15]] ); - let amd_not_filled = create_core_units(16, 4, 3); + let amd_not_filled = create_core_units(16, 4, 3, &(0..16).collect()); assert_eq!( amd_not_filled, [[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14]] ); - let amd_not_filled = create_core_units(16, 4, 3); + let amd_not_filled = create_core_units(16, 4, 3, &(0..16).collect()); assert_eq!( amd_not_filled, [[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14]] ); - let intel = create_core_units(16, 2, 3); + let intel = create_core_units(16, 2, 3, &(0..16).collect()); assert_eq!(intel, [[0, 1, 2], [8, 9, 10], [3, 4, 5], [11, 12, 13]]); - let sp = create_core_units(48, 8, 3); + let sp = create_core_units(48, 8, 3, &(0..48).collect()); assert_eq!( sp, [ @@ -348,7 +363,7 @@ mod tests { ] ); - let sp_not_filled = create_core_units(48, 8, 4); + let sp_not_filled = create_core_units(48, 8, 4, &(0..48).collect()); assert_eq!( sp_not_filled, [ @@ -363,9 +378,73 @@ mod tests { ] ); - let laptop = create_core_units(4, 1, 2); + let laptop = create_core_units(4, 1, 2, &(0..4).collect()); assert_eq!(laptop, [[0, 1], [2, 3]]); - let laptop_not_filled = create_core_units(4, 1, 3); + let laptop_not_filled = create_core_units(4, 1, 3, &(0..4).collect()); assert_eq!(laptop_not_filled, [[0, 1, 2]]); + + let amd_limited_0 = create_core_units(16, 4, 4, &(0..8).collect()); + assert_eq!(amd_limited_0, [[0, 1, 2, 3], [4, 5, 6, 7]]); + + let amd_limited_1 = create_core_units(16, 4, 4, &(8..16).collect()); + assert_eq!(amd_limited_1, [[8, 9, 10, 11], [12, 13, 14, 15]]); + + let sp_limited_0 = create_core_units(48, 8, 3, &(0..24).collect()); + assert_eq!( + sp_limited_0, + [ + [0, 1, 2], + [6, 7, 8], + [12, 13, 14], + [18, 19, 20], + [3, 4, 5], + [9, 10, 11], + [15, 16, 17], + [21, 22, 23], + ] + ); + + let sp_limited_1 = create_core_units(48, 8, 3, &(24..48).collect()); + assert_eq!( + sp_limited_1, + [ + [24, 25, 26], + [30, 31, 32], + [36, 37, 38], + [42, 43, 44], + [27, 28, 29], + [33, 34, 35], + [39, 40, 41], + [45, 46, 47] + ] + ); + + let limited_group = create_core_units( + 16, + 4, + 4, + &vec![0, 1, 2, 4, 5, 6, 8, 9, 10, 12, 13, 14] + .into_iter() + .collect(), + ); + assert_eq!( + limited_group, + [[0, 1, 2], [4, 5, 6], [8, 9, 10], [12, 13, 14],] + ); + + let limited_non_continuous = create_core_units(48, 8, 3, &(0..12).chain(24..36).collect()); + assert_eq!( + limited_non_continuous, + [ + [0, 1, 2], + [6, 7, 8], + [24, 25, 26], + [30, 31, 32], + [3, 4, 5], + [9, 10, 11], + [27, 28, 29], + [33, 34, 35], + ] + ); } }