Skip to content

Commit

Permalink
Update and test shard writer.
Browse files Browse the repository at this point in the history
  • Loading branch information
aliddell committed Dec 10, 2024
1 parent dfc3228 commit dbed6e7
Show file tree
Hide file tree
Showing 7 changed files with 224 additions and 45 deletions.
83 changes: 64 additions & 19 deletions src/streaming/shard.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,21 @@
#include "vectorized.file.writer.hh"

#include <algorithm>
#include <filesystem>
#include <limits>

namespace fs = std::filesystem;

#ifdef max
#undef max
#endif

zarr::ShardWriter::ShardWriter(std::shared_ptr<ThreadPool> thread_pool,
zarr::ShardWriter::ShardWriter(std::string_view file_path,
uint32_t chunks_before_flush,
uint32_t chunks_per_shard)
: thread_pool_(thread_pool)
: file_path_(file_path)
, chunks_before_flush_{ chunks_before_flush }
, chunks_per_shard_{ chunks_per_shard }
, chunks_flushed_{ 0 }
, cumulative_size_{ 0 }
, file_offset_{ 0 }
Expand All @@ -37,33 +41,74 @@ zarr::ShardWriter::add_chunk(ChunkBufferPtr buffer, uint32_t index_in_shard)

chunks_.push_back(buffer);
if (chunks_.size() == chunks_before_flush_) {
auto job = [this](std::string& err) -> bool {
std::vector<std::span<std::byte>> buffers;
buffers.reserve(chunks_.size() + 1);
for (const auto& chunk : chunks_) {
buffers.push_back(std::span(*chunk));
}
buffers.push_back(std::span(index_table_));
// VectorizedFileWriter writer()


chunks_.clear();
cv_.notify_all();
return true;
};

EXPECT(thread_pool_->push_job(job),
"Failed to push job to thread pool.");
flush_();
}
}

bool
zarr::ShardWriter::flush_()
{
std::vector<std::span<std::byte>> buffers;
buffers.reserve(chunks_.size() + 1);
for (const auto& chunk : chunks_) {
buffers.emplace_back(*chunk);
}
buffers.emplace_back(index_table_);

try {
VectorizedFileWriter writer(file_path_);
writer.write_vectors(buffers, file_offset_);
} catch (const std::exception& exc) {
LOG_ERROR("Failed to write chunk: ", std::string(exc.what()));
return false;
}

chunks_flushed_ += chunks_.size();
chunks_.clear();
chunks_.reserve(chunks_before_flush_);
file_offset_ = cumulative_size_;

cv_.notify_all();

return true;
}

void
zarr::ShardWriter::set_offset_extent_(uint32_t shard_internal_index,
uint64_t offset,
uint64_t size)
{
EXPECT(shard_internal_index < chunks_per_shard_,
"Shard internal index ",
shard_internal_index,
" out of bounds");

auto* index_table_u64 = reinterpret_cast<uint64_t*>(index_table_.data());
const auto index = 2 * shard_internal_index;
index_table_u64[index] = offset;
index_table_u64[index + 1] = size;
}

bool
zarr::finalize_shard_writer(std::unique_ptr<ShardWriter>&& writer)
{
if (writer == nullptr) {
LOG_INFO("Writer is null. Nothing to finalize.");
return true;
}

if (!writer->flush_()) {
return false;
}

// resize file if necessary
const auto file_size = fs::file_size(writer->file_path_);
const auto expected_size = writer->cumulative_size_ +
writer->chunks_per_shard_ * 2 * sizeof(uint64_t);
if (file_size > expected_size) {
fs::resize_file(writer->file_path_, expected_size);
}

writer.reset();
return true;
}
28 changes: 13 additions & 15 deletions src/streaming/shard.writer.hh
Original file line number Diff line number Diff line change
Expand Up @@ -3,49 +3,47 @@
#include "thread.pool.hh"

#include <condition_variable>
#include <latch>
#include <mutex>
#include <vector>

#ifdef min
#undef min
#endif
#include <atomic>

