Skip to content

Commit

Permalink
Clear minor warnings (#214)
Browse files Browse the repository at this point in the history
Clear warnings from the clang compiler.
  • Loading branch information
chhwang authored Nov 14, 2023
1 parent 0863e86 commit 3521fb0
Show file tree
Hide file tree
Showing 9 changed files with 119 additions and 155 deletions.
5 changes: 0 additions & 5 deletions include/mscclpp/core.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -551,11 +551,6 @@ class NonblockingFuture {
/// @param future The shared future to move.
NonblockingFuture(std::shared_future<T>&& future) : future(std::move(future)) {}

/// Copy constructor.
///
/// @param other The @ref NonblockingFuture to copy.
NonblockingFuture(const NonblockingFuture& other) = default;

/// Check if the value is ready to be retrieved.
///
/// @return True if the value is ready, false otherwise.
Expand Down
2 changes: 1 addition & 1 deletion python/test/_cpp/proxy_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ class MyProxyService {
semaphores_(semaphores),
proxy_([&](mscclpp::ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) {
int cudaDevice;
cudaGetDevice(&cudaDevice);
MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDevice));
deviceNumaNode_ = mscclpp::getDeviceNumaNode(cudaDevice);
}

Expand Down
46 changes: 16 additions & 30 deletions test/allgather_test_cpp.cu
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,6 @@

static int nranksPerNode = 8;

// Propagate errors up

#define MSCCLPPCHECK(call) \
do { \
mscclppResult_t res = call; \
if (res != mscclppSuccess && res != mscclppInProgress) { \
/* Print the back trace*/ \
printf("Failure at %s:%d -> %s\n", __FILE__, __LINE__, mscclppGetErrorString(res)); \
return res; \
} \
} while (0)

// Check CUDA RT calls
#define CUDACHECK(cmd) \
do { \
Expand All @@ -54,8 +42,7 @@ template <class T>
using DeviceHandle = mscclpp::DeviceHandle<T>;
__constant__ DeviceHandle<mscclpp::SimpleProxyChannel> constProxyChans[16];

__device__ void allgather0(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int world_size,
int remoteRank, size_t nelemsPerGPU) {
__device__ void allgather0(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, size_t nelemsPerGPU) {
// this allgather is really simple and implemented as an alltoall

// this thread's role is a sender role
Expand All @@ -70,8 +57,8 @@ __device__ void allgather0(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan,
if ((threadIdx.x % 32) == 0) proxyChan.wait();
}

__device__ void localAllGather(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int world_size,
int nranksPerNode, int remoteRank, uint64_t offset, uint64_t size) {
__device__ void localAllGather(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int nranksPerNode,
int remoteRank, uint64_t offset, uint64_t size) {
// this allgather algorithm works as follows:
// Step 1: GPU rank i sends data to GPU rank (i+1) % nranksPerNode
// and waits for data from GPU rank (i-1) % nranksPerNode
Expand All @@ -91,9 +78,9 @@ __device__ void localAllGather(DeviceHandle<mscclpp::SimpleProxyChannel> proxyCh
}
}

__device__ void allgather1(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int world_size,
int nranksPerNode, int remoteRank, size_t nelemsPerGPU) {
localAllGather(proxyChan, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
__device__ void allgather1(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan, int rank, int nranksPerNode,
int remoteRank, size_t nelemsPerGPU) {
localAllGather(proxyChan, rank, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
nelemsPerGPU * sizeof(int));
if (remoteRank / nranksPerNode == rank / nranksPerNode)
if ((threadIdx.x % 32) == 0) proxyChan.flush();
Expand All @@ -116,7 +103,7 @@ __device__ void allgather2(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan,
// Step 1
// local allgather
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
localAllGather(proxyChan, rank, world_size, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
localAllGather(proxyChan, rank, nranksPerNode, remoteRank, rank * nelemsPerGPU * sizeof(int),
nelemsPerGPU * sizeof(int));
}
// cross-node exchange
Expand All @@ -134,7 +121,7 @@ __device__ void allgather2(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan,
// local allgather
int otherNghr = (rank + nranksPerNode) % world_size;
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
localAllGather(proxyChan, rank, world_size, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int),
localAllGather(proxyChan, rank, nranksPerNode, remoteRank, otherNghr * nelemsPerGPU * sizeof(int),
(nelemsPerGPU * (pipelineSize - 1)) / pipelineSize * sizeof(int));
}

Expand All @@ -152,7 +139,7 @@ __device__ void allgather2(DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan,
// Step 3
// local allgather
if (remoteRank / nranksPerNode == rank / nranksPerNode) {
localAllGather(proxyChan, rank, world_size, nranksPerNode, remoteRank,
localAllGather(proxyChan, rank, nranksPerNode, remoteRank,
(otherNghr * nelemsPerGPU + (nelemsPerGPU * (pipelineSize - 1)) / pipelineSize) * sizeof(int),
nelemsPerGPU / pipelineSize * sizeof(int));
}
Expand All @@ -170,9 +157,9 @@ __global__ void kernel(int rank, int world_size, int nranksPerNode, size_t nelem
DeviceHandle<mscclpp::SimpleProxyChannel> proxyChan = constProxyChans[warpId];

if (kernel == 0)
allgather0(proxyChan, rank, world_size, remoteRank, nelemsPerGPU);
allgather0(proxyChan, rank, nelemsPerGPU);
else if (kernel == 1)
allgather1(proxyChan, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
allgather1(proxyChan, rank, nranksPerNode, remoteRank, nelemsPerGPU);
else if (kernel == 2)
allgather2(proxyChan, rank, world_size, nranksPerNode, remoteRank, nelemsPerGPU);
}
Expand Down Expand Up @@ -388,7 +375,6 @@ int main(int argc, const char* argv[]) {
}
ip_port = (char*)parsedArgs["ip_port"].c_str();

int thisNode = rankToNode(rank);
int cudaNum = rankToLocalRank(rank);
CUDACHECK(cudaSetDevice(cudaNum));

Expand Down Expand Up @@ -452,19 +438,19 @@ int main(int argc, const char* argv[]) {
if (rank == 0) printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter);
cudaGraph_t graph;
cudaGraphExec_t instance;
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
CUDACHECK(cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal));
for (int i = 0; i < cudagraphiter; ++i) {
kernel<<<1, 32 * (world_size - 1), 0, stream>>>(rank, world_size, nranksPerNode, nelemsPerGPU, kernelNum);
}
cudaStreamEndCapture(stream, &graph);
cudaGraphInstantiate(&instance, graph, NULL, NULL, 0);
CUDACHECK(cudaStreamEndCapture(stream, &graph));
CUDACHECK(cudaGraphInstantiate(&instance, graph, NULL, NULL, 0));

int cudagraphwarmup = 10;
if (rank == 0)
printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup,
cudagraphiter);
for (int i = 0; i < cudagraphwarmup; ++i) {
cudaGraphLaunch(instance, stream);
CUDACHECK(cudaGraphLaunch(instance, stream));
}
CUDACHECK(cudaStreamSynchronize(stream));

Expand All @@ -477,7 +463,7 @@ int main(int argc, const char* argv[]) {
double t0, t1, ms, time_in_us;
t0 = getTime();
for (int i = 0; i < cudagraphlaunch; ++i) {
cudaGraphLaunch(instance, stream);
CUDACHECK(cudaGraphLaunch(instance, stream));
}
CUDACHECK(cudaStreamSynchronize(stream));

Expand Down
82 changes: 35 additions & 47 deletions test/allgather_test_host_offloading.cu
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,6 @@ int nranksPerNode;
int rank;
int world_size;

// Propagate errors up

// Check CUDA RT calls
#define CUCHECK(cmd) \
do { \
cudaError_t err = cmd; \
if (err != cudaSuccess) { \
printf("%s:%d Cuda failure '%s'\n", __FILE__, __LINE__, cudaGetErrorString(err)); \
exit(EXIT_FAILURE); \
} \
} while (false)

// Measure current time in second.
static double getTime(void) {
struct timespec tspec;
Expand All @@ -45,8 +33,8 @@ static double getTime(void) {
return (tspec.tv_nsec / 1.0e9) + tspec.tv_sec;
}

__global__ void kernel(int r, int nranks, mscclpp::FifoDeviceHandle fifo,
mscclpp::Host2DeviceSemaphore::DeviceHandle* handles, int handleIndex) {
__global__ void kernel(int r, mscclpp::FifoDeviceHandle fifo, mscclpp::Host2DeviceSemaphore::DeviceHandle* handles,
int handleIndex) {
int tid = threadIdx.x;
__syncthreads();
// uint64_t tail;
Expand Down Expand Up @@ -75,8 +63,8 @@ void print_usage(const char* prog) {

void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSize, size_t nelemsPerGPU, int** data_h,
int** data_d) {
CUCHECK(cudaMalloc(data_d, dataSize));
CUCHECK(cudaMemset(*data_d, 0, dataSize));
MSCCLPP_CUDATHROW(cudaMalloc(data_d, dataSize));
MSCCLPP_CUDATHROW(cudaMemset(*data_d, 0, dataSize));

*data_h = new int[nelemsPerGPU * world_size];
for (size_t i = 0; i < nelemsPerGPU * world_size; i++) {
Expand All @@ -87,29 +75,29 @@ void initializeAndAllocateAllGatherData(int rank, int world_size, size_t dataSiz
(*data_h)[i] = 0;
}
}
CUCHECK(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice));
MSCCLPP_CUDATHROW(cudaMemcpy(*data_d, *data_h, dataSize, cudaMemcpyHostToDevice));
}

class MyProxyService {
private:
int deviceNumaNode_;
mscclpp::Proxy proxy_;
int dataSize_;
std::vector<mscclpp::RegisteredMemory> remoteMemories_;
mscclpp::RegisteredMemory localMemory_;
std::vector<std::shared_ptr<mscclpp::Host2HostSemaphore>> hostSemaphores_;
std::vector<std::shared_ptr<mscclpp::Host2DeviceSemaphore>> deviceSemaphores1_;
std::vector<std::shared_ptr<mscclpp::Host2DeviceSemaphore>> deviceSemaphores2_;
std::vector<std::shared_ptr<mscclpp::Connection>> connections_;
int dataSize_;
mscclpp::Proxy proxy_;
int deviceNumaNode_;

public:
MyProxyService(mscclpp::Communicator& comm, int* data_d, int dataSize)
: remoteMemories_(world_size),
: dataSize_(dataSize),
remoteMemories_(world_size),
connections_(world_size),
dataSize_(dataSize),
proxy_([&](mscclpp::ProxyTrigger triggerRaw) { return handleTrigger(triggerRaw); }, [&]() { bindThread(); }) {
int cudaDevice;
CUCHECK(cudaGetDevice(&cudaDevice));
MSCCLPP_CUDATHROW(cudaGetDevice(&cudaDevice));
deviceNumaNode_ = mscclpp::getDeviceNumaNode(cudaDevice);

int thisNode = rankToNode(rank);
Expand Down Expand Up @@ -237,7 +225,7 @@ int main(int argc, char* argv[]) {
MPI_Comm_free(&shmcomm);

int cudaNum = rankToLocalRank(rank);
CUCHECK(cudaSetDevice(cudaNum));
MSCCLPP_CUDATHROW(cudaSetDevice(cudaNum));

if (rank == 0) printf("Initializing MSCCL++\n");
auto bootstrap = std::make_shared<mscclpp::TcpBootstrap>(rank, world_size);
Expand Down Expand Up @@ -268,30 +256,30 @@ int main(int argc, char* argv[]) {
mscclpp::FifoDeviceHandle fifo = proxyService.fifo().deviceHandle();
if (rank == 0) printf("Testing the correctness of AllGather implementation\n");
cudaStream_t stream;
CUCHECK(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
MSCCLPP_CUDATHROW(cudaStreamCreateWithFlags(&stream, cudaStreamNonBlocking));
mscclpp::Host2DeviceSemaphore::DeviceHandle* deviceHandles1;
mscclpp::Host2DeviceSemaphore::DeviceHandle* deviceHandles2;

CUCHECK(cudaMalloc(&deviceHandles1, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle) * world_size));
MSCCLPP_CUDATHROW(cudaMalloc(&deviceHandles1, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle) * world_size));
for (int i = 0; i < world_size; ++i) {
if (i == rank) continue;
auto handle = proxyService.getDeviceHandle1(i);
CUCHECK(cudaMemcpy(&deviceHandles1[i], &handle, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle),
cudaMemcpyHostToDevice));
MSCCLPP_CUDATHROW(cudaMemcpy(&deviceHandles1[i], &handle, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle),
cudaMemcpyHostToDevice));
}

CUCHECK(cudaMalloc(&deviceHandles2, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle) * world_size));
MSCCLPP_CUDATHROW(cudaMalloc(&deviceHandles2, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle) * world_size));
for (int i = 0; i < world_size; ++i) {
if (i == rank) continue;
auto handle = proxyService.getDeviceHandle2(i);
CUCHECK(cudaMemcpy(&deviceHandles2[i], &handle, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle),
cudaMemcpyHostToDevice));
MSCCLPP_CUDATHROW(cudaMemcpy(&deviceHandles2[i], &handle, sizeof(mscclpp::Host2DeviceSemaphore::DeviceHandle),
cudaMemcpyHostToDevice));
}

kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles1, 1);
CUCHECK(cudaStreamSynchronize(stream));
kernel<<<1, world_size, 0, stream>>>(rank, fifo, deviceHandles1, 1);
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));

CUCHECK(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost));
MSCCLPP_CUDATHROW(cudaMemcpy(data_h, data_d, dataSize, cudaMemcpyDeviceToHost));

for (size_t i = 0; i < nelemsPerGPU * world_size; i++) {
int val = i + 1;
Expand All @@ -307,14 +295,14 @@ int main(int argc, char* argv[]) {
double t0, t1, ms, time_in_us;
int iterwithoutcudagraph = 10;
if (rank == 0) printf("Running %d iterations of the kernel without CUDA graph\n", iterwithoutcudagraph);
CUCHECK(cudaStreamSynchronize(stream));
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));
bootstrap->barrier();
t0 = getTime();
for (int i = 0; i < iterwithoutcudagraph; ++i) {
kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles1, 1);
kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles2, 2);
kernel<<<1, world_size, 0, stream>>>(rank, fifo, deviceHandles1, 1);
kernel<<<1, world_size, 0, stream>>>(rank, fifo, deviceHandles2, 2);
}
CUCHECK(cudaStreamSynchronize(stream));
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));
bootstrap->barrier();
t1 = getTime();
ms = (t1 - t0) * 1000.0;
Expand All @@ -327,22 +315,22 @@ int main(int argc, char* argv[]) {
if (rank == 0) printf("Capturing %d iterations of the kernel in a CUDA graph\n", cudagraphiter);
cudaGraph_t graph;
cudaGraphExec_t instance;
cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal);
MSCCLPP_CUDATHROW(cudaStreamBeginCapture(stream, cudaStreamCaptureModeGlobal));
for (int i = 0; i < cudagraphiter; ++i) {
kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles1, 1);
kernel<<<1, world_size, 0, stream>>>(rank, world_size, fifo, deviceHandles2, 2);
kernel<<<1, world_size, 0, stream>>>(rank, fifo, deviceHandles1, 1);
kernel<<<1, world_size, 0, stream>>>(rank, fifo, deviceHandles2, 2);
}
cudaStreamEndCapture(stream, &graph);
cudaGraphInstantiate(&instance, graph, NULL, NULL, 0);
MSCCLPP_CUDATHROW(cudaStreamEndCapture(stream, &graph));
MSCCLPP_CUDATHROW(cudaGraphInstantiate(&instance, graph, NULL, NULL, 0));

int cudagraphwarmup = 10;
if (rank == 0)
printf("Warming up %d iterations of the CUDA graph with %d iterations of the kernel\n", cudagraphwarmup,
cudagraphiter);
for (int i = 0; i < cudagraphwarmup; ++i) {
cudaGraphLaunch(instance, stream);
MSCCLPP_CUDATHROW(cudaGraphLaunch(instance, stream));
}
CUCHECK(cudaStreamSynchronize(stream));
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));

// measure runtime
int cudagraphlaunch = 10;
Expand All @@ -352,9 +340,9 @@ int main(int argc, char* argv[]) {
bootstrap->barrier();
t0 = getTime();
for (int i = 0; i < cudagraphlaunch; ++i) {
cudaGraphLaunch(instance, stream);
MSCCLPP_CUDATHROW(cudaGraphLaunch(instance, stream));
}
CUCHECK(cudaStreamSynchronize(stream));
MSCCLPP_CUDATHROW(cudaStreamSynchronize(stream));

t1 = getTime();
ms = (t1 - t0) * 1000.0;
Expand Down
Loading

0 comments on commit 3521fb0

Please sign in to comment.