Skip to content

Commit

Permalink
[feature] add debug log (#55)
Browse files Browse the repository at this point in the history
  • Loading branch information
ver217 authored Nov 27, 2024
1 parent a411b55 commit a4d34bf
Show file tree
Hide file tree
Showing 14 changed files with 161 additions and 44 deletions.
26 changes: 16 additions & 10 deletions csrc/aio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ void AIOAsyncIO::get_event(WaitType wt)
std::unique_ptr<io_event> events(new io_event[this->max_nr]);
int num_events;

if(wt == WAIT)
if (wt == WAIT)
num_events = io_getevents(io_ctx, this->min_nr, this->max_nr, events.get(), &(this->timeout)); /* 获得异步I/O event个数 */
else
num_events = io_getevents(io_ctx, 0, this->max_nr, events.get(), &(this->timeout)); /* 获得异步I/O event个数 */
Expand All @@ -48,7 +48,7 @@ void AIOAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long long
{
struct iocb iocb
{
}; //建立一个异步I/O需求
}; // 建立一个异步I/O需求
struct iocb *iocbs = &iocb;
auto *data = new IOData(WRITE, callback);

Expand All @@ -64,7 +64,7 @@ void AIOAsyncIO::read(int fd, void *buffer, size_t n_bytes, unsigned long long o
{
struct iocb iocb
{
}; //建立一个异步I/O需求
}; // 建立一个异步I/O需求
struct iocb *iocbs = &iocb;
auto *data = new IOData(READ, callback);

Expand Down Expand Up @@ -98,7 +98,7 @@ void AIOAsyncIO::writev(int fd, const iovec *iov, unsigned int iovcnt, unsigned
{
struct iocb iocb
{
}; //建立一个异步I/O需求
}; // 建立一个异步I/O需求
struct iocb *iocbs = &iocb;
auto *data = new IOData(WRITE, callback, iov);

Expand All @@ -114,7 +114,7 @@ void AIOAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned l
{
struct iocb iocb
{
}; //建立一个异步I/O需求
}; // 建立一个异步I/O需求
struct iocb *iocbs = &iocb;
auto *data = new IOData(READ, callback, iov);

Expand All @@ -126,12 +126,17 @@ void AIOAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned l
this->n_read_events++;
}

void AIOAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned) {
if (t.is_cuda()) {
if (pinned.has_value()) {
void AIOAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned)
{
if (t.is_cuda())
{
if (pinned.has_value())
{
pinned.value().copy_(t);
t = pinned.value();
} else {
}
else
{
t = t.to(torch::kCPU);
}
}
Expand All @@ -141,4 +146,5 @@ void AIOAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset
}

void AIOAsyncIO::register_h2d(unsigned int num_tensors) {}
void AIOAsyncIO::sync_h2d() {}
void AIOAsyncIO::sync_h2d() {}
void AIOAsyncIO::register_tasks(unsigned int num_tasks) {}
14 changes: 11 additions & 3 deletions csrc/async_file_io.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@ void AsyncFileWriter::write(size_t buffer, size_t n_bytes, unsigned long long of
this->aio->write(this->fd, ptr, n_bytes, offset, callback);
}

void AsyncFileWriter::write_tensor(torch::Tensor tensor, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned) {
void AsyncFileWriter::write_tensor(torch::Tensor tensor, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned)
{
this->aio->write_tensor(this->fd, tensor, offset, callback, pinned);
}

void AsyncFileWriter::register_h2d(unsigned int num_tensors) {
void AsyncFileWriter::register_h2d(unsigned int num_tensors)
{
this->aio->register_h2d(num_tensors);
}

void AsyncFileWriter::sync_h2d() {
void AsyncFileWriter::sync_h2d()
{
this->aio->sync_h2d();
}

Expand All @@ -28,4 +31,9 @@ void AsyncFileWriter::synchronize()
AsyncFileWriter::~AsyncFileWriter()
{
delete this->aio;
}

void AsyncFileWriter::register_tasks(unsigned int num_tasks)
{
this->aio->register_tasks(num_tasks);
}
57 changes: 40 additions & 17 deletions csrc/backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,25 +41,32 @@ void probe_asyncio(const std::string &backend)
try
{
std::unique_ptr<AsyncIO> aio;
if (backend == "uring") {
if (backend == "uring")
{
#ifndef DISABLE_URING
aio.reset(new UringAsyncIO(2));
#else
throw std::runtime_error("backend uring is not installed\n");
#endif
} else if (backend == "aio") {
}
else if (backend == "aio")
{
#ifndef DISABLE_AIO
aio.reset(new AIOAsyncIO(2));
#else
throw std::runtime_error("backend aio is not installed\n");
#endif
} else if (backend == "pthread") {
}
else if (backend == "pthread")
{
#ifndef DISABLE_PTHREAD
aio.reset(new PthreadAsyncIO(2));
#else
throw std::runtime_error("backend pthread is not installed\n");
#endif
} else {
}
else
{
throw std::runtime_error("unknown backend");
}

Expand Down Expand Up @@ -119,25 +126,40 @@ bool probe_backend(const std::string &backend)
}
}

std::string get_default_backend() {
const char* env = getenv("TENSORNVME_BACKEND");
if (env == nullptr) {
std::string get_default_backend()
{
const char *env = getenv("TENSORNVME_BACKEND");
if (env == nullptr)
{
return std::string("");
}
return std::string(env);
}

bool get_debug_flag() {
const char* env_ = getenv("TENSORNVME_DEBUG");
if (env_ == nullptr) {
bool get_debug_flag()
{
const char *env_ = getenv("TENSORNVME_DEBUG");
if (env_ == nullptr)
{
return false;
}
std::string env(env_);
std::transform(env.begin(), env.end(), env.begin(),
[](unsigned char c) { return std::tolower(c); });
[](unsigned char c)
{ return std::tolower(c); });
return env == "1" || env == "true";
}

std::string get_debug_log()
{
const char *env_ = getenv("TENSORNVME_DEBUG_LOG");
if (env_ == nullptr)
{
return std::string("");
}
return std::string(env_);
}

AsyncIO *create_asyncio(unsigned int n_entries, std::string backend)
{
std::unordered_set<std::string> backends = get_backends();
Expand All @@ -147,18 +169,19 @@ AsyncIO *create_asyncio(unsigned int n_entries, std::string backend)
if (backends.empty())
throw std::runtime_error("No asyncio backend is installed");

if (default_backend.size() > 0) { // priority 1: environ is set
if (is_debugging) {
if (default_backend.size() > 0)
{ // priority 1: environ is set
if (is_debugging)
{
std::cout << "[backend] backend is overwritten by environ TENSORNVME_BACKEND from " << backend << " to " << default_backend << std::endl;
}
backend = default_backend;
} else if (backend.size() > 0) { // priority 2: backend is set
}
else if (backend.size() > 0)
{ // priority 2: backend is set
if (backends.find(backend) == backends.end())
throw std::runtime_error("Unsupported backend: " + backend);
}
if (is_debugging) {
std::cout << "[backend] using backend: " << backend << std::endl;
}

if (!probe_backend(backend))
throw std::runtime_error("Backend \"" + backend + "\" is not install correctly");
Expand Down
54 changes: 51 additions & 3 deletions csrc/pthread_backend.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,35 @@
#include "pthread_backend.h"
#include <fstream>

void write_file(const std::string &filename, const std::string &content)
{
std::ofstream file(filename, std::ios::app);
file << content << std::endl;
file.close();
}

void PthreadAsyncIO::write(int fd, void *buffer, size_t n_bytes, unsigned long long offset, callback_t callback)
{
auto fut = this->pool.submit_task(
[fd, buffer, n_bytes, offset]
[this, fd, buffer, n_bytes, offset]
{
return pwrite(fd, buffer, n_bytes, offset);
auto val = pwrite(fd, buffer, n_bytes, offset);
if (this->is_debug)
{
auto cur_tasks = this->tasks_in_progress.fetch_sub(1);
if (cur_tasks == 1)
{
if (this->debug_log.empty())
{
std::cout << "All tasks are completed" << std::endl;
}
else
{
write_file(this->debug_log, "All tasks are completed");
}
}
}
return val;
});
this->write_fut.push_back(std::make_tuple(std::move(fut), callback));
}
Expand Down Expand Up @@ -144,7 +168,31 @@ void PthreadAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long of
}
void *buf = cpu_tensor.data_ptr();
size_t n_bytes = cpu_tensor.numel() * cpu_tensor.element_size();
return pwrite(fd, buf, n_bytes, offset);
auto val = pwrite(fd, buf, n_bytes, offset);
if (this->is_debug)
{
auto cur_tasks = this->tasks_in_progress.fetch_sub(1);
if (cur_tasks == 1)
{
if (this->debug_log.empty())
{
std::cout << "All tasks are completed" << std::endl;
}
else
{
write_file(this->debug_log, "All tasks are completed");
}
}
}
return val;
});
this->write_fut.push_back(std::make_tuple(std::move(fut), callback));
}

void PthreadAsyncIO::register_tasks(unsigned int num_tasks)
{
if (this->is_debug)
{
this->tasks_in_progress.store(num_tasks);
}
}
3 changes: 2 additions & 1 deletion csrc/py_api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,6 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m)
.def("write_tensor", &AsyncFileWriter::write_tensor, py::arg("tensor"), py::arg("offset"), py::arg("callback") = py::none(), py::arg("pinned") = py::none())
.def("synchronize", &AsyncFileWriter::synchronize)
.def("sync_h2d", &AsyncFileWriter::sync_h2d)
.def("register_h2d", &AsyncFileWriter::register_h2d, py::arg("num_tensors"));
.def("register_h2d", &AsyncFileWriter::register_h2d, py::arg("num_tensors"))
.def("register_tasks", &AsyncFileWriter::register_tasks, py::arg("num_tasks"));
}
25 changes: 17 additions & 8 deletions csrc/uring.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ UringAsyncIO::~UringAsyncIO()
void UringAsyncIO::get_event(WaitType wt)
{
io_uring_cqe *cqe;
if (wt == WAIT){
if (wt == WAIT)
{
io_uring_wait_cqe(&this->ring, &cqe);
}
else{
else
{
int ret = io_uring_peek_cqe(&this->ring, &cqe);
if (ret != 0) return;
if (ret != 0)
return;
}

std::unique_ptr<IOData> data(static_cast<IOData *>(io_uring_cqe_get_data(cqe)));
Expand Down Expand Up @@ -99,12 +102,17 @@ void UringAsyncIO::readv(int fd, const iovec *iov, unsigned int iovcnt, unsigned
this->n_read_events++;
}

void UringAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned) {
if (t.is_cuda()) {
if (pinned.has_value()) {
void UringAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned)
{
if (t.is_cuda())
{
if (pinned.has_value())
{
pinned.value().copy_(t);
t = pinned.value();
} else {
}
else
{
t = t.to(torch::kCPU);
}
}
Expand All @@ -114,4 +122,5 @@ void UringAsyncIO::write_tensor(int fd, torch::Tensor t, unsigned long long offs
}

void UringAsyncIO::register_h2d(unsigned int num_tensors) {}
void UringAsyncIO::sync_h2d() {}
void UringAsyncIO::sync_h2d() {}
void UringAsyncIO::register_tasks(unsigned int num_tasks) {}
1 change: 1 addition & 0 deletions include/aio.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,4 +35,5 @@ class AIOAsyncIO : public AsyncIO

void register_file(int fd);
void write_tensor(int fd, torch::Tensor t, unsigned long long offset, callback_t callback, std::optional<torch::Tensor> pinned);
void register_tasks(unsigned int num_tasks);
};
1 change: 1 addition & 0 deletions include/async_file_io.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class AsyncFileWriter
void synchronize();
void register_h2d(unsigned int num_tensors);
void sync_h2d();
void register_tasks(unsigned int num_tasks);
~AsyncFileWriter();

private:
Expand Down
1 change: 1 addition & 0 deletions include/asyncio.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class AsyncIO
virtual void sync_write_events() = 0;
virtual void sync_read_events() = 0;
virtual void register_h2d(unsigned int num_tensors) = 0;
virtual void register_tasks(unsigned int num_tasks) = 0;
virtual void sync_h2d() = 0;
virtual void synchronize() = 0;

Expand Down
2 changes: 2 additions & 0 deletions include/backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,5 @@ std::string get_default_backend();
bool get_debug_flag();

AsyncIO *create_asyncio(unsigned int n_entries, std::string backend);

std::string get_debug_log();
Loading

0 comments on commit a4d34bf

Please sign in to comment.