Skip to content

Commit

Permalink
Various fixes to the DAOS tensorflow-io plugin. (#2)
Browse files Browse the repository at this point in the history
General:
    Asserts were added and enabled after each DAOS event-related call in order
    to track down internal race conditions in the DAOS client code, see
    DAOS-10601.

    The DAOS_FILE structure documented behavior for the 'offset' field, but
    most of that behavior didn't need to be implemented.  Field 'offset' was
    removed while fixing the Append() and Tell() functions, leaving only a
    single field 'file' in DAOS_FILE, so the DAOS_FILE struct was removed as
    well.

    Typos in error messages were corrected.

File dfs_utils.cc:

In DFS::Setup():
   Code after the Connect() call replaced the detailed error status set in
   Connect() with a generic TF_NOT_FOUND error with no accompanying message.
   This cost me several days of debugging to realize that a problem was not
   some object not being found, but rather was a connection failure to an
   unhealthy container.  The TF_NOT_FOUND has been removed, allowing the more
   detailed error messages in Connect() to be reported.

In ReadBuffer::ReadBuffer()
    By setting buffer_offset to ULONG_MAX, an uninitialized buffer will never
    be matched by CacheHit(), removing the need for a separate 'initialized'
    variable.  The valid variable is no longer needed as well, more on that
    below.

In ReadBuffer::~ReadBuffer()
    daos_event_fini(() cannot be called on an event that is still in flight,
    it fails without doing anything, daos_event_test() must wait for any prior
    event to complete, otherwise the event delete that follows
    daos_event_fini() could then cause corruption of the event queue.  Call the
    reworked WaitEvent() (see below) first to ensure that daos_event_fini()
    will clean up the event before it is deleted.

In ReadBuffer::FinalizeEvent()
    The same problem exists here as in ~ReadBuffer(), daos_event_fini() can't
    be called on an event that is still in flight.  However, FinalizeEvent()
    isn't actually needed, a call to dfs_file->buffers.clear() in Cleanup()
    accomplishes the same thing using the ~ReadBuffer code, so FinalizeEvent
    was removed.

ReadBuffer::WaitEvent()
    There is a need for a WaitEvent() function in several places to wait for
    any outstanding event to complete, but this routine manipulates 'valid',
    so it can't be used anywhere else.  Removed the 'valid' code so that this
    routine can become a void and be called in multiple places.

ReadBuffer::AbortEvent()
    daos_event_abort() doesn't actually contain any logic to ask the server to
    abort an in-flight dfs_read() request.  In addition it is buggy, internal
    DAOS asserts were hit due to daos_event_abort() calls during I/O testing.
    The code was changed to instead use WaitEvent() to simply wait for a prior
    read to complete before issuing a new one, and AbortEvent() was removed.

ReadBuffer::ReadAsync()
    Both daos_event_fini() and daos_event_init() must be called on a
    daos_event_t structure before the event can be reused for another
    dfs_read() call.  These have been added.  The AbortEvent() call was
    replaced with a call to WaitEvent().  The code was changed to save the
    errno from a failed dfs_read() call in the event's ev_error field so
    that the error will be detected, and so a user cannot accidentally read
    trash data after a failed dfs_read() call.

ReadBuffer::ReadSync()
    This function is no longer used, see below.

ReadBuffer::CopyData()
    The WaitEvent() call ensures that the thread blocks until any in-flight
    read request is done.  The event->ev_error field is used to detect I/O
    failure either at the time the dfs_read() is issued or in the reply, so
    the valid flag is no longer needed.

ReadBuffer::CopyFromCache()
    The TF_RandomAccessFile read() function allows for int64_t-sized reads, so
    change the return value here to int64_t.  If an I/O error occurred, then
    return -1 so that the caller function Read() can easily tell when there
    has been an I/O error.  Provide a detailed error message so that the user
    can tell what caused the error.

File dfs_filesystem.cc:

In DFSRandomAccessFile constructor:
    Added an assert() on the event queue creation.

In Cleanup():
    Replaced FinalizeEvent() code with a dfs_file->buffers.clear() call.
    Add asserts on dfs function calls.

In df_dfs_filesystem::Read():
    The loop "for (auto& read_buf : dfs_file->buffers)" was missing a break
    statement, so CacheHit was called 256 times for each curr_offset value.
    A break was added.

    Support was added for detecting a read error and returning -1.

    Since Read() is now a while loop, there is no reason to specially use
    ReadSync() for the first buffer.  Code changed to use ReadAsync() for
    all readahead, CopyFromCache() will block until the first buffer's I/O
    is complete.  ReadSync is now unused, and is removed.

    I could not determine a reason for the WaitEvent loop:
        if (curr_offset >= dfs_file->file_size)
    because I/O requests will never be be started beyond EOF.  The loop
    was removed.

In DFSWritableFile:
   The Append() function had to make a dfs_get_size() call for each append
   to a file, adding a second round trip to the server for each append.  This
   is very expensive.  Member functions were added to cache the file size
   and update it locally as Append() operations are done..  Since the
   tensorflow API only allows one writer, local caching is allowable.  Should
   there be an I/O error, the actual size of the file becomes unknown, the
   new member functions take that into account and call dfs_get_size() in
   those situations to reestablish the correct size of the file.

In Append():
   The dfs_file->daos_file.offset field was not updated after an Append()
   operation completed successfully, so a subsequent Tell() call would return
   the size of the file before the last Append(), not after, the reported size
   was incorrect.  The code was changed to update the cached file size after
   successful Append() operations.

In RenameFile():
    Similar to the Setup() case, the detailed error statuses in Connect() were
    being hidden by a genereric TF_NOT_FOUND error.  The generic error was
    removed.

Signed-off-by: Kevan Rehm <[email protected]>
  • Loading branch information
krehm authored Jun 5, 2022
1 parent 79b9d18 commit fab7f15
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 153 deletions.
143 changes: 87 additions & 56 deletions tensorflow_io/core/filesystems/dfs/dfs_filesystem.cc
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
#include "tensorflow_io/core/filesystems/dfs/dfs_utils.h"

#include <stdio.h>
#undef NDEBUG
#include <cassert>

namespace tensorflow {
namespace io {
namespace dfs {


// SECTION 1. Implementation for `TF_RandomAccessFile`
// ----------------------------------------------------------------------------
namespace tf_random_access_file {
typedef struct DFSRandomAccessFile {
std::string dfs_path;
dfs_t* daos_fs;
DAOS_FILE daos_file;
dfs_obj_t *daos_file;
std::vector<ReadBuffer> buffers;
daos_size_t file_size;
daos_handle_t mEventQueueHandle{};

DFSRandomAccessFile(std::string dfs_path, dfs_t* file_system, dfs_obj_t* obj)
: dfs_path(std::move(dfs_path)) {
daos_fs = file_system;
daos_file.file = obj;
daos_file = obj;
dfs_get_size(daos_fs, obj, &file_size);
size_t num_of_buffers;
size_t buff_size;
int rc = daos_eq_create(&mEventQueueHandle);
assert(rc == 0);

if (char* env_num_of_buffers = std::getenv("TF_IO_DAOS_NUM_OF_BUFFERS")) {
num_of_buffers = atoi(env_num_of_buffers);
Expand All @@ -42,12 +49,12 @@ typedef struct DFSRandomAccessFile {

void Cleanup(TF_RandomAccessFile* file) {
auto dfs_file = static_cast<DFSRandomAccessFile*>(file->plugin_file);
for (auto& buffer : dfs_file->buffers) {
buffer.FinalizeEvent();
}
dfs_file->buffers.clear();

daos_eq_destroy(dfs_file->mEventQueueHandle, 0);
dfs_release(dfs_file->daos_file.file);
int rc = daos_eq_destroy(dfs_file->mEventQueueHandle, 0);
assert(rc == 0);
rc = dfs_release(dfs_file->daos_file);
assert(rc == 0);
dfs_file->daos_fs = nullptr;
delete dfs_file;
}
Expand All @@ -65,14 +72,19 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
int64_t total_bytes = 0;
size_t ret_size = offset + n;
while (curr_offset < ret_size && curr_offset < dfs_file->file_size) {
size_t read_bytes = 0;
int64_t read_bytes = 0;
for (auto& read_buf : dfs_file->buffers) {
if (read_buf.CacheHit(curr_offset)) {
read_bytes = read_buf.CopyFromCache(ret, ret_offset, curr_offset, n,
dfs_file->file_size, status);
break;
}
}

if (read_bytes < 0) {
return -1;
}

if (read_bytes > 0) {
curr_offset += read_bytes;
ret_offset += read_bytes;
Expand All @@ -81,30 +93,13 @@ int64_t Read(const TF_RandomAccessFile* file, uint64_t offset, size_t n,
continue;
}

size_t async_offset = curr_offset + BUFF_SIZE;
for (size_t i = 1; i < dfs_file->buffers.size(); i++) {
size_t async_offset = curr_offset;
for (size_t i = 0; i < dfs_file->buffers.size(); i++) {
if (async_offset > dfs_file->file_size) break;
dfs_file->buffers[i].ReadAsync(dfs_file->daos_fs,
dfs_file->daos_file.file, async_offset);
dfs_file->daos_file, async_offset);
async_offset += BUFF_SIZE;
}

dfs_file->buffers[0].ReadSync(dfs_file->daos_fs, dfs_file->daos_file.file,
curr_offset);

read_bytes = dfs_file->buffers[0].CopyFromCache(
ret, ret_offset, curr_offset, n, dfs_file->file_size, status);

curr_offset += read_bytes;
ret_offset += read_bytes;
total_bytes += read_bytes;
n -= read_bytes;

if (curr_offset >= dfs_file->file_size) {
for (size_t i = 0; i < dfs_file->buffers.size(); i++) {
dfs_file->buffers[i].WaitEvent();
}
}
}

return total_bytes;
Expand All @@ -118,17 +113,42 @@ namespace tf_writable_file {
typedef struct DFSWritableFile {
std::string dfs_path;
dfs_t* daos_fs;
DAOS_FILE daos_file;
dfs_obj_t *daos_file;
daos_size_t file_size;
bool size_known;

DFSWritableFile(std::string dfs_path, dfs_t* file_system, dfs_obj_t* obj)
: dfs_path(std::move(dfs_path)) {
daos_fs = file_system;
daos_file.file = obj;
daos_file = obj;
size_known=false;
}

int get_file_size(daos_size_t &size) {
if (!size_known) {
int rc = dfs_get_size(daos_fs, daos_file, &file_size);
if (rc != 0) {
return rc;
}
size_known = true;
}
size = file_size;
return 0;
}

void set_file_size(daos_size_t size) {
file_size = size;
size_known = true;
}

void unset_file_size(void) {
size_known = false;
}
} DFSWritableFile;

void Cleanup(TF_WritableFile* file) {
auto dfs_file = static_cast<DFSWritableFile*>(file->plugin_file);
dfs_release(dfs_file->daos_file.file);
dfs_release(dfs_file->daos_file);
dfs_file->daos_fs = nullptr;
delete dfs_file;
}
Expand All @@ -144,32 +164,44 @@ void Append(const TF_WritableFile* file, const char* buffer, size_t n,
wsgl.sg_nr = 1;
wsgl.sg_iovs = &iov;

daos_size_t size;
dfs_get_size(dfs_file->daos_fs, dfs_file->daos_file.file, &size);
dfs_file->daos_file.offset = size;
daos_size_t cur_file_size;
rc = dfs_file->get_file_size(cur_file_size);
if (rc != 0) {
TF_SetStatus(status, TF_INTERNAL, "Cannot determine file size");
return;
}

rc = dfs_write(dfs_file->daos_fs, dfs_file->daos_file.file, &wsgl,
dfs_file->daos_file.offset, NULL);
rc = dfs_write(dfs_file->daos_fs, dfs_file->daos_file, &wsgl,
cur_file_size, NULL);
if (rc) {
TF_SetStatus(status, TF_RESOURCE_EXHAUSTED, "");
dfs_file->unset_file_size();
return;
}

dfs_file->set_file_size(cur_file_size + n);
TF_SetStatus(status, TF_OK, "");
}

int64_t Tell(const TF_WritableFile* file, TF_Status* status) {
auto dfs_file = static_cast<DFSWritableFile*>(file->plugin_file);

TF_SetStatus(status, TF_OK, "");
daos_size_t cur_file_size;
int rc = dfs_file->get_file_size(cur_file_size);
if (rc != 0) {
TF_SetStatus(status, TF_INTERNAL, "Cannot determine file size");
return -1;
}

return dfs_file->daos_file.offset;
TF_SetStatus(status, TF_OK, "");
return cur_file_size;
}

void Close(const TF_WritableFile* file, TF_Status* status) {
auto dfs_file = static_cast<DFSWritableFile*>(file->plugin_file);
dfs_release(dfs_file->daos_file.file);
dfs_release(dfs_file->daos_file);
dfs_file->daos_fs = nullptr;
dfs_file->daos_file.file = nullptr;
dfs_file->daos_file = nullptr;
TF_SetStatus(status, TF_OK, "");
}

Expand Down Expand Up @@ -206,7 +238,7 @@ void NewFile(const TF_Filesystem* filesystem, const char* path, File_Mode mode,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
std::string pool, cont, file_path;
Expand All @@ -222,7 +254,7 @@ void NewWritableFile(const TF_Filesystem* filesystem, const char* path,
if (TF_GetCode(status) != TF_OK) return;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
file->plugin_file =
Expand All @@ -237,13 +269,13 @@ void NewRandomAccessFile(const TF_Filesystem* filesystem, const char* path,
if (TF_GetCode(status) != TF_OK) return;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
auto random_access_file =
new tf_random_access_file::DFSRandomAccessFile(path, daos->daos_fs, obj);
random_access_file->buffers[0].ReadAsync(
daos->daos_fs, random_access_file->daos_file.file, 0);
daos->daos_fs, random_access_file->daos_file, 0);
file->plugin_file = random_access_file;
TF_SetStatus(status, TF_OK, "");
}
Expand All @@ -255,7 +287,7 @@ void NewAppendableFile(const TF_Filesystem* filesystem, const char* path,
if (TF_GetCode(status) != TF_OK) return;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
file->plugin_file =
Expand All @@ -268,7 +300,7 @@ void PathExists(const TF_Filesystem* filesystem, const char* path,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
std::string pool, cont, file;
Expand All @@ -288,7 +320,7 @@ void CreateDir(const TF_Filesystem* filesystem, const char* path,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
std::string pool, cont, dir_path;
Expand All @@ -304,7 +336,7 @@ static void RecursivelyCreateDir(const TF_Filesystem* filesystem,
std::string pool, cont, dir_path;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
rc = daos->Setup(path, pool, cont, dir_path, status);
Expand Down Expand Up @@ -333,13 +365,13 @@ void DeleteFileSystemEntry(const TF_Filesystem* filesystem, const char* path,
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();

if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}

rc = daos->Setup(path, pool, cont, dir_path, status);
if (rc) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}

Expand Down Expand Up @@ -376,7 +408,7 @@ bool IsDir(const TF_Filesystem* filesystem, const char* path,
std::string pool, cont, file;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return is_dir;
}
rc = daos->Setup(path, pool, cont, file, status);
Expand Down Expand Up @@ -411,7 +443,7 @@ int64_t GetFileSize(const TF_Filesystem* filesystem, const char* path,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return -1;
}
std::string pool, cont, file;
Expand Down Expand Up @@ -448,7 +480,7 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
int allow_cont_creation = 1;
Expand All @@ -473,7 +505,6 @@ void RenameFile(const TF_Filesystem* filesystem, const char* src,

daos->Connect(pool_src, cont_src, allow_cont_creation, status);
if (TF_GetCode(status) != TF_OK) {
TF_SetStatus(status, TF_NOT_FOUND, "");
return;
}

Expand Down Expand Up @@ -558,7 +589,7 @@ void Stat(const TF_Filesystem* filesystem, const char* path,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return;
}
std::string pool, cont, dir_path;
Expand Down Expand Up @@ -598,7 +629,7 @@ int GetChildren(const TF_Filesystem* filesystem, const char* path,
int rc;
auto daos = static_cast<DFS*>(filesystem->plugin_filesystem)->Load();
if (!daos) {
TF_SetStatus(status, TF_INTERNAL, "Error initializng DAOS API");
TF_SetStatus(status, TF_INTERNAL, "Error initializing DAOS API");
return -1;
}
std::string pool, cont, dir_path;
Expand Down
Loading

0 comments on commit fab7f15

Please sign in to comment.