diff --git a/examples/cpp/07_streams/main.cpp b/examples/cpp/07_streams/main.cpp index 04cff4c24..36d0ab631 100644 --- a/examples/cpp/07_streams/main.cpp +++ b/examples/cpp/07_streams/main.cpp @@ -25,34 +25,75 @@ int main(int argc, const char **argv) { ab[i] = 0; } - occa::kernel addVectors; - occa::memory o_a, o_b, o_ab; + // The default stream + occa::stream streamA = occa::getStream(); + + // Another, new stream + occa::stream streamB = occa::createStream(); - occa::stream streamA, streamB; + occa::memory o_a = occa::malloc(entries); + occa::memory o_b = occa::malloc(entries); + occa::memory o_ab = occa::malloc(entries); - streamA = occa::getStream(); - streamB = occa::createStream(); - - o_a = occa::malloc(entries); - o_b = occa::malloc(entries); - o_ab = occa::malloc(entries); - - addVectors = occa::buildKernel("addVectors.okl", + occa::kernel addVectors = occa::buildKernel("addVectors.okl", "addVectors"); - o_a.copyFrom(a); - o_b.copyFrom(b); + // Pass this property to make copies non-blocking on the host. + occa::json async_copy({{"async", true}}); + + // These copies will be submitted to the current + // stream, which is streamA--the default stream. + o_a.copyFrom(a,async_copy); + o_b.copyFrom(b,async_copy); + + // Waits the copies in streamA to complete + streamA.finish(); + + // **IMPORTANT** + // Operating on overlaping memory regions simultaneously + // from different streams, without appropriate + // synchronization, leads to undefined behavior. + + // Create *non-overlapping* memory slices for use by + // kernels launched in separate streams. + occa::memory o_a1 = o_a.slice(0); // First half of a + occa::memory o_a2 = o_a.slice(entries/2); // Second half of a + + occa::memory o_b1 = o_b.slice(0); + occa::memory o_b2 = o_b.slice(entries/2); + + occa::memory o_ab1 = o_ab.slice(0); + occa::memory o_ab2 = o_ab.slice(entries/2); occa::setStream(streamA); - addVectors(entries, o_a, o_b, o_ab); + // This kernel launch is submitted to streamA. + // It operates on the first half of each vector. + addVectors(entries/2, o_a1, o_b1, o_ab1); occa::setStream(streamB); - addVectors(entries, o_a, o_b, o_ab); + // This kernel launch is submitted to streamB. + // It operates on the second half of each vector. + addVectors(entries/2, o_a2, o_b2, o_ab2); - o_ab.copyTo(ab); + // The copy below will be submitted to streamB; + // however, we need to wait for the kernel + // submitted to streamA to finish since the + // entire vector is copied. + streamA.finish(); + o_ab.copyTo(ab,async_copy); - for (int i = 0; i < entries; ++i) + // Wait for streamB to finish + streamB.finish(); + + // Verify the results + for (int i = 0; i < entries; ++i) { std::cout << i << ": " << ab[i] << '\n'; + } + for (int i = 0; i < entries; ++i) { + if (!occa::areBitwiseEqual(ab[i], a[i] + b[i])) { + throw 1; + } + } delete [] a; delete [] b; diff --git a/include/occa/c/device.h b/include/occa/c/device.h index 7f89aeb22..c9cdbf0f1 100644 --- a/include/occa/c/device.h +++ b/include/occa/c/device.h @@ -27,6 +27,8 @@ occaUDim_t occaDeviceMemoryAllocated(occaDevice device); void occaDeviceFinish(occaDevice device); +void occaDeviceFinishAll(occaDevice device); + bool occaDeviceHasSeparateMemorySpace(occaDevice device); //---[ Stream ]------------------------- diff --git a/include/occa/c/stream.h b/include/occa/c/stream.h new file mode 100644 index 000000000..48022a487 --- /dev/null +++ b/include/occa/c/stream.h @@ -0,0 +1,14 @@ + +ifndef OCCA_C_STREAM_HEADER +#define OCCA_C_STREAM_HEADER + +#include +#include + +OCCA_START_EXTERN_C + +void occaStreamFinish(occaStream stream); + +OCCA_END_EXTERN_C + +#endif diff --git a/include/occa/core/device.hpp b/include/occa/core/device.hpp index 371e5364c..3d901ef76 100644 --- a/include/occa/core/device.hpp +++ b/include/occa/core/device.hpp @@ -349,13 +349,26 @@ namespace occa { * @startDoc{finish} * * Description: - * Finishes any asynchronous operation queued up on the device, such as - * [[async memory allocations|device.malloc]] or [[kernel calls|kernel.operator_parentheses]]. + * Waits for all asynchronous operations, such as + * [[async memory allocations|device.malloc]] or [[kernel calls|kernel.operator_parentheses]], + * submitted to the current stream on this device to complete. * * @endDoc */ void finish(); + /** + * @startDoc{finish} + * + * Description: + * Waits for all asynchronous operations, such as + * [[async memory allocations|device.malloc]] or [[kernel calls|kernel.operator_parentheses]], + * submitted to all streams on this device to complete. + * + * @endDoc + */ + void finishAll(); + /** * @startDoc{hasSeparateMemorySpace} * diff --git a/include/occa/core/stream.hpp b/include/occa/core/stream.hpp index 0075836b5..b72310d6b 100644 --- a/include/occa/core/stream.hpp +++ b/include/occa/core/stream.hpp @@ -136,6 +136,17 @@ namespace occa { * @endDoc */ void free(); + + /** + * @startDoc{finish} + * + * Description: + * Waits for all asynchronous operations, such as memory allocations + * or kernel calls, submitted to this device to complete. + * + * @endDoc + */ + void finish(); }; std::ostream& operator << (std::ostream &out, diff --git a/scripts/build/Make.fortran_rules b/scripts/build/Make.fortran_rules index 31eb21621..1bb39cf0e 100644 --- a/scripts/build/Make.fortran_rules +++ b/scripts/build/Make.fortran_rules @@ -8,6 +8,7 @@ $(fObjPath)/occa_device_m.o: $(fObjPath)/occa_types_m.o $(fObjPath)/occa_memory_m.o: $(fObjPath)/occa_types_m.o $(fObjPath)/occa_kernel_m.o: $(fObjPath)/occa_types_m.o $(fObjPath)/occa_kernelBuilder_m.o: $(fObjPath)/occa_types_m.o +$(fObjPath)/occa_stream_m.o: $(fObjPath)/occa_types_m.o $(fObjPath)/occa_uva_m.o: $(fObjPath)/occa_types_m.o $(fObjPath)/occa_scope_m.o: $(fObjPath)/occa_types_m.o $(fObjPath)/occa_json_m.o: $(fObjPath)/occa_types_m.o @@ -20,6 +21,7 @@ $(fObjPath)/occa_m.o: $(fObjPath)/fc_string_m.o \ $(fObjPath)/occa_memory_m.o \ $(fObjPath)/occa_kernel_m.o \ $(fObjPath)/occa_kernelBuilder_m.o \ + $(fObjPath)/occa_stream_m.o \ $(fObjPath)/occa_uva_m.o \ $(fObjPath)/occa_scope_m.o \ $(fObjPath)/occa_json_m.o diff --git a/src/c/device.cpp b/src/c/device.cpp index c6759a364..0f46b0359 100644 --- a/src/c/device.cpp +++ b/src/c/device.cpp @@ -70,6 +70,10 @@ void occaDeviceFinish(occaDevice device) { occa::c::device(device).finish(); } +void occaDeviceFinishAll(occaDevice device) { + occa::c::device(device).finishAll(); +} + bool occaDeviceHasSeparateMemorySpace(occaDevice device) { return (int) occa::c::device(device).hasSeparateMemorySpace(); } diff --git a/src/c/stream.cpp b/src/c/stream.cpp new file mode 100644 index 000000000..a7782a375 --- /dev/null +++ b/src/c/stream.cpp @@ -0,0 +1,11 @@ +#include +#include +#include + +OCCA_START_EXTERN_C + +void occaStreamFinish(occaStream stream) { + occa::c::stream(stream).finish(); +} + +OCCA_END_EXTERN_C diff --git a/src/core/device.cpp b/src/core/device.cpp index c1c9524d8..e4fd62e68 100644 --- a/src/core/device.cpp +++ b/src/core/device.cpp @@ -245,6 +245,12 @@ namespace occa { } } + void device::finishAll() { + if (modeDevice) { + modeDevice->finishAll(); + } + } + bool device::hasSeparateMemorySpace() { return (modeDevice && modeDevice->hasSeparateMemorySpace()); diff --git a/src/core/stream.cpp b/src/core/stream.cpp index b41bb0eb2..9d0a27582 100644 --- a/src/core/stream.cpp +++ b/src/core/stream.cpp @@ -98,6 +98,10 @@ namespace occa { modeStream = NULL; } + void stream::finish() { + if(modeStream) modeStream->finish(); + } + std::ostream& operator << (std::ostream &out, const occa::stream &stream) { out << stream.properties(); diff --git a/src/fortran/occa_device_m.f90 b/src/fortran/occa_device_m.f90 index 074d3c171..de2a44933 100644 --- a/src/fortran/occa_device_m.f90 +++ b/src/fortran/occa_device_m.f90 @@ -93,6 +93,13 @@ subroutine occaDeviceFinish(device) bind(C, name="occaDeviceFinish") type(occaDevice), value :: device end subroutine + ! void occaDeviceFinishAll(occaDevice device); + subroutine occaDeviceFinishAll(device) bind(C, name="occaDeviceFinishAll") + import occaDevice + implicit none + type(occaDevice), value :: device + end subroutine + ! bool occaDeviceHasSeparateMemorySpace(occaDevice device); logical(kind=C_bool) function occaDeviceHasSeparateMemorySpace(device) & bind(C, name="occaDeviceHasSeparateMemorySpace") diff --git a/src/fortran/occa_m.f90 b/src/fortran/occa_m.f90 index ca120dd1b..e84259f8e 100644 --- a/src/fortran/occa_m.f90 +++ b/src/fortran/occa_m.f90 @@ -8,6 +8,7 @@ module occa use occa_memory_m use occa_kernel_m use occa_kernelBuilder_m + use occa_stream_m use occa_scope_m use occa_json_m diff --git a/src/fortran/occa_stream_m.f90 b/src/fortran/occa_stream_m.f90 new file mode 100644 index 000000000..dc277b10c --- /dev/null +++ b/src/fortran/occa_stream_m.f90 @@ -0,0 +1,16 @@ +module occa_stream_m +! occa/c/device.h + +use occa_types_m +implicit none + +interface + ! void occaStreamFinish(occaStream stream); + subroutine occaStreamFinish(stream) bind(C, name="occaStreamFinish") + import occaStream + implicit none + type(occaStream), value :: stream + end subroutine +end interface + +end module occa_stream_m \ No newline at end of file diff --git a/src/occa/internal/core/device.cpp b/src/occa/internal/core/device.cpp index 4991e17c8..9638e01a7 100644 --- a/src/occa/internal/core/device.cpp +++ b/src/occa/internal/core/device.cpp @@ -80,6 +80,16 @@ namespace occa { streamTagRing.removeRef(streamTag); } + void modeDevice_t::finish() const { + currentStream.getModeStream()->finish(); + } + + void modeDevice_t::finishAll() const { + for(auto* stream : streams) { + if(stream) stream->finish(); + } + } + hash_t modeDevice_t::versionedHash() const { return (occa::hash(settings()["version"]) ^ hash()); diff --git a/src/occa/internal/core/device.hpp b/src/occa/internal/core/device.hpp index aa9abf0d7..7b5d1b4d8 100644 --- a/src/occa/internal/core/device.hpp +++ b/src/occa/internal/core/device.hpp @@ -58,11 +58,12 @@ namespace occa { void addStreamTagRef(modeStreamTag_t *streamTag); void removeStreamTagRef(modeStreamTag_t *streamTag); + void finish() const; + void finishAll() const; + //---[ Virtual Methods ]------------ virtual ~modeDevice_t() = 0; - virtual void finish() const = 0; - virtual bool hasSeparateMemorySpace() const = 0; hash_t versionedHash() const; diff --git a/src/occa/internal/core/stream.hpp b/src/occa/internal/core/stream.hpp index cc34d649e..1f2fcc61a 100644 --- a/src/occa/internal/core/stream.hpp +++ b/src/occa/internal/core/stream.hpp @@ -24,6 +24,7 @@ namespace occa { //---[ Virtual Methods ]------------ virtual ~modeStream_t() = 0; + virtual void finish() = 0; //================================== }; } diff --git a/src/occa/internal/modes/cuda/device.cpp b/src/occa/internal/modes/cuda/device.cpp index 248d77930..d99a67f47 100644 --- a/src/occa/internal/modes/cuda/device.cpp +++ b/src/occa/internal/modes/cuda/device.cpp @@ -88,11 +88,6 @@ namespace occa { } } - void device::finish() const { - OCCA_CUDA_ERROR("Device: Finish", - cuStreamSynchronize(getCuStream())); - } - bool device::hasSeparateMemorySpace() const { return true; } diff --git a/src/occa/internal/modes/cuda/device.hpp b/src/occa/internal/modes/cuda/device.hpp index 63fb678e8..9965bcaf7 100644 --- a/src/occa/internal/modes/cuda/device.hpp +++ b/src/occa/internal/modes/cuda/device.hpp @@ -30,8 +30,6 @@ namespace occa { device(const occa::json &properties_); virtual ~device(); - virtual void finish() const; - virtual bool hasSeparateMemorySpace() const; virtual hash_t hash() const; diff --git a/src/occa/internal/modes/cuda/stream.cpp b/src/occa/internal/modes/cuda/stream.cpp index 5ba20e7a2..e9f14448f 100644 --- a/src/occa/internal/modes/cuda/stream.cpp +++ b/src/occa/internal/modes/cuda/stream.cpp @@ -18,5 +18,10 @@ namespace occa { ); } } + + void stream::finish() { + OCCA_CUDA_ERROR("Stream: Finish", + cuStreamSynchronize(cuStream)); + } } } diff --git a/src/occa/internal/modes/cuda/stream.hpp b/src/occa/internal/modes/cuda/stream.hpp index 0887e73e8..7b00aa192 100644 --- a/src/occa/internal/modes/cuda/stream.hpp +++ b/src/occa/internal/modes/cuda/stream.hpp @@ -20,6 +20,7 @@ namespace occa { bool isWrapped_=false); virtual ~stream(); + void finish() override; }; } } diff --git a/src/occa/internal/modes/dpcpp/device.cpp b/src/occa/internal/modes/dpcpp/device.cpp index 7f39f6316..45d2e99ec 100644 --- a/src/occa/internal/modes/dpcpp/device.cpp +++ b/src/occa/internal/modes/dpcpp/device.cpp @@ -50,11 +50,6 @@ namespace occa setCompilerLinkerOptions(kernelProps); } - void device::finish() const - { - getDpcppStream(currentStream).finish(); - } - hash_t device::hash() const { if (!hash_.initialized) diff --git a/src/occa/internal/modes/dpcpp/device.hpp b/src/occa/internal/modes/dpcpp/device.hpp index 93a4e0ab3..b9d676acb 100644 --- a/src/occa/internal/modes/dpcpp/device.hpp +++ b/src/occa/internal/modes/dpcpp/device.hpp @@ -25,8 +25,6 @@ namespace occa device(const occa::json &properties_); virtual ~device() = default; - virtual void finish() const override; - virtual inline bool hasSeparateMemorySpace() const override { return true; } virtual hash_t hash() const override; diff --git a/src/occa/internal/modes/dpcpp/stream.hpp b/src/occa/internal/modes/dpcpp/stream.hpp index 229866bbe..de4a8b953 100644 --- a/src/occa/internal/modes/dpcpp/stream.hpp +++ b/src/occa/internal/modes/dpcpp/stream.hpp @@ -18,7 +18,7 @@ namespace occa { virtual ~stream()=default; - void finish(); + void finish() override; occa::dpcpp::streamTag memcpy(void *dest, const void *src, occa::udim_t num_bytes); }; diff --git a/src/occa/internal/modes/hip/device.cpp b/src/occa/internal/modes/hip/device.cpp index ce492c9bc..7b06770ea 100644 --- a/src/occa/internal/modes/hip/device.cpp +++ b/src/occa/internal/modes/hip/device.cpp @@ -85,11 +85,6 @@ namespace occa { device::~device() { } - void device::finish() const { - OCCA_HIP_ERROR("Device: Finish", - hipStreamSynchronize(getHipStream())); - } - bool device::hasSeparateMemorySpace() const { return true; } diff --git a/src/occa/internal/modes/hip/device.hpp b/src/occa/internal/modes/hip/device.hpp index 76c4f357b..ed3df60ce 100644 --- a/src/occa/internal/modes/hip/device.hpp +++ b/src/occa/internal/modes/hip/device.hpp @@ -25,8 +25,6 @@ namespace occa { device(const occa::json &properties_); virtual ~device(); - virtual void finish() const; - virtual bool hasSeparateMemorySpace() const; virtual hash_t hash() const; diff --git a/src/occa/internal/modes/hip/stream.cpp b/src/occa/internal/modes/hip/stream.cpp index 7bbcb91d9..f8253e44b 100644 --- a/src/occa/internal/modes/hip/stream.cpp +++ b/src/occa/internal/modes/hip/stream.cpp @@ -17,5 +17,10 @@ namespace occa { hipStreamDestroy(hipStream)); } } + + void stream::finish() { + OCCA_HIP_ERROR("Stream: Finish", + hipStreamSynchronize(hipStream)); + } } } diff --git a/src/occa/internal/modes/hip/stream.hpp b/src/occa/internal/modes/hip/stream.hpp index 17f6e6300..cc8ee1f71 100644 --- a/src/occa/internal/modes/hip/stream.hpp +++ b/src/occa/internal/modes/hip/stream.hpp @@ -18,6 +18,7 @@ namespace occa { bool isWrapped_=false); virtual ~stream(); + void finish() override; }; } } diff --git a/src/occa/internal/modes/metal/device.cpp b/src/occa/internal/modes/metal/device.cpp index d2d5feea0..261c5e1b9 100644 --- a/src/occa/internal/modes/metal/device.cpp +++ b/src/occa/internal/modes/metal/device.cpp @@ -41,13 +41,6 @@ namespace occa { metalDevice.free(); } - void device::finish() const { - metal::stream &stream = ( - *((metal::stream*) (currentStream.getModeStream())) - ); - stream.metalCommandQueue.finish(); - } - bool device::hasSeparateMemorySpace() const { return true; } diff --git a/src/occa/internal/modes/metal/device.hpp b/src/occa/internal/modes/metal/device.hpp index d8991fbf0..92938b6c6 100644 --- a/src/occa/internal/modes/metal/device.hpp +++ b/src/occa/internal/modes/metal/device.hpp @@ -22,8 +22,6 @@ namespace occa { device(const occa::json &properties_); virtual ~device(); - virtual void finish() const; - virtual bool hasSeparateMemorySpace() const; virtual hash_t hash() const; diff --git a/src/occa/internal/modes/metal/stream.cpp b/src/occa/internal/modes/metal/stream.cpp index f7e830ca3..0a137cc5d 100644 --- a/src/occa/internal/modes/metal/stream.cpp +++ b/src/occa/internal/modes/metal/stream.cpp @@ -15,5 +15,9 @@ namespace occa { metalCommandQueue.free(); } } + + void stream::finish() { + metalCommandQueue.finish(); + } } } diff --git a/src/occa/internal/modes/metal/stream.hpp b/src/occa/internal/modes/metal/stream.hpp index 47ea01248..82406a638 100644 --- a/src/occa/internal/modes/metal/stream.hpp +++ b/src/occa/internal/modes/metal/stream.hpp @@ -18,6 +18,7 @@ namespace occa { bool isWrapped_=false); virtual ~stream(); + void finish() override; }; } } diff --git a/src/occa/internal/modes/opencl/device.cpp b/src/occa/internal/modes/opencl/device.cpp index f1314a632..16be70241 100644 --- a/src/occa/internal/modes/opencl/device.cpp +++ b/src/occa/internal/modes/opencl/device.cpp @@ -65,11 +65,6 @@ namespace occa { } } - void device::finish() const { - OCCA_OPENCL_ERROR("Device: Finish", - clFinish(getCommandQueue())); - } - bool device::hasSeparateMemorySpace() const { return true; } diff --git a/src/occa/internal/modes/opencl/device.hpp b/src/occa/internal/modes/opencl/device.hpp index 334240800..8b4937bec 100644 --- a/src/occa/internal/modes/opencl/device.hpp +++ b/src/occa/internal/modes/opencl/device.hpp @@ -23,8 +23,6 @@ namespace occa { device(const occa::json &properties_); virtual ~device(); - virtual void finish() const; - virtual bool hasSeparateMemorySpace() const; virtual hash_t hash() const; diff --git a/src/occa/internal/modes/opencl/stream.cpp b/src/occa/internal/modes/opencl/stream.cpp index 4122dd49c..dd0239d8d 100644 --- a/src/occa/internal/modes/opencl/stream.cpp +++ b/src/occa/internal/modes/opencl/stream.cpp @@ -13,5 +13,10 @@ namespace occa { OCCA_OPENCL_ERROR("Device: Freeing cl_command_queue", clReleaseCommandQueue(commandQueue)); } + + void stream::finish() { + OCCA_OPENCL_ERROR("Stream: finish", + clFinish(commandQueue)); + } } } diff --git a/src/occa/internal/modes/opencl/stream.hpp b/src/occa/internal/modes/opencl/stream.hpp index 8f8471a90..fe165973c 100644 --- a/src/occa/internal/modes/opencl/stream.hpp +++ b/src/occa/internal/modes/opencl/stream.hpp @@ -17,6 +17,7 @@ namespace occa { cl_command_queue commandQueue_); virtual ~stream(); + void finish() override; }; } } diff --git a/src/occa/internal/modes/serial/device.cpp b/src/occa/internal/modes/serial/device.cpp index 8bdd246b4..ce3df6586 100644 --- a/src/occa/internal/modes/serial/device.cpp +++ b/src/occa/internal/modes/serial/device.cpp @@ -17,8 +17,6 @@ namespace occa { device::~device() {} - void device::finish() const {} - bool device::hasSeparateMemorySpace() const { return false; } diff --git a/src/occa/internal/modes/serial/device.hpp b/src/occa/internal/modes/serial/device.hpp index bcf549b09..8f7e8f026 100644 --- a/src/occa/internal/modes/serial/device.hpp +++ b/src/occa/internal/modes/serial/device.hpp @@ -13,8 +13,6 @@ namespace occa { device(const occa::json &properties_); virtual ~device(); - virtual void finish() const; - virtual bool hasSeparateMemorySpace() const; virtual hash_t hash() const; diff --git a/src/occa/internal/modes/serial/stream.cpp b/src/occa/internal/modes/serial/stream.cpp index db15ec533..63a77c284 100644 --- a/src/occa/internal/modes/serial/stream.cpp +++ b/src/occa/internal/modes/serial/stream.cpp @@ -7,5 +7,6 @@ namespace occa { modeStream_t(modeDevice_, properties_) {} stream::~stream() {} + void stream::finish() {} } } diff --git a/src/occa/internal/modes/serial/stream.hpp b/src/occa/internal/modes/serial/stream.hpp index 0584ecc91..dede81008 100644 --- a/src/occa/internal/modes/serial/stream.hpp +++ b/src/occa/internal/modes/serial/stream.hpp @@ -12,6 +12,7 @@ namespace occa { const occa::json &properties_); virtual ~stream(); + void finish() override; }; } }