Skip to content

Commit

Permalink
Refactor file storage, part11 (#1972)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

### Type of change

- [x] Refactoring

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Oct 6, 2024
1 parent 82bd47d commit 19c5aa6
Show file tree
Hide file tree
Showing 55 changed files with 439 additions and 467 deletions.
2 changes: 0 additions & 2 deletions benchmark/local_infinity/knn/hnsw_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ import hnsw_alg;
import vec_store_type;
import compilation_config;
import virtual_store;
import file_system;
import file_system_type;
import status;
import hnsw_common;
import infinity_exception;
Expand Down
1 change: 0 additions & 1 deletion benchmark/local_infinity/knn/hnsw_benchmark_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#include "CLI11.hpp"

import stl;
import file_system;
import virtual_store;
import infinity_exception;
import abstract_file_handle;
Expand Down
2 changes: 0 additions & 2 deletions benchmark/local_infinity/sparse/bmp_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
#include <stdexcept>

import stl;
import file_system;
import virtual_store;
import infinity_exception;
import file_system_type;
import compilation_config;
import third_party;
import profiler;
Expand Down
2 changes: 0 additions & 2 deletions benchmark/local_infinity/sparse/sparse_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,8 @@
#include <stdexcept>

import stl;
import file_system;
import virtual_store;
import infinity_exception;
import file_system_type;
import compilation_config;
import third_party;
import profiler;
Expand Down
1 change: 0 additions & 1 deletion benchmark/local_infinity/sparse/sparse_benchmark_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
import stl;
import virtual_store;
import infinity_exception;
import file_system_type;
import third_party;
import infinity_exception;
import sparse_util;
Expand Down
1 change: 0 additions & 1 deletion src/executor/operator/physical_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import query_context;
import operator_state;

import profiler;
import local_file_system;
import file_writer;
import table_def;
import data_table;
Expand Down
9 changes: 3 additions & 6 deletions src/executor/operator/physical_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,10 @@ import value;
import catalog;
import catalog_delta_entry;
import build_fast_rough_filter_task;
import stream_io;
import stream_reader;
import parser_assert;
import virtual_store;
import abstract_file_handle;
import file_system_type;

namespace infinity {

Expand Down Expand Up @@ -577,9 +576,7 @@ void PhysicalImport::ImportCSV(QueryContext *query_context, ImportOperatorState
}

void PhysicalImport::ImportJSONL(QueryContext *query_context, ImportOperatorState *import_op_state) {
StreamIO stream_io;
stream_io.Init(file_path_, FileFlags::READ_FLAG);
DeferFn file_defer([&]() { stream_io.Close(); });
UniquePtr<StreamReader> stream_reader = LocalStore::OpenStreamReader(file_path_);

Txn *txn = query_context->GetTxn();
u64 segment_id = Catalog::GetNextSegmentID(table_entry_);
Expand All @@ -595,7 +592,7 @@ void PhysicalImport::ImportJSONL(QueryContext *query_context, ImportOperatorStat
SizeT row_count{0};
while (true) {
String json_str;
if (stream_io.ReadLine(json_str)) {
if (stream_reader->ReadLine(json_str)) {
nlohmann::json line_json = nlohmann::json::parse(json_str);
try {
JSONLRowHandler(line_json, column_vectors);
Expand Down
1 change: 0 additions & 1 deletion src/storage/buffer/file_worker/bmp_index_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import bmp_util;
import bmp_alg;
import abstract_bmp;
import virtual_store;
import file_system_type;
import persistence_manager;
import abstract_file_handle;

Expand Down
1 change: 0 additions & 1 deletion src/storage/buffer/file_worker/file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import utility;
import infinity_exception;
import local_file_handle;
import third_party;
import file_system_type;
import defer_op;
import status;
import virtual_store;
Expand Down
1 change: 0 additions & 1 deletion src/storage/buffer/file_worker/hnsw_file_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ import embedding_info;
import create_index_info;
import internal_types;
import abstract_hnsw;
import file_system_type;
import virtual_store;
import persistence_manager;
import abstract_file_handle;
Expand Down
10 changes: 7 additions & 3 deletions src/storage/data_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,8 @@ void DataTable::UnionWith(const SharedPtr<DataTable> &other) {
UnrecoverableError(error_message);
}
if (this->data_blocks_.size() != other->data_blocks_.size()) {
String error_message = fmt::format("Can't union two table with different block count {}:{}.", this->data_blocks_.size(), other->data_blocks_.size());
String error_message =
fmt::format("Can't union two table with different block count {}:{}.", this->data_blocks_.size(), other->data_blocks_.size());
UnrecoverableError(error_message);
}
SizeT block_count = this->data_blocks_.size();
Expand All @@ -105,7 +106,9 @@ void DataTable::Append(const SharedPtr<DataBlock> &data_block) {
UpdateRowCount(data_block->row_count());
}

SharedPtr<DataTable> DataTable::Make(SharedPtr<TableDef> table_def_ptr, TableType type) { return MakeShared<DataTable>(std::move(table_def_ptr), type); }
SharedPtr<DataTable> DataTable::Make(SharedPtr<TableDef> table_def_ptr, TableType type) {
return MakeShared<DataTable>(std::move(table_def_ptr), type);
}

SharedPtr<DataTable> DataTable::MakeResultTable(const Vector<SharedPtr<ColumnDef>> &column_defs) {
SharedPtr<TableDef> result_table_def_ptr = TableDef::Make(nullptr, nullptr, column_defs);
Expand All @@ -119,7 +122,8 @@ SharedPtr<DataTable> DataTable::MakeEmptyResultTable() {

SharedPtr<DataTable> DataTable::MakeSummaryResultTable(u64 count, u64 sum) {
Vector<SharedPtr<ColumnDef>> column_defs;
column_defs.emplace_back(MakeShared<ColumnDef>(0, std::make_shared<DataType>(LogicalType::kBigInt, nullptr), "count", std::set<ConstraintType>()));
column_defs.emplace_back(
MakeShared<ColumnDef>(0, std::make_shared<DataType>(LogicalType::kBigInt, nullptr), "count", std::set<ConstraintType>()));
column_defs.emplace_back(MakeShared<ColumnDef>(1, std::make_shared<DataType>(LogicalType::kBigInt, nullptr), "sum", std::set<ConstraintType>()));
SharedPtr<TableDef> result_table_def_ptr = MakeShared<TableDef>(nullptr, nullptr, column_defs);
SharedPtr<DataTable> result_table = Make(result_table_def_ptr, TableType::kResult);
Expand Down
5 changes: 3 additions & 2 deletions src/storage/invertedindex/disk_segment_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import logger;
import persistence_manager;
import infinity_context;
import persist_result_handler;
import virtual_store;

namespace infinity {

Expand Down Expand Up @@ -64,7 +65,7 @@ DiskIndexSegmentReader::DiskIndexSegmentReader(const String &index_dir, const St
// Empty posting
return;
}
int rc = fs_.MmapFile(posting_file, data_ptr_, data_len_);
i32 rc = LocalStore::MmapFile(posting_file, data_ptr_, data_len_);
assert(rc == 0);
if (rc != 0) {
Status status = Status::MmapFileError(posting_file);
Expand All @@ -91,7 +92,7 @@ DiskIndexSegmentReader::~DiskIndexSegmentReader() {
if (nullptr != pm) {
posting_file = posting_file_obj_;
}
int rc = fs_.MunmapFile(posting_file);
i32 rc = LocalStore::MunmapFile(posting_file);
assert(rc == 0);
if (rc != 0) {
Status status = Status::MunmapFileError(posting_file);
Expand Down
2 changes: 0 additions & 2 deletions src/storage/invertedindex/disk_segment_reader.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import index_segment_reader;
import dict_reader;
import file_reader;
import posting_list_format;
import local_file_system;
import internal_types;
import term_meta;

Expand All @@ -44,7 +43,6 @@ private:
String dict_file_{};
u8 *data_ptr_{};
SizeT data_len_{};
LocalFileSystem fs_{};
};

} // namespace infinity
23 changes: 8 additions & 15 deletions src/storage/invertedindex/memory_indexer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,13 @@ import invert_task;
import third_party;
import ring;
import external_sort_merger;
import local_file_system;
import file_writer;
import term_meta;
import fst;
import posting_list_format;
import dict_reader;
import file_reader;
import logger;
import file_system;
import file_system_type;
import vector_with_lock;
import infinity_exception;
import mmap;
Expand Down Expand Up @@ -266,7 +263,6 @@ void MemoryIndexer::Dump(bool offline, bool spill) {
CommitSync(100);
}

LocalFileSystem fs;
String posting_file = Path(index_dir_) / (base_name_ + POSTING_SUFFIX + (spill ? SPILL_SUFFIX : ""));
String dict_file = Path(index_dir_) / (base_name_ + DICT_SUFFIX + (spill ? SPILL_SUFFIX : ""));
String column_length_file = Path(index_dir_) / (base_name_ + LENGTH_SUFFIX + (spill ? SPILL_SUFFIX : ""));
Expand Down Expand Up @@ -309,17 +305,16 @@ void MemoryIndexer::Dump(bool offline, bool spill) {
posting_file_writer->Sync();
dict_file_writer->Sync();
fst_builder.Finish();
fs.AppendFile(tmp_dict_file, tmp_fst_file);
fs.DeleteFile(tmp_fst_file);
LocalStore::Merge(tmp_dict_file, tmp_fst_file);
LocalStore::DeleteFile(tmp_fst_file);
}
auto [file_handle, status] = fs.OpenFile(tmp_column_length_file, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kNoLock);
auto [file_handle, status] = LocalStore::Open(tmp_column_length_file, FileAccessMode::kWrite);
if (!status.ok()) {
UnrecoverableError(status.message());
}

Vector<u32> &column_length_array = column_lengths_.UnsafeVec();
fs.Write(*file_handle, &column_length_array[0], sizeof(column_length_array[0]) * column_length_array.size());
fs.Close(*file_handle);
file_handle->Append(&column_length_array[0], sizeof(column_length_array[0]) * column_length_array.size());
if (use_object_cache) {
PersistResultHandler handler(pm);
PersistWriteResult result1 = pm->Persist(posting_file, tmp_posting_file, false);
Expand Down Expand Up @@ -394,7 +389,6 @@ void MemoryIndexer::TupleListToIndexFile(UniquePtr<SortMergerTermTuple<TermTuple
auto &term_tuple_list_queue = merger->TermTupleListQueue();
Path path = Path(index_dir_) / base_name_;
String index_prefix = path.string();
LocalFileSystem fs;

PersistenceManager *pm = InfinityContext::instance().persistence_manager();
bool use_object_cache = pm != nullptr;
Expand Down Expand Up @@ -486,17 +480,16 @@ void MemoryIndexer::TupleListToIndexFile(UniquePtr<SortMergerTermTuple<TermTuple
posting_file_writer->Sync();
dict_file_writer->Sync();
fst_builder.Finish();
fs.AppendFile(tmp_dict_file, tmp_fst_file);
fs.DeleteFile(tmp_fst_file);
LocalStore::Merge(tmp_dict_file, tmp_fst_file);
LocalStore::DeleteFile(tmp_fst_file);

auto [file_handler, status] = fs.OpenFile(tmp_column_length_file, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kNoLock);
auto [file_handle, status] = LocalStore::Open(tmp_column_length_file, FileAccessMode::kWrite);
if (!status.ok()) {
UnrecoverableError(status.message());
}

Vector<u32> &unsafe_column_lengths = column_lengths_.UnsafeVec();
fs.Write(*file_handler, &unsafe_column_lengths[0], sizeof(unsafe_column_lengths[0]) * unsafe_column_lengths.size());
fs.Close(*file_handler);
file_handle->Append(&unsafe_column_lengths[0], sizeof(unsafe_column_lengths[0]) * unsafe_column_lengths.size());
if (use_object_cache) {
PersistResultHandler handler(pm);
PersistWriteResult result1 = pm->Persist(posting_file, tmp_posting_file, false);
Expand Down
1 change: 0 additions & 1 deletion src/storage/invertedindex/segment_posting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import index_defines;
import internal_types;
import file_reader;

import file_system;
import third_party;

module segment_posting;
Expand Down
2 changes: 0 additions & 2 deletions src/storage/invertedindex/segment_posting.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ import index_defines;
import internal_types;
import file_reader;

import file_system;

export module segment_posting;

namespace infinity {
Expand Down
2 changes: 0 additions & 2 deletions src/storage/io/file_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ module;
module file_reader;

import stl;
import file_system;
import file_system_type;
import status;
import infinity_exception;
import third_party;
Expand Down
31 changes: 10 additions & 21 deletions src/storage/io/stream_io.cpp → src/storage/io/stream_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,48 +16,37 @@ module;

#include <fstream>

module stream_io;
module stream_reader;

import stl;
import logger;
import status;
import file_system_type;
import infinity_exception;
import third_party;

namespace infinity {

StreamIO::~StreamIO() = default;

void StreamIO::Init(const String& file_name, u8 flags) {
bool reader_ = flags & FileFlags::READ_FLAG;
bool writer_ = flags & FileFlags::WRITE_FLAG;
if (reader_ && writer_) {
file_.open(file_name, std::ios::in | std::ios::out);
} else if (reader_) {
file_.open(file_name, std::ios::in);
} else if (writer_) {
file_.open(file_name, std::ios::out);
} else {
Status status = Status::InvalidCommand("Not reachable");
RecoverableError(status);
}
StreamReader::~StreamReader() {
Close();
}

Status StreamReader::Init(const String& file_name) {
file_.open(file_name);
if (!file_.is_open()) {
Status status = Status::IOError(fmt::format("{} can't open", file_name));
RecoverableError(status);
return Status::IOError(fmt::format("{} can't open", file_name));
}
return Status::OK();
}

bool StreamIO::ReadLine(String& line) {
bool StreamReader::ReadLine(String& line) {
if(getline(file_, line)) {
return true;
} else {
return false;
}
}

void StreamIO::Close() {
void StreamReader::Close() {
file_.close();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved.
// Copyright(C) 2024 InfiniFlow, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -16,27 +16,25 @@ module;

#include <fstream>

export module stream_io;
export module stream_reader;

import stl;
import status;

namespace infinity {

export class StreamIO {
export class StreamReader {

public:
StreamIO() = default;
~StreamIO();
StreamReader() = default;
~StreamReader();

void Init(const String& file_name, u8 flags);
Status Init(const String& file_name);
bool ReadLine(String& line);
void Close();

private:
std::fstream file_;
bool reader_{false};
bool writer_{false};
std::ifstream file_;
};

} // namespace infinity
Loading

0 comments on commit 19c5aa6

Please sign in to comment.