Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a function for stream to wait for a tag #746

Merged
merged 3 commits into from
Apr 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions examples/cpp/19_stream_tags/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
main
main.o
main_c
4 changes: 4 additions & 0 deletions examples/cpp/19_stream_tags/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
compile_cpp_example_with_modes(stream_tags main.cpp)

add_custom_target(cpp_example_stream_tags_okl ALL COMMAND ${CMAKE_COMMAND} -E copy ${CMAKE_CURRENT_SOURCE_DIR}/powerOfPi2.okl powerOfPi2.okl)
add_dependencies(examples_cpp_stream_tags cpp_example_stream_tags_okl)
27 changes: 27 additions & 0 deletions examples/cpp/19_stream_tags/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@

PROJ_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))

ifndef OCCA_DIR
include $(PROJ_DIR)/../../../scripts/build/Makefile
else
include ${OCCA_DIR}/scripts/build/Makefile
endif

#---[ COMPILATION ]-------------------------------
headers = $(wildcard $(incPath)/*.hpp) $(wildcard $(incPath)/*.tpp)
sources = $(wildcard $(srcPath)/*.cpp)

objects = $(subst $(srcPath)/,$(objPath)/,$(sources:.cpp=.o))

executables: ${PROJ_DIR}/main

${PROJ_DIR}/main: $(objects) $(headers) ${PROJ_DIR}/main.cpp
$(compiler) $(compilerFlags) -o ${PROJ_DIR}/main $(flags) $(objects) ${PROJ_DIR}/main.cpp $(paths) $(linkerFlags)

$(objPath)/%.o:$(srcPath)/%.cpp $(wildcard $(subst $(srcPath)/,$(incPath)/,$(<:.cpp=.hpp))) $(wildcard $(subst $(srcPath)/,$(incPath)/,$(<:.cpp=.tpp)))
$(compiler) $(compilerFlags) -o $@ $(flags) -c $(paths) $<

clean:
rm -f $(objPath)/*;
rm -f ${PROJ_DIR}/main;
#=================================================
28 changes: 28 additions & 0 deletions examples/cpp/19_stream_tags/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Example: Events

GPU devices introduce `streams`, which potentially allow parallel queueing of instructions

`Stream tags` are used to query and manage (synchronize) those streams

This example shows how to setup `occa::streamTag` to manage jobs in different streams

# Compiling the Example

```bash
make
```

## Usage

```
> ./main --help

Usage: ./main [OPTIONS]

Example showing the use of multiple non-blocking streams in a device

Options:
-d, --device Device properties (default: "{mode: 'CUDA', device_id: 0}")
-h, --help Print usage
-v, --verbose Compile kernels in verbose mode
```
92 changes: 92 additions & 0 deletions examples/cpp/19_stream_tags/main.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
#include <iostream>

#include <occa.hpp>

//---[ Internal Tools ]-----------------
// Note: These headers are not officially supported
// Please don't rely on it outside of the occa examples
#include <occa/internal/utils/cli.hpp>
//======================================


occa::json parseArgs(int argc, const char **argv);

int main(int argc, const char **argv) {
occa::json args = parseArgs(argc, argv);

occa::setDevice(occa::json::parse(args["options/device"]));

int entries = 1<<20;
int block = 64;
int group = 1;

float *a = new float[entries];
for (int i = 0; i < entries; i++)
a[i] = 0.f;

occa::memory o_a = occa::malloc<float>(entries);
o_a.copyFrom(a);

occa::json kernelProps({
{"defines/block", block},
{"defines/group", group},
{"serial/include_std", true},
});
occa::kernel powerOfPi2 = occa::buildKernel("powerOfPi2.okl",
"powerOfPi2",
kernelProps);
occa::json streamProps({
{"nonblocking", true},
});
occa::stream stream_a = occa::createStream(streamProps);
occa::stream stream_b = occa::createStream(streamProps);

occa::setStream(stream_a);
powerOfPi2(o_a, entries);
occa::streamTag tag_a = occa::tagStream();

// set stream_b to wait for the job(s) to be finished in stream_a
occa::streamWait(stream_b, tag_a);

occa::setStream(stream_b);
powerOfPi2(o_a, entries);
occa::streamTag tag_b = occa::tagStream();

// set the device to wait for stream_b to finish
occa::waitFor(tag_b);

o_a.copyTo(a);

const float tol = 1e-3;
for (auto i = 0; i < entries; i++) {
if (fabs(a[i] - 3.14159) > tol) {
std::cerr << "Invalid output value: " << a[i] << " in " << i << std::endl;
return -1;
}
}
return 0;
}


occa::json parseArgs(int argc, const char **argv) {
occa::cli::parser parser;
parser
.withDescription(
"Example showing the use of multiple device streams"
)
.addOption(
occa::cli::option('d', "device",
"Device properties (default: \"{mode: 'CUDA', device_id: 0}\")")
.withArg()
.withDefaultValue("{mode: 'CUDA', device_id: 0}")
)
.addOption(
occa::cli::option('v', "verbose",
"Compile kernels in verbose mode")
);

occa::json args = parser.parseArgs(argc, argv);
occa::settings()["kernel/verbose"] = args["options/verbose"];

return args;
}
10 changes: 10 additions & 0 deletions examples/cpp/19_stream_tags/powerOfPi2.okl
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
@kernel void powerOfPi2(float* x,
int entries) {
for (int g = 0; g < group; g++; @outer) {
for (int i = 0; i < block; ++i; @inner) {
for (int j=i+g*block; j < entries; j+=block*group) {
x[j] = pow(3.14159f,x[j]);
}
}
}
}
1 change: 1 addition & 0 deletions examples/cpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ add_subdirectory(14_cuda_interop)
add_subdirectory(17_memory_pool)

add_subdirectory(18_nonblocking_streams)
add_subdirectory(19_stream_tags)
add_subdirectory(20_native_dpcpp_kernel)
add_subdirectory(30_device_function)

Expand Down
2 changes: 2 additions & 0 deletions include/occa/c/base.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ occaStreamTag occaTagStream();

void occaWaitForTag(occaStreamTag tag);

void occaStreamWaitForTag(occaStream stream, occaStreamTag tag);

double occaTimeBetweenTags(occaStreamTag startTag,
occaStreamTag endTag);

Expand Down
1 change: 1 addition & 0 deletions include/occa/core/base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ namespace occa {
stream createStream(const occa::json &props = occa::json());
stream getStream();
void setStream(stream s);
void streamWait(stream s, streamTag tag);

streamTag tagStream();

Expand Down
12 changes: 12 additions & 0 deletions include/occa/core/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
namespace occa {
class modeStream_t; class stream;
class modeDevice_t; class device;
class streamTag;

/**
* @startDoc{stream}
Expand Down Expand Up @@ -148,6 +149,17 @@ namespace occa {
*/
void finish();

/**
* @startDoc{waitFor}
*
* Description:
* Waits for any operations submitted before a given streamg tag recording
* to complete where the stream tag may be created on a different stream.
*
* @endDoc
*/
void waitFor(occa::streamTag tag);

/**
* @startDoc{unwrap}
*
Expand Down
4 changes: 4 additions & 0 deletions src/c/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@
occa::waitFor(occa::c::streamTag(tag));
}

void occaStreamWaitForTag(occaStream stream, occaStreamTag tag) {
occa::streamWait(occa::c::stream(stream), occa::c::streamTag(tag));

Check warning on line 79 in src/c/base.cpp

View check run for this annotation

Codecov / codecov/patch

src/c/base.cpp#L78-L79

Added lines #L78 - L79 were not covered by tests
}

double occaTimeBetweenTags(occaStreamTag startTag,
occaStreamTag endTag) {
return occa::timeBetween(occa::c::streamTag(startTag),
Expand Down
4 changes: 4 additions & 0 deletions src/core/base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,10 @@
return getDevice().tagStream();
}

void streamWait(stream s, streamTag tag) {
s.waitFor(tag);

Check warning on line 98 in src/core/base.cpp

View check run for this annotation

Codecov / codecov/patch

src/core/base.cpp#L97-L98

Added lines #L97 - L98 were not covered by tests
}

memoryPool createMemoryPool(const occa::json &props) {
return getDevice().createMemoryPool(props);
}
Expand Down
5 changes: 5 additions & 0 deletions src/core/stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include <occa/core/device.hpp>
#include <occa/internal/core/device.hpp>
#include <occa/internal/core/stream.hpp>
#include <occa/internal/core/streamTag.hpp>

namespace occa {
stream::stream() :
Expand Down Expand Up @@ -102,6 +103,10 @@
if(modeStream) modeStream->finish();
}

void stream::waitFor(occa::streamTag tag) {
if(modeStream) modeStream->waitFor(tag);

Check warning on line 107 in src/core/stream.cpp

View check run for this annotation

Codecov / codecov/patch

src/core/stream.cpp#L106-L107

Added lines #L106 - L107 were not covered by tests
}

void* stream::unwrap() {
OCCA_ERROR(
"stream::unwrap: stream is uninitialized or has been free'd",
Expand Down
1 change: 1 addition & 0 deletions src/occa/internal/api/metal/commandQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace occa {
void freeLastCommandBuffer();

event_t createEvent() const;
void waitForEvent(const event_t &event);

void clearCommandBuffer(void *commandBufferObj);
void setLastCommandBuffer(void *commandBufferObj);
Expand Down
10 changes: 10 additions & 0 deletions src/occa/internal/api/metal/commandQueue.mm
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@
lastCommandBufferObj);
}

void commandQueue_t::waitForEvent(const event_t &event) {
if (lastCommandBufferObj) {
id<MTLEvent> metalEvent = (__bridge id<MTLEvent>) event.eventObj;
id<MTLCommandBuffer> metalCommandBuffer = (
(__bridge id<MTLCommandBuffer>) lastCommandBufferObj
);
[metalCommandBuffer encodeWaitForEvent:metalEvent value:event.signalValue];
}
}

void commandQueue_t::clearCommandBuffer(void *commandBufferObj) {
if (commandBufferObj == lastCommandBufferObj) {
freeLastCommandBuffer();
Expand Down
1 change: 1 addition & 0 deletions src/occa/internal/api/metal/event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ namespace occa {
int eventId;
void *commandBufferObj;
double eventTime;
int signalValue;

event_t();

Expand Down
13 changes: 11 additions & 2 deletions src/occa/internal/api/metal/event.mm
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,17 @@
eventObj(eventObj_),
eventId(eventId_),
commandBufferObj(commandBufferObj_),
eventTime(0) {
eventTime(0),
signalValue(1) {
// If there are no active command buffers, use the current time
if (!commandBufferObj) {
eventTime = occa::sys::currentTime();
} else {
id<MTLEvent> metalEvent = (__bridge id<MTLEvent>) eventObj;
id<MTLCommandBuffer> metalCommandBuffer = (
(__bridge id<MTLCommandBuffer>) commandBufferObj
);
[metalCommandBuffer encodeSignalEvent:metalEvent value:signalValue];
}
}

Expand All @@ -37,14 +44,16 @@
eventId(other.eventId),
eventObj(other.eventObj),
commandBufferObj(other.commandBufferObj),
eventTime(other.eventTime) {}
eventTime(other.eventTime),
signalValue(other.signalValue) {}

event_t& event_t::operator = (const event_t &other) {
commandQueue = other.commandQueue;
eventId = other.eventId;
eventObj = other.eventObj;
commandBufferObj = other.commandBufferObj;
eventTime = other.eventTime;
signalValue = other.signalValue;
return *this;
}

Expand Down
1 change: 1 addition & 0 deletions src/occa/internal/api/metal/polyfill.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
event_t commandQueue_t::createEvent() const {
return event_t();
}
void commandQueue_t::waitForEvent(const event_t &event) {}

Check warning on line 76 in src/occa/internal/api/metal/polyfill.cpp

View check run for this annotation

Codecov / codecov/patch

src/occa/internal/api/metal/polyfill.cpp#L76

Added line #L76 was not covered by tests

void commandQueue_t::clearCommandBuffer(void *commandBufferObj) {}
void commandQueue_t::setLastCommandBuffer(void *commandBufferObj) {}
Expand Down
1 change: 1 addition & 0 deletions src/occa/internal/core/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ namespace occa {

//---[ Virtual Methods ]------------
virtual void finish() = 0;
virtual void waitFor(streamTag tag) = 0;

virtual void* unwrap() = 0;
//==================================
Expand Down
4 changes: 4 additions & 0 deletions src/occa/internal/modes/cuda/polyfill.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,10 @@ namespace occa {
inline CUresult cuStreamSynchronize(CUstream hStream) {
return OCCA_CUDA_IS_NOT_ENABLED;
}

inline CUresult cuStreamWaitEvent(CUstream hStream, CUevent hEvent, unsigned int Flags) {
return OCCA_CUDA_IS_NOT_ENABLED;
}
}

#endif
Expand Down
9 changes: 9 additions & 0 deletions src/occa/internal/modes/cuda/stream.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#include <occa/internal/modes/cuda/stream.hpp>
#include <occa/internal/modes/cuda/streamTag.hpp>
#include <occa/internal/modes/cuda/utils.hpp>

namespace occa {
Expand All @@ -24,6 +25,14 @@ namespace occa {
cuStreamSynchronize(cuStream));
}

void stream::waitFor(occa::streamTag tag) {
occa::cuda::streamTag *cuTag = (
dynamic_cast<occa::cuda::streamTag*>(tag.getModeStreamTag())
);
OCCA_CUDA_ERROR("Stream: waitFor",
cuStreamWaitEvent(cuStream, cuTag->cuEvent, 0));
}

void* stream::unwrap() {
return static_cast<void*>(&cuStream);
}
Expand Down
1 change: 1 addition & 0 deletions src/occa/internal/modes/cuda/stream.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace occa {

virtual ~stream();
void finish() override;
void waitFor(occa::streamTag tag) override;

void* unwrap() override;
};
Expand Down
1 change: 1 addition & 0 deletions src/occa/internal/modes/dpcpp/polyfill.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -295,6 +295,7 @@ class queue {
}

sycl::event ext_oneapi_submit_barrier() { return sycl::event();}
sycl::event ext_oneapi_submit_barrier( const std::vector<sycl::event> &waitList ) { return sycl::event(); }
};

inline void* malloc_device(size_t num_bytes,
Expand Down
Loading