Skip to content

Commit

Permalink
[chore] add pinned mem buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
botbw committed Oct 29, 2024
1 parent 0ed8faa commit 8aa59d9
Show file tree
Hide file tree
Showing 12 changed files with 37 additions and 23 deletions.
11 changes: 7 additions & 4 deletions csrc/aio.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
#include <stdexcept>
#include <memory>
#include "aio.h"

AIOAsyncIO::AIOAsyncIO(unsigned int n_entries)
Expand Down Expand Up @@ -128,9 +126,14 @@ void AIOAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned l
this->n_read_events++;
}

void AIOAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback) {
void AIOAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned) {
if (t.is_cuda()) {
t = t.to(torch::kCPU);
if (pinned.has_value()) {
pinned.value().copy_(t);
t = pinned.value();
} else {
t = t.to(torch::kCPU);
}
}
void *buffer = t.data_ptr();
size_t n_bytes = t.numel() * t.element_size();
Expand Down
4 changes: 2 additions & 2 deletions csrc/async_file_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ void AsyncFileWriter::write(size_t buffer, size_t n_bytes, unsigned long long of
this->aio->write(this->fd, ptr, n_bytes, offset, callback);
}

void AsyncFileWriter::write_tensor(torch::Tensor tensor, unsigned long long offset, callback_t callback) {
this->aio->write_tensor(this->fd, tensor, offset, callback);
void AsyncFileWriter::write_tensor(torch::Tensor tensor, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned) {
this->aio->write_tensor(this->fd, tensor, offset, callback, pinned);
}


Expand Down
13 changes: 8 additions & 5 deletions csrc/pthread_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,17 @@ void PthreadAsyncIO::synchronize() {

void PthreadAsyncIO::register_file(int fd) {}

void PthreadAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback) {
void PthreadAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned) {
auto fut = this->pool.submit_task(
[fd, t, offset] {
[fd, t, offset, pinned] {
torch::Tensor cpu_tensor;
if (t.is_cuda()) {
cpu_tensor = t.to(torch::kCPU);
} else {
cpu_tensor = t;
if (pinned.has_value()) {
pinned.value().copy_(t);
cpu_tensor = pinned.value();
} else {
cpu_tensor = t.to(torch::kCPU);
}
}
void *buf = cpu_tensor.data_ptr();
size_t n_bytes = cpu_tensor.numel() * cpu_tensor.element_size();
Expand Down
2 changes: 1 addition & 1 deletion csrc/py_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,6 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m)
py::class_<AsyncFileWriter>(m, "AsyncFileWriter")
.def(py::init<int, unsigned int, const std::string &>(), py::arg("fd"), py::arg("n_entries"), py::arg("backend") = "aio")
.def("write", &AsyncFileWriter::write, py::arg("buffer"), py::arg("n_bytes"), py::arg("offset"), py::arg("callback") = py::none())
.def("write_tensor", &AsyncFileWriter::write_tensor, py::arg("tensor"), py::arg("offset"), py::arg("callback") = py::none())
.def("write_tensor", &AsyncFileWriter::write_tensor, py::arg("tensor"), py::arg("offset"), py::arg("callback") = py::none(), py::arg("pinned") = py::none())
.def("synchronize", &AsyncFileWriter::synchronize);
}
9 changes: 7 additions & 2 deletions csrc/uring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,9 +99,14 @@ void UringAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned
this->n_read_events++;
}

void UringAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback) {
void UringAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned) {
if (t.is_cuda()) {
t = t.to(torch::kCPU);
if (pinned.has_value()) {
pinned.value().copy_(t);
t = pinned.value();
} else {
t = t.to(torch::kCPU);
}
}
void *buffer = t.data_ptr<float>();
size_t n_bytes = t.numel() * t.element_size();
Expand Down
4 changes: 3 additions & 1 deletion include/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

#include <libaio.h>
#include <torch/torch.h>
#include <stdexcept>
#include <memory>
#include "asyncio.h"

class AIOAsyncIO : public AsyncIO
Expand Down Expand Up @@ -30,5 +32,5 @@ class AIOAsyncIO : public AsyncIO
void synchronize();

void register_file(int fd);
void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback);
void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned);
};
3 changes: 2 additions & 1 deletion include/async_file_io.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <string>
#include <torch/torch.h>
#include <optional>

#include "asyncio.h"
#include "backend.h"
Expand All @@ -18,7 +19,7 @@ class AsyncFileWriter
public:
AsyncFileWriter(int fd, unsigned int n_entries, const std::string &backend);
void write(size_t buffer, size_t n_bytes, unsigned long long offset, callback_t callback);
void write_tensor(torch::Tensor tensor, unsigned long long offset, callback_t callback);
void write_tensor(torch::Tensor tensor, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned);
void synchronize();
~AsyncFileWriter();

Expand Down
2 changes: 1 addition & 1 deletion include/asyncio.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,5 +48,5 @@ class AsyncIO
virtual void synchronize() = 0;

virtual void register_file(int fd) = 0;
virtual void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback) = 0;
virtual void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned) = 0;
};
2 changes: 1 addition & 1 deletion include/pthread_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,5 @@ class PthreadAsyncIO : public AsyncIO

void register_file(int fd);

void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback);
void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned);
};
2 changes: 1 addition & 1 deletion include/uring.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,5 @@ class UringAsyncIO : public AsyncIO
void synchronize();

void register_file(int fd);
void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback);
void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned);
};
2 changes: 1 addition & 1 deletion tensornvme/_C/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,5 @@ def probe_backend(backend: str) -> bool: ...
class AsyncFileWriter:
def __init__(self, fd: int, n_entries: int, backend: str = "aio") -> None: ...
def write(self, buffer: int, n_bytes: int, offset: int, callback: Optional[Callable[[], None]] = None) -> None: ...
def write_tensor(self, tensor: Tensor, offset: int, callback: Optional[Callable[[], None]] = None) -> None: ...
def write_tensor(self, tensor: Tensor, offset: int, callback: Optional[Callable[[], None]] = None, pinned: Optional[Tensor] = None) -> None: ...
def synchronize(self) -> None: ...
6 changes: 3 additions & 3 deletions tensornvme/async_file_io.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ctypes
from functools import partial
from torch import Tensor
from typing import List
from typing import List, Optional
from io import IOBase
from tensornvme._C import AsyncFileWriter as AsyncFileWriterC

Expand Down Expand Up @@ -31,9 +31,9 @@ def write_raw(self, py_ref: object, buffer: int, n_bytes: int, offset: int) -> N
self.io.write(buffer, n_bytes, offset, partial(AsyncFileWriter.gc_callback, self.buffers, len(self.buffers) - 1))
self.offset += n_bytes

def write_tensor(self, tensor: Tensor) -> None:
def write_tensor(self, tensor: Tensor, pinned: Optional[Tensor] = None) -> None:
self.buffers.append(tensor) # append before callback is called
self.io.write_tensor(tensor, self.offset, partial(AsyncFileWriter.gc_callback, self.buffers, len(self.buffers) - 1))
self.io.write_tensor(tensor, self.offset, partial(AsyncFileWriter.gc_callback, self.buffers, len(self.buffers) - 1), pinned)
self.offset += tensor.numel() * tensor.element_size()

@staticmethod
Expand Down

0 comments on commit 8aa59d9

Please sign in to comment.