diff --git a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_merge.h b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_merge.h index b08b17b5b44..df0857b166a 100644 --- a/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_merge.h +++ b/include/oneapi/dpl/pstl/hetero/dpcpp/parallel_backend_sycl_merge.h @@ -97,21 +97,6 @@ _split_point_t<_Index> __find_start_point_in(const _Rng1& __rng1, const _Index __rng1_from, _Index __rng1_to, const _Rng2& __rng2, const _Index __rng2_from, _Index __rng2_to, const _Index __i_elem, _Compare __comp) { - assert(__rng1_from + __rng2_from <= __i_elem && __i_elem <= __rng1_to + __rng2_to); - - if (__i_elem == 0) - return _split_point_t<_Index>{ 0, 0 }; - - if (__rng1_from == __rng1_to) - return _split_point_t<_Index>{ __rng1_from, __rng2_from + __i_elem }; - - if (__rng2_from == __rng2_to) - return _split_point_t<_Index>{ __rng1_from + __i_elem, __rng2_to }; - - // We shouldn't call this function with __i_elem == 0 because we a priory know that - // split point for this case is {0, 0} - assert(__i_elem > 0); - // ----------------------- EXAMPLE ------------------------ // Let's consider the following input data: // rng1.size() = 10 @@ -160,11 +145,9 @@ __find_start_point_in(const _Rng1& __rng1, const _Index __rng1_from, _Index __rn _IndexSigned idx1_from = __rng1_from; _IndexSigned idx1_to = __rng1_to; - assert(idx1_from <= idx1_to); _IndexSigned idx2_from = __index_sum - (__rng1_to - 1); _IndexSigned idx2_to = __index_sum - __rng1_from + 1; - assert(idx2_from <= idx2_to); const _IndexSigned idx2_from_diff = idx2_from < (_IndexSigned)__rng2_from ? (_IndexSigned)__rng2_from - idx2_from : 0; @@ -176,12 +159,6 @@ __find_start_point_in(const _Rng1& __rng1, const _Index __rng1_from, _Index __rn idx2_from = __index_sum - (idx1_to - 1); idx2_to = __index_sum - idx1_from + 1; - assert(idx1_from <= idx1_to); - assert(__rng1_from <= idx1_from && idx1_to <= __rng1_to); - - assert(idx2_from <= idx2_to); - assert(__rng2_from <= idx2_from && idx2_to <= __rng2_to); - //////////////////////////////////////////////////////////////////////////////////// // Run search of split point on diagonal @@ -191,26 +168,14 @@ __find_start_point_in(const _Rng1& __rng1, const _Index __rng1_from, _Index __rn __it_t __diag_it_end(idx1_to); constexpr int kValue = 1; - const __it_t __res = - std::lower_bound(__diag_it_begin, __diag_it_end, kValue, [&](_Index __idx, const auto& __value) { - const auto __rng1_idx = __idx; - const auto __rng2_idx = __index_sum - __idx; - - assert(__rng1_from <= __rng1_idx && __rng1_idx < __rng1_to); - assert(__rng2_from <= __rng2_idx && __rng2_idx < __rng2_to); - assert(__rng1_idx + __rng2_idx == __index_sum); - - const auto __zero_or_one = __comp(__rng2[__rng2_idx], __rng1[__rng1_idx]); - return __zero_or_one < kValue; - }); - - const _split_point_t<_Index> __result{*__res, __index_sum - *__res + 1}; - assert(__result.first + __result.second == __i_elem); - - assert(__rng1_from <= __result.first && __result.first <= __rng1_to); - assert(__rng2_from <= __result.second && __result.second <= __rng2_to); - - return __result; + const __it_t __res = std::lower_bound(__diag_it_begin, __diag_it_end, kValue, + [&__rng1, &__rng2, __index_sum, __comp](_Index __idx, const auto& __value) { + const auto __zero_or_one = + __comp(__rng2[__index_sum - __idx], __rng1[__idx]); + return __zero_or_one < kValue; + }); + + return _split_point_t<_Index>{*__res, __index_sum - *__res + 1}; } // Do serial merge of the data from rng1 (starting from start1) and rng2 (starting from start2) and writing @@ -247,17 +212,56 @@ __serial_merge(const _Rng1& __rng1, const _Rng2& __rng2, _Rng3& __rng3, const _I } } -template +// Please see the comment for __parallel_for_submitter for optional kernel name explanation +template struct __parallel_merge_submitter; -template -struct __parallel_merge_submitter<_IdType, _CustomName, __internal::__optional_kernel_name<_DiagonalsKernelName...>, - __internal::__optional_kernel_name<_MergeKernelName1...>, - __internal::__optional_kernel_name<_MergeKernelName2...>> +template +struct __parallel_merge_submitter<_IdType, __internal::__optional_kernel_name<_MergeKernelName...>> +{ + template + auto + operator()(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Range3&& __rng3, _Compare __comp) const + { + const _IdType __n1 = __rng1.size(); + const _IdType __n2 = __rng2.size(); + const _IdType __n = __n1 + __n2; + + assert(__n1 > 0 || __n2 > 0); + + _PRINT_INFO_IN_DEBUG_MODE(__exec); + + // Empirical number of values to process per work-item + const _IdType __chunk = __exec.queue().get_device().is_cpu() ? 128 : 4; + + const _IdType __steps = oneapi::dpl::__internal::__dpl_ceiling_div(__n, __chunk); + + auto __event = __exec.queue().submit( + [&__rng1, &__rng2, &__rng3, __comp, __chunk, __steps, __n1, __n2](sycl::handler& __cgh) { + oneapi::dpl::__ranges::__require_access(__cgh, __rng1, __rng2, __rng3); + __cgh.parallel_for<_MergeKernelName...>( + sycl::range(__steps), [=](sycl::item __item_id) { + const _IdType __i_elem = __item_id.get_linear_id() * __chunk; + const auto __start = __find_start_point(__rng1, __rng2, __i_elem, __n1, __n2, __comp); + __serial_merge(__rng1, __rng2, __rng3, __start.first, __start.second, __i_elem, __chunk, __n1, + __n2, __comp); + }); + }); + // We should return the same thing in the second param of __future for compatibility + // with the returning value in __parallel_merge_submitter_large::operator() + return __future(__event, __result_and_scratch_storage_base_ptr{}); + } +}; + +template +struct __parallel_merge_submitter_large; + +template +struct __parallel_merge_submitter_large<_IdType, _CustomName, + __internal::__optional_kernel_name<_DiagonalsKernelName...>, + __internal::__optional_kernel_name<_MergeKernelName...>> { - protected: + private: struct nd_range_params { std::size_t base_diag_count = 0; @@ -278,19 +282,13 @@ struct __parallel_merge_submitter<_IdType, _CustomName, __internal::__optional_k const std::size_t __n = __rng1.size() + __rng2.size(); - constexpr std::size_t __slm_bank_size = 16; // TODO is it correct value? How to get it from hardware? - - // Calculate how many data items we can read into one SLM bank - constexpr std::size_t __data_items_in_slm_bank = - oneapi::dpl::__internal::__dpl_ceiling_div(__slm_bank_size, sizeof(_RangeValueType)); - // Empirical number of values to process per work-item - const std::uint8_t __chunk = __exec.queue().get_device().is_cpu() ? 128 : __data_items_in_slm_bank; + const std::uint8_t __chunk = __exec.queue().get_device().is_cpu() ? 128 : 4; const _IdType __steps = oneapi::dpl::__internal::__dpl_ceiling_div(__n, __chunk); - const _IdType __base_diag_count = __use_base_diags ? 32 * 1'024 : 0; + const _IdType __base_diag_count = 32 * 1'024; const _IdType __steps_between_two_base_diags = - __use_base_diags ? oneapi::dpl::__internal::__dpl_ceiling_div(__steps, __base_diag_count) : 0; + oneapi::dpl::__internal::__dpl_ceiling_div(__steps, __base_diag_count); return {__base_diag_count, __steps_between_two_base_diags, __chunk, __steps}; } @@ -306,7 +304,8 @@ struct __parallel_merge_submitter<_IdType, _CustomName, __internal::__optional_k const _IdType __n2 = __rng2.size(); const _IdType __n = __n1 + __n2; - sycl::event __event = __exec.queue().submit([&](sycl::handler& __cgh) { + return __exec.queue().submit([&__rng1, &__rng2, __comp, __nd_range_params, __base_diagonals_sp_global_storage, + __n1, __n2, __n](sycl::handler& __cgh) { oneapi::dpl::__ranges::__require_access(__cgh, __rng1, __rng2); auto __base_diagonals_sp_global_acc = __base_diagonals_sp_global_storage.template __get_scratch_acc( @@ -332,60 +331,28 @@ struct __parallel_merge_submitter<_IdType, _CustomName, __internal::__optional_k __base_diagonals_sp_global_ptr[__global_idx] = __sp; }); }); - - return __event; - } - - // Process parallel merge - template - sycl::event - run_parallel_merge(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Range3&& __rng3, _Compare __comp, - const nd_range_params& __nd_range_params) const - { - const _IdType __n1 = __rng1.size(); - const _IdType __n2 = __rng2.size(); - - const auto __chunk = __nd_range_params.chunk; - - sycl::event __event = __exec.queue().submit([&](sycl::handler& __cgh) { - oneapi::dpl::__ranges::__require_access(__cgh, __rng1, __rng2, __rng3); - - __cgh.parallel_for<_MergeKernelName1...>( - sycl::range(__nd_range_params.steps), [=](sycl::item __item_id) { - auto __global_idx = __item_id.get_linear_id(); - const _IdType __i_elem = __global_idx * __chunk; - - if (__i_elem < __n1 + __n2) - { - _split_point_t<_IdType> __start = __find_start_point(__rng1, __rng2, __i_elem, __n1, __n2, __comp); - __serial_merge(__rng1, __rng2, __rng3, __start.first, __start.second, __i_elem, __chunk, __n1, __n2, - __comp); - } - }); - }); - - return __event; } // Process parallel merge template sycl::event - run_parallel_merge(sycl::event __event, _ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, + run_parallel_merge(const sycl::event& __event, _ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Range3&& __rng3, _Compare __comp, const nd_range_params& __nd_range_params, const _Storage& __base_diagonals_sp_global_storage) const { const _IdType __n1 = __rng1.size(); const _IdType __n2 = __rng2.size(); - __event = __exec.queue().submit([&](sycl::handler& __cgh) { + return __exec.queue().submit([&__event, &__rng1, &__rng2, &__rng3, __comp, __nd_range_params, + __base_diagonals_sp_global_storage, __n1, __n2](sycl::handler& __cgh) { oneapi::dpl::__ranges::__require_access(__cgh, __rng1, __rng2, __rng3); auto __base_diagonals_sp_global_acc = __base_diagonals_sp_global_storage.template __get_scratch_acc(__cgh); __cgh.depends_on(__event); - __cgh.parallel_for<_MergeKernelName2...>( + __cgh.parallel_for<_MergeKernelName...>( sycl::range(__nd_range_params.steps), [=](sycl::item __item_id) { auto __global_idx = __item_id.get_linear_id(); const _IdType __i_elem = __global_idx * __nd_range_params.chunk; @@ -415,13 +382,9 @@ struct __parallel_merge_submitter<_IdType, _CustomName, __internal::__optional_k __nd_range_params.chunk, __n1, __n2, __comp); }); }); - - return __event; } public: - __parallel_merge_submitter(bool __use_base_diags) : __use_base_diags(__use_base_diags) {} - template auto operator()(_ExecutionPolicy&& __exec, _Range1&& __rng1, _Range2&& __rng2, _Range3&& __rng3, _Compare __comp) const @@ -435,46 +398,47 @@ struct __parallel_merge_submitter<_IdType, _CustomName, __internal::__optional_k __result_and_scratch_storage_base_ptr __p_result_and_scratch_storage_base; - // Calculation of split points on each base diagonal - sycl::event __event; - if (__use_base_diags) - { - // Create storage for save split-points on each base diagonal + 1 (for the right base diagonal in the last work-group) - auto __p_base_diagonals_sp_global_storage = - new __result_and_scratch_storage<_ExecutionPolicy, _split_point_t<_IdType>>( - __exec, 0, __nd_range_params.base_diag_count + 1); - __p_result_and_scratch_storage_base.reset( - static_cast<__result_and_scratch_storage_base*>(__p_base_diagonals_sp_global_storage)); - - __event = eval_split_points_for_groups(__exec, __rng1, __rng2, __comp, __nd_range_params, - *__p_base_diagonals_sp_global_storage); - - // Merge data using split points on each base diagonal - __event = run_parallel_merge(__event, __exec, __rng1, __rng2, __rng3, __comp, __nd_range_params, - *__p_base_diagonals_sp_global_storage); - } - else - { - // Merge data using split points on each base diagonal - __event = run_parallel_merge(__exec, __rng1, __rng2, __rng3, __comp, __nd_range_params); - } + // Create storage for save split-points on each base diagonal + 1 (for the right base diagonal in the last work-group) + auto __p_base_diagonals_sp_global_storage = + new __result_and_scratch_storage<_ExecutionPolicy, _split_point_t<_IdType>>( + __exec, 0, __nd_range_params.base_diag_count + 1); + __p_result_and_scratch_storage_base.reset( + static_cast<__result_and_scratch_storage_base*>(__p_base_diagonals_sp_global_storage)); + + sycl::event __event = eval_split_points_for_groups(__exec, __rng1, __rng2, __comp, __nd_range_params, + *__p_base_diagonals_sp_global_storage); + + // Merge data using split points on each base diagonal + __event = run_parallel_merge(__event, __exec, __rng1, __rng2, __rng3, __comp, __nd_range_params, + *__p_base_diagonals_sp_global_storage); return __future(std::move(__event), std::move(__p_result_and_scratch_storage_base)); } - - private: - const bool __use_base_diags = false; }; template -class __merge_kernel_name1; +class __merge_kernel_name; template -class __merge_kernel_name2; +class __merge_kernel_name_large; template class __diagonals_kernel_name; +template +constexpr std::size_t +__get_starting_size_limit_for_large_submitter() +{ + return 4 * 1'048'576; // 4 MB +} + +template <> +constexpr std::size_t +__get_starting_size_limit_for_large_submitter() +{ + return 16 * 1'048'576; // 8 MB +} + template auto __parallel_merge(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPolicy&& __exec, _Range1&& __rng1, @@ -482,38 +446,44 @@ __parallel_merge(oneapi::dpl::__internal::__device_backend_tag, _ExecutionPolicy { using _CustomName = oneapi::dpl::__internal::__policy_kernel_name<_ExecutionPolicy>; - const std::size_t __n = __rng1.size() + __rng2.size(); - - constexpr std::size_t __starting_size_limit_for_large_submitter = 4 * 1'048'576; // 4 MB - const bool __use_base_diags = __n >= __starting_size_limit_for_large_submitter; + using __value_type = oneapi::dpl::__internal::__value_t<_Range3>; - if (__n <= std::numeric_limits::max()) + const std::size_t __n = __rng1.size() + __rng2.size(); + if (__n < __get_starting_size_limit_for_large_submitter<__value_type>()) { using _WiIndex = std::uint32_t; - using _DiagonalsKernelName = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< - __diagonals_kernel_name<_CustomName, _WiIndex>>; - using _MergeKernelName1 = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< - __merge_kernel_name1<_CustomName, _WiIndex>>; - using _MergeKernelName2 = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< - __merge_kernel_name2<_CustomName, _WiIndex>>; - return __parallel_merge_submitter<_WiIndex, _CustomName, _DiagonalsKernelName, _MergeKernelName1, - _MergeKernelName2>(__use_base_diags)( + static_assert(__get_starting_size_limit_for_large_submitter<__value_type>() <= + std::numeric_limits<_WiIndex>::max()); + using _MergeKernelName = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< + __merge_kernel_name<_CustomName, _WiIndex>>; + return __parallel_merge_submitter<_WiIndex, _MergeKernelName>()( std::forward<_ExecutionPolicy>(__exec), std::forward<_Range1>(__rng1), std::forward<_Range2>(__rng2), std::forward<_Range3>(__rng3), __comp); } else { - using _WiIndex = std::uint64_t; - using _DiagonalsKernelName = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< - __diagonals_kernel_name<_CustomName, _WiIndex>>; - using _MergeKernelName1 = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< - __merge_kernel_name1<_CustomName, _WiIndex>>; - using _MergeKernelName2 = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< - __merge_kernel_name2<_CustomName, _WiIndex>>; - return __parallel_merge_submitter<_WiIndex, _CustomName, _DiagonalsKernelName, _MergeKernelName1, - _MergeKernelName2>(__use_base_diags)( - std::forward<_ExecutionPolicy>(__exec), std::forward<_Range1>(__rng1), std::forward<_Range2>(__rng2), - std::forward<_Range3>(__rng3), __comp); + if (__n <= std::numeric_limits::max()) + { + using _WiIndex = std::uint32_t; + using _DiagonalsKernelName = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< + __diagonals_kernel_name<_CustomName, _WiIndex>>; + using _MergeKernelName = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< + __merge_kernel_name_large<_CustomName, _WiIndex>>; + return __parallel_merge_submitter_large<_WiIndex, _CustomName, _DiagonalsKernelName, _MergeKernelName>()( + std::forward<_ExecutionPolicy>(__exec), std::forward<_Range1>(__rng1), std::forward<_Range2>(__rng2), + std::forward<_Range3>(__rng3), __comp); + } + else + { + using _WiIndex = std::uint64_t; + using _DiagonalsKernelName = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< + __diagonals_kernel_name<_CustomName, _WiIndex>>; + using _MergeKernelName = oneapi::dpl::__par_backend_hetero::__internal::__kernel_name_provider< + __merge_kernel_name_large<_CustomName, _WiIndex>>; + return __parallel_merge_submitter_large<_WiIndex, _CustomName, _DiagonalsKernelName, _MergeKernelName>()( + std::forward<_ExecutionPolicy>(__exec), std::forward<_Range1>(__rng1), std::forward<_Range2>(__rng2), + std::forward<_Range3>(__rng3), __comp); + } } } diff --git a/test/parallel_api/algorithm/alg.merge/merge.pass.cpp b/test/parallel_api/algorithm/alg.merge/merge.pass.cpp index 34cba9f672a..2715256f3a1 100644 --- a/test/parallel_api/algorithm/alg.merge/merge.pass.cpp +++ b/test/parallel_api/algorithm/alg.merge/merge.pass.cpp @@ -97,24 +97,18 @@ struct test_merge_compare } }; -template +template void -test_merge_by_type(Generator1 generator1, Generator2 generator2) +test_merge_by_type(Generator1 generator1, Generator2 generator2, size_t start_size, size_t max_size, FStep fstep) { using namespace std; - size_t max_size = 100000; Sequence in1(max_size, generator1); Sequence in2(max_size / 2, generator2); Sequence out(in1.size() + in2.size()); ::std::sort(in1.begin(), in1.end()); ::std::sort(in2.begin(), in2.end()); - size_t start_size = 0; -#if TEST_DPCPP_BACKEND_PRESENT - start_size = 2; -#endif - - for (size_t size = start_size; size <= max_size; size = size <= 16 ? size + 1 : size_t(3.1415 * size)) { + for (size_t size = start_size; size <= max_size; size = fstep(size)) { #if !TEST_DPCPP_BACKEND_PRESENT invoke_on_all_policies<0>()(test_merge(), in1.cbegin(), in1.cbegin() + size, in2.data(), in2.data() + size / 2, out.begin(), out.begin() + 1.5 * size); @@ -139,6 +133,16 @@ test_merge_by_type(Generator1 generator1, Generator2 generator2) } } +template +void +test_merge_by_type(size_t start_size, size_t max_size, FStep fstep) +{ + test_merge_by_type([](size_t v) { return (v % 2 == 0 ? v : -v) * 3; }, [](size_t v) { return v * 2; }, start_size, max_size, fstep); +#if !ONEDPL_FPGA_DEVICE + test_merge_by_type([](size_t v) { return float64_t(v); }, [](size_t v) { return float64_t(v - 100); }, start_size, max_size, fstep); +#endif +} + template struct test_non_const { @@ -166,9 +170,24 @@ struct test_merge_tuple int main() { - test_merge_by_type([](size_t v) { return (v % 2 == 0 ? v : -v) * 3; }, [](size_t v) { return v * 2; }); -#if !ONEDPL_FPGA_DEVICE - test_merge_by_type([](size_t v) { return float64_t(v); }, [](size_t v) { return float64_t(v - 100); }); +#if TEST_DPCPP_BACKEND_PRESENT + const size_t start_size_small = 2; +#else + const size_t start_size_small = 0; +#endif + const size_t max_size_small = 100000; + auto fstep_small = [](std::size_t size){ return size <= 16 ? size + 1 : size_t(3.1415 * size);}; + test_merge_by_type(start_size_small, max_size_small, fstep_small); + + // Large data sizes (on GPU only) +#if TEST_DPCPP_BACKEND_PRESENT + if (!TestUtils::get_test_queue().get_device().is_cpu()) + { + const size_t start_size_large = 4'000'000; + const size_t max_size_large = 8'000'000; + auto fstep_large = [](std::size_t size){ return size + 2'000'000; }; + test_merge_by_type(start_size_large, max_size_large, fstep_large); + } #endif #if !TEST_DPCPP_BACKEND_PRESENT