diff --git a/.azure-pipelines/integration-test.yml b/.azure-pipelines/integration-test.yml index ae0becb90..491fe0773 100644 --- a/.azure-pipelines/integration-test.yml +++ b/.azure-pipelines/integration-test.yml @@ -31,7 +31,7 @@ jobs: targetType: 'inline' script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release .. + cmake -DCMAKE_BUILD_TYPE=Release -DBYPASS_PEERMEM_CHECK=ON -DBYPASS_GPU_CHECK=ON -DUSE_CUDA=ON .. make -j workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.azure-pipelines/multi-nodes-test.yml b/.azure-pipelines/multi-nodes-test.yml index 12bad0da5..a7597dfa2 100644 --- a/.azure-pipelines/multi-nodes-test.yml +++ b/.azure-pipelines/multi-nodes-test.yml @@ -26,7 +26,7 @@ jobs: targetType: 'inline' script: | mkdir build && cd build - cmake -DCMAKE_BUILD_TYPE=Release -DBYPASS_PEERMEM_CHECK=ON .. + cmake -DCMAKE_BUILD_TYPE=Release -DBYPASS_PEERMEM_CHECK=ON -DBYPASS_GPU_CHECK=ON -DUSE_CUDA=ON .. make -j make pylib-copy workingDirectory: '$(System.DefaultWorkingDirectory)' diff --git a/.github/workflows/codeql-analysis.yml b/.github/workflows/codeql-analysis.yml index f26f8701b..27cbaf5af 100644 --- a/.github/workflows/codeql-analysis.yml +++ b/.github/workflows/codeql-analysis.yml @@ -45,7 +45,7 @@ jobs: - name: Build run: | - cmake -DBYPASS_PEERMEM_CHECK=ON . + cmake -DBYPASS_PEERMEM_CHECK=ON -DBYPASS_GPU_CHECK=ON -DUSE_CUDA=ON . make -j - name: Perform CodeQL Analysis diff --git a/.github/workflows/lint.yml b/.github/workflows/lint.yml index aaffe9578..0c1babcdd 100644 --- a/.github/workflows/lint.yml +++ b/.github/workflows/lint.yml @@ -20,7 +20,7 @@ jobs: - name: Run cpplint run: | - CPPSOURCES=$(find ./ -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)' -not -path "./build/*") + CPPSOURCES=$(find ./src ./include ./python ./test -regextype posix-extended -regex '.*\.(c|cpp|h|hpp|cc|cxx|cu)') clang-format -style=file --verbose --Werror --dry-run ${CPPSOURCES} pylint: diff --git a/CMakeLists.txt b/CMakeLists.txt index 5c32d47ee..d311f5a42 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -9,80 +9,125 @@ set(MSCCLPP_SOVERSION ${MSCCLPP_MAJOR}) set(MSCCLPP_VERSION "${MSCCLPP_MAJOR}.${MSCCLPP_MINOR}.${MSCCLPP_PATCH}") cmake_minimum_required(VERSION 3.25) -project(mscclpp LANGUAGES CUDA CXX) -set(CMAKE_CXX_STANDARD 17) -set(CMAKE_CUDA_STANDARD 17) -set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra") -set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -Xcompiler -Wall,-Wextra") - -list(APPEND CMAKE_MODULE_PATH ${PROJECT_SOURCE_DIR}/cmake) +enable_language(CXX) -# Format targets -include(${PROJECT_SOURCE_DIR}/cmake/AddFormatTargets.cmake) +list(APPEND CMAKE_MODULE_PATH ${CMAKE_CURRENT_SOURCE_DIR}/cmake) # Options option(ENABLE_TRACE "Enable tracing" OFF) option(USE_NPKIT "Use NPKIT" ON) option(BUILD_TESTS "Build tests" ON) option(BUILD_PYTHON_BINDINGS "Build Python bindings" ON) -option(ALLOW_GDRCOPY "Use GDRCopy, if available" OFF) +option(USE_CUDA "Use NVIDIA/CUDA." OFF) +option(USE_ROCM "Use AMD/ROCm." OFF) +option(BYPASS_GPU_CHECK "Bypass GPU check." OFF) option(BYPASS_PEERMEM_CHECK "Bypass checking nvidia_peermem" OFF) -# Find CUDAToolkit. Set CUDA flags based on the detected CUDA version -find_package(CUDAToolkit REQUIRED) -if(CUDAToolkit_FOUND) +if(BYPASS_GPU_CHECK) + if(USE_CUDA) + message("Bypassing GPU check: using NVIDIA/CUDA.") + find_package(CUDAToolkit REQUIRED) + elseif(USE_ROCM) + message("Bypassing GPU check: using AMD/ROCm.") + # Temporal fix for rocm5.6 + set(CMAKE_PREFIX_PATH "/opt/rocm;${CMAKE_PREFIX_PATH}") + find_package(hip REQUIRED) + else() + message(FATAL_ERROR "Bypassing GPU check: neither NVIDIA/CUDA nor AMD/ROCm is specified.") + endif() +else() + # Detect GPUs + include(CheckNvidiaGpu) + include(CheckAmdGpu) + if(NVIDIA_FOUND AND AMD_FOUND) + message("Detected NVIDIA/CUDA and AMD/ROCm: prioritizing NVIDIA/CUDA.") + set(USE_CUDA ON) + set(USE_ROCM OFF) + elseif(NVIDIA_FOUND) + message("Detected NVIDIA/CUDA.") + set(USE_CUDA ON) + set(USE_ROCM OFF) + elseif(AMD_FOUND) + message("Detected AMD/ROCm.") + set(USE_CUDA OFF) + set(USE_ROCM ON) + else() + message(FATAL_ERROR "Neither NVIDIA/CUDA nor AMD/ROCm is found.") + endif() +endif() + +# Declare project +set(CMAKE_CXX_STANDARD 17) +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wall -Wextra") +if(USE_CUDA) + set(CMAKE_CUDA_STANDARD 17) + set(CMAKE_CUDA_FLAGS "${CMAKE_CUDA_FLAGS} -Xcompiler -Wall,-Wextra") + project(mscclpp LANGUAGES CXX CUDA) + + # CUDA 11 or higher is required if(CUDAToolkit_VERSION_MAJOR LESS 11) message(FATAL_ERROR "CUDA 11 or higher is required but detected ${CUDAToolkit_VERSION}") endif() + # Set CUDA architectures if(CUDAToolkit_VERSION_MAJOR GREATER_EQUAL 11) set(CMAKE_CUDA_ARCHITECTURES 80) endif() + # Hopper architecture if(CUDAToolkit_VERSION_MAJOR GREATER_EQUAL 12) set(CMAKE_CUDA_ARCHITECTURES ${CMAKE_CUDA_ARCHITECTURES} 90) endif() -endif() -set(CUDA_LIBRARIES CUDA::cudart CUDA::cuda_driver) - -# Find if nvidia_peermem is installed and loaded -if(NOT BYPASS_PEERMEM_CHECK) - execute_process(COMMAND sh -c "lsmod | grep nvidia_peermem" - RESULT_VARIABLE lsmod_result - OUTPUT_VARIABLE lsmod_output) - if(NOT lsmod_result EQUAL 0) - message(FATAL_ERROR "nvidia_peermem is not installed or not loaded.") + + set(GPU_LIBRARIES CUDA::cudart CUDA::cuda_driver) + set(GPU_INCLUDE_DIRS ${CUDAToolkit_INCLUDE_DIRS}) + + # Find if nvidia_peermem is installed and loaded + if(NOT BYPASS_PEERMEM_CHECK) + execute_process(COMMAND sh -c "lsmod | grep nvidia_peermem" + RESULT_VARIABLE lsmod_result + OUTPUT_VARIABLE lsmod_output) + if(NOT lsmod_result EQUAL 0) + message(FATAL_ERROR "nvidia_peermem is not installed or not loaded.") + endif() endif() +else() + set(CMAKE_HIP_STANDARD 17) + set(CMAKE_HIP_FLAGS "${CMAKE_HIP_FLAGS} -Wall -Wextra") + project(mscclpp LANGUAGES CXX HIP) + + set(CMAKE_HIP_ARCHITECTURES gfx90a gfx941 gfx942) + + set(GPU_LIBRARIES hip::host) + set(GPU_INCLUDE_DIRS ${hip_INCLUDE_DIRS}) endif() +# Format targets +include(${PROJECT_SOURCE_DIR}/cmake/AddFormatTargets.cmake) + # Find ibverbs and libnuma find_package(IBVerbs REQUIRED) find_package(NUMA REQUIRED) -# Find optional packages -if(ALLOW_GDRCOPY) - find_package(GDRCopy) -endif() - add_library(mscclpp_obj OBJECT) target_include_directories(mscclpp_obj PRIVATE - ${CUDAToolkit_INCLUDE_DIRS} + ${GPU_INCLUDE_DIRS} ${IBVERBS_INCLUDE_DIRS} - ${NUMA_INCLUDE_DIRS} - ${GDRCOPY_INCLUDE_DIRS}) -target_link_libraries(mscclpp_obj PRIVATE ${CUDA_LIBRARIES} ${NUMA_LIBRARIES} ${IBVERBS_LIBRARIES} ${GDRCOPY_LIBRARIES}) + ${NUMA_INCLUDE_DIRS}) +target_link_libraries(mscclpp_obj PRIVATE ${GPU_LIBRARIES} ${NUMA_LIBRARIES} ${IBVERBS_LIBRARIES}) set_target_properties(mscclpp_obj PROPERTIES LINKER_LANGUAGE CXX POSITION_INDEPENDENT_CODE 1 VERSION ${MSCCLPP_VERSION} SOVERSION ${MSCCLPP_SOVERSION}) +if(USE_CUDA) + target_compile_definitions(mscclpp_obj PRIVATE USE_CUDA) +elseif(USE_ROCM) + target_compile_definitions(mscclpp_obj PRIVATE USE_ROCM) +endif() if(ENABLE_TRACE) target_compile_definitions(mscclpp_obj PRIVATE ENABLE_TRACE) endif() if(USE_NPKIT) target_compile_definitions(mscclpp_obj PRIVATE ENABLE_NPKIT) endif() -if(ALLOW_GDRCOPY AND GDRCOPY_FOUND) - target_compile_definitions(mscclpp_obj PRIVATE MSCCLPP_USE_GDRCOPY) - target_link_libraries(mscclpp_obj PRIVATE MSCCLPP::gdrcopy) -endif() # libmscclpp add_library(mscclpp SHARED) @@ -103,7 +148,7 @@ install(TARGETS mscclpp_static ARCHIVE DESTINATION lib) # Tests -if (BUILD_TESTS) +if(BUILD_TESTS) enable_testing() # Called here to allow ctest from the build directory add_subdirectory(test) endif() diff --git a/cmake/CheckAmdGpu.cmake b/cmake/CheckAmdGpu.cmake new file mode 100644 index 000000000..3b26bfa5e --- /dev/null +++ b/cmake/CheckAmdGpu.cmake @@ -0,0 +1,25 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +set(AMD_FOUND "FALSE") + +set(CMAKE_PREFIX_PATH "/opt/rocm;${CMAKE_PREFIX_PATH}") +# Temporal fix for rocm5.6 +set(ENV{amd_comgr_DIR} "/opt/rocm/lib/cmake/amd_comgr") +set(ENV{AMDDeviceLibs_DIR} "/opt/rocm/lib/cmake/AMDDeviceLibs") + +find_package(hip QUIET) + +if(NOT hip_FOUND) + return() +endif() + +enable_language(HIP) + +set(CHECK_SRC "${CMAKE_CURRENT_SOURCE_DIR}/cmake/check_amd_gpu.hip") + +try_run(RUN_RESULT COMPILE_SUCCESS SOURCES ${CHECK_SRC}) + +if(COMPILE_SUCCESS AND RUN_RESULT EQUAL 0) + set(AMD_FOUND "TRUE") +endif() diff --git a/cmake/CheckNvidiaGpu.cmake b/cmake/CheckNvidiaGpu.cmake new file mode 100644 index 000000000..adc42ea00 --- /dev/null +++ b/cmake/CheckNvidiaGpu.cmake @@ -0,0 +1,36 @@ +# Copyright (c) Microsoft Corporation. +# Licensed under the MIT license. + +set(NVIDIA_FOUND "FALSE") + +find_package(CUDAToolkit) + +if(NOT CUDAToolkit_FOUND) + return() +endif() + +set(CMAKE_CUDA_ARCHITECTURES "60") +if(NOT CMAKE_CUDA_COMPILER) + # In case the CUDA Toolkit directory is not in the PATH + find_program(CUDA_COMPILER + NAMES nvcc + PATHS ${CUDAToolkit_BIN_DIR}) + if(NOT CUDA_COMPILER) + message(WARNING "Could not find nvcc in ${CUDAToolkit_BIN_DIR}") + unset(CMAKE_CUDA_ARCHITECTURES) + return() + endif() + set(CMAKE_CUDA_COMPILER "${CUDA_COMPILER}") +endif() +enable_language(CUDA) + +set(CHECK_SRC "${CMAKE_CURRENT_SOURCE_DIR}/cmake/check_nvidia_gpu.cu") + +try_run(RUN_RESULT COMPILE_SUCCESS SOURCES ${CHECK_SRC}) + +if(COMPILE_SUCCESS AND RUN_RESULT EQUAL 0) + set(NVIDIA_FOUND "TRUE") +else() + unset(CMAKE_CUDA_ARCHITECTURES) + unset(CMAKE_CUDA_COMPILER) +endif() diff --git a/cmake/check_amd_gpu.hip b/cmake/check_amd_gpu.hip new file mode 100644 index 000000000..7537f7edc --- /dev/null +++ b/cmake/check_amd_gpu.hip @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include + +__global__ void kernel() {} + +int main() { + int cnt; + hipError_t err = hipGetDeviceCount(&cnt); + if (err != hipSuccess || cnt == 0) { + return 1; + } + return 0; +} diff --git a/cmake/check_nvidia_gpu.cu b/cmake/check_nvidia_gpu.cu new file mode 100644 index 000000000..672e70f28 --- /dev/null +++ b/cmake/check_nvidia_gpu.cu @@ -0,0 +1,15 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#include + +__global__ void kernel() {} + +int main() { + int cnt; + cudaError_t err = cudaGetDeviceCount(&cnt); + if (err != cudaSuccess || cnt == 0) { + return 1; + } + return 0; +} diff --git a/include/mscclpp/atomic_device.hpp b/include/mscclpp/atomic_device.hpp new file mode 100644 index 000000000..1a8dd4c38 --- /dev/null +++ b/include/mscclpp/atomic_device.hpp @@ -0,0 +1,65 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#ifndef MSCCLPP_ATOMIC_DEVICE_HPP_ +#define MSCCLPP_ATOMIC_DEVICE_HPP_ + +#include "device.hpp" + +#if defined(MSCCLPP_DEVICE_CUDA) +#include +#endif // defined(MSCCLPP_DEVICE_CUDA) + +namespace mscclpp { + +#if defined(MSCCLPP_DEVICE_CUDA) + +constexpr cuda::memory_order memoryOrderRelaxed = cuda::memory_order_relaxed; +constexpr cuda::memory_order memoryOrderAcquire = cuda::memory_order_acquire; +constexpr cuda::memory_order memoryOrderRelease = cuda::memory_order_release; +constexpr cuda::memory_order memoryOrderAcqRel = cuda::memory_order_acq_rel; +constexpr cuda::memory_order memoryOrderSeqCst = cuda::memory_order_seq_cst; + +template +MSCCLPP_HOST_DEVICE_INLINE T atomicLoad(T* ptr, cuda::memory_order memoryOrder) { + return cuda::atomic_ref{*ptr}.load(memoryOrder); +} + +template +MSCCLPP_HOST_DEVICE_INLINE void atomicStore(T* ptr, const T& val, cuda::memory_order memoryOrder) { + cuda::atomic_ref{*ptr}.store(val, memoryOrder); +} + +template +MSCCLPP_HOST_DEVICE_INLINE T atomicFetchAdd(T* ptr, const T& val, cuda::memory_order memoryOrder) { + return cuda::atomic_ref{*ptr}.fetch_add(val, memoryOrder); +} + +#elif defined(MSCCLPP_DEVICE_HIP) + +constexpr auto memoryOrderRelaxed = __ATOMIC_RELAXED; +constexpr auto memoryOrderAcquire = __ATOMIC_ACQUIRE; +constexpr auto memoryOrderRelease = __ATOMIC_RELEASE; +constexpr auto memoryOrderAcqRel = __ATOMIC_ACQ_REL; +constexpr auto memoryOrderSeqCst = __ATOMIC_SEQ_CST; + +template +MSCCLPP_HOST_DEVICE_INLINE T atomicLoad(const T* ptr, int memoryOrder) { + return __atomic_load_n(ptr, memoryOrder); +} + +template +MSCCLPP_HOST_DEVICE_INLINE void atomicStore(T* ptr, const T& val, int memoryOrder) { + __atomic_store_n(ptr, val, memoryOrder); +} + +template +MSCCLPP_HOST_DEVICE_INLINE T atomicFetchAdd(T* ptr, const T& val, int memoryOrder) { + return __atomic_fetch_add(ptr, val, memoryOrder); +} + +#endif // defined(MSCCLPP_DEVICE_HIP) + +} // namespace mscclpp + +#endif // MSCCLPP_ATOMIC_DEVICE_HPP_ diff --git a/include/mscclpp/concurrency.hpp b/include/mscclpp/concurrency_device.hpp similarity index 83% rename from include/mscclpp/concurrency.hpp rename to include/mscclpp/concurrency_device.hpp index 61299d891..c443a73c4 100644 --- a/include/mscclpp/concurrency.hpp +++ b/include/mscclpp/concurrency_device.hpp @@ -1,10 +1,10 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#ifndef MSCCLPP_CONCURRENCY_HPP_ -#define MSCCLPP_CONCURRENCY_HPP_ +#ifndef MSCCLPP_CONCURRENCY_DEVICE_HPP_ +#define MSCCLPP_CONCURRENCY_DEVICE_HPP_ -#include +#include "poll_device.hpp" namespace mscclpp { @@ -17,12 +17,12 @@ struct DeviceSyncer { /// Destroy the DeviceSyncer object. ~DeviceSyncer() = default; -#ifdef __CUDACC__ +#if defined(MSCCLPP_DEVICE_COMPILE) /// Synchronize all threads inside a kernel. Guarantee that all previous work of all threads in cooperating blocks is /// finished. /// @param blockNum The number of blocks that will synchronize. /// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative. - __forceinline__ __device__ void sync(int blockNum, int64_t maxSpinCount = 100000000) { + MSCCLPP_DEVICE_INLINE void sync(int blockNum, int64_t maxSpinCount = 100000000) { unsigned int maxOldCnt = blockNum - 1; __syncthreads(); if (blockNum == 1) return; @@ -47,7 +47,7 @@ struct DeviceSyncer { // the flag is flipped. __syncthreads(); } -#endif +#endif // !defined(MSCCLPP_DEVICE_COMPILE) private: /// The flag to indicate whether the barrier is reached by the latest thread. @@ -60,4 +60,4 @@ struct DeviceSyncer { } // namespace mscclpp -#endif // MSCCLPP_CONCURRENCY_HPP_ +#endif // MSCCLPP_CONCURRENCY_DEVICE_HPP_ diff --git a/include/mscclpp/core.hpp b/include/mscclpp/core.hpp index 9920826ba..f55ac3dd8 100644 --- a/include/mscclpp/core.hpp +++ b/include/mscclpp/core.hpp @@ -13,10 +13,11 @@ #include #include #include -#include #include #include +#include "errors.hpp" + namespace mscclpp { #define MSCCLPP_UNIQUE_ID_BYTES 128 @@ -114,7 +115,7 @@ class TcpBootstrap : public Bootstrap { private: // The interal implementation. - struct Impl; + class Impl; // Pointer to the internal implementation. std::unique_ptr pimpl_; @@ -400,6 +401,8 @@ class Endpoint { /// Represents a connection between two processes. class Connection { public: + virtual ~Connection() = default; + /// Write data from a source @ref RegisteredMemory to a destination @ref RegisteredMemory. /// /// @param dst The destination @ref RegisteredMemory. @@ -524,6 +527,8 @@ class Context { /// A base class for objects that can be set up during @ref Communicator::setup(). struct Setuppable { + virtual ~Setuppable() = default; + /// Called inside @ref Communicator::setup() before any call to @ref endSetup() of any @ref Setuppable object that is /// being set up within the same @ref Communicator::setup() call. /// diff --git a/include/mscclpp/device.hpp b/include/mscclpp/device.hpp new file mode 100644 index 000000000..600a8f855 --- /dev/null +++ b/include/mscclpp/device.hpp @@ -0,0 +1,29 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#ifndef MSCCLPP_DEVICE_HPP_ +#define MSCCLPP_DEVICE_HPP_ + +#if defined(__HIP_PLATFORM_AMD__) +#include +#endif // defined(__HIP_PLATFORM_AMD__) + +#if (defined(__NVCC__) || defined(__HIP_PLATFORM_AMD__)) + +#define MSCCLPP_DEVICE_COMPILE +#define MSCCLPP_DEVICE_INLINE __forceinline__ __device__ +#define MSCCLPP_HOST_DEVICE_INLINE __forceinline__ __host__ __device__ +#if defined(__HIP_PLATFORM_AMD__) +#define MSCCLPP_DEVICE_HIP +#else // !defined(__HIP_PLATFORM_AMD__) +#define MSCCLPP_DEVICE_CUDA +#endif // !defined(__HIP_PLATFORM_AMD__) + +#else // !(defined(__NVCC__) || defined(__HIP_PLATFORM_AMD__)) + +#define MSCCLPP_HOST_COMPILE +#define MSCCLPP_HOST_DEVICE_INLINE inline + +#endif // !(defined(__NVCC__) || defined(__HIP_PLATFORM_AMD__)) + +#endif // MSCCLPP_DEVICE_HPP_ diff --git a/include/mscclpp/errors.hpp b/include/mscclpp/errors.hpp index 0e9c7ecc0..4e90c8d84 100644 --- a/include/mscclpp/errors.hpp +++ b/include/mscclpp/errors.hpp @@ -4,9 +4,6 @@ #ifndef MSCCLPP_ERRORS_HPP_ #define MSCCLPP_ERRORS_HPP_ -#include -#include - #include namespace mscclpp { @@ -77,14 +74,14 @@ class SysError : public BaseError { /// An error from a CUDA runtime library call. class CudaError : public BaseError { public: - CudaError(const std::string& message, cudaError_t errorCode); + CudaError(const std::string& message, int errorCode); virtual ~CudaError() = default; }; /// An error from a CUDA driver library call. class CuError : public BaseError { public: - CuError(const std::string& message, CUresult errorCode); + CuError(const std::string& message, int errorCode); virtual ~CuError() = default; }; diff --git a/include/mscclpp/fifo.hpp b/include/mscclpp/fifo.hpp index be31b80a0..138d0ff7e 100644 --- a/include/mscclpp/fifo.hpp +++ b/include/mscclpp/fifo.hpp @@ -7,8 +7,8 @@ #include #include #include -#include -#include + +#include "fifo_device.hpp" namespace mscclpp { diff --git a/include/mscclpp/fifo_device.hpp b/include/mscclpp/fifo_device.hpp index c48cef274..5806ca510 100644 --- a/include/mscclpp/fifo_device.hpp +++ b/include/mscclpp/fifo_device.hpp @@ -4,9 +4,14 @@ #ifndef MSCCLPP_FIFO_DEVICE_HPP_ #define MSCCLPP_FIFO_DEVICE_HPP_ -#include +#include -#include "poll.hpp" +#include "device.hpp" + +#if defined(MSCCLPP_DEVICE_COMPILE) +#include "atomic_device.hpp" +#include "poll_device.hpp" +#endif // defined(MSCCLPP_DEVICE_COMPILE) namespace mscclpp { @@ -33,15 +38,14 @@ struct alignas(16) ProxyTrigger { /// tail as there is usually enough space for device threads to push their work into. /// struct FifoDeviceHandle { -#ifdef __CUDACC__ +#if defined(MSCCLPP_DEVICE_COMPILE) /// Push a trigger to the FIFO. /// /// @param trigger The trigger to push. /// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative. /// @return The new head of the FIFO. - __forceinline__ __device__ uint64_t push(ProxyTrigger trigger, int64_t maxSpinCount = 1000000) { - uint64_t curFifoHead = - cuda::atomic_ref{*this->head}.fetch_add(1, cuda::memory_order_relaxed); + MSCCLPP_DEVICE_INLINE uint64_t push(ProxyTrigger trigger, int64_t maxSpinCount = 1000000) { + uint64_t curFifoHead = atomicFetchAdd(this->head, (uint64_t)1, memoryOrderRelaxed); // make the last bit intentionally non-zero so that we can safely poll. Don't worry, we will change it back in host // side @@ -53,18 +57,21 @@ struct FifoDeviceHandle { // As atomic access is slow, we first check using the bare pointer and then use the atomic load if the // condition is not met. if (curFifoHead >= size + *(this->tailReplica)) { - OR_POLL_MAYBE_JAILBREAK( - (curFifoHead >= size + cuda::atomic_ref{*this->tailReplica}.load( - cuda::memory_order_relaxed)), - (cuda::atomic_ref{this->triggers[curFifoHead % size].fst}.load( - cuda::memory_order_relaxed) != 0), - maxSpinCount); + OR_POLL_MAYBE_JAILBREAK((curFifoHead >= size + atomicLoad(this->tailReplica, memoryOrderRelaxed)), + (atomicLoad(&(this->triggers[curFifoHead % size].fst), memoryOrderRelaxed) != 0), + maxSpinCount); } - ProxyTrigger* triggerPtr = (ProxyTrigger*)&(this->triggers[curFifoHead % size]); + ProxyTrigger* triggerPtr = &(this->triggers[curFifoHead % size]); // store with memory order release so that the while loop does not go pass this. - asm volatile("st.global.release.cta.v2.u64 [%0], {%1,%2};" ::"l"(triggerPtr), "l"(trigger.fst), "l"(trigger.snd)); +#if defined(MSCCLPP_DEVICE_CUDA) + asm volatile("st.global.release.sys.v2.u64 [%0], {%1,%2};" ::"l"(triggerPtr), "l"(trigger.fst), "l"(trigger.snd)); +#else // !defined(MSCCLPP_DEVICE_CUDA) + // TODO: both atomic and clang built-ins are buggy here + triggerPtr->fst = trigger.fst; + triggerPtr->snd = trigger.snd; +#endif // !defined(MSCCLPP_DEVICE_CUDA) return curFifoHead; } @@ -73,17 +80,14 @@ struct FifoDeviceHandle { /// /// @param curFifoHead The current head of the FIFO. /// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative. - __forceinline__ __device__ void sync(uint64_t curFifoHead, int64_t maxSpinCount = 1000000) { + MSCCLPP_DEVICE_INLINE void sync(uint64_t curFifoHead, int64_t maxSpinCount = 1000000) { // Same as push but in this case checking the fist condition is probably faster since for tail to be pushed we need // to wait for cudaMemcpy to be done. - OR_POLL_MAYBE_JAILBREAK( - (curFifoHead >= - cuda::atomic_ref{*this->tailReplica}.load(cuda::memory_order_relaxed)), - (cuda::atomic_ref{this->triggers[curFifoHead % size].fst}.load( - cuda::memory_order_relaxed) != 0), - maxSpinCount); + OR_POLL_MAYBE_JAILBREAK((curFifoHead >= atomicLoad(this->tailReplica, memoryOrderRelaxed)), + (atomicLoad(&(this->triggers[curFifoHead % size].fst), memoryOrderRelaxed) != 0), + maxSpinCount); } -#endif // __CUDACC__ +#endif // defined(MSCCLPP_DEVICE_COMPILE) /// The FIFO buffer that is allocated on the host via `cudaHostAlloc()`. ProxyTrigger* triggers; diff --git a/include/mscclpp/gpu.hpp b/include/mscclpp/gpu.hpp new file mode 100644 index 000000000..d3d48ce1f --- /dev/null +++ b/include/mscclpp/gpu.hpp @@ -0,0 +1,80 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#ifndef MSCCLPP_GPU_HPP_ +#define MSCCLPP_GPU_HPP_ + +#if defined(__HIP_PLATFORM_AMD__) + +#include + +using cudaError_t = hipError_t; +using cudaGraph_t = hipGraph_t; +using cudaGraphExec_t = hipGraphExec_t; +using cudaDeviceProp = hipDeviceProp_t; +using cudaStream_t = hipStream_t; +using cudaStreamCaptureMode = hipStreamCaptureMode; +using cudaMemcpyKind = hipMemcpyKind; +using cudaIpcMemHandle_t = hipIpcMemHandle_t; + +using CUresult = hipError_t; +using CUdeviceptr = hipDeviceptr_t; + +constexpr auto cudaSuccess = hipSuccess; +constexpr auto cudaStreamNonBlocking = hipStreamNonBlocking; +constexpr auto cudaStreamCaptureModeGlobal = hipStreamCaptureModeGlobal; +constexpr auto cudaStreamCaptureModeRelaxed = hipStreamCaptureModeRelaxed; +constexpr auto cudaHostAllocMapped = hipHostMallocMapped; +constexpr auto cudaHostAllocWriteCombined = hipHostMallocWriteCombined; +constexpr auto cudaMemcpyDefault = hipMemcpyDefault; +constexpr auto cudaMemcpyDeviceToDevice = hipMemcpyDeviceToDevice; +constexpr auto cudaMemcpyHostToDevice = hipMemcpyHostToDevice; +constexpr auto cudaMemcpyDeviceToHost = hipMemcpyDeviceToHost; +constexpr auto cudaIpcMemLazyEnablePeerAccess = hipIpcMemLazyEnablePeerAccess; + +#ifndef CUDA_SUCCESS +#define CUDA_SUCCESS hipSuccess +#endif // CUDA_SUCCESS + +#define cudaGetErrorString(...) hipGetErrorString(__VA_ARGS__) +#define cudaGetDevice(...) hipGetDevice(__VA_ARGS__) +#define cudaGetDeviceCount(...) hipGetDeviceCount(__VA_ARGS__) +#define cudaGetDeviceProperties(...) hipGetDeviceProperties(__VA_ARGS__) +#define cudaGetLastError(...) hipGetLastError(__VA_ARGS__) +#define cudaSetDevice(...) hipSetDevice(__VA_ARGS__) +#define cudaDeviceSynchronize(...) hipDeviceSynchronize(__VA_ARGS__) +#define cudaDeviceGetPCIBusId(...) hipDeviceGetPCIBusId(__VA_ARGS__) +#define cudaHostAlloc(...) hipHostMalloc(__VA_ARGS__) +#define cudaMalloc(...) hipMalloc(__VA_ARGS__) +#define cudaFree(...) hipFree(__VA_ARGS__) +#define cudaFreeHost(...) hipHostFree(__VA_ARGS__) +#define cudaMemset(...) hipMemset(__VA_ARGS__) +#define cudaMemsetAsync(...) hipMemsetAsync(__VA_ARGS__) +#define cudaMemcpy(...) hipMemcpy(__VA_ARGS__) +#define cudaMemcpyAsync(...) hipMemcpyAsync(__VA_ARGS__) +#define cudaMemcpyToSymbol(...) hipMemcpyToSymbol(__VA_ARGS__) +#define cudaStreamCreateWithFlags(...) hipStreamCreateWithFlags(__VA_ARGS__) +#define cudaStreamSynchronize(...) hipStreamSynchronize(__VA_ARGS__) +#define cudaStreamBeginCapture(...) hipStreamBeginCapture(__VA_ARGS__) +#define cudaStreamEndCapture(...) hipStreamEndCapture(__VA_ARGS__) +#define cudaStreamDestroy(...) hipStreamDestroy(__VA_ARGS__) +#define cudaGraphInstantiate(...) hipGraphInstantiate(__VA_ARGS__) +#define cudaGraphLaunch(...) hipGraphLaunch(__VA_ARGS__) +#define cudaGraphDestroy(...) hipGraphDestroy(__VA_ARGS__) +#define cudaGraphExecDestroy(...) hipGraphExecDestroy(__VA_ARGS__) +#define cudaThreadExchangeStreamCaptureMode(...) hipThreadExchangeStreamCaptureMode(__VA_ARGS__) +#define cudaIpcGetMemHandle(...) hipIpcGetMemHandle(__VA_ARGS__) +#define cudaIpcOpenMemHandle(...) hipIpcOpenMemHandle(__VA_ARGS__) +#define cudaIpcCloseMemHandle(...) hipIpcCloseMemHandle(__VA_ARGS__) + +#define cuGetErrorString(...) hipDrvGetErrorString(__VA_ARGS__) +#define cuMemGetAddressRange(...) hipMemGetAddressRange(__VA_ARGS__) + +#else + +#include +#include + +#endif + +#endif // MSCCLPP_GPU_HPP_ diff --git a/include/mscclpp/cuda_utils.hpp b/include/mscclpp/gpu_utils.hpp similarity index 87% rename from include/mscclpp/cuda_utils.hpp rename to include/mscclpp/gpu_utils.hpp index 2cadfef6c..154f87723 100644 --- a/include/mscclpp/cuda_utils.hpp +++ b/include/mscclpp/gpu_utils.hpp @@ -1,14 +1,14 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#ifndef MSCCLPP_CUDA_UTILS_HPP_ -#define MSCCLPP_CUDA_UTILS_HPP_ - -#include +#ifndef MSCCLPP_GPU_UTILS_HPP_ +#define MSCCLPP_GPU_UTILS_HPP_ #include #include -#include + +#include "errors.hpp" +#include "gpu.hpp" /// Throw @ref mscclpp::CudaError if @p cmd does not return cudaSuccess. /// @param cmd The command to execute. @@ -67,6 +67,21 @@ T* cudaCalloc(size_t nelem) { return ptr; } +template +T* cudaExtCalloc(size_t nelem) { + AvoidCudaGraphCaptureGuard cgcGuard; + T* ptr; + CudaStreamWithFlags stream(cudaStreamNonBlocking); +#if defined(__HIP_PLATFORM_AMD__) && (__HIP_PLATFORM_AMD__ == 1) + MSCCLPP_CUDATHROW(hipExtMallocWithFlags((void**)&ptr, nelem * sizeof(T), hipDeviceMallocUncached)); +#else + MSCCLPP_CUDATHROW(cudaMalloc(&ptr, nelem * sizeof(T))); +#endif + MSCCLPP_CUDATHROW(cudaMemsetAsync(ptr, 0, nelem * sizeof(T), stream)); + MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream)); + return ptr; +} + /// A wrapper of cudaHostAlloc that sets the allocated memory to zero. /// @tparam T Type of each element in the allocated memory. /// @param nelem Number of elements to allocate. @@ -136,6 +151,15 @@ std::shared_ptr allocSharedCuda(size_t count = 1) { return detail::safeAlloc, CudaDeleter, std::shared_ptr>(count); } +/// Allocates memory on the device and returns a std::shared_ptr to it. The memory is zeroed out. +/// @tparam T Type of each element in the allocated memory. +/// @param count Number of elements to allocate. +/// @return A std::shared_ptr to the allocated memory. +template +std::shared_ptr allocExtSharedCuda(size_t count = 1) { + return detail::safeAlloc, CudaDeleter, std::shared_ptr>(count); +} + /// Unique device pointer that will call cudaFree on destruction. /// @tparam T Type of each element in the allocated memory. template @@ -150,6 +174,15 @@ UniqueCudaPtr allocUniqueCuda(size_t count = 1) { return detail::safeAlloc, CudaDeleter, UniqueCudaPtr>(count); } +/// Allocates memory on the device and returns a std::unique_ptr to it. The memory is zeroed out. +/// @tparam T Type of each element in the allocated memory. +/// @param count Number of elements to allocate. +/// @return A std::unique_ptr to the allocated memory. +template +UniqueCudaPtr allocExtUniqueCuda(size_t count = 1) { + return detail::safeAlloc, CudaDeleter, UniqueCudaPtr>(count); +} + /// Allocates memory with cudaHostAlloc, constructs an object of type T in it and returns a std::shared_ptr to it. /// @tparam T Type of the object to construct. /// @tparam Args Types of the arguments to pass to the constructor. @@ -238,4 +271,4 @@ void memcpyCuda(T* dst, const T* src, size_t count, cudaMemcpyKind kind = cudaMe } // namespace mscclpp -#endif // MSCCLPP_CUDA_UTILS_HPP_ +#endif // MSCCLPP_GPU_UTILS_HPP_ diff --git a/include/mscclpp/packet.hpp b/include/mscclpp/packet_device.hpp similarity index 50% rename from include/mscclpp/packet.hpp rename to include/mscclpp/packet_device.hpp index 6f95bb092..91246067e 100644 --- a/include/mscclpp/packet.hpp +++ b/include/mscclpp/packet_device.hpp @@ -1,15 +1,20 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#ifndef MSCCLPP_PACKET_HPP_ -#define MSCCLPP_PACKET_HPP_ +#ifndef MSCCLPP_PACKET_DEVICE_HPP_ +#define MSCCLPP_PACKET_DEVICE_HPP_ -#include "poll.hpp" +#include "device.hpp" + +#if defined(MSCCLPP_DEVICE_COMPILE) +#include "atomic_device.hpp" +#include "poll_device.hpp" +#endif // defined(MSCCLPP_DEVICE_COMPILE) namespace mscclpp { /// LL (low latency) protocol packet. -union LLPacket { +union alignas(16) LLPacket { // Assume data is written with an atomicity of 8 bytes (IB/RDMA). struct { uint32_t data1; @@ -18,67 +23,76 @@ union LLPacket { uint32_t flag2; }; - struct { - uint64_t x; - uint64_t y; - } vec; - - uint64_t v[2]; +#if defined(MSCCLPP_DEVICE_COMPILE) + ulonglong2 raw_; -#ifdef __CUDACC__ - __forceinline__ __device__ LLPacket() {} + MSCCLPP_DEVICE_INLINE LLPacket() {} /// Write 8 bytes of data to the packet. /// @param val1 The first 4-byte data to write. /// @param val2 The second 4-byte data to write. /// @param flag The flag to write. - __forceinline__ __device__ void write(uint32_t val1, uint32_t val2, uint32_t flag) { - asm volatile("st.volatile.global.v4.u32 [%0], {%1,%2,%3,%4};" ::"l"(v), "r"(val1), "r"(flag), "r"(val2), "r"(flag)); + MSCCLPP_DEVICE_INLINE void write(uint32_t val1, uint32_t val2, uint32_t flag) { +#if defined(MSCCLPP_DEVICE_CUDA) + asm volatile("st.volatile.global.v4.u32 [%0], {%1,%2,%3,%4};" ::"l"(&raw_), "r"(val1), "r"(flag), "r"(val2), + "r"(flag)); +#else // !defined(MSCCLPP_DEVICE_CUDA) + uint4 reg = make_uint4(val1, flag, val2, flag); + ulonglong2* p = reinterpret_cast(®); + // TODO: clang built-ins are buggy here + atomicStore(&(raw_.x), p->x, memoryOrderRelaxed); + atomicStore(&(raw_.y), p->y, memoryOrderRelaxed); +#endif } /// Write 8 bytes of data to the packet. /// @param val The 8-byte data to write. /// @param flag The flag to write. - __forceinline__ __device__ void write(uint64_t val, uint32_t flag) { - asm volatile("st.volatile.global.v4.u32 [%0], {%1,%2,%3,%4};" ::"l"(v), "r"((uint32_t)val), "r"(flag), - "r"((uint32_t)(val >> 32)), "r"(flag)); - } + MSCCLPP_DEVICE_INLINE void write(uint64_t val, uint32_t flag) { write((uint32_t)val, (uint32_t)(val >> 32), flag); } /// Helper of @ref read(). /// @param flag The flag to read. /// @param data The 8-byte data read. /// @return True if the flag is not equal to the given flag. - __forceinline__ __device__ bool readOnce(uint32_t flag, uint2& data) const { + MSCCLPP_DEVICE_INLINE bool readOnce(uint32_t flag, uint2& data) const { +#if defined(MSCCLPP_DEVICE_CUDA) uint32_t flag1, flag2; asm volatile("ld.volatile.global.v4.u32 {%0,%1,%2,%3}, [%4];" : "=r"(data.x), "=r"(flag1), "=r"(data.y), "=r"(flag2) - : "l"(v)); + : "l"(&raw_)); return (flag1 != flag) || (flag2 != flag); +#else // !defined(MSCCLPP_DEVICE_CUDA) + ulonglong2 reg; + // TODO: clang built-ins are buggy here + reg.x = atomicLoad(&(raw_.x), memoryOrderRelaxed); + reg.y = atomicLoad(&(raw_.y), memoryOrderRelaxed); + uint4* ptr = reinterpret_cast(®); + data.x = ptr->x; + data.y = ptr->z; + return (ptr->y != flag) || (ptr->w != flag); +#endif } /// Read 8 bytes of data from the packet. /// @param flag The flag to read. /// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative. /// @return The 8-byte data read. - __forceinline__ __device__ uint2 read(uint32_t flag, int64_t maxSpinCount = 100000000) const { + MSCCLPP_DEVICE_INLINE uint2 read(uint32_t flag, int64_t maxSpinCount = 100000000) const { uint2 data; POLL_MAYBE_JAILBREAK(readOnce(flag, data), maxSpinCount); return data; } /// Clear the packet. - __forceinline__ __device__ void clear() { - vec.x = 0; - vec.y = 0; - } -#endif // __CUDACC__ + MSCCLPP_DEVICE_INLINE void clear() { raw_ = make_ulonglong2(0, 0); } +#endif // defined(MSCCLPP_DEVICE_COMPILE) }; -#ifdef __CUDACC__ +#if defined(MSCCLPP_DEVICE_COMPILE) /// Read from the origin and write to the target buffer. -__forceinline__ __device__ void putPackets(void* targetPtr, uint64_t targetOffset, const void* originPtr, - uint64_t originOffset, uint64_t originBytes, uint32_t threadId, - uint32_t numThreads, uint32_t flag) { +MSCCLPP_DEVICE_INLINE void putPackets(void* targetPtr, uint64_t targetOffset, const void* originPtr, + uint64_t originOffset, uint64_t originBytes, uint32_t threadId, + uint32_t numThreads, uint32_t flag) { // Offsets should be aligned to 8 bytes & size should be a multiple of 8 bytes const uint32_t* originBase = (const uint32_t*)((const char*)originPtr + originOffset); LLPacket* targetBase = (LLPacket*)((char*)targetPtr + targetOffset); @@ -90,9 +104,9 @@ __forceinline__ __device__ void putPackets(void* targetPtr, uint64_t targetOffse } /// Read from the target buffer and write to the origin. -__forceinline__ __device__ void getPackets(const void* targetPtr, uint64_t targetOffset, void* originPtr, - uint64_t originOffset, uint64_t originBytes, uint32_t threadId, - uint32_t numThreads, uint32_t flag) { +MSCCLPP_DEVICE_INLINE void getPackets(const void* targetPtr, uint64_t targetOffset, void* originPtr, + uint64_t originOffset, uint64_t originBytes, uint32_t threadId, + uint32_t numThreads, uint32_t flag) { // Offsets should be aligned to 8 bytes & size should be a multiple of 8 bytes const LLPacket* targetBase = (const LLPacket*)((const char*)targetPtr + targetOffset); uint2* originBase = (uint2*)((char*)originPtr + originOffset); @@ -102,8 +116,8 @@ __forceinline__ __device__ void getPackets(const void* targetPtr, uint64_t targe originBase[i] = pkt->read(flag); } } -#endif // __CUDACC__ +#endif // defined(MSCCLPP_DEVICE_COMPILE) }; // namespace mscclpp -#endif // MSCCLPP_PACKET_HPP_ +#endif // MSCCLPP_PACKET_DEVICE_HPP_ diff --git a/include/mscclpp/poll.hpp b/include/mscclpp/poll_device.hpp similarity index 70% rename from include/mscclpp/poll.hpp rename to include/mscclpp/poll_device.hpp index ea111d9de..0cdb6b019 100644 --- a/include/mscclpp/poll.hpp +++ b/include/mscclpp/poll_device.hpp @@ -1,28 +1,22 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#ifndef MSCCLPP_POLL_HPP_ -#define MSCCLPP_POLL_HPP_ +#ifndef MSCCLPP_POLL_DEVICE_HPP_ +#define MSCCLPP_POLL_DEVICE_HPP_ -#ifdef __CUDACC__ +#include "device.hpp" + +#if defined(MSCCLPP_DEVICE_COMPILE) #include +#if defined(MSCCLPP_DEVICE_HIP) +extern "C" __device__ void __assert_fail(const char *__assertion, const char *__file, unsigned int __line, + const char *__function); +#else // !defined(MSCCLPP_DEVICE_HIP) extern "C" __device__ void __assert_fail(const char *__assertion, const char *__file, unsigned int __line, const char *__function) __THROW; - -// If a spin is stuck, escape from it and set status to 1. -#define POLL_MAYBE_JAILBREAK_ESCAPE(__cond, __max_spin_cnt, __status) \ - do { \ - int64_t __spin_cnt = 0; \ - __status = 0; \ - while (__cond) { \ - if (__max_spin_cnt >= 0 && __spin_cnt++ == __max_spin_cnt) { \ - __status = 1; \ - break; \ - } \ - } \ - } while (0); +#endif // !defined(MSCCLPP_DEVICE_HIP) // If a spin is stuck, print a warning and keep spinning. #define POLL_MAYBE_JAILBREAK(__cond, __max_spin_cnt) \ @@ -52,6 +46,6 @@ extern "C" __device__ void __assert_fail(const char *__assertion, const char *__ } \ } while (0); -#endif // __CUDACC__ +#endif // defined(MSCCLPP_DEVICE_COMPILE) -#endif // MSCCLPP_POLL_HPP_ +#endif // MSCCLPP_POLL_DEVICE_HPP_ diff --git a/include/mscclpp/proxy.hpp b/include/mscclpp/proxy.hpp index c403a86de..655263c83 100644 --- a/include/mscclpp/proxy.hpp +++ b/include/mscclpp/proxy.hpp @@ -6,7 +6,8 @@ #include #include -#include + +#include "fifo.hpp" namespace mscclpp { diff --git a/include/mscclpp/proxy_channel.hpp b/include/mscclpp/proxy_channel.hpp index 016a4f24c..fa161494f 100644 --- a/include/mscclpp/proxy_channel.hpp +++ b/include/mscclpp/proxy_channel.hpp @@ -4,10 +4,10 @@ #ifndef MSCCLPP_PROXY_CHANNEL_HPP_ #define MSCCLPP_PROXY_CHANNEL_HPP_ -#include -#include -#include -#include +#include "core.hpp" +#include "proxy.hpp" +#include "proxy_channel_device.hpp" +#include "semaphore.hpp" namespace mscclpp { diff --git a/include/mscclpp/proxy_channel_device.hpp b/include/mscclpp/proxy_channel_device.hpp index 35c5b6fea..c4cbc6ec2 100644 --- a/include/mscclpp/proxy_channel_device.hpp +++ b/include/mscclpp/proxy_channel_device.hpp @@ -47,12 +47,12 @@ union ChannelTrigger { uint64_t reserved : MSCCLPP_BITS_FIFO_RESERVED; } fields; -#ifdef __CUDACC__ +#if defined(MSCCLPP_DEVICE_COMPILE) /// Default constructor. - __forceinline__ __device__ ChannelTrigger() {} + MSCCLPP_DEVICE_INLINE ChannelTrigger() {} /// Copy constructor. - __forceinline__ __device__ ChannelTrigger(ProxyTrigger value) : value(value) {} + MSCCLPP_DEVICE_INLINE ChannelTrigger(ProxyTrigger value) : value(value) {} /// Constructor. /// @param type The type of the trigger. @@ -62,8 +62,8 @@ union ChannelTrigger { /// @param srcOffset The offset into the source memory region. /// @param bytes The bytes of the transfer. /// @param semaphoreId The ID of the semaphore. - __forceinline__ __device__ ChannelTrigger(TriggerType type, MemoryId dst, uint64_t dstOffset, MemoryId src, - uint64_t srcOffset, uint64_t bytes, int semaphoreId) { + MSCCLPP_DEVICE_INLINE ChannelTrigger(TriggerType type, MemoryId dst, uint64_t dstOffset, MemoryId src, + uint64_t srcOffset, uint64_t bytes, int semaphoreId) { value.fst = ((srcOffset << MSCCLPP_BITS_SIZE) + bytes); value.snd = ((((((((semaphoreId << MSCCLPP_BITS_TYPE) + (uint64_t)type) << MSCCLPP_BITS_REGMEM_HANDLE) + dst) << MSCCLPP_BITS_REGMEM_HANDLE) + @@ -71,7 +71,7 @@ union ChannelTrigger { << MSCCLPP_BITS_OFFSET) + dstOffset); } -#endif // __CUDACC__ +#endif // defined(MSCCLPP_DEVICE_COMPILE) }; struct ProxyChannelDeviceHandle { @@ -83,15 +83,14 @@ struct ProxyChannelDeviceHandle { // can produce for and the sole proxy thread consumes it. FifoDeviceHandle fifo_; -#ifdef __CUDACC__ +#if defined(MSCCLPP_DEVICE_COMPILE) /// Push a @ref TriggerData to the FIFO. /// @param dst The destination memory region. /// @param dstOffset The offset into the destination memory region. /// @param src The source memory region. /// @param srcOffset The offset into the source memory region. /// @param size The size of the transfer. - __forceinline__ __device__ void put(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, - uint64_t size) { + MSCCLPP_DEVICE_INLINE void put(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, uint64_t size) { fifo_.push(ChannelTrigger(TriggerData, dst, dstOffset, src, srcOffset, size, semaphoreId_).value); } @@ -100,14 +99,12 @@ struct ProxyChannelDeviceHandle { /// @param src The source memory region. /// @param offset The common offset into the destination and source memory regions. /// @param size The size of the transfer. - __forceinline__ __device__ void put(MemoryId dst, MemoryId src, uint64_t offset, uint64_t size) { + MSCCLPP_DEVICE_INLINE void put(MemoryId dst, MemoryId src, uint64_t offset, uint64_t size) { put(dst, offset, src, offset, size); } /// Push a @ref TriggerFlag to the FIFO. - __forceinline__ __device__ void signal() { - fifo_.push(ChannelTrigger(TriggerFlag, 0, 0, 0, 0, 1, semaphoreId_).value); - } + MSCCLPP_DEVICE_INLINE void signal() { fifo_.push(ChannelTrigger(TriggerFlag, 0, 0, 0, 0, 1, semaphoreId_).value); } /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. /// @param dst The destination memory region. @@ -115,8 +112,8 @@ struct ProxyChannelDeviceHandle { /// @param src The source memory region. /// @param srcOffset The offset into the source memory region. /// @param size The size of the transfer. - __forceinline__ __device__ void putWithSignal(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, - uint64_t size) { + MSCCLPP_DEVICE_INLINE void putWithSignal(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, + uint64_t size) { fifo_.push(ChannelTrigger(TriggerData | TriggerFlag, dst, dstOffset, src, srcOffset, size, semaphoreId_).value); } @@ -125,7 +122,7 @@ struct ProxyChannelDeviceHandle { /// @param src The source memory region. /// @param offset The common offset into the destination and source memory regions. /// @param size The size of the transfer. - __forceinline__ __device__ void putWithSignal(MemoryId dst, MemoryId src, uint64_t offset, uint64_t size) { + MSCCLPP_DEVICE_INLINE void putWithSignal(MemoryId dst, MemoryId src, uint64_t offset, uint64_t size) { putWithSignal(dst, offset, src, offset, size); } @@ -135,8 +132,8 @@ struct ProxyChannelDeviceHandle { /// @param src The source memory region. /// @param srcOffset The offset into the source memory region. /// @param size The size of the transfer. - __forceinline__ __device__ void putWithSignalAndFlush(MemoryId dst, uint64_t dstOffset, MemoryId src, - uint64_t srcOffset, uint64_t size) { + MSCCLPP_DEVICE_INLINE void putWithSignalAndFlush(MemoryId dst, uint64_t dstOffset, MemoryId src, uint64_t srcOffset, + uint64_t size) { uint64_t curFifoHead = fifo_.push( ChannelTrigger(TriggerData | TriggerFlag | TriggerSync, dst, dstOffset, src, srcOffset, size, semaphoreId_) .value); @@ -148,25 +145,25 @@ struct ProxyChannelDeviceHandle { /// @param src The source memory region. /// @param offset The common offset into the destination and source memory regions. /// @param size The size of the transfer. - __forceinline__ __device__ void putWithSignalAndFlush(MemoryId dst, MemoryId src, uint64_t offset, uint64_t size) { + MSCCLPP_DEVICE_INLINE void putWithSignalAndFlush(MemoryId dst, MemoryId src, uint64_t offset, uint64_t size) { putWithSignalAndFlush(dst, offset, src, offset, size); } /// Push a @ref TriggerSync to the FIFO. - __forceinline__ __device__ void flush() { + MSCCLPP_DEVICE_INLINE void flush() { uint64_t curFifoHead = fifo_.push(ChannelTrigger(TriggerSync, 0, 0, 0, 0, 1, semaphoreId_).value); fifo_.sync(curFifoHead); } /// Check if the proxy channel has been signaled. /// @return true if the proxy channel has been signaled. - __forceinline__ __device__ bool poll() { return semaphore_.poll(); } + MSCCLPP_DEVICE_INLINE bool poll() { return semaphore_.poll(); } /// Wait for the proxy channel to be signaled. /// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative. - __forceinline__ __device__ void wait(int64_t maxSpinCount = 10000000) { semaphore_.wait(maxSpinCount); } + MSCCLPP_DEVICE_INLINE void wait(int64_t maxSpinCount = 10000000) { semaphore_.wait(maxSpinCount); } -#endif // __CUDACC__ +#endif // defined(MSCCLPP_DEVICE_COMPILE) }; struct SimpleProxyChannelDeviceHandle { @@ -174,62 +171,62 @@ struct SimpleProxyChannelDeviceHandle { MemoryId dst_; MemoryId src_; -#ifdef __CUDACC__ +#if defined(MSCCLPP_DEVICE_COMPILE) /// Push a @ref TriggerData to the FIFO. /// @param dstOffset The offset into the destination memory region. /// @param srcOffset The offset into the source memory region. /// @param size The size of the transfer. - __forceinline__ __device__ void put(uint64_t dstOffset, uint64_t srcOffset, uint64_t size) { + MSCCLPP_DEVICE_INLINE void put(uint64_t dstOffset, uint64_t srcOffset, uint64_t size) { proxyChan_.put(dst_, dstOffset, src_, srcOffset, size); } /// Push a @ref TriggerData to the FIFO. /// @param offset The common offset into the destination and source memory regions. /// @param size The size of the transfer. - __forceinline__ __device__ void put(uint64_t offset, uint64_t size) { put(offset, offset, size); } + MSCCLPP_DEVICE_INLINE void put(uint64_t offset, uint64_t size) { put(offset, offset, size); } /// Push a @ref TriggerFlag to the FIFO. - __forceinline__ __device__ void signal() { proxyChan_.signal(); } + MSCCLPP_DEVICE_INLINE void signal() { proxyChan_.signal(); } /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. /// @param dstOffset The offset into the destination memory region. /// @param srcOffset The offset into the source memory region. /// @param size The size of the transfer. - __forceinline__ __device__ void putWithSignal(uint64_t dstOffset, uint64_t srcOffset, uint64_t size) { + MSCCLPP_DEVICE_INLINE void putWithSignal(uint64_t dstOffset, uint64_t srcOffset, uint64_t size) { proxyChan_.putWithSignal(dst_, dstOffset, src_, srcOffset, size); } /// Push a @ref TriggerData and a @ref TriggerFlag at the same time to the FIFO. /// @param offset The common offset into the destination and source memory regions. /// @param size The size of the transfer. - __forceinline__ __device__ void putWithSignal(uint64_t offset, uint64_t size) { putWithSignal(offset, offset, size); } + MSCCLPP_DEVICE_INLINE void putWithSignal(uint64_t offset, uint64_t size) { putWithSignal(offset, offset, size); } /// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO. /// @param dstOffset The offset into the destination memory region. /// @param srcOffset The offset into the source memory region. /// @param size The size of the transfer. - __forceinline__ __device__ void putWithSignalAndFlush(uint64_t dstOffset, uint64_t srcOffset, uint64_t size) { + MSCCLPP_DEVICE_INLINE void putWithSignalAndFlush(uint64_t dstOffset, uint64_t srcOffset, uint64_t size) { proxyChan_.putWithSignalAndFlush(dst_, dstOffset, src_, srcOffset, size); } /// Push a @ref TriggerData, a @ref TriggerFlag, and a @ref TriggerSync at the same time to the FIFO. /// @param offset The common offset into the destination and source memory regions. /// @param size The size of the transfer. - __forceinline__ __device__ void putWithSignalAndFlush(uint64_t offset, uint64_t size) { + MSCCLPP_DEVICE_INLINE void putWithSignalAndFlush(uint64_t offset, uint64_t size) { putWithSignalAndFlush(offset, offset, size); } /// Push a @ref TriggerSync to the FIFO. - __forceinline__ __device__ void flush() { proxyChan_.flush(); } + MSCCLPP_DEVICE_INLINE void flush() { proxyChan_.flush(); } /// Check if the proxy channel has been signaled. /// @return true if the proxy channel has been signaled. - __forceinline__ __device__ bool poll() { return proxyChan_.poll(); } + MSCCLPP_DEVICE_INLINE bool poll() { return proxyChan_.poll(); } /// Wait for the proxy channel to be signaled. /// @param maxSpinCount The maximum number of spin counts before asserting. Never assert if negative. - __forceinline__ __device__ void wait(int64_t maxSpinCount = 10000000) { proxyChan_.wait(maxSpinCount); } -#endif // __CUDACC__ + MSCCLPP_DEVICE_INLINE void wait(int64_t maxSpinCount = 10000000) { proxyChan_.wait(maxSpinCount); } +#endif // defined(MSCCLPP_DEVICE_COMPILE) }; } // namespace mscclpp diff --git a/include/mscclpp/semaphore.hpp b/include/mscclpp/semaphore.hpp index 7ad3ec6be..5f1800990 100644 --- a/include/mscclpp/semaphore.hpp +++ b/include/mscclpp/semaphore.hpp @@ -5,10 +5,10 @@ #define MSCCLPP_SEMAPHORE_HPP_ #include -#include -#include -#include -#include + +#include "core.hpp" +#include "gpu_utils.hpp" +#include "semaphore_device.hpp" namespace mscclpp { @@ -125,7 +125,7 @@ class SmDevice2DeviceSemaphore : public BaseSemaphore SmDevice2DeviceSemaphore(Communicator& communicator, std::shared_ptr connection); /// Constructor. - SmDevice2DeviceSemaphore() = default; + SmDevice2DeviceSemaphore() = delete; /// Device-side handle for @ref SmDevice2DeviceSemaphore. using DeviceHandle = SmDevice2DeviceSemaphoreDeviceHandle; diff --git a/include/mscclpp/semaphore_device.hpp b/include/mscclpp/semaphore_device.hpp index 4ed5fbeec..cd455078a 100644 --- a/include/mscclpp/semaphore_device.hpp +++ b/include/mscclpp/semaphore_device.hpp @@ -4,32 +4,33 @@ #ifndef MSCCLPP_SEMAPHORE_DEVICE_HPP_ #define MSCCLPP_SEMAPHORE_DEVICE_HPP_ -#include +#include "device.hpp" -#include "poll.hpp" +#if defined(MSCCLPP_DEVICE_COMPILE) +#include "atomic_device.hpp" +#include "poll_device.hpp" +#endif // defined(MSCCLPP_DEVICE_COMPILE) namespace mscclpp { /// Device-side handle for @ref Host2DeviceSemaphore. struct Host2DeviceSemaphoreDeviceHandle { -#ifdef __CUDACC__ +#if defined(MSCCLPP_DEVICE_COMPILE) /// Poll if the host has signaled. /// @return true if the host has signaled. - __forceinline__ __device__ bool poll() { - bool signaled = (cuda::atomic_ref{*inboundSemaphoreId}.load( - cuda::memory_order_acquire) > (*expectedInboundSemaphoreId)); + MSCCLPP_DEVICE_INLINE bool poll() { + bool signaled = (atomicLoad(inboundSemaphoreId, memoryOrderAcquire) > (*expectedInboundSemaphoreId)); if (signaled) (*expectedInboundSemaphoreId) += 1; return signaled; } /// Wait for the host to signal. - __forceinline__ __device__ void wait(int64_t maxSpinCount = 100000000) { + MSCCLPP_DEVICE_INLINE void wait(int64_t maxSpinCount = 100000000) { (*expectedInboundSemaphoreId) += 1; - POLL_MAYBE_JAILBREAK((cuda::atomic_ref{*inboundSemaphoreId}.load( - cuda::memory_order_acquire) < (*expectedInboundSemaphoreId)), + POLL_MAYBE_JAILBREAK((atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId)), maxSpinCount); } -#endif // __CUDACC__ +#endif // defined(MSCCLPP_DEVICE_COMPILE) uint64_t* inboundSemaphoreId; uint64_t* expectedInboundSemaphoreId; @@ -37,21 +38,19 @@ struct Host2DeviceSemaphoreDeviceHandle { /// Device-side handle for @ref SmDevice2DeviceSemaphore. struct SmDevice2DeviceSemaphoreDeviceHandle { -#ifdef __CUDACC__ +#if defined(MSCCLPP_DEVICE_COMPILE) /// Poll if the remote device has signaled. /// @return true if the remote device has signaled. - __forceinline__ __device__ bool poll() { - bool signaled = (cuda::atomic_ref{*inboundSemaphoreId}.load( - cuda::memory_order_acquire) > (*expectedInboundSemaphoreId)); + MSCCLPP_DEVICE_INLINE bool poll() { + bool signaled = (atomicLoad(inboundSemaphoreId, memoryOrderAcquire) > (*expectedInboundSemaphoreId)); if (signaled) (*expectedInboundSemaphoreId) += 1; return signaled; } /// Wait for the remote device to signal. - __forceinline__ __device__ void wait(int64_t maxSpinCount = 100000000) { + MSCCLPP_DEVICE_INLINE void wait(int64_t maxSpinCount = 100000000) { (*expectedInboundSemaphoreId) += 1; - POLL_MAYBE_JAILBREAK((cuda::atomic_ref{*inboundSemaphoreId}.load( - cuda::memory_order_acquire) < (*expectedInboundSemaphoreId)), + POLL_MAYBE_JAILBREAK((atomicLoad(inboundSemaphoreId, memoryOrderAcquire) < (*expectedInboundSemaphoreId)), maxSpinCount); } @@ -60,12 +59,11 @@ struct SmDevice2DeviceSemaphoreDeviceHandle { /// This function guarantees that all the memory operation before this function is completed before the remote /// semaphore is signaled. /// - __forceinline__ __device__ void signal() { + MSCCLPP_DEVICE_INLINE void signal() { // This fence ensures that preceding writes are visible on the peer GPU before the incremented // `outboundSemaphoreId` is visible. semaphoreIncrement(); - cuda::atomic_ref{*remoteInboundSemaphoreId}.store(semaphoreGetLocal(), - cuda::memory_order_seq_cst); + atomicStore(remoteInboundSemaphoreId, semaphoreGetLocal(), memoryOrderSeqCst); } /// Signal the remote device. @@ -73,12 +71,11 @@ struct SmDevice2DeviceSemaphoreDeviceHandle { /// This function is a relaxed version of signal() and provides no guarantee on the completion of memory operations. /// User requires to call proper fencing before using this function. /// - __forceinline__ __device__ void relaxedSignal() { + MSCCLPP_DEVICE_INLINE void relaxedSignal() { // This fence ensures that preceding writes are visible on the peer GPU before the incremented // `outboundSemaphoreId` is visible. semaphoreIncrement(); - cuda::atomic_ref{*remoteInboundSemaphoreId}.store(semaphoreGetLocal(), - cuda::memory_order_relaxed); + atomicStore(remoteInboundSemaphoreId, semaphoreGetLocal(), memoryOrderRelaxed); } /// Signal the remote device for copied packets. @@ -87,17 +84,17 @@ struct SmDevice2DeviceSemaphoreDeviceHandle { /// intended to be used with @ref putPackets() and @ref getPackets() that use flags inside packets to indicate the /// completion of copies. /// - __forceinline__ __device__ void signalPacket() { + MSCCLPP_DEVICE_INLINE void signalPacket() { semaphoreIncrement(); *remoteInboundSemaphoreId = semaphoreGetLocal(); } /// Increase the counter of the local semaphore. - __forceinline__ __device__ void semaphoreIncrement() { *outboundSemaphoreId += 1; } + MSCCLPP_DEVICE_INLINE void semaphoreIncrement() { *outboundSemaphoreId += 1; } /// Get the value of the local semaphore. - __forceinline__ __device__ uint64_t semaphoreGetLocal() const { return *outboundSemaphoreId; } -#endif // __CUDACC__ + MSCCLPP_DEVICE_INLINE uint64_t semaphoreGetLocal() const { return *outboundSemaphoreId; } +#endif // defined(MSCCLPP_DEVICE_COMPILE) uint64_t* inboundSemaphoreId; uint64_t* outboundSemaphoreId; diff --git a/include/mscclpp/sm_channel.hpp b/include/mscclpp/sm_channel.hpp index a1d1daf2b..1a759968b 100644 --- a/include/mscclpp/sm_channel.hpp +++ b/include/mscclpp/sm_channel.hpp @@ -4,11 +4,12 @@ #ifndef MSCCLPP_SM_CHANNEL_HPP_ #define MSCCLPP_SM_CHANNEL_HPP_ -#include -#include -#include #include +#include "core.hpp" +#include "semaphore.hpp" +#include "sm_channel_device.hpp" + namespace mscclpp { /// Channel for accessing peer memory directly from SM. diff --git a/include/mscclpp/sm_channel_device.hpp b/include/mscclpp/sm_channel_device.hpp index e4e945c1c..29993f8e8 100644 --- a/include/mscclpp/sm_channel_device.hpp +++ b/include/mscclpp/sm_channel_device.hpp @@ -4,36 +4,17 @@ #ifndef MSCCLPP_SM_CHANNEL_DEVICE_HPP_ #define MSCCLPP_SM_CHANNEL_DEVICE_HPP_ -#include "packet.hpp" -#include "poll.hpp" #include "semaphore_device.hpp" +#if defined(MSCCLPP_DEVICE_COMPILE) +#include "packet_device.hpp" +#endif // defined(MSCCLPP_DEVICE_COMPILE) namespace mscclpp { -#ifdef __CUDACC__ +#if defined(MSCCLPP_DEVICE_COMPILE) namespace Element { -/// Load an element from DRAM. -/// -/// @param v The value to be loaded. -/// @param p The address of the value to be loaded. -/// -template -__forceinline__ __device__ void load(T& v, const T* p) { - v = *p; -} - -/// Write an element on DRAM. -/// -/// @param p The address of the value to be written. -/// @param v The value to be written. -/// -template -__forceinline__ __device__ void store(T* p, const T& v) { - *p = v; -} - /// Copy aligned elements from the source memory to the destination memory. /// /// This function is intended to be collectively called by multiple threads. Each thread copies a part of @@ -47,18 +28,19 @@ __forceinline__ __device__ void store(T* p, const T& v) { /// @param numThreads The total number of threads that run this function. /// template -__forceinline__ __device__ void copy(T* dst, T* src, uint64_t numElems, uint32_t threadId, uint32_t numThreads) { +MSCCLPP_DEVICE_INLINE void copy(T* dst, T* src, uint64_t numElems, uint32_t threadId, uint32_t numThreads) { T reg; for (size_t i = threadId; i < numElems; i += numThreads) { // Load to register first. - load(reg, src + i); - store(dst + i, reg); + reg = src[i]; + // Then store to destination. + dst[i] = reg; } } } // namespace Element -#endif // __CUDACC__ +#endif // defined(MSCCLPP_DEVICE_COMPILE) /// Channel for accessing peer memory directly from SM. struct SmChannelDeviceHandle { @@ -67,16 +49,14 @@ struct SmChannelDeviceHandle { void* dst_; void* getPacketBuffer_; -#ifdef __CUDACC__ +#if defined(MSCCLPP_DEVICE_COMPILE) /// Load a value from the remote memory. /// @tparam T The type of the value to be loaded. /// @param index The index of the value to be loaded. The offset in bytes is calculated as index * sizeof(T). /// @return The value loaded. template - __forceinline__ __device__ T read(uint64_t index) { - T v; - Element::load(v, (T*)dst_ + index); - return v; + MSCCLPP_DEVICE_INLINE T read(uint64_t index) { + return *(reinterpret_cast(dst_) + index); } /// Write a value to the remote memory. @@ -84,14 +64,13 @@ struct SmChannelDeviceHandle { /// @param index The index of the value to be written. The offset in bytes is calculated as index * sizeof(T). /// @param v The value to be written. template - __forceinline__ __device__ void write(uint64_t index, const T& v) { - Element::store((T*)dst_ + index, v); + MSCCLPP_DEVICE_INLINE void write(uint64_t index, const T& v) { + *(reinterpret_cast(dst_) + index) = v; } /// this is a helper for copy function template - __forceinline__ __device__ void copy_helper(void* dst, void* src, uint64_t bytes, uint32_t threadId, - uint32_t numThreads) { + MSCCLPP_DEVICE_INLINE void copy_helper(void* dst, void* src, uint64_t bytes, uint32_t threadId, uint32_t numThreads) { int* dstInt = reinterpret_cast(dst); int* srcInt = reinterpret_cast(src); const uintptr_t dstPtr = reinterpret_cast(dst); @@ -130,7 +109,7 @@ struct SmChannelDeviceHandle { /// @param numThreads The total number of threads that run this function. /// template - __forceinline__ __device__ void copy(void* dst, void* src, uint64_t bytes, uint32_t threadId, uint32_t numThreads) { + MSCCLPP_DEVICE_INLINE void copy(void* dst, void* src, uint64_t bytes, uint32_t threadId, uint32_t numThreads) { if (Alignment == 4) { copy_helper(dst, src, bytes, threadId, numThreads); } else if (Alignment == 8) { @@ -157,8 +136,8 @@ struct SmChannelDeviceHandle { /// @param numThreads The total number of threads that run this function. /// template - __forceinline__ __device__ void put(uint64_t targetOffset, uint64_t originOffset, uint64_t originBytes, - uint32_t threadId, uint32_t numThreads) { + MSCCLPP_DEVICE_INLINE void put(uint64_t targetOffset, uint64_t originOffset, uint64_t originBytes, uint32_t threadId, + uint32_t numThreads) { copy((char*)dst_ + targetOffset, (char*)src_ + originOffset, originBytes, threadId, numThreads); } @@ -178,8 +157,8 @@ struct SmChannelDeviceHandle { /// @param numThreads The total number of threads that run this function. /// template - __forceinline__ __device__ void get(uint64_t targetOffset, uint64_t originOffset, uint64_t originBytes, - uint32_t threadId, uint32_t numThreads) { + MSCCLPP_DEVICE_INLINE void get(uint64_t targetOffset, uint64_t originOffset, uint64_t originBytes, uint32_t threadId, + uint32_t numThreads) { // Note that `dst` and `src` are swapped for `get()`. copy((char*)src_ + originOffset, (char*)dst_ + targetOffset, originBytes, threadId, numThreads); @@ -199,7 +178,7 @@ struct SmChannelDeviceHandle { /// @param numThreads The total number of threads that run this function. /// template - __forceinline__ __device__ void put(uint64_t offset, uint64_t bytes, uint32_t threadId, uint32_t numThreads) { + MSCCLPP_DEVICE_INLINE void put(uint64_t offset, uint64_t bytes, uint32_t threadId, uint32_t numThreads) { put(offset, offset, bytes, threadId, numThreads); } @@ -217,7 +196,7 @@ struct SmChannelDeviceHandle { /// @param numThreads The total number of threads that run this function. /// template - __forceinline__ __device__ void get(uint64_t offset, uint64_t bytes, uint32_t threadId, uint32_t numThreads) { + MSCCLPP_DEVICE_INLINE void get(uint64_t offset, uint64_t bytes, uint32_t threadId, uint32_t numThreads) { get(offset, offset, bytes, threadId, numThreads); } @@ -233,8 +212,8 @@ struct SmChannelDeviceHandle { /// the `threadIdx` in CUDA. /// @param numThreads The total number of threads that run this function. /// - __forceinline__ __device__ void putPackets(uint64_t targetOffset, uint64_t originOffset, uint64_t originBytes, - uint32_t threadId, uint32_t numThreads, uint32_t flag) { + MSCCLPP_DEVICE_INLINE void putPackets(uint64_t targetOffset, uint64_t originOffset, uint64_t originBytes, + uint32_t threadId, uint32_t numThreads, uint32_t flag) { mscclpp::putPackets(dst_, targetOffset, src_, originOffset, originBytes, threadId, numThreads, flag); } @@ -249,8 +228,8 @@ struct SmChannelDeviceHandle { /// the `threadIdx` in CUDA. /// @param numThreads The total number of threads that run this function. /// - __forceinline__ __device__ void getPackets(uint64_t targetOffset, uint64_t originOffset, uint64_t originBytes, - uint32_t threadId, uint32_t numThreads, uint32_t flag) { + MSCCLPP_DEVICE_INLINE void getPackets(uint64_t targetOffset, uint64_t originOffset, uint64_t originBytes, + uint32_t threadId, uint32_t numThreads, uint32_t flag) { mscclpp::getPackets(getPacketBuffer_, targetOffset, src_, originOffset, originBytes, threadId, numThreads, flag); } @@ -259,14 +238,14 @@ struct SmChannelDeviceHandle { /// This function guarantees that all the memory operation before this function is completed before the remote /// semaphore is signaled. /// - __forceinline__ __device__ void signal() { semaphore_.signal(); } + MSCCLPP_DEVICE_INLINE void signal() { semaphore_.signal(); } /// Signal the remote semaphore. /// /// This function is a relaxed version of signal() and provides no guarantee on the completion of memory operations. /// User requires to call proper fencing before using this function. /// - __forceinline__ __device__ void relaxedSignal() { semaphore_.relaxedSignal(); } + MSCCLPP_DEVICE_INLINE void relaxedSignal() { semaphore_.relaxedSignal(); } /// Signal the remote semaphore for copied packets. /// @@ -274,22 +253,22 @@ struct SmChannelDeviceHandle { /// intended to be used with @ref putPackets() and @ref getPackets() that use flags inside packets to indicate the /// completion of copies. /// - __forceinline__ __device__ void signalPacket() { semaphore_.signalPacket(); } + MSCCLPP_DEVICE_INLINE void signalPacket() { semaphore_.signalPacket(); } /// Increase the counter of the local semaphore. - __forceinline__ __device__ void semaphoreIncrement() { semaphore_.semaphoreIncrement(); } + MSCCLPP_DEVICE_INLINE void semaphoreIncrement() { semaphore_.semaphoreIncrement(); } /// Read the counter of the local semaphore. - __forceinline__ __device__ uint64_t semaphoreGetLocal() const { return semaphore_.semaphoreGetLocal(); } + MSCCLPP_DEVICE_INLINE uint64_t semaphoreGetLocal() const { return semaphore_.semaphoreGetLocal(); } /// Check if the remote semaphore has signaled. /// @return true if the remote semaphore has signaled. - __forceinline__ __device__ bool poll() { return semaphore_.poll(); } + MSCCLPP_DEVICE_INLINE bool poll() { return semaphore_.poll(); } /// Wait for the remote semaphore to send a signal. /// @param maxSpinCount The maximum number of spins before asserting. Never assert if negative. - __forceinline__ __device__ void wait(int64_t maxSpinCount = 10000000) { semaphore_.wait(maxSpinCount); } -#endif // __CUDACC__ + MSCCLPP_DEVICE_INLINE void wait(int64_t maxSpinCount = 10000000) { semaphore_.wait(maxSpinCount); } +#endif // defined(MSCCLPP_DEVICE_COMPILE) }; } // namespace mscclpp diff --git a/python/mscclpp/CMakeLists.txt b/python/mscclpp/CMakeLists.txt index 22fd318e9..0fe510c80 100644 --- a/python/mscclpp/CMakeLists.txt +++ b/python/mscclpp/CMakeLists.txt @@ -9,6 +9,6 @@ FetchContent_MakeAvailable(nanobind) file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cpp) nanobind_add_module(mscclpp_py ${SOURCES}) set_target_properties(mscclpp_py PROPERTIES OUTPUT_NAME _mscclpp) -target_link_libraries(mscclpp_py PRIVATE mscclpp_static) -target_include_directories(mscclpp_py PRIVATE ${CUDAToolkit_INCLUDE_DIRS}) +target_link_libraries(mscclpp_py PRIVATE ${GPU_LIBRARIES} mscclpp_static) +target_include_directories(mscclpp_py PRIVATE ${GPU_INCLUDE_DIRS}) install(TARGETS mscclpp_py LIBRARY DESTINATION .) diff --git a/python/mscclpp/core_py.cpp b/python/mscclpp/core_py.cpp index d1df8fd31..4e92f8841 100644 --- a/python/mscclpp/core_py.cpp +++ b/python/mscclpp/core_py.cpp @@ -50,9 +50,9 @@ void register_core(nb::module_& m) { nb::arg("data"), nb::arg("size"), nb::arg("peer"), nb::arg("tag")) .def("all_gather", &Bootstrap::allGather, nb::arg("allData"), nb::arg("size")) .def("barrier", &Bootstrap::barrier) - .def("send", (void (Bootstrap::*)(const std::vector&, int, int)) & Bootstrap::send, nb::arg("data"), - nb::arg("peer"), nb::arg("tag")) - .def("recv", (void (Bootstrap::*)(std::vector&, int, int)) & Bootstrap::recv, nb::arg("data"), + .def("send", static_cast&, int, int)>(&Bootstrap::send), + nb::arg("data"), nb::arg("peer"), nb::arg("tag")) + .def("recv", static_cast&, int, int)>(&Bootstrap::recv), nb::arg("data"), nb::arg("peer"), nb::arg("tag")); nb::class_(m, "UniqueId"); diff --git a/python/mscclpp/error_py.cpp b/python/mscclpp/error_py.cpp index 7b148dd5f..18d4b834a 100644 --- a/python/mscclpp/error_py.cpp +++ b/python/mscclpp/error_py.cpp @@ -31,10 +31,10 @@ void register_error(nb::module_& m) { .def(nb::init(), nb::arg("message"), nb::arg("errorCode")); nb::class_(m, "CudaError") - .def(nb::init(), nb::arg("message"), nb::arg("errorCode")); + .def(nb::init(), nb::arg("message"), nb::arg("errorCode")); nb::class_(m, "CuError") - .def(nb::init(), nb::arg("message"), nb::arg("errorCode")); + .def(nb::init(), nb::arg("message"), nb::arg("errorCode")); nb::class_(m, "IbError") .def(nb::init(), nb::arg("message"), nb::arg("errorCode")); diff --git a/python/mscclpp_benchmark/allreduce.cu b/python/mscclpp_benchmark/allreduce.cu index b4623afff..e86047283 100644 --- a/python/mscclpp_benchmark/allreduce.cu +++ b/python/mscclpp_benchmark/allreduce.cu @@ -3,7 +3,7 @@ #include -#include +#include #include #include diff --git a/python/test/CMakeLists.txt b/python/test/CMakeLists.txt index 356934536..cf705841c 100644 --- a/python/test/CMakeLists.txt +++ b/python/test/CMakeLists.txt @@ -9,5 +9,5 @@ FetchContent_MakeAvailable(nanobind) file(GLOB_RECURSE SOURCES CONFIGURE_DEPENDS *.cpp) nanobind_add_module(mscclpp_py_test ${SOURCES}) set_target_properties(mscclpp_py_test PROPERTIES OUTPUT_NAME _ext) -target_link_libraries(mscclpp_py_test PRIVATE mscclpp_static) -target_include_directories(mscclpp_py_test PRIVATE ${CUDAToolkit_INCLUDE_DIRS}) +target_link_libraries(mscclpp_py_test PRIVATE ${GPU_LIBRARIES} mscclpp_static) +target_include_directories(mscclpp_py_test PRIVATE ${GPU_INCLUDE_DIRS}) diff --git a/python/test/_cpp/proxy_test.cpp b/python/test/_cpp/proxy_test.cpp index 4a1a0f754..6ad0df72b 100644 --- a/python/test/_cpp/proxy_test.cpp +++ b/python/test/_cpp/proxy_test.cpp @@ -1,7 +1,6 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include #include #include #include @@ -9,8 +8,8 @@ #include #include #include -#include #include +#include #include #include #include diff --git a/python/test/simple_proxy_channel_test.cu b/python/test/simple_proxy_channel_test.cu index bfe59623a..0a7542768 100644 --- a/python/test/simple_proxy_channel_test.cu +++ b/python/test/simple_proxy_channel_test.cu @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include +#include #include // be careful about using channels[my_rank] as it is inavlie and it is there just for simplicity of indexing diff --git a/src/cuda_utils.cc b/src/cuda_utils.cc index 5b1449eb0..2e0d4a1b0 100644 --- a/src/cuda_utils.cc +++ b/src/cuda_utils.cc @@ -1,7 +1,7 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include +#include namespace mscclpp { @@ -9,11 +9,11 @@ AvoidCudaGraphCaptureGuard::AvoidCudaGraphCaptureGuard() : mode_(cudaStreamCaptu MSCCLPP_CUDATHROW(cudaThreadExchangeStreamCaptureMode(&mode_)); } -AvoidCudaGraphCaptureGuard::~AvoidCudaGraphCaptureGuard() { cudaThreadExchangeStreamCaptureMode(&mode_); } +AvoidCudaGraphCaptureGuard::~AvoidCudaGraphCaptureGuard() { (void)cudaThreadExchangeStreamCaptureMode(&mode_); } CudaStreamWithFlags::CudaStreamWithFlags(unsigned int flags) { MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&stream_, flags)); } -CudaStreamWithFlags::~CudaStreamWithFlags() { cudaStreamDestroy(stream_); } +CudaStreamWithFlags::~CudaStreamWithFlags() { (void)cudaStreamDestroy(stream_); } } // namespace mscclpp diff --git a/src/debug.cc b/src/debug.cc index 8af75daa0..aa97b09db 100644 --- a/src/debug.cc +++ b/src/debug.cc @@ -6,12 +6,12 @@ #include "debug.h" -#include #include #include #include #include +#include #include #include @@ -181,7 +181,7 @@ void mscclppDebugLog(mscclppDebugLogLevel level, unsigned long flags, const char int cudaDev; if (!(level == MSCCLPP_LOG_TRACE && flags == MSCCLPP_CALL)) { - cudaGetDevice(&cudaDev); + MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDev)); } char buffer[1024]; diff --git a/src/errors.cc b/src/errors.cc index c3b3ac707..537b3fc27 100644 --- a/src/errors.cc +++ b/src/errors.cc @@ -3,6 +3,7 @@ #include #include +#include #include "api.h" @@ -42,13 +43,15 @@ MSCCLPP_API_CPP SysError::SysError(const std::string& message, int errorCode) : message_ = message + " (System failure: " + std::strerror(errorCode) + ")"; } -MSCCLPP_API_CPP CudaError::CudaError(const std::string& message, cudaError_t errorCode) : BaseError(errorCode) { - message_ = message + " (Cuda failure: " + cudaGetErrorString(errorCode) + ")"; +MSCCLPP_API_CPP CudaError::CudaError(const std::string& message, int errorCode) : BaseError(errorCode) { + message_ = message + " (Cuda failure: " + cudaGetErrorString(static_cast(errorCode)) + ")"; } -MSCCLPP_API_CPP CuError::CuError(const std::string& message, CUresult errorCode) : BaseError(errorCode) { +MSCCLPP_API_CPP CuError::CuError(const std::string& message, int errorCode) : BaseError(errorCode) { const char* errStr; - cuGetErrorString(errorCode, &errStr); + if (cuGetErrorString(static_cast(errorCode), &errStr) != CUDA_SUCCESS) { + errStr = "failed to get error string"; + } message_ = message + " (Cu failure: " + errStr + ")"; } diff --git a/src/fifo.cc b/src/fifo.cc index c0bdedba8..4255bcdcd 100644 --- a/src/fifo.cc +++ b/src/fifo.cc @@ -1,11 +1,11 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include -#include #include +#include #include "api.h" +#include "atomic.hpp" namespace mscclpp { @@ -43,15 +43,13 @@ MSCCLPP_API_CPP ProxyTrigger Fifo::poll() { ProxyTrigger trigger; ProxyTrigger* ptr = &pimpl->triggers.get()[pimpl->hostTail % pimpl->size]; // we are loading fst first. if fst is non-zero then snd is also valid - trigger.fst = cuda::atomic_ref{ptr->fst}.load(cuda::memory_order_acquire); + trigger.fst = atomicLoad(&(ptr->fst), memoryOrderAcquire); trigger.snd = ptr->snd; return trigger; } MSCCLPP_API_CPP void Fifo::pop() { - cuda::atomic_ref{pimpl->triggers.get()[pimpl->hostTail % pimpl->size].fst}.store( - 0, cuda::memory_order_release); - + atomicStore(&(pimpl->triggers.get()[pimpl->hostTail % pimpl->size].fst), uint64_t{0}, memoryOrderRelease); (pimpl->hostTail)++; } diff --git a/src/include/atomic.hpp b/src/include/atomic.hpp new file mode 100644 index 000000000..d7f61fec4 --- /dev/null +++ b/src/include/atomic.hpp @@ -0,0 +1,17 @@ +// Copyright (c) Microsoft Corporation. +// Licensed under the MIT license. + +#ifndef MSCCLPP_ATOMIC_HPP_ +#define MSCCLPP_ATOMIC_HPP_ + +#if defined(USE_CUDA) +#define MSCCLPP_DEVICE_CUDA +#include +#undef MSCCLPP_DEVICE_CUDA +#else // !defined(USE_CUDA) +#define MSCCLPP_DEVICE_HIP +#include +#undef MSCCLPP_DEVICE_HIP +#endif // !defined(USE_CUDA) + +#endif // MSCCLPP_ATOMIC_HPP_ diff --git a/src/include/connection.hpp b/src/include/connection.hpp index fffdc2086..47b154758 100644 --- a/src/include/connection.hpp +++ b/src/include/connection.hpp @@ -4,9 +4,8 @@ #ifndef MSCCLPP_CONNECTION_HPP_ #define MSCCLPP_CONNECTION_HPP_ -#include - #include +#include #include "communicator.hpp" #include "context.hpp" diff --git a/src/include/context.hpp b/src/include/context.hpp index 6468b1d33..39a699560 100644 --- a/src/include/context.hpp +++ b/src/include/context.hpp @@ -5,7 +5,7 @@ #define MSCCLPP_CONTEXT_HPP_ #include -#include +#include #include #include diff --git a/src/include/registered_memory.hpp b/src/include/registered_memory.hpp index 3804bfd62..11cd30231 100644 --- a/src/include/registered_memory.hpp +++ b/src/include/registered_memory.hpp @@ -4,10 +4,9 @@ #ifndef MSCCLPP_REGISTERED_MEMORY_HPP_ #define MSCCLPP_REGISTERED_MEMORY_HPP_ -#include - #include #include +#include #include "communicator.hpp" #include "ib.hpp" diff --git a/src/npkit/npkit.cc b/src/npkit/npkit.cc index 4bee4597e..466806d1f 100644 --- a/src/npkit/npkit.cc +++ b/src/npkit/npkit.cc @@ -3,11 +3,11 @@ #include "npkit.h" -#include #include #include #include +#include uint64_t NpKit::rank_ = 0; diff --git a/src/npkit/npkit.h b/src/npkit/npkit.h index 6af98be84..21ba928ae 100644 --- a/src/npkit/npkit.h +++ b/src/npkit/npkit.h @@ -4,7 +4,7 @@ #ifndef NPKIT_H_ #define NPKIT_H_ -#include +#include #include #include diff --git a/src/numa.cc b/src/numa.cc index a1d4129d1..d72c99ee7 100644 --- a/src/numa.cc +++ b/src/numa.cc @@ -4,7 +4,7 @@ #include #include -#include +#include #include "api.h" diff --git a/src/proxy.cc b/src/proxy.cc index 3fe3b1645..b3d2b6859 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -3,7 +3,7 @@ #include #include -#include +#include #include #include #include diff --git a/src/registered_memory.cc b/src/registered_memory.cc index 9c35e1443..6d5fd79f5 100644 --- a/src/registered_memory.cc +++ b/src/registered_memory.cc @@ -3,10 +3,8 @@ #include "registered_memory.hpp" -#include - #include -#include +#include #include "api.h" #include "context.hpp" diff --git a/src/semaphore.cc b/src/semaphore.cc index 6605be99f..7dec60c3d 100644 --- a/src/semaphore.cc +++ b/src/semaphore.cc @@ -1,10 +1,10 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include #include #include "api.h" +#include "atomic.hpp" #include "debug.h" namespace mscclpp { @@ -21,7 +21,7 @@ static NonblockingFuture setupInboundSemaphoreId(Communicator& MSCCLPP_API_CPP Host2DeviceSemaphore::Host2DeviceSemaphore(Communicator& communicator, std::shared_ptr connection) - : BaseSemaphore(allocUniqueCuda(), allocUniqueCuda(), std::make_unique()), + : BaseSemaphore(allocExtUniqueCuda(), allocExtUniqueCuda(), std::make_unique()), connection_(connection) { INFO(MSCCLPP_INIT, "Creating a Host2Device semaphore for %s transport from %d to %d", connection->getTransportName().c_str(), communicator.bootstrap()->getRank(), @@ -67,8 +67,8 @@ MSCCLPP_API_CPP void Host2HostSemaphore::signal() { } MSCCLPP_API_CPP bool Host2HostSemaphore::poll() { - bool signaled = (cuda::atomic_ref{*(uint64_t*)localInboundSemaphore_.get()}.load( - cuda::memory_order_acquire) > (*expectedInboundSemaphore_)); + bool signaled = + (atomicLoad((uint64_t*)localInboundSemaphore_.get(), memoryOrderAcquire) > (*expectedInboundSemaphore_)); if (signaled) (*expectedInboundSemaphore_) += 1; return signaled; } @@ -76,8 +76,7 @@ MSCCLPP_API_CPP bool Host2HostSemaphore::poll() { MSCCLPP_API_CPP void Host2HostSemaphore::wait(int64_t maxSpinCount) { (*expectedInboundSemaphore_) += 1; int64_t spinCount = 0; - while (cuda::atomic_ref{*(uint64_t*)localInboundSemaphore_.get()}.load( - cuda::memory_order_acquire) < (*expectedInboundSemaphore_)) { + while (atomicLoad((uint64_t*)localInboundSemaphore_.get(), memoryOrderAcquire) < (*expectedInboundSemaphore_)) { if (maxSpinCount >= 0 && spinCount++ == maxSpinCount) { throw Error("Host2HostSemaphore::wait timed out", ErrorCode::Timeout); } @@ -86,7 +85,7 @@ MSCCLPP_API_CPP void Host2HostSemaphore::wait(int64_t maxSpinCount) { MSCCLPP_API_CPP SmDevice2DeviceSemaphore::SmDevice2DeviceSemaphore(Communicator& communicator, std::shared_ptr connection) - : BaseSemaphore(allocUniqueCuda(), allocUniqueCuda(), allocUniqueCuda()) { + : BaseSemaphore(allocExtUniqueCuda(), allocExtUniqueCuda(), allocExtUniqueCuda()) { INFO(MSCCLPP_INIT, "Creating a Device2Device semaphore for %s transport from %d to %d", connection->getTransportName().c_str(), communicator.bootstrap()->getRank(), communicator.remoteRankOf(*connection)); diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt index 076798966..d27f4e3fb 100644 --- a/test/CMakeLists.txt +++ b/test/CMakeLists.txt @@ -3,11 +3,16 @@ find_package(MPI) -set(TEST_LIBS_COMMON mscclpp ${CUDA_LIBRARIES} ${NUMA_LIBRARIES} ${IBVERBS_LIBRARIES} ${GDRCOPY_LIBRARIES}) +set(TEST_LIBS_COMMON mscclpp ${GPU_LIBRARIES} ${NUMA_LIBRARIES} ${IBVERBS_LIBRARIES}) set(TEST_LIBS_GTEST GTest::gtest_main GTest::gmock_main) -set(TEST_INC_COMMON PRIVATE ${PROJECT_SOURCE_DIR}/include ${CUDAToolkit_INCLUDE_DIRS}) +set(TEST_INC_COMMON PRIVATE ${PROJECT_SOURCE_DIR}/include ${GPU_INCLUDE_DIRS}) set(TEST_INC_INTERNAL PRIVATE ${PROJECT_SOURCE_DIR}/src/include) +if(USE_ROCM) + file(GLOB_RECURSE CU_SOURCES CONFIGURE_DEPENDS *.cu) + set_source_files_properties(${CU_SOURCES} PROPERTIES LANGUAGE HIP) +endif() + function(add_test_executable name sources) add_executable(${name} ${sources}) target_link_libraries(${name} ${TEST_LIBS_COMMON} MPI::MPI_CXX) diff --git a/test/allgather_test_cpp.cu b/test/allgather_test_cpp.cu index fcebd7590..2f56b221d 100644 --- a/test/allgather_test_cpp.cu +++ b/test/allgather_test_cpp.cu @@ -74,7 +74,12 @@ __device__ void localAllGather(DeviceHandle proxyCh if ((remoteRank % nranksPerNode) == ((rank - i + nranksPerNode) % nranksPerNode)) { if ((threadIdx.x % 32) == 0) proxyChan.wait(); } +#if defined(__HIP_PLATFORM_AMD__) + // NOTE: we actually need a group barrier here for better performance, but __syncthreads() is still correct. + __syncthreads(); +#else asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nranksPerNode - 1) * 32) : "memory"); +#endif } } @@ -237,7 +242,7 @@ void setupMscclppConnections(int rank, int world_size, mscclpp::Communicator& co } if (proxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { - std::unexpected(); + std::runtime_error("unexpected error"); } CUDACHECK(cudaMemcpyToSymbol(constProxyChans, proxyChannels.data(), sizeof(DeviceHandle) * proxyChannels.size())); diff --git a/test/allgather_test_host_offloading.cu b/test/allgather_test_host_offloading.cu index 76e6b6311..7f50994b9 100644 --- a/test/allgather_test_host_offloading.cu +++ b/test/allgather_test_host_offloading.cu @@ -2,8 +2,8 @@ // Licensed under the MIT license. #include -#include #include +#include #include #include #include diff --git a/test/mp_unit/communicator_tests.cu b/test/mp_unit/communicator_tests.cu index e37093574..30727667d 100644 --- a/test/mp_unit/communicator_tests.cu +++ b/test/mp_unit/communicator_tests.cu @@ -3,7 +3,7 @@ #include -#include +#include #include #include "mp_unit_tests.hpp" diff --git a/test/mp_unit/ib_tests.cu b/test/mp_unit/ib_tests.cu index d32f102c3..e878154d7 100644 --- a/test/mp_unit/ib_tests.cu +++ b/test/mp_unit/ib_tests.cu @@ -3,7 +3,7 @@ #include -#include +#include #include "infiniband/verbs.h" #include "mp_unit_tests.hpp" @@ -118,6 +118,8 @@ __global__ void kernelMemoryConsistency(uint64_t* data, volatile uint64_t* curIt uint64_t nelem, uint64_t maxIter) { if (blockIdx.x != 0) return; + __shared__ int errs[1024]; + constexpr int FlagWrong = 1; constexpr int FlagAbort = 2; @@ -165,15 +167,18 @@ __global__ void kernelMemoryConsistency(uint64_t* data, volatile uint64_t* curIt #endif } } + + errs[threadIdx.x] = err; __threadfence(); __syncthreads(); - // Shuffle err - for (int i = 16; i > 0; i /= 2) { - err += __shfl_xor_sync(0xffffffff, err, i); + // Check if any error is detected. + int total_err = 0; + for (size_t i = 0; i < blockDim.x; ++i) { + total_err += errs[i]; } - if (err > 0) { + if (total_err > 0) { // Exit if any error is detected. return; } diff --git a/test/mp_unit/mp_unit_tests.hpp b/test/mp_unit/mp_unit_tests.hpp index c3a3b1efe..de1f4b6b4 100644 --- a/test/mp_unit/mp_unit_tests.hpp +++ b/test/mp_unit/mp_unit_tests.hpp @@ -7,7 +7,7 @@ #include #include -#include +#include #include #include #include diff --git a/test/mp_unit/proxy_channel_tests.cu b/test/mp_unit/proxy_channel_tests.cu index ae0ea4c68..21dbe064d 100644 --- a/test/mp_unit/proxy_channel_tests.cu +++ b/test/mp_unit/proxy_channel_tests.cu @@ -2,7 +2,7 @@ // Licensed under the MIT license. #include -#include +#include #include "mp_unit_tests.hpp" @@ -152,7 +152,7 @@ void ProxyChannelOneToOneTest::testPingPong(bool useIbOnly, bool waitWithPoll) { const int nElem = 4 * 1024 * 1024; std::vector proxyChannels; - std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); + std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); setupMeshConnections(proxyChannels, useIbOnly, buff.get(), nElem * sizeof(int)); std::vector> proxyChannelHandles; @@ -211,7 +211,7 @@ __global__ void kernelProxyLLPingPong(int* buff, mscclpp::LLPacket* putPktBuf, m int threadId = threadIdx.x + blockIdx.x * blockDim.x; int numThreads = blockDim.x * gridDim.x; int flusher = 0; - const size_t nPkt = nElem / 2; + const int nPkt = nElem / 2; for (int i = 0; i < nTries; i++) { uint64_t flag = (uint64_t)i + 1; @@ -269,11 +269,11 @@ void ProxyChannelOneToOneTest::testPacketPingPong(bool useIbOnly) { const int nElem = 4 * 1024 * 1024; std::vector proxyChannels; - std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); + std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); const size_t nPacket = (nElem * sizeof(int) + sizeof(uint64_t) - 1) / sizeof(uint64_t); - auto putPacketBuffer = mscclpp::allocSharedCuda(nPacket); - auto getPacketBuffer = mscclpp::allocSharedCuda(nPacket); + auto putPacketBuffer = mscclpp::allocExtSharedCuda(nPacket); + auto getPacketBuffer = mscclpp::allocExtSharedCuda(nPacket); setupMeshConnections(proxyChannels, useIbOnly, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket), getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket)); @@ -336,11 +336,11 @@ void ProxyChannelOneToOneTest::testPacketPingPongPerf(bool useIbOnly) { const int nElem = 4 * 1024 * 1024; std::vector proxyChannels; - std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); + std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); const size_t nPacket = (nElem * sizeof(int) + sizeof(uint64_t) - 1) / sizeof(uint64_t); - auto putPacketBuffer = mscclpp::allocSharedCuda(nPacket); - auto getPacketBuffer = mscclpp::allocSharedCuda(nPacket); + auto putPacketBuffer = mscclpp::allocExtSharedCuda(nPacket); + auto getPacketBuffer = mscclpp::allocExtSharedCuda(nPacket); setupMeshConnections(proxyChannels, useIbOnly, putPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket), getPacketBuffer.get(), nPacket * sizeof(mscclpp::LLPacket)); @@ -362,7 +362,7 @@ void ProxyChannelOneToOneTest::testPacketPingPongPerf(bool useIbOnly) { auto* testInfo = ::testing::UnitTest::GetInstance()->current_test_info(); const std::string testName = std::string(testInfo->test_suite_name()) + "." + std::string(testInfo->name()); - const int nTries = 1000; + const int nTries = 1000000; // Warm-up kernelProxyLLPingPong diff --git a/test/mp_unit/sm_channel_tests.cu b/test/mp_unit/sm_channel_tests.cu index 37b3ce637..ea5241053 100644 --- a/test/mp_unit/sm_channel_tests.cu +++ b/test/mp_unit/sm_channel_tests.cu @@ -123,7 +123,7 @@ TEST_F(SmChannelOneToOneTest, PutPingPong) { const int nElem = 4 * 1024 * 1024; std::vector smChannels; - std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); + std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); setupMeshConnections(smChannels, buff.get(), nElem * sizeof(int)); std::vector> deviceHandles(smChannels.size()); std::transform(smChannels.begin(), smChannels.end(), deviceHandles.begin(), @@ -202,7 +202,7 @@ TEST_F(SmChannelOneToOneTest, GetPingPong) { const int nElem = 4 * 1024 * 1024; std::vector smChannels; - std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); + std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); setupMeshConnections(smChannels, buff.get(), nElem * sizeof(int)); std::vector> deviceHandles(smChannels.size()); std::transform(smChannels.begin(), smChannels.end(), deviceHandles.begin(), @@ -289,8 +289,8 @@ TEST_F(SmChannelOneToOneTest, PacketPingPong) { const int nElem = 4 * 1024 * 1024; std::vector smChannels; - std::shared_ptr buff = mscclpp::allocSharedCuda(nElem); - std::shared_ptr intermBuff = mscclpp::allocSharedCuda(nElem * 2); + std::shared_ptr buff = mscclpp::allocExtSharedCuda(nElem); + std::shared_ptr intermBuff = mscclpp::allocExtSharedCuda(nElem * 2); setupMeshConnections(smChannels, buff.get(), nElem * sizeof(int), intermBuff.get(), nElem * 2 * sizeof(int)); std::vector> deviceHandles(smChannels.size()); std::transform(smChannels.begin(), smChannels.end(), deviceHandles.begin(), diff --git a/test/mscclpp-test/CMakeLists.txt b/test/mscclpp-test/CMakeLists.txt index d77c01dde..cbbdfea65 100644 --- a/test/mscclpp-test/CMakeLists.txt +++ b/test/mscclpp-test/CMakeLists.txt @@ -5,6 +5,9 @@ FetchContent_Declare(json URL https://github.com/nlohmann/json/releases/download FetchContent_MakeAvailable(json) function(add_mscclpp_test_executable name sources) + if(USE_ROCM) + set_source_files_properties(${sources} PROPERTIES LANGUAGE HIP) + endif() add_executable(${name} ${sources} common.cc) target_link_libraries(${name} ${TEST_LIBS_COMMON} MPI::MPI_CXX nlohmann_json::nlohmann_json) target_include_directories(${name} ${TEST_INC_COMMON}) diff --git a/test/mscclpp-test/allgather_test.cu b/test/mscclpp-test/allgather_test.cu index ca050826e..529826f32 100644 --- a/test/mscclpp-test/allgather_test.cu +++ b/test/mscclpp-test/allgather_test.cu @@ -1,14 +1,18 @@ // Copyright (c) Microsoft Corporation. // Licensed under the MIT license. -#include - #include -#include +#include #include #include "common.hpp" +#if defined(__HIP_PLATFORM_AMD__) && (__HIP_PLATFORM_AMD__ == 1) +#define WARP_SIZE 64 +#else +#define WARP_SIZE 32 +#endif + namespace { auto isUsingHostOffload = [](int kernelNum) { return kernelNum == 3; }; constexpr uint64_t MAGIC = 0xdeadbeef; @@ -22,7 +26,7 @@ __constant__ DeviceHandle constRawProxyChan[16]; __constant__ DeviceHandle constSmChans[8]; __global__ void allgather0(int rank, size_t nelemsPerGPU) { - int warpId = threadIdx.x / 32; + int warpId = threadIdx.x / WARP_SIZE; // Each warp is responsible for one of the remote ranks DeviceHandle proxyChan = constProxyChans[warpId]; @@ -31,14 +35,16 @@ __global__ void allgather0(int rank, size_t nelemsPerGPU) { // this thread's role is a sender role // put your data asynchronously - if (threadIdx.x % 32 == 0) proxyChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); + if (threadIdx.x % WARP_SIZE == 0) { + proxyChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), nelemsPerGPU * sizeof(int)); + } // make sure everyone is put their data before some thread randomly blocks everyone else in signal __syncthreads(); // push with flag and sync to make sure the data is received - if (threadIdx.x % 32 == 0) proxyChan.flush(); + if (threadIdx.x % WARP_SIZE == 0) proxyChan.flush(); // this thread's role is a receiver role. wait on the semaphore to make sure the data is ready - if (threadIdx.x % 32 == 0) proxyChan.wait(); + if (threadIdx.x % WARP_SIZE == 0) proxyChan.wait(); } __device__ void localAllGather(DeviceHandle proxyChan, int rank, int nRanksPerNode, @@ -52,14 +58,19 @@ __device__ void localAllGather(DeviceHandle proxyCh for (int i = 1; i < nRanksPerNode; i++) { if ((remoteRank % nRanksPerNode) == ((rank + i) % nRanksPerNode)) { // put your data to GPU (rank+i) % nRanksPerNode and signal in one call - if (flushAfterSignal && (threadIdx.x % 32) == 0) proxyChan.putWithSignalAndFlush(offset, size); - if (!flushAfterSignal && (threadIdx.x % 32) == 0) proxyChan.putWithSignal(offset, size); + if (flushAfterSignal && (threadIdx.x % WARP_SIZE) == 0) proxyChan.putWithSignalAndFlush(offset, size); + if (!flushAfterSignal && (threadIdx.x % WARP_SIZE) == 0) proxyChan.putWithSignal(offset, size); } // wait for the data from GPU (rank-i) % nRanksPerNode to arrive if ((remoteRank % nRanksPerNode) == ((rank - i + nRanksPerNode) % nRanksPerNode)) { - if ((threadIdx.x % 32) == 0) proxyChan.wait(); + if ((threadIdx.x % WARP_SIZE) == 0) proxyChan.wait(); } - asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nRanksPerNode - 1) * 32) : "memory"); +#if defined(__HIP_PLATFORM_AMD__) && (__HIP_PLATFORM_AMD__ == 1) + // NOTE: we actually need a group barrier here for better performance, but __syncthreads() is still correct. + __syncthreads(); +#else + asm volatile("bar.sync %0, %1;" ::"r"(11), "r"((nRanksPerNode - 1) * WARP_SIZE) : "memory"); +#endif } } @@ -112,7 +123,7 @@ __device__ void localAllGatherSm(int rank, int nRanksPerNode, int startRankChunk } __global__ void allgather1(int rank, int nRanksPerNode, size_t nelemsPerGPU) { - int warpId = threadIdx.x / 32; + int warpId = threadIdx.x / WARP_SIZE; int remoteRank = (warpId < rank) ? warpId : warpId + 1; // Each warp is responsible for one of the remote ranks @@ -123,7 +134,7 @@ __global__ void allgather1(int rank, int nRanksPerNode, size_t nelemsPerGPU) { } __global__ void allgather2(int rank, int worldSize, int nRanksPerNode, size_t nelemsPerGPU) { - int warpId = threadIdx.x / 32; + int warpId = threadIdx.x / WARP_SIZE; int remoteRank = (warpId < rank) ? warpId : warpId + 1; // Each warp is responsible for one of the remote ranks @@ -150,10 +161,10 @@ __global__ void allgather2(int rank, int worldSize, int nRanksPerNode, size_t ne // cross-node exchange if (remoteRank % nRanksPerNode == rank % nRanksPerNode) { // opposite side - if ((threadIdx.x % 32) == 0) + if ((threadIdx.x % WARP_SIZE) == 0) proxyChan.putWithSignal(rank * nelemsPerGPU * sizeof(int), (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int)); - if ((threadIdx.x % 32) == 0) proxyChan.wait(); + if ((threadIdx.x % WARP_SIZE) == 0) proxyChan.wait(); } // sync here to make sure IB flush dose not block the CUDA IPC traffic @@ -161,7 +172,7 @@ __global__ void allgather2(int rank, int worldSize, int nRanksPerNode, size_t ne // need to flush ib channel here to avoid cq overflow. since we won't change send suffer after send, we don't need // to flush for IPC channel. if (remoteRank % nRanksPerNode == rank % nRanksPerNode) { - if ((threadIdx.x % 32) == 0) proxyChan.flush(); + if ((threadIdx.x % WARP_SIZE) == 0) proxyChan.flush(); } __syncthreads(); @@ -176,15 +187,15 @@ __global__ void allgather2(int rank, int worldSize, int nRanksPerNode, size_t ne // cross-node exchange if (remoteRank % nRanksPerNode == rank % nRanksPerNode) { // opposite side - if ((threadIdx.x % 32) == 0) + if ((threadIdx.x % WARP_SIZE) == 0) proxyChan.putWithSignal((rank * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int), nelemsPerGPU / pipelineSize * sizeof(int)); - if ((threadIdx.x % 32) == 0) proxyChan.wait(); + if ((threadIdx.x % WARP_SIZE) == 0) proxyChan.wait(); } __syncthreads(); if (remoteRank % nRanksPerNode == rank % nRanksPerNode) { - if ((threadIdx.x % 32) == 0) proxyChan.flush(); + if ((threadIdx.x % WARP_SIZE) == 0) proxyChan.flush(); } __syncthreads(); @@ -198,7 +209,7 @@ __global__ void allgather2(int rank, int worldSize, int nRanksPerNode, size_t ne } __global__ void allgather3() { - int warpId = threadIdx.x / 32; + int warpId = threadIdx.x / WARP_SIZE; // Each warp is responsible for one of the remote ranks DeviceHandle proxyChan = constRawProxyChan[warpId]; @@ -214,7 +225,7 @@ __global__ void allgather3() { // wait for the work to be done in cpu side proxyChan.fifo_.sync(currentFifoHead); } - if (tid % 32 == 0) { + if (tid % WARP_SIZE == 0) { proxyChan.wait(); } } @@ -378,7 +389,7 @@ void AllGatherTestColl::runColl(const TestArgs& args, cudaStream_t stream) { nThreads = 1024; } else { nBlocks = 1; - nThreads = 32 * (worldSize - 1); + nThreads = WARP_SIZE * (worldSize - 1); } if (kernelNum == 0) { allgather0<<>>(rank, paramCount_); @@ -394,7 +405,7 @@ void AllGatherTestColl::runColl(const TestArgs& args, cudaStream_t stream) { } void AllGatherTestColl::initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) { - if (sendBuff.size() != 1) std::unexpected(); + if (sendBuff.size() != 1) std::runtime_error("unexpected error"); int rank = args.rank; std::vector dataHost(std::max(sendCount_, recvCount_), 0); for (size_t i = 0; i < recvCount_; i++) { @@ -469,7 +480,7 @@ class AllGatherTestEngine : public BaseTestEngine { AllGatherTestEngine::AllGatherTestEngine(const TestArgs& args) : BaseTestEngine(args, "allgather") {} void AllGatherTestEngine::allocateBuffer() { - sendBuff_ = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); + sendBuff_ = mscclpp::allocExtSharedCuda(args_.maxBytes / sizeof(int)); expectedBuff_ = std::shared_ptr(new int[args_.maxBytes / sizeof(int)]); } @@ -478,7 +489,7 @@ void AllGatherTestEngine::setupConnections() { if (!isUsingHostOffload(args_.kernelNum)) { setupMeshConnections(devProxyChannels, sendBuff_.get(), args_.maxBytes); if (devProxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { - std::unexpected(); + std::runtime_error("unexpected error"); } CUDATHROW(cudaMemcpyToSymbol(constProxyChans, devProxyChannels.data(), sizeof(DeviceHandle) * devProxyChannels.size())); @@ -486,7 +497,7 @@ void AllGatherTestEngine::setupConnections() { setupMeshConnections(smChannels_, sendBuff_.get(), args_.maxBytes); std::vector> smChannelHandles(smChannels_.size()); if (smChannels_.size() > sizeof(constSmChans) / sizeof(DeviceHandle)) { - std::unexpected(); + std::runtime_error("unexpected error"); } std::transform(smChannels_.begin(), smChannels_.end(), smChannelHandles.begin(), [](const mscclpp::SmChannel& smChannel) { return mscclpp::deviceHandle(smChannel); }); @@ -508,7 +519,7 @@ void AllGatherTestEngine::setupConnections() { }); auto proxyChannels = service->proxyChannels(); if (proxyChannels.size() > sizeof(constRawProxyChan) / sizeof(DeviceHandle)) { - std::unexpected(); + std::runtime_error("unexpected error"); } CUDATHROW(cudaMemcpyToSymbol(constRawProxyChan, proxyChannels.data(), sizeof(DeviceHandle) * proxyChannels.size())); diff --git a/test/mscclpp-test/allreduce_test.cu b/test/mscclpp-test/allreduce_test.cu index 73f23a1b1..ff1ab37b1 100644 --- a/test/mscclpp-test/allreduce_test.cu +++ b/test/mscclpp-test/allreduce_test.cu @@ -2,8 +2,8 @@ // Licensed under the MIT license. #include -#include -#include +#include +#include #include #include "common.hpp" @@ -1000,7 +1000,7 @@ void AllReduceTestColl::runColl(const TestArgs& args, cudaStream_t stream) { } void AllReduceTestColl::initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) { - if (sendBuff.size() != 1) std::unexpected(); + if (sendBuff.size() != 1) std::runtime_error("unexpected error"); const int rank = args.rank; const int worldSize = args.totalRanks; std::vector dataHost(std::max(sendCount_, recvCount_), rank); @@ -1091,30 +1091,30 @@ bool AllReduceTestEngine::isUsePacket() const { return (args_.kernelNum == 2 || bool AllReduceTestEngine::isInPlace() const { return (args_.kernelNum != 2 && args_.kernelNum != 6); } void AllReduceTestEngine::allocateBuffer() { - inputBuff_ = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); - resultBuff_ = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); + inputBuff_ = mscclpp::allocExtSharedCuda(args_.maxBytes / sizeof(int)); + resultBuff_ = mscclpp::allocExtSharedCuda(args_.maxBytes / sizeof(int)); inputBuff = inputBuff_.get(); resultBuff = resultBuff_.get(); if (args_.kernelNum == 0 || args_.kernelNum == 1 || args_.kernelNum == 3 || args_.kernelNum == 4) { - scratchBuff_ = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); + scratchBuff_ = mscclpp::allocExtSharedCuda(args_.maxBytes / sizeof(int)); scratchBuff = scratchBuff_.get(); } else if (args_.kernelNum == 2) { const size_t nPacket = (args_.maxBytes + sizeof(uint64_t) - 1) / sizeof(uint64_t); // 2x for double-buffering const size_t scratchBuffNelem = nPacket * std::max(args_.nRanksPerNode - 1, 1) * 2; - scratchPacketBuff_ = mscclpp::allocSharedCuda(scratchBuffNelem); + scratchPacketBuff_ = mscclpp::allocExtSharedCuda(scratchBuffNelem); scratchPacketBuff = scratchPacketBuff_.get(); const size_t packetBuffNelem = nPacket * 2; - putPacketBuff_ = mscclpp::allocSharedCuda(packetBuffNelem); - getPacketBuff_ = mscclpp::allocSharedCuda(packetBuffNelem); + putPacketBuff_ = mscclpp::allocExtSharedCuda(packetBuffNelem); + getPacketBuff_ = mscclpp::allocExtSharedCuda(packetBuffNelem); putPacketBuff = putPacketBuff_.get(); getPacketBuff = getPacketBuff_.get(); } else if (args_.kernelNum == 6) { const size_t nPacket = (args_.maxBytes + sizeof(uint64_t) - 1) / sizeof(uint64_t); // 2x for double-buffering, scratchBuff used to store original data and reduced results const size_t scratchBuffNelem = nPacket * 2 /*original data & reduced result */ * 2 /* double buffering*/; - scratchPacketBuff_ = mscclpp::allocSharedCuda(scratchBuffNelem); + scratchPacketBuff_ = mscclpp::allocExtSharedCuda(scratchBuffNelem); scratchPacketBuff = scratchPacketBuff_.get(); } @@ -1149,10 +1149,10 @@ void AllReduceTestEngine::setupConnections() { scratchPacketBuffBytes); if (smOutOfPlaceChannels_.size() > sizeof(constSmOutOfPlaceChans) / sizeof(DeviceHandle)) { - std::unexpected(); + std::runtime_error("unexpected error"); } if (proxyChannels.size() > sizeof(constDevFstRoundChans) / sizeof(DeviceHandle)) { - std::unexpected(); + std::runtime_error("unexpected error"); } std::vector> smChannelDeviceHandles(smOutOfPlaceChannels_.size()); @@ -1169,7 +1169,7 @@ void AllReduceTestEngine::setupConnections() { // Send data from local inputBuff to remote scratchBuff (out-of-place) setupMeshConnections(fstRoundChannels, inputBuff_.get(), args_.maxBytes, scratchBuff_.get(), args_.maxBytes); if (fstRoundChannels.size() > sizeof(constDevFstRoundChans) / sizeof(DeviceHandle)) { - std::unexpected(); + std::runtime_error("unexpected error"); } CUDATHROW(cudaMemcpyToSymbol(constDevFstRoundChans, fstRoundChannels.data(), sizeof(DeviceHandle) * fstRoundChannels.size())); @@ -1177,14 +1177,14 @@ void AllReduceTestEngine::setupConnections() { // Send data from local inputBuff to remote inputBuff (in-place) setupMeshConnections(sndRoundChannels, inputBuff_.get(), args_.maxBytes); if (sndRoundChannels.size() > sizeof(constDevSndRoundChans) / sizeof(DeviceHandle)) { - std::unexpected(); + std::runtime_error("unexpected error"); } CUDATHROW(cudaMemcpyToSymbol(constDevSndRoundChans, sndRoundChannels.data(), sizeof(DeviceHandle) * sndRoundChannels.size())); setupMeshConnections(smOutOfPlaceChannels_, inputBuff_.get(), args_.maxBytes, scratchBuff_.get(), args_.maxBytes); if (smOutOfPlaceChannels_.size() > sizeof(constSmOutOfPlaceChans) / sizeof(DeviceHandle)) { - std::unexpected(); + std::runtime_error("unexpected error"); } std::vector> smChannelDeviceHandles(smOutOfPlaceChannels_.size()); getChannelDeviceHandle(smOutOfPlaceChannels_, smChannelDeviceHandles); @@ -1193,7 +1193,7 @@ void AllReduceTestEngine::setupConnections() { setupMeshConnections(smInPlaceChannels_, inputBuff_.get(), args_.maxBytes); if (smInPlaceChannels_.size() > sizeof(constSmInPlaceChans) / sizeof(DeviceHandle)) { - std::unexpected(); + std::runtime_error("unexpected error"); } smChannelDeviceHandles.resize(smInPlaceChannels_.size()); getChannelDeviceHandle(smInPlaceChannels_, smChannelDeviceHandles); @@ -1204,7 +1204,7 @@ void AllReduceTestEngine::setupConnections() { args_.maxBytes, ChannelSemantic::GET); if (smOutputPlaceGetChannels_.size() > sizeof(constSmOutOfPlaceGetChans) / sizeof(DeviceHandle)) { - std::unexpected(); + std::runtime_error("unexpected error"); } smChannelDeviceHandles.resize(smOutputPlaceGetChannels_.size()); getChannelDeviceHandle(smOutputPlaceGetChannels_, smChannelDeviceHandles); diff --git a/test/mscclpp-test/alltoall_test.cu b/test/mscclpp-test/alltoall_test.cu index 88f6fb4cc..8bb0b5a6d 100644 --- a/test/mscclpp-test/alltoall_test.cu +++ b/test/mscclpp-test/alltoall_test.cu @@ -2,7 +2,7 @@ // Licensed under the MIT license. #include -#include +#include #include "common.hpp" @@ -75,7 +75,7 @@ void AllToAllTestColl::runColl(const TestArgs& args, cudaStream_t stream) { } void AllToAllTestColl::initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) { - if (sendBuff.size() != 1) std::unexpected(); + if (sendBuff.size() != 1) std::runtime_error("unexpected error"); const int rank = args.rank; std::vector dataHost(recvCount_, 0); // For rank 0, the data is 0, 1, 2 ... recvCount_ - 1, for rank 1, the data is recvCount_, recvCount_ + 1, ... @@ -139,8 +139,8 @@ class AllToAllTestEngine : public BaseTestEngine { AllToAllTestEngine::AllToAllTestEngine(const TestArgs& args) : BaseTestEngine(args, "alltoall") { inPlace_ = false; } void AllToAllTestEngine::allocateBuffer() { - sendBuff_ = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); - recvBuff_ = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); + sendBuff_ = mscclpp::allocExtSharedCuda(args_.maxBytes / sizeof(int)); + recvBuff_ = mscclpp::allocExtSharedCuda(args_.maxBytes / sizeof(int)); expectedBuff_ = std::shared_ptr(new int[args_.maxBytes / sizeof(int)]); localSendBuff = sendBuff_.get(); @@ -152,7 +152,7 @@ void AllToAllTestEngine::setupConnections() { setupMeshConnections(proxyChannels, sendBuff_.get(), args_.maxBytes, recvBuff_.get(), args_.maxBytes); if (proxyChannels.size() > sizeof(constProxyChans) / sizeof(DeviceHandle)) { - std::unexpected(); + std::runtime_error("unexpected error"); } CUDATHROW(cudaMemcpyToSymbol(constProxyChans, proxyChannels.data(), sizeof(DeviceHandle) * proxyChannels.size())); diff --git a/test/mscclpp-test/common.cc b/test/mscclpp-test/common.cc index a8e533ab9..c5653b3fc 100644 --- a/test/mscclpp-test/common.cc +++ b/test/mscclpp-test/common.cc @@ -3,7 +3,6 @@ #include "common.hpp" -#include #include #include #include diff --git a/test/mscclpp-test/sendrecv_test.cu b/test/mscclpp-test/sendrecv_test.cu index 46cc0658f..1170dc4ad 100644 --- a/test/mscclpp-test/sendrecv_test.cu +++ b/test/mscclpp-test/sendrecv_test.cu @@ -5,8 +5,8 @@ #include #include #include -#include -#include +#include +#include #include #include #include @@ -87,7 +87,7 @@ std::vector SendRecvTestColl::getKernelRestrictions() { void SendRecvTestColl::initData(const TestArgs& args, std::vector sendBuff, void* expectedBuff) { int rank = args.rank; - if (sendBuff.size() != 1) std::unexpected(); + if (sendBuff.size() != 1) std::runtime_error("unexpected error"); MSCCLPP_CUDATHROW(cudaMemset(sendBuff[0], 0, sendCount_ * typeSize_)); // TODO: The type should not limited to int. @@ -137,8 +137,8 @@ class SendRecvTestEngine : public BaseTestEngine { SendRecvTestEngine::SendRecvTestEngine(const TestArgs& args) : BaseTestEngine(args, "sendrecv") { inPlace_ = false; } void SendRecvTestEngine::allocateBuffer() { - std::shared_ptr sendBuff = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); - std::shared_ptr recvBuff = mscclpp::allocSharedCuda(args_.maxBytes / sizeof(int)); + std::shared_ptr sendBuff = mscclpp::allocExtSharedCuda(args_.maxBytes / sizeof(int)); + std::shared_ptr recvBuff = mscclpp::allocExtSharedCuda(args_.maxBytes / sizeof(int)); devicePtrs_.push_back(sendBuff); devicePtrs_.push_back(recvBuff); diff --git a/test/unit/cuda_utils_tests.cc b/test/unit/cuda_utils_tests.cc index 0fabd9740..c2f565967 100644 --- a/test/unit/cuda_utils_tests.cc +++ b/test/unit/cuda_utils_tests.cc @@ -3,7 +3,7 @@ #include -#include +#include TEST(CudaUtilsTest, AllocShared) { auto p1 = mscclpp::allocSharedCuda(); diff --git a/test/unit/fifo_tests.cu b/test/unit/fifo_tests.cu index 567592117..0cfe03e1e 100644 --- a/test/unit/fifo_tests.cu +++ b/test/unit/fifo_tests.cu @@ -3,8 +3,8 @@ #include -#include #include +#include #include #include diff --git a/test/unit/numa_tests.cc b/test/unit/numa_tests.cc index af09261a2..dfa63a74a 100644 --- a/test/unit/numa_tests.cc +++ b/test/unit/numa_tests.cc @@ -3,7 +3,7 @@ #include -#include +#include #include TEST(NumaTest, Basic) {