Skip to content

Commit

Permalink
Initial Python Interface for cufile Async IO (#376)
Browse files Browse the repository at this point in the history
Hi there,

Thanks for this great repository! I want to use the cuFile async IO in my research project and noticed this kvikio repo. However, the initial support has been done in #259 and tracked in #204, but the Python interface hasn't been done yet. So I exported the write_async and read_async to the CuFile Python class and added test case. This will be very helpful for my project where I want to do the PyTorch training computation and simultaneously load tensors from the SSDs. I created this PR because hopefully, it could be helpful for your repository as well as keeping the Python interface current.

Please let me know your thoughts. Thank you.

Best Regards,
Kun

Authors:
  - Kun Wu (https://github.com/K-Wu)
  - Mads R. B. Kristensen (https://github.com/madsbk)

Approvers:
  - Mads R. B. Kristensen (https://github.com/madsbk)

URL: #376
  • Loading branch information
K-Wu authored May 7, 2024
1 parent 8081fbb commit 6ed7bcc
Show file tree
Hide file tree
Showing 4 changed files with 227 additions and 6 deletions.
27 changes: 26 additions & 1 deletion python/kvikio/_lib/kvikio_cxx_api.pxd
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
Expand All @@ -12,12 +12,23 @@ from libcpp.utility cimport pair
from libcpp.vector cimport vector


cdef extern from "cuda.h":
ctypedef void* CUstream


cdef extern from "<future>" namespace "std" nogil:
cdef cppclass future[T]:
future() except +
T get() except +


cdef extern from "<kvikio/stream.hpp>" namespace "kvikio" nogil:
cdef cppclass StreamFuture:
StreamFuture() except +
StreamFuture(StreamFuture&&) except +
size_t check_bytes_done() except +


cdef extern from "<kvikio/utils.hpp>" namespace "kvikio" nogil:
bool is_future_done[T](const T& future) except +

Expand Down Expand Up @@ -97,3 +108,17 @@ cdef extern from "<kvikio/file_handle.hpp>" namespace "kvikio" nogil:
size_t file_offset,
size_t devPtr_offset
) except +
StreamFuture read_async(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset,
CUstream stream
) except +
StreamFuture write_async(
void* devPtr_base,
size_t size,
size_t file_offset,
size_t devPtr_offset,
CUstream stream
) except +
43 changes: 41 additions & 2 deletions python/kvikio/_lib/libkvikio.pyx
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2021-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

# distutils: language = c++
Expand All @@ -12,7 +12,22 @@ from libcpp.utility cimport move, pair

from . cimport kvikio_cxx_api
from .arr cimport Array
from .kvikio_cxx_api cimport FileHandle, future, is_future_done
from .kvikio_cxx_api cimport CUstream, FileHandle, StreamFuture, future, is_future_done


cdef class IOFutureStream:
"""Wrap a C++ StreamFuture in a Python object"""
cdef StreamFuture _handle

def check_bytes_done(self) -> int:
return self._handle.check_bytes_done()


cdef IOFutureStream _wrap_stream_future(StreamFuture &fut):
"""Wrap a C++ future (of a `size_t`) in a `IOFuture` instance"""
ret = IOFutureStream()
ret._handle = move(fut)
return ret


cdef class IOFuture:
Expand Down Expand Up @@ -165,6 +180,30 @@ cdef class CuFile:
dev_offset,
)

def read_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int,
st: uintptr_t) -> IOFutureStream:
stream = <CUstream>st
cdef pair[uintptr_t, size_t] info = _parse_buffer(buf, size, False)
return _wrap_stream_future(self._handle.read_async(
<void*>info.first,
info.second,
file_offset,
dev_offset,
stream,
))

def write_async(self, buf, size: Optional[int], file_offset: int, dev_offset: int,
st: uintptr_t) -> IOFutureStream:
stream = <CUstream>st
cdef pair[uintptr_t, size_t] info = _parse_buffer(buf, size, False)
return _wrap_stream_future(self._handle.write_async(
<void*>info.first,
info.second,
file_offset,
dev_offset,
stream,
))


cdef class DriverProperties:
cdef kvikio_cxx_api.DriverProperties _handle
Expand Down
105 changes: 102 additions & 3 deletions python/kvikio/cufile.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright (c) 2022-2023, NVIDIA CORPORATION. All rights reserved.
# Copyright (c) 2022-2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import pathlib
Expand All @@ -7,6 +7,27 @@
from ._lib import libkvikio # type: ignore


class IOFutureStream:
"""Future for CuFile async IO
This class shouldn't be used directly, instead non-blocking async IO operations
such as `CuFile.raw_read_async` and `CuFile.raw_write_async` returns an instance
of this class.
The instance must be kept alive alive until all data has been read from disk. One
way to do this, is by calling `StreamFuture.check_bytes_done()`, which will
synchronize the associated stream and return the number of bytes read.
"""

__slots__ = "_handle"

def __init__(self, handle):
self._handle = handle

def check_bytes_done(self) -> int:
return self._handle.check_bytes_done()


class IOFuture:
"""Future for CuFile IO
Expand Down Expand Up @@ -260,8 +281,82 @@ def write(
"""
return self.pwrite(buf, size, file_offset, task_size).get()

def raw_read_async(
self,
buf,
stream,
size: Optional[int] = None,
file_offset: int = 0,
dev_offset: int = 0,
) -> IOFutureStream:
"""Reads specified bytes from the file into the device memory asynchronously
This is an async version of `.raw_read` that doesn't use threads and
does not support host memory.
Parameters
----------
buf: buffer-like or array-like
Device buffer to read into.
stream: cuda.Stream
CUDA stream to perform the read operation asynchronously.
size: int, optional
Size in bytes to read.
file_offset: int, optional
Offset in the file to read from.
Returns
-------
IOFutureStream
Future that when executed ".check_bytes_done()" returns the size of bytes
that were successfully read. The instance must be kept alive until
all data has been read from disk. One way to do this, is by calling
`IOFutureStream.check_bytes_done()`, which will synchronize the associated
stream and return the number of bytes read.
"""
return self._handle.read_async(buf, size, file_offset, dev_offset, stream)

def raw_write_async(
self,
buf,
stream,
size: Optional[int] = None,
file_offset: int = 0,
dev_offset: int = 0,
) -> IOFutureStream:
"""Writes specified bytes from the device memory into the file asynchronously
This is an async version of `.raw_write` that doesn't use threads and
does not support host memory.
Parameters
----------
buf: buffer-like or array-like
Device buffer to write to.
stream: cuda.Stream
CUDA stream to perform the write operation asynchronously.
size: int, optional
Size in bytes to write.
file_offset: int, optional
Offset in the file to write from.
Returns
-------
IOFutureStream
Future that when executed ".check_bytes_done()" returns the size of bytes
that were successfully written. The instance must be kept alive until
all data has been written to disk. One way to do this, is by calling
`IOFutureStream.check_bytes_done()`, which will synchronize the associated
stream and return the number of bytes written.
"""
return self._handle.write_async(buf, size, file_offset, dev_offset, stream)

def raw_read(
self, buf, size: Optional[int] = None, file_offset: int = 0, dev_offset: int = 0
self,
buf,
size: Optional[int] = None,
file_offset: int = 0,
dev_offset: int = 0,
) -> int:
"""Reads specified bytes from the file into the device memory
Expand Down Expand Up @@ -297,7 +392,11 @@ def raw_read(
return self._handle.read(buf, size, file_offset, dev_offset)

def raw_write(
self, buf, size: Optional[int] = None, file_offset: int = 0, dev_offset: int = 0
self,
buf,
size: Optional[int] = None,
file_offset: int = 0,
dev_offset: int = 0,
) -> int:
"""Writes specified bytes from the device memory into the file
Expand Down
58 changes: 58 additions & 0 deletions python/tests/test_async_io.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved.
# See file LICENSE for terms.

import os

import cupy
import pytest

import kvikio
import kvikio.defaults


def check_bit_flags(x: int, y: int) -> bool:
"""Check that the bits set in `y` is also set in `x`"""
return x & y == y


@pytest.mark.parametrize("size", [1, 10, 100, 1000, 1024, 4096, 4096 * 10])
def test_read_write(tmp_path, size):
"""Test basic read/write"""
filename = tmp_path / "test-file"

stream = cupy.cuda.Stream()

# Write file
a = cupy.arange(size)
f = kvikio.CuFile(filename, "w")
assert not f.closed
assert check_bit_flags(f.open_flags(), os.O_WRONLY)
assert f.raw_write_async(a, stream.ptr).check_bytes_done() == a.nbytes

# Try to read file opened in write-only mode
with pytest.raises(RuntimeError, match="unsupported file open flags"):
# The exception is raised when we call the raw_read_async API.
future_stream = f.raw_read_async(a, stream.ptr)
future_stream.check_bytes_done()

# Close file
f.close()
assert f.closed

# Read file into a new array and compare
b = cupy.empty_like(a)
c = cupy.empty_like(a)
f = kvikio.CuFile(filename, "r+")
assert check_bit_flags(f.open_flags(), os.O_RDWR)

future_stream = f.raw_read_async(b, stream.ptr)
future_stream2 = f.raw_write_async(b, stream.ptr)
future_stream3 = f.raw_read_async(c, stream.ptr)
assert (
future_stream.check_bytes_done()
== future_stream2.check_bytes_done()
== future_stream3.check_bytes_done()
== b.nbytes
)
assert all(a == b)
assert all(a == c)

0 comments on commit 6ed7bcc

Please sign in to comment.