diff --git a/csrc/aio.cpp b/csrc/aio.cpp index 30d9560..0796c34 100644 --- a/csrc/aio.cpp +++ b/csrc/aio.cpp @@ -1,5 +1,3 @@ -#include -#include #include "aio.h" AIOAsyncIO::AIOAsyncIO(unsigned int n_entries) @@ -128,9 +126,14 @@ void AIOAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned l this->n_read_events++; } -void AIOAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback) { +void AIOAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional pinned) { if (t.is_cuda()) { - t = t.to(torch::kCPU); + if (pinned.has_value()) { + pinned.value().copy_(t); + t = pinned.value(); + } else { + t = t.to(torch::kCPU); + } } void *buffer = t.data_ptr(); size_t n_bytes = t.numel() * t.element_size(); diff --git a/csrc/async_file_io.cpp b/csrc/async_file_io.cpp index 4896e87..918dfc8 100644 --- a/csrc/async_file_io.cpp +++ b/csrc/async_file_io.cpp @@ -8,8 +8,8 @@ void AsyncFileWriter::write(size_t buffer, size_t n_bytes, unsigned long long of this->aio->write(this->fd, ptr, n_bytes, offset, callback); } -void AsyncFileWriter::write_tensor(torch::Tensor tensor, unsigned long long offset, callback_t callback) { - this->aio->write_tensor(this->fd, tensor, offset, callback); +void AsyncFileWriter::write_tensor(torch::Tensor tensor, unsigned long long offset, callback_t callback, std::optional pinned) { + this->aio->write_tensor(this->fd, tensor, offset, callback, pinned); } diff --git a/csrc/pthread_backend.cpp b/csrc/pthread_backend.cpp index b2799ee..0fc0ab3 100644 --- a/csrc/pthread_backend.cpp +++ b/csrc/pthread_backend.cpp @@ -80,14 +80,17 @@ void PthreadAsyncIO::synchronize() { void PthreadAsyncIO::register_file(int fd) {} -void PthreadAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback) { +void PthreadAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional pinned) { auto fut = this->pool.submit_task( - [fd, t, offset] { + [fd, t, offset, pinned] { torch::Tensor cpu_tensor; if (t.is_cuda()) { - cpu_tensor = t.to(torch::kCPU); - } else { - cpu_tensor = t; + if (pinned.has_value()) { + pinned.value().copy_(t); + cpu_tensor = pinned.value(); + } else { + cpu_tensor = t.to(torch::kCPU); + } } void *buf = cpu_tensor.data_ptr(); size_t n_bytes = cpu_tensor.numel() * cpu_tensor.element_size(); diff --git a/csrc/py_api.cpp b/csrc/py_api.cpp index fced1ab..a305ac2 100644 --- a/csrc/py_api.cpp +++ b/csrc/py_api.cpp @@ -29,6 +29,6 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) py::class_(m, "AsyncFileWriter") .def(py::init(), py::arg("fd"), py::arg("n_entries"), py::arg("backend") = "aio") .def("write", &AsyncFileWriter::write, py::arg("buffer"), py::arg("n_bytes"), py::arg("offset"), py::arg("callback") = py::none()) - .def("write_tensor", &AsyncFileWriter::write_tensor, py::arg("tensor"), py::arg("offset"), py::arg("callback") = py::none()) + .def("write_tensor", &AsyncFileWriter::write_tensor, py::arg("tensor"), py::arg("offset"), py::arg("callback") = py::none(), py::arg("pinned") = py::none()) .def("synchronize", &AsyncFileWriter::synchronize); } \ No newline at end of file diff --git a/csrc/uring.cpp b/csrc/uring.cpp index 8cc3afc..8cd3dc0 100644 --- a/csrc/uring.cpp +++ b/csrc/uring.cpp @@ -99,9 +99,14 @@ void UringAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned this->n_read_events++; } -void UringAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback) { +void UringAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional pinned) { if (t.is_cuda()) { - t = t.to(torch::kCPU); + if (pinned.has_value()) { + pinned.value().copy_(t); + t = pinned.value(); + } else { + t = t.to(torch::kCPU); + } } void *buffer = t.data_ptr(); size_t n_bytes = t.numel() * t.element_size(); diff --git a/include/aio.h b/include/aio.h index c5300ae..a4aee4e 100644 --- a/include/aio.h +++ b/include/aio.h @@ -2,6 +2,8 @@ #include #include +#include +#include #include "asyncio.h" class AIOAsyncIO : public AsyncIO @@ -30,5 +32,5 @@ class AIOAsyncIO : public AsyncIO void synchronize(); void register_file(int fd); - void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback); + void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional pinned); }; \ No newline at end of file diff --git a/include/async_file_io.h b/include/async_file_io.h index 0e1d73b..d12e4fe 100644 --- a/include/async_file_io.h +++ b/include/async_file_io.h @@ -1,6 +1,7 @@ #pragma once #include #include +#include #include "asyncio.h" #include "backend.h" @@ -18,7 +19,7 @@ class AsyncFileWriter public: AsyncFileWriter(int fd, unsigned int n_entries, const std::string &backend); void write(size_t buffer, size_t n_bytes, unsigned long long offset, callback_t callback); - void write_tensor(torch::Tensor tensor, unsigned long long offset, callback_t callback); + void write_tensor(torch::Tensor tensor, unsigned long long offset, callback_t callback, std::optional pinned); void synchronize(); ~AsyncFileWriter(); diff --git a/include/asyncio.h b/include/asyncio.h index 88e726f..68a501a 100644 --- a/include/asyncio.h +++ b/include/asyncio.h @@ -48,5 +48,5 @@ class AsyncIO virtual void synchronize() = 0; virtual void register_file(int fd) = 0; - virtual void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback) = 0; + virtual void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional pinned) = 0; }; \ No newline at end of file diff --git a/include/pthread_backend.h b/include/pthread_backend.h index 8e9a9b7..97ce561 100644 --- a/include/pthread_backend.h +++ b/include/pthread_backend.h @@ -39,5 +39,5 @@ class PthreadAsyncIO : public AsyncIO void register_file(int fd); - void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback); + void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional pinned); }; \ No newline at end of file diff --git a/include/uring.h b/include/uring.h index 2377944..6f95215 100644 --- a/include/uring.h +++ b/include/uring.h @@ -26,5 +26,5 @@ class UringAsyncIO : public AsyncIO void synchronize(); void register_file(int fd); - void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback); + void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional pinned); }; \ No newline at end of file diff --git a/tensornvme/_C/__init__.pyi b/tensornvme/_C/__init__.pyi index 014bfa4..5bd330e 100644 --- a/tensornvme/_C/__init__.pyi +++ b/tensornvme/_C/__init__.pyi @@ -22,5 +22,5 @@ def probe_backend(backend: str) -> bool: ... class AsyncFileWriter: def __init__(self, fd: int, n_entries: int, backend: str = "aio") -> None: ... def write(self, buffer: int, n_bytes: int, offset: int, callback: Optional[Callable[[], None]] = None) -> None: ... - def write_tensor(self, tensor: Tensor, offset: int, callback: Optional[Callable[[], None]] = None) -> None: ... + def write_tensor(self, tensor: Tensor, offset: int, callback: Optional[Callable[[], None]] = None, pinned: Optional[Tensor] = None) -> None: ... def synchronize(self) -> None: ... diff --git a/tensornvme/async_file_io.py b/tensornvme/async_file_io.py index 4ab976d..48fd66a 100644 --- a/tensornvme/async_file_io.py +++ b/tensornvme/async_file_io.py @@ -1,7 +1,7 @@ import ctypes from functools import partial from torch import Tensor -from typing import List +from typing import List, Optional from io import IOBase from tensornvme._C import AsyncFileWriter as AsyncFileWriterC @@ -31,9 +31,9 @@ def write_raw(self, py_ref: object, buffer: int, n_bytes: int, offset: int) -> N self.io.write(buffer, n_bytes, offset, partial(AsyncFileWriter.gc_callback, self.buffers, len(self.buffers) - 1)) self.offset += n_bytes - def write_tensor(self, tensor: Tensor) -> None: + def write_tensor(self, tensor: Tensor, pinned: Optional[Tensor] = None) -> None: self.buffers.append(tensor) # append before callback is called - self.io.write_tensor(tensor, self.offset, partial(AsyncFileWriter.gc_callback, self.buffers, len(self.buffers) - 1)) + self.io.write_tensor(tensor, self.offset, partial(AsyncFileWriter.gc_callback, self.buffers, len(self.buffers) - 1), pinned) self.offset += tensor.numel() * tensor.element_size() @staticmethod