diff --git a/python/kvikio/_lib/kvikio_cxx_api.pxd b/python/kvikio/_lib/kvikio_cxx_api.pxd index 83c0a934bb..afb644f7e6 100644 --- a/python/kvikio/_lib/kvikio_cxx_api.pxd +++ b/python/kvikio/_lib/kvikio_cxx_api.pxd @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. # distutils: language = c++ @@ -12,12 +12,23 @@ from libcpp.utility cimport pair from libcpp.vector cimport vector +cdef extern from "cuda.h": + ctypedef void* CUstream + + cdef extern from "" namespace "std" nogil: cdef cppclass future[T]: future() except + T get() except + +cdef extern from "" namespace "kvikio" nogil: + cdef cppclass StreamFuture: + StreamFuture() except + + StreamFuture(StreamFuture&&) except + + size_t check_bytes_done() except + + + cdef extern from "" namespace "kvikio" nogil: bool is_future_done[T](const T& future) except + @@ -97,3 +108,17 @@ cdef extern from "" namespace "kvikio" nogil: size_t file_offset, size_t devPtr_offset ) except + + StreamFuture read_async( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset, + CUstream stream + ) except + + StreamFuture write_async( + void* devPtr_base, + size_t size, + size_t file_offset, + size_t devPtr_offset, + CUstream stream + ) except + diff --git a/python/kvikio/_lib/libkvikio.pyx b/python/kvikio/_lib/libkvikio.pyx index aeeb2e3fc8..c9c539877f 100644 --- a/python/kvikio/_lib/libkvikio.pyx +++ b/python/kvikio/_lib/libkvikio.pyx @@ -1,4 +1,4 @@ -# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. # distutils: language = c++ @@ -12,7 +12,22 @@ from libcpp.utility cimport move, pair from . cimport kvikio_cxx_api from .arr cimport Array -from .kvikio_cxx_api cimport FileHandle, future, is_future_done +from .kvikio_cxx_api cimport CUstream, FileHandle, StreamFuture, future, is_future_done + + +cdef class IOFutureStream: + """Wrap a C++ StreamFuture in a Python object""" + cdef StreamFuture _handle + + def check_bytes_done(self) -> int: + return self._handle.check_bytes_done() + + +cdef IOFutureStream _wrap_stream_future(StreamFuture &fut): + """Wrap a C++ future (of a `size_t`) in a `IOFuture` instance""" + ret = IOFutureStream() + ret._handle = move(fut) + return ret cdef class IOFuture: @@ -165,6 +180,30 @@ cdef class CuFile: dev_offset, ) + def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, + st: uintptr_t) -> IOFutureStream: + stream = st + cdef pair[uintptr_t, size_t] info = _parse_buffer(buf, size, False) + return _wrap_stream_future(self._handle.read_async( + info.first, + info.second, + file_offset, + dev_offset, + stream, + )) + + def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int, + st: uintptr_t) -> IOFutureStream: + stream = st + cdef pair[uintptr_t, size_t] info = _parse_buffer(buf, size, False) + return _wrap_stream_future(self._handle.write_async( + info.first, + info.second, + file_offset, + dev_offset, + stream, + )) + cdef class DriverProperties: cdef kvikio_cxx_api.DriverProperties _handle diff --git a/python/kvikio/cufile.py b/python/kvikio/cufile.py index c5ef010e67..7bba7bc566 100644 --- a/python/kvikio/cufile.py +++ b/python/kvikio/cufile.py @@ -1,4 +1,4 @@ -# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved. +# Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved. # See file LICENSE for terms. import pathlib @@ -7,6 +7,27 @@ from ._lib import libkvikio # type: ignore +class IOFutureStream: + """Future for CuFile async IO + + This class shouldn't be used directly, instead non-blocking async IO operations + such as `CuFile.raw_read_async` and `CuFile.raw_write_async` returns an instance + of this class. + + The instance must be kept alive alive until all data has been read from disk. One + way to do this, is by calling `StreamFuture.check_bytes_done()`, which will + synchronize the associated stream and return the number of bytes read. + """ + + __slots__ = "_handle" + + def __init__(self, handle): + self._handle = handle + + def check_bytes_done(self) -> int: + return self._handle.check_bytes_done() + + class IOFuture: """Future for CuFile IO @@ -260,8 +281,82 @@ def write( """ return self.pwrite(buf, size, file_offset, task_size).get() + def raw_read_async( + self, + buf, + stream, + size: Optional[int] = None, + file_offset: int = 0, + dev_offset: int = 0, + ) -> IOFutureStream: + """Reads specified bytes from the file into the device memory asynchronously + + This is an async version of `.raw_read` that doesn't use threads and + does not support host memory. + + Parameters + ---------- + buf: buffer-like or array-like + Device buffer to read into. + stream: cuda.Stream + CUDA stream to perform the read operation asynchronously. + size: int, optional + Size in bytes to read. + file_offset: int, optional + Offset in the file to read from. + + Returns + ------- + IOFutureStream + Future that when executed ".check_bytes_done()" returns the size of bytes + that were successfully read. The instance must be kept alive until + all data has been read from disk. One way to do this, is by calling + `IOFutureStream.check_bytes_done()`, which will synchronize the associated + stream and return the number of bytes read. + """ + return self._handle.read_async(buf, size, file_offset, dev_offset, stream) + + def raw_write_async( + self, + buf, + stream, + size: Optional[int] = None, + file_offset: int = 0, + dev_offset: int = 0, + ) -> IOFutureStream: + """Writes specified bytes from the device memory into the file asynchronously + + This is an async version of `.raw_write` that doesn't use threads and + does not support host memory. + + Parameters + ---------- + buf: buffer-like or array-like + Device buffer to write to. + stream: cuda.Stream + CUDA stream to perform the write operation asynchronously. + size: int, optional + Size in bytes to write. + file_offset: int, optional + Offset in the file to write from. + + Returns + ------- + IOFutureStream + Future that when executed ".check_bytes_done()" returns the size of bytes + that were successfully written. The instance must be kept alive until + all data has been written to disk. One way to do this, is by calling + `IOFutureStream.check_bytes_done()`, which will synchronize the associated + stream and return the number of bytes written. + """ + return self._handle.write_async(buf, size, file_offset, dev_offset, stream) + def raw_read( - self, buf, size: Optional[int] = None, file_offset: int = 0, dev_offset: int = 0 + self, + buf, + size: Optional[int] = None, + file_offset: int = 0, + dev_offset: int = 0, ) -> int: """Reads specified bytes from the file into the device memory @@ -297,7 +392,11 @@ def raw_read( return self._handle.read(buf, size, file_offset, dev_offset) def raw_write( - self, buf, size: Optional[int] = None, file_offset: int = 0, dev_offset: int = 0 + self, + buf, + size: Optional[int] = None, + file_offset: int = 0, + dev_offset: int = 0, ) -> int: """Writes specified bytes from the device memory into the file diff --git a/python/tests/test_async_io.py b/python/tests/test_async_io.py new file mode 100644 index 0000000000..27a9f22e05 --- /dev/null +++ b/python/tests/test_async_io.py @@ -0,0 +1,58 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# See file LICENSE for terms. + +import os + +import cupy +import pytest + +import kvikio +import kvikio.defaults + + +def check_bit_flags(x: int, y: int) -> bool: + """Check that the bits set in `y` is also set in `x`""" + return x & y == y + + +@pytest.mark.parametrize("size", [1, 10, 100, 1000, 1024, 4096, 4096 * 10]) +def test_read_write(tmp_path, size): + """Test basic read/write""" + filename = tmp_path / "test-file" + + stream = cupy.cuda.Stream() + + # Write file + a = cupy.arange(size) + f = kvikio.CuFile(filename, "w") + assert not f.closed + assert check_bit_flags(f.open_flags(), os.O_WRONLY) + assert f.raw_write_async(a, stream.ptr).check_bytes_done() == a.nbytes + + # Try to read file opened in write-only mode + with pytest.raises(RuntimeError, match="unsupported file open flags"): + # The exception is raised when we call the raw_read_async API. + future_stream = f.raw_read_async(a, stream.ptr) + future_stream.check_bytes_done() + + # Close file + f.close() + assert f.closed + + # Read file into a new array and compare + b = cupy.empty_like(a) + c = cupy.empty_like(a) + f = kvikio.CuFile(filename, "r+") + assert check_bit_flags(f.open_flags(), os.O_RDWR) + + future_stream = f.raw_read_async(b, stream.ptr) + future_stream2 = f.raw_write_async(b, stream.ptr) + future_stream3 = f.raw_read_async(c, stream.ptr) + assert ( + future_stream.check_bytes_done() + == future_stream2.check_bytes_done() + == future_stream3.check_bytes_done() + == b.nbytes + ) + assert all(a == b) + assert all(a == c)