namespace zarr {
struct ChunkIndex
{
uint32_t buffer_idx;
uint64_t offset;
uint64_t size;
};

class ShardWriter
{
public:
using ChunkBufferPtr = std::vector<std::byte>*;

ShardWriter(std::shared_ptr<ThreadPool> thread_pool,
ShardWriter(std::string_view file_path,
uint32_t chunks_before_flush,
uint32_t chunks_per_shard);
~ShardWriter() = default;

void add_chunk(ChunkBufferPtr buffer, uint32_t chunk_buffer_index);
void add_chunk(ChunkBufferPtr buffer, uint32_t index_in_shard);

private:
uint32_t chunks_before_flush_;
uint32_t chunks_per_shard_;
uint32_t chunks_flushed_;
std::string file_path_;

std::vector<std::byte> index_table_;

std::shared_ptr<ThreadPool> thread_pool_;

std::mutex mutex_;
std::condition_variable cv_;

std::vector<ChunkBufferPtr> chunks_;
uint64_t cumulative_size_;
uint64_t file_offset_;

void set_offset_extent_(uint32_t shard_internal_index,
uint64_t offset,
uint64_t size);
[[nodiscard]] bool flush_();

friend bool finalize_shard_writer(std::unique_ptr<ShardWriter>&& writer);
};

bool
finalize_shard_writer(std::unique_ptr<ShardWriter>&& writer);
} // namespace zarr
15 changes: 8 additions & 7 deletions src/streaming/vectorized.file.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,11 @@ get_last_error_as_string()
}

size_t
get_sector_size(const std::string& path)
get_sector_size(std::string_view path)
{
// Get volume root path
char volume_path[MAX_PATH];
if (!GetVolumePathNameA(path.c_str(), volume_path, MAX_PATH)) {
if (!GetVolumePathNameA(path.data(), volume_path, MAX_PATH)) {
return 0;
}

Expand Down Expand Up @@ -65,7 +65,7 @@ get_last_error_as_string()
#endif
} // namespace

zarr::VectorizedFileWriter::VectorizedFileWriter(const std::string& path)
zarr::VectorizedFileWriter::VectorizedFileWriter(std::string_view path)
{
#ifdef _WIN32
SYSTEM_INFO si;
Expand All @@ -77,7 +77,7 @@ zarr::VectorizedFileWriter::VectorizedFileWriter(const std::string& path)
throw std::runtime_error("Failed to get sector size");
}

handle_ = CreateFileA(path.c_str(),
handle_ = CreateFileA(path.data(),
GENERIC_WRITE,
0, // No sharing
nullptr,
Expand All @@ -87,13 +87,14 @@ zarr::VectorizedFileWriter::VectorizedFileWriter(const std::string& path)
nullptr);
if (handle_ == INVALID_HANDLE_VALUE) {
auto err = get_last_error_as_string();
throw std::runtime_error("Failed to open file '" + path + "': " + err);
throw std::runtime_error("Failed to open file '" + std::string(path) +
"': " + err);
}
#else
page_size_ = sysconf(_SC_PAGESIZE);
fd_ = open(path.c_str(), O_WRONLY | O_CREAT, 0644);
fd_ = open(path.data(), O_WRONLY | O_CREAT, 0644);
if (fd_ < 0) {
throw std::runtime_error("Failed to open file: " + path);
throw std::runtime_error("Failed to open file: " + std::string(path));
}
#endif
}
Expand Down
9 changes: 6 additions & 3 deletions src/streaming/vectorized.file.writer.hh
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
#pragma once

#include <cstdint>
#include <mutex>
#include <span>
#include <string_view>
#include <vector>
#include <mutex>
#include <cstdint>

#ifdef _WIN32
#include <windows.h>
#undef min
#undef max
#else
#include <sys/uio.h>
#include <unistd.h>
Expand All @@ -17,7 +20,7 @@ namespace zarr {
class VectorizedFileWriter
{
public:
explicit VectorizedFileWriter(const std::string& path);
explicit VectorizedFileWriter(std::string_view path);
~VectorizedFileWriter();

bool write_vectors(const std::vector<std::span<std::byte>>& buffers,
Expand Down
2 changes: 1 addition & 1 deletion src/streaming/zarrv3.array.writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ zarr::ZarrV3ArrayWriter::ZarrV3ArrayWriter(
shards_ready_.resize(n_shards);
for (auto& shard : shards_ready_) {
shard.reset(
new ShardWriter(thread_pool_, chunks_before_flush, chunks_per_shard));
new ShardWriter("foo", chunks_before_flush, chunks_per_shard));
}
}

Expand Down
1 change: 1 addition & 0 deletions tests/unit-tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ set(tests
zarrv3-writer-write-ragged-append-dim
zarrv3-writer-write-ragged-internal-dim
vectorized-file-write
shard-writer-add-chunk-data-to-flush
)

foreach (name ${tests})
Expand Down
Loading

0 comments on commit dbed6e7

Please sign in to comment.