diff --git a/benchmark/local_infinity/knn/hnsw_benchmark.cpp b/benchmark/local_infinity/knn/hnsw_benchmark.cpp index 1fc2ba3aba..14c099ef72 100644 --- a/benchmark/local_infinity/knn/hnsw_benchmark.cpp +++ b/benchmark/local_infinity/knn/hnsw_benchmark.cpp @@ -21,8 +21,6 @@ import hnsw_alg; import vec_store_type; import compilation_config; import virtual_store; -import file_system; -import file_system_type; import status; import hnsw_common; import infinity_exception; diff --git a/benchmark/local_infinity/knn/hnsw_benchmark_util.h b/benchmark/local_infinity/knn/hnsw_benchmark_util.h index 1a3d83e6dc..b6e7259f37 100644 --- a/benchmark/local_infinity/knn/hnsw_benchmark_util.h +++ b/benchmark/local_infinity/knn/hnsw_benchmark_util.h @@ -17,7 +17,6 @@ #include "CLI11.hpp" import stl; -import file_system; import virtual_store; import infinity_exception; import abstract_file_handle; diff --git a/benchmark/local_infinity/sparse/bmp_benchmark.cpp b/benchmark/local_infinity/sparse/bmp_benchmark.cpp index 3ac42a1550..aff692fc98 100644 --- a/benchmark/local_infinity/sparse/bmp_benchmark.cpp +++ b/benchmark/local_infinity/sparse/bmp_benchmark.cpp @@ -18,10 +18,8 @@ #include import stl; -import file_system; import virtual_store; import infinity_exception; -import file_system_type; import compilation_config; import third_party; import profiler; diff --git a/benchmark/local_infinity/sparse/sparse_benchmark.cpp b/benchmark/local_infinity/sparse/sparse_benchmark.cpp index d67e194755..d2fb45c373 100644 --- a/benchmark/local_infinity/sparse/sparse_benchmark.cpp +++ b/benchmark/local_infinity/sparse/sparse_benchmark.cpp @@ -18,10 +18,8 @@ #include import stl; -import file_system; import virtual_store; import infinity_exception; -import file_system_type; import compilation_config; import third_party; import profiler; diff --git a/benchmark/local_infinity/sparse/sparse_benchmark_util.h b/benchmark/local_infinity/sparse/sparse_benchmark_util.h index f350df1a1d..63a894049a 100644 --- a/benchmark/local_infinity/sparse/sparse_benchmark_util.h +++ b/benchmark/local_infinity/sparse/sparse_benchmark_util.h @@ -19,7 +19,6 @@ import stl; import virtual_store; import infinity_exception; -import file_system_type; import third_party; import infinity_exception; import sparse_util; diff --git a/src/executor/operator/physical_command.cpp b/src/executor/operator/physical_command.cpp index 83c8ed42fd..88cee92fc9 100644 --- a/src/executor/operator/physical_command.cpp +++ b/src/executor/operator/physical_command.cpp @@ -30,7 +30,6 @@ import query_context; import operator_state; import profiler; -import local_file_system; import file_writer; import table_def; import data_table; diff --git a/src/executor/operator/physical_import.cpp b/src/executor/operator/physical_import.cpp index 7f3f97f4ef..d265ec710e 100644 --- a/src/executor/operator/physical_import.cpp +++ b/src/executor/operator/physical_import.cpp @@ -61,11 +61,10 @@ import value; import catalog; import catalog_delta_entry; import build_fast_rough_filter_task; -import stream_io; +import stream_reader; import parser_assert; import virtual_store; import abstract_file_handle; -import file_system_type; namespace infinity { @@ -577,9 +576,7 @@ void PhysicalImport::ImportCSV(QueryContext *query_context, ImportOperatorState } void PhysicalImport::ImportJSONL(QueryContext *query_context, ImportOperatorState *import_op_state) { - StreamIO stream_io; - stream_io.Init(file_path_, FileFlags::READ_FLAG); - DeferFn file_defer([&]() { stream_io.Close(); }); + UniquePtr stream_reader = LocalStore::OpenStreamReader(file_path_); Txn *txn = query_context->GetTxn(); u64 segment_id = Catalog::GetNextSegmentID(table_entry_); @@ -595,7 +592,7 @@ void PhysicalImport::ImportJSONL(QueryContext *query_context, ImportOperatorStat SizeT row_count{0}; while (true) { String json_str; - if (stream_io.ReadLine(json_str)) { + if (stream_reader->ReadLine(json_str)) { nlohmann::json line_json = nlohmann::json::parse(json_str); try { JSONLRowHandler(line_json, column_vectors); diff --git a/src/storage/buffer/file_worker/bmp_index_file_worker.cpp b/src/storage/buffer/file_worker/bmp_index_file_worker.cpp index c622fc79cb..cef07658b8 100644 --- a/src/storage/buffer/file_worker/bmp_index_file_worker.cpp +++ b/src/storage/buffer/file_worker/bmp_index_file_worker.cpp @@ -27,7 +27,6 @@ import bmp_util; import bmp_alg; import abstract_bmp; import virtual_store; -import file_system_type; import persistence_manager; import abstract_file_handle; diff --git a/src/storage/buffer/file_worker/file_worker.cpp b/src/storage/buffer/file_worker/file_worker.cpp index bfaf3ba662..1c1f6c43ff 100644 --- a/src/storage/buffer/file_worker/file_worker.cpp +++ b/src/storage/buffer/file_worker/file_worker.cpp @@ -23,7 +23,6 @@ import utility; import infinity_exception; import local_file_handle; import third_party; -import file_system_type; import defer_op; import status; import virtual_store; diff --git a/src/storage/buffer/file_worker/hnsw_file_worker.cpp b/src/storage/buffer/file_worker/hnsw_file_worker.cpp index e304ab6a64..b0d729ca10 100644 --- a/src/storage/buffer/file_worker/hnsw_file_worker.cpp +++ b/src/storage/buffer/file_worker/hnsw_file_worker.cpp @@ -33,7 +33,6 @@ import embedding_info; import create_index_info; import internal_types; import abstract_hnsw; -import file_system_type; import virtual_store; import persistence_manager; import abstract_file_handle; diff --git a/src/storage/data_table.cpp b/src/storage/data_table.cpp index 2f2c5c7a11..ec7a17daac 100644 --- a/src/storage/data_table.cpp +++ b/src/storage/data_table.cpp @@ -89,7 +89,8 @@ void DataTable::UnionWith(const SharedPtr &other) { UnrecoverableError(error_message); } if (this->data_blocks_.size() != other->data_blocks_.size()) { - String error_message = fmt::format("Can't union two table with different block count {}:{}.", this->data_blocks_.size(), other->data_blocks_.size()); + String error_message = + fmt::format("Can't union two table with different block count {}:{}.", this->data_blocks_.size(), other->data_blocks_.size()); UnrecoverableError(error_message); } SizeT block_count = this->data_blocks_.size(); @@ -105,7 +106,9 @@ void DataTable::Append(const SharedPtr &data_block) { UpdateRowCount(data_block->row_count()); } -SharedPtr DataTable::Make(SharedPtr table_def_ptr, TableType type) { return MakeShared(std::move(table_def_ptr), type); } +SharedPtr DataTable::Make(SharedPtr table_def_ptr, TableType type) { + return MakeShared(std::move(table_def_ptr), type); +} SharedPtr DataTable::MakeResultTable(const Vector> &column_defs) { SharedPtr result_table_def_ptr = TableDef::Make(nullptr, nullptr, column_defs); @@ -119,7 +122,8 @@ SharedPtr DataTable::MakeEmptyResultTable() { SharedPtr DataTable::MakeSummaryResultTable(u64 count, u64 sum) { Vector> column_defs; - column_defs.emplace_back(MakeShared(0, std::make_shared(LogicalType::kBigInt, nullptr), "count", std::set())); + column_defs.emplace_back( + MakeShared(0, std::make_shared(LogicalType::kBigInt, nullptr), "count", std::set())); column_defs.emplace_back(MakeShared(1, std::make_shared(LogicalType::kBigInt, nullptr), "sum", std::set())); SharedPtr result_table_def_ptr = MakeShared(nullptr, nullptr, column_defs); SharedPtr result_table = Make(result_table_def_ptr, TableType::kResult); diff --git a/src/storage/invertedindex/disk_segment_reader.cpp b/src/storage/invertedindex/disk_segment_reader.cpp index ae6de08302..82ff7b8dea 100644 --- a/src/storage/invertedindex/disk_segment_reader.cpp +++ b/src/storage/invertedindex/disk_segment_reader.cpp @@ -37,6 +37,7 @@ import logger; import persistence_manager; import infinity_context; import persist_result_handler; +import virtual_store; namespace infinity { @@ -64,7 +65,7 @@ DiskIndexSegmentReader::DiskIndexSegmentReader(const String &index_dir, const St // Empty posting return; } - int rc = fs_.MmapFile(posting_file, data_ptr_, data_len_); + i32 rc = LocalStore::MmapFile(posting_file, data_ptr_, data_len_); assert(rc == 0); if (rc != 0) { Status status = Status::MmapFileError(posting_file); @@ -91,7 +92,7 @@ DiskIndexSegmentReader::~DiskIndexSegmentReader() { if (nullptr != pm) { posting_file = posting_file_obj_; } - int rc = fs_.MunmapFile(posting_file); + i32 rc = LocalStore::MunmapFile(posting_file); assert(rc == 0); if (rc != 0) { Status status = Status::MunmapFileError(posting_file); diff --git a/src/storage/invertedindex/disk_segment_reader.cppm b/src/storage/invertedindex/disk_segment_reader.cppm index 0663d6f888..75d82b1bd4 100644 --- a/src/storage/invertedindex/disk_segment_reader.cppm +++ b/src/storage/invertedindex/disk_segment_reader.cppm @@ -23,7 +23,6 @@ import index_segment_reader; import dict_reader; import file_reader; import posting_list_format; -import local_file_system; import internal_types; import term_meta; @@ -44,7 +43,6 @@ private: String dict_file_{}; u8 *data_ptr_{}; SizeT data_len_{}; - LocalFileSystem fs_{}; }; } // namespace infinity \ No newline at end of file diff --git a/src/storage/invertedindex/memory_indexer.cpp b/src/storage/invertedindex/memory_indexer.cpp index 2c5ab63a93..77ef575deb 100644 --- a/src/storage/invertedindex/memory_indexer.cpp +++ b/src/storage/invertedindex/memory_indexer.cpp @@ -44,7 +44,6 @@ import invert_task; import third_party; import ring; import external_sort_merger; -import local_file_system; import file_writer; import term_meta; import fst; @@ -52,8 +51,6 @@ import posting_list_format; import dict_reader; import file_reader; import logger; -import file_system; -import file_system_type; import vector_with_lock; import infinity_exception; import mmap; @@ -266,7 +263,6 @@ void MemoryIndexer::Dump(bool offline, bool spill) { CommitSync(100); } - LocalFileSystem fs; String posting_file = Path(index_dir_) / (base_name_ + POSTING_SUFFIX + (spill ? SPILL_SUFFIX : "")); String dict_file = Path(index_dir_) / (base_name_ + DICT_SUFFIX + (spill ? SPILL_SUFFIX : "")); String column_length_file = Path(index_dir_) / (base_name_ + LENGTH_SUFFIX + (spill ? SPILL_SUFFIX : "")); @@ -309,17 +305,16 @@ void MemoryIndexer::Dump(bool offline, bool spill) { posting_file_writer->Sync(); dict_file_writer->Sync(); fst_builder.Finish(); - fs.AppendFile(tmp_dict_file, tmp_fst_file); - fs.DeleteFile(tmp_fst_file); + LocalStore::Merge(tmp_dict_file, tmp_fst_file); + LocalStore::DeleteFile(tmp_fst_file); } - auto [file_handle, status] = fs.OpenFile(tmp_column_length_file, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kNoLock); + auto [file_handle, status] = LocalStore::Open(tmp_column_length_file, FileAccessMode::kWrite); if (!status.ok()) { UnrecoverableError(status.message()); } Vector &column_length_array = column_lengths_.UnsafeVec(); - fs.Write(*file_handle, &column_length_array[0], sizeof(column_length_array[0]) * column_length_array.size()); - fs.Close(*file_handle); + file_handle->Append(&column_length_array[0], sizeof(column_length_array[0]) * column_length_array.size()); if (use_object_cache) { PersistResultHandler handler(pm); PersistWriteResult result1 = pm->Persist(posting_file, tmp_posting_file, false); @@ -394,7 +389,6 @@ void MemoryIndexer::TupleListToIndexFile(UniquePtrTermTupleListQueue(); Path path = Path(index_dir_) / base_name_; String index_prefix = path.string(); - LocalFileSystem fs; PersistenceManager *pm = InfinityContext::instance().persistence_manager(); bool use_object_cache = pm != nullptr; @@ -486,17 +480,16 @@ void MemoryIndexer::TupleListToIndexFile(UniquePtrSync(); dict_file_writer->Sync(); fst_builder.Finish(); - fs.AppendFile(tmp_dict_file, tmp_fst_file); - fs.DeleteFile(tmp_fst_file); + LocalStore::Merge(tmp_dict_file, tmp_fst_file); + LocalStore::DeleteFile(tmp_fst_file); - auto [file_handler, status] = fs.OpenFile(tmp_column_length_file, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kNoLock); + auto [file_handle, status] = LocalStore::Open(tmp_column_length_file, FileAccessMode::kWrite); if (!status.ok()) { UnrecoverableError(status.message()); } Vector &unsafe_column_lengths = column_lengths_.UnsafeVec(); - fs.Write(*file_handler, &unsafe_column_lengths[0], sizeof(unsafe_column_lengths[0]) * unsafe_column_lengths.size()); - fs.Close(*file_handler); + file_handle->Append(&unsafe_column_lengths[0], sizeof(unsafe_column_lengths[0]) * unsafe_column_lengths.size()); if (use_object_cache) { PersistResultHandler handler(pm); PersistWriteResult result1 = pm->Persist(posting_file, tmp_posting_file, false); diff --git a/src/storage/invertedindex/segment_posting.cpp b/src/storage/invertedindex/segment_posting.cpp index 8577255353..a7af313bd7 100644 --- a/src/storage/invertedindex/segment_posting.cpp +++ b/src/storage/invertedindex/segment_posting.cpp @@ -24,7 +24,6 @@ import index_defines; import internal_types; import file_reader; -import file_system; import third_party; module segment_posting; diff --git a/src/storage/invertedindex/segment_posting.cppm b/src/storage/invertedindex/segment_posting.cppm index 22b39955e5..f85ec38a88 100644 --- a/src/storage/invertedindex/segment_posting.cppm +++ b/src/storage/invertedindex/segment_posting.cppm @@ -23,8 +23,6 @@ import index_defines; import internal_types; import file_reader; -import file_system; - export module segment_posting; namespace infinity { diff --git a/src/storage/io/file_reader.cpp b/src/storage/io/file_reader.cpp index 503d942f29..f843a955c0 100644 --- a/src/storage/io/file_reader.cpp +++ b/src/storage/io/file_reader.cpp @@ -21,8 +21,6 @@ module; module file_reader; import stl; -import file_system; -import file_system_type; import status; import infinity_exception; import third_party; diff --git a/src/storage/io/stream_io.cpp b/src/storage/io/stream_reader.cpp similarity index 55% rename from src/storage/io/stream_io.cpp rename to src/storage/io/stream_reader.cpp index 1996f90b9e..48fb5690ad 100644 --- a/src/storage/io/stream_io.cpp +++ b/src/storage/io/stream_reader.cpp @@ -16,40 +16,29 @@ module; #include -module stream_io; +module stream_reader; import stl; import logger; import status; -import file_system_type; import infinity_exception; import third_party; namespace infinity { -StreamIO::~StreamIO() = default; - -void StreamIO::Init(const String& file_name, u8 flags) { - bool reader_ = flags & FileFlags::READ_FLAG; - bool writer_ = flags & FileFlags::WRITE_FLAG; - if (reader_ && writer_) { - file_.open(file_name, std::ios::in | std::ios::out); - } else if (reader_) { - file_.open(file_name, std::ios::in); - } else if (writer_) { - file_.open(file_name, std::ios::out); - } else { - Status status = Status::InvalidCommand("Not reachable"); - RecoverableError(status); - } +StreamReader::~StreamReader() { + Close(); +} +Status StreamReader::Init(const String& file_name) { + file_.open(file_name); if (!file_.is_open()) { - Status status = Status::IOError(fmt::format("{} can't open", file_name)); - RecoverableError(status); + return Status::IOError(fmt::format("{} can't open", file_name)); } + return Status::OK(); } -bool StreamIO::ReadLine(String& line) { +bool StreamReader::ReadLine(String& line) { if(getline(file_, line)) { return true; } else { @@ -57,7 +46,7 @@ bool StreamIO::ReadLine(String& line) { } } -void StreamIO::Close() { +void StreamReader::Close() { file_.close(); } diff --git a/src/storage/io/stream_io.cppm b/src/storage/io/stream_reader.cppm similarity index 72% rename from src/storage/io/stream_io.cppm rename to src/storage/io/stream_reader.cppm index 6587e61045..5f5a46112c 100644 --- a/src/storage/io/stream_io.cppm +++ b/src/storage/io/stream_reader.cppm @@ -1,4 +1,4 @@ -// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// Copyright(C) 2024 InfiniFlow, Inc. All rights reserved. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -16,27 +16,25 @@ module; #include -export module stream_io; +export module stream_reader; import stl; import status; namespace infinity { -export class StreamIO { +export class StreamReader { public: - StreamIO() = default; - ~StreamIO(); + StreamReader() = default; + ~StreamReader(); - void Init(const String& file_name, u8 flags); + Status Init(const String& file_name); bool ReadLine(String& line); void Close(); private: - std::fstream file_; - bool reader_{false}; - bool writer_{false}; + std::ifstream file_; }; } // namespace infinity \ No newline at end of file diff --git a/src/storage/io/virtual_store.cpp b/src/storage/io/virtual_store.cpp index 9fa520df37..3f71c4430f 100644 --- a/src/storage/io/virtual_store.cpp +++ b/src/storage/io/virtual_store.cpp @@ -14,13 +14,14 @@ module; -#include -#include -#include #include #include +#include #include +#include +#include #include +#include module virtual_store; @@ -31,6 +32,7 @@ import logger; import infinity_exception; import default_values; import abstract_file_handle; +import stream_reader; namespace infinity { @@ -98,7 +100,7 @@ Status RemoteStore::UnInit() { return Status::OK(); } -Tuple, Status> LocalStore::Open(const String& path, FileAccessMode access_mode) { +Tuple, Status> LocalStore::Open(const String &path, FileAccessMode access_mode) { i32 fd = -1; switch (access_mode) { case FileAccessMode::kRead: { @@ -117,13 +119,21 @@ Tuple, Status> LocalStore::Open(const String& path, F break; } } - if(fd == -1) { + if (fd == -1) { String error_message = fmt::format("File open failed: {}", strerror(errno)); return {nullptr, Status::IOError(error_message)}; } return {MakeUnique(fd, path, access_mode), Status::OK()}; } +UniquePtr LocalStore::OpenStreamReader(const String& path) { + auto res = MakeUnique(); + Status status = res->Init(path); + if(!status.ok()) { + RecoverableError(status); + } + return res; +} // For local disk filesystem, such as temp file, disk cache and WAL bool LocalStore::Exists(const String &path) { @@ -162,8 +172,8 @@ Status LocalStore::DeleteFile(const String &file_name) { } Status LocalStore::MakeDirectory(const String &path) { - if(LocalStore::Exists(path)) { - if(std::filesystem::is_directory(path)) { + if (LocalStore::Exists(path)) { + if (std::filesystem::is_directory(path)) { return Status::OK(); } else { String error_message = fmt::format("Exists file: {}", path); @@ -302,20 +312,18 @@ Tuple>, Status> LocalStore::ListDirectory(const Strin return {file_array, Status::OK()}; } -SizeT LocalStore::GetFileSize(const String& path) { - if(!std::filesystem::path(path).is_absolute()) { +SizeT LocalStore::GetFileSize(const String &path) { + if (!std::filesystem::path(path).is_absolute()) { String error_message = fmt::format("{} isn't absolute path.", path); UnrecoverableError(error_message); } return std::filesystem::file_size(path); } -String LocalStore::GetParentPath(const String& path) { - return Path(path).parent_path().string(); -} +String LocalStore::GetParentPath(const String &path) { return Path(path).parent_path().string(); } SizeT LocalStore::GetDirectorySize(const String &path) { - if(!std::filesystem::path(path).is_absolute()) { + if (!std::filesystem::path(path).is_absolute()) { String error_message = fmt::format("{} isn't absolute path.", path); UnrecoverableError(error_message); } @@ -335,4 +343,65 @@ String LocalStore::ConcatenatePath(const String &dir_path, const String &file_pa return full_path.string(); } +std::mutex LocalStore::mtx_; +HashMap LocalStore::mapped_files_; + +i32 LocalStore::MmapFile(const String &file_path, u8 *&data_ptr, SizeT &data_len) { + if (!std::filesystem::path(file_path).is_absolute()) { + String error_message = fmt::format("{} isn't absolute path.", file_path); + UnrecoverableError(error_message); + } + data_ptr = nullptr; + data_len = 0; + std::lock_guard lock(mtx_); + auto it = mapped_files_.find(file_path); + if (it != mapped_files_.end()) { + auto &mmap_info = it->second; + data_ptr = mmap_info.data_ptr_; + data_len = mmap_info.data_len_; + mmap_info.rc_++; + return 0; + } + long len_f = std::filesystem::file_size(file_path); + if (len_f == 0) + return -1; + i32 f = open(file_path.c_str(), O_RDONLY); + void *tmpd = mmap(NULL, len_f, PROT_READ, MAP_SHARED, f, 0); + if (tmpd == MAP_FAILED) + return -1; + close(f); + i32 rc = madvise(tmpd, + len_f, + MADV_NORMAL +#if defined(linux) || defined(__linux) || defined(__linux__) + | MADV_DONTDUMP +#endif + ); + if (rc < 0) + return -1; + data_ptr = (u8 *)tmpd; + data_len = len_f; + mapped_files_.emplace(file_path, MmapInfo{data_ptr, data_len, 1}); + return 0; +} + +i32 LocalStore::MunmapFile(const String &file_path) { + if (!std::filesystem::path(file_path).is_absolute()) { + String error_message = fmt::format("{} isn't absolute path.", file_path); + UnrecoverableError(error_message); + } + std::lock_guard lock(mtx_); + auto it = mapped_files_.find(file_path); + if (it == mapped_files_.end()) { + return -1; + } + auto &mmap_info = it->second; + mmap_info.rc_--; + if (mmap_info.rc_ == 0) { + munmap(mmap_info.data_ptr_, mmap_info.data_len_); + mapped_files_.erase(it); + } + return 0; +} + } // namespace infinity diff --git a/src/storage/io/virtual_store.cppm b/src/storage/io/virtual_store.cppm index d2501940e0..96bc0e5b7b 100644 --- a/src/storage/io/virtual_store.cppm +++ b/src/storage/io/virtual_store.cppm @@ -22,9 +22,16 @@ import third_party; import local_file_handle; import virtual_storage_type; import abstract_file_handle; +import stream_reader; namespace infinity { +export struct MmapInfo { + u8 *data_ptr_{}; + SizeT data_len_{}; + SizeT rc_{}; +}; + // Only one instance; export class RemoteStore { public: @@ -43,6 +50,7 @@ private: export class LocalStore { public: static Tuple, Status> Open(const String& path, FileAccessMode access_mode); + static UniquePtr OpenStreamReader(const String& path); static bool IsRegularFile(const String& path); static bool Exists(const String& path); static Status DeleteFile(const String& path); @@ -57,6 +65,12 @@ public: static String GetParentPath(const String& path); static SizeT GetDirectorySize(const String &path); static String ConcatenatePath(const String &dir_path, const String &file_path); + static i32 MmapFile(const String &file_path, u8 *&data_ptr, SizeT &data_len); + static i32 MunmapFile(const String &file_path); + +private: + static std::mutex mtx_; + static HashMap mapped_files_; }; } diff --git a/src/storage/knn_index/emvb/emvb_index.cpp b/src/storage/knn_index/emvb/emvb_index.cpp index d5c1ccf438..5e9adb0021 100644 --- a/src/storage/knn_index/emvb/emvb_index.cpp +++ b/src/storage/knn_index/emvb/emvb_index.cpp @@ -31,7 +31,6 @@ import status; import logger; import third_party; import infinity_exception; -import file_system; import internal_types; import segment_entry; import block_entry; diff --git a/src/storage/knn_index/knn_diskann/diskann_index_data.cppm b/src/storage/knn_index/knn_diskann/diskann_index_data.cppm index 35cd46c32c..2a18e5ddd5 100644 --- a/src/storage/knn_index/knn_diskann/diskann_index_data.cppm +++ b/src/storage/knn_index/knn_diskann/diskann_index_data.cppm @@ -22,8 +22,6 @@ export module diskann_index_data; import stl; import index_base; -import file_system; -import file_system_type; import search_top_1; import kmeans_partition; import infinity_exception; diff --git a/src/storage/knn_index/knn_diskann/diskann_partition_and_pq.cppm b/src/storage/knn_index/knn_diskann/diskann_partition_and_pq.cppm index 430ae66e64..e75f799941 100644 --- a/src/storage/knn_index/knn_diskann/diskann_partition_and_pq.cppm +++ b/src/storage/knn_index/knn_diskann/diskann_partition_and_pq.cppm @@ -21,8 +21,6 @@ export module diskann_partition_and_pq; import stl; import third_party; -import file_system; -import file_system_type; import infinity_exception; import index_base; import vector_distance; diff --git a/src/storage/knn_index/knn_diskann/inner/diskann_mem_data_store.cppm b/src/storage/knn_index/knn_diskann/inner/diskann_mem_data_store.cppm index 79a1ebfd96..3a0e5bac3b 100644 --- a/src/storage/knn_index/knn_diskann/inner/diskann_mem_data_store.cppm +++ b/src/storage/knn_index/knn_diskann/inner/diskann_mem_data_store.cppm @@ -26,7 +26,6 @@ module; export module diskann_mem_data_store; import stl; -import file_system; import diskann_dist_func; import diskann_utils; import infinity_exception; diff --git a/src/storage/knn_index/knn_diskann/inner/diskann_utils.cppm b/src/storage/knn_index/knn_diskann/inner/diskann_utils.cppm index 96cdb67ff7..d03dbcb1b8 100644 --- a/src/storage/knn_index/knn_diskann/inner/diskann_utils.cppm +++ b/src/storage/knn_index/knn_diskann/inner/diskann_utils.cppm @@ -22,17 +22,12 @@ import stl; import third_party; import default_values; import infinity_exception; -import file_system; import logger; import third_party; export module diskann_utils; namespace infinity { -export void FileHandlerSeek(FileHandler &fh, i64 offset) { - auto &fs = fh.file_system_; - fs.Seek(fh, offset); -} export inline void AllocAligned(void **ptr, SizeT size, SizeT align) { *ptr = nullptr; diff --git a/src/storage/knn_index/knn_diskann/vamana_alg.cppm b/src/storage/knn_index/knn_diskann/vamana_alg.cppm index 7aa5295ea9..d011f7a635 100644 --- a/src/storage/knn_index/knn_diskann/vamana_alg.cppm +++ b/src/storage/knn_index/knn_diskann/vamana_alg.cppm @@ -25,7 +25,6 @@ import stl; import logger; import index_base; import local_file_handle; -import file_system_type; import infinity_exception; import knn_result_handler; import logical_type; diff --git a/src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm b/src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm index 037f1d8e7c..e56762d31f 100644 --- a/src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm +++ b/src/storage/knn_index/knn_hnsw/abstract_hnsw.cppm @@ -28,7 +28,6 @@ import data_store; import vec_store_type; import dist_func_l2; import dist_func_ip; -import file_system; import hnsw_common; import column_def; import index_hnsw; diff --git a/src/storage/knn_index/knn_hnsw/hnsw_alg.cppm b/src/storage/knn_index/knn_hnsw/hnsw_alg.cppm index 6238170811..642bf82097 100644 --- a/src/storage/knn_index/knn_hnsw/hnsw_alg.cppm +++ b/src/storage/knn_index/knn_hnsw/hnsw_alg.cppm @@ -21,7 +21,6 @@ export module hnsw_alg; import stl; import local_file_handle; -import file_system_type; import infinity_exception; import knn_result_handler; import multivector_result_handler; diff --git a/src/storage/knn_index/knn_hnsw/hnsw_common.cppm b/src/storage/knn_index/knn_hnsw/hnsw_common.cppm index 0d28dcacab..ab626ac2ab 100644 --- a/src/storage/knn_index/knn_hnsw/hnsw_common.cppm +++ b/src/storage/knn_index/knn_hnsw/hnsw_common.cppm @@ -20,7 +20,6 @@ module; export module hnsw_common; import stl; -import file_system; import infinity_exception; import sparse_util; diff --git a/src/storage/knn_index/knn_ivf/ivf_index_data.cpp b/src/storage/knn_index/knn_ivf/ivf_index_data.cpp index d19a63598d..12967a82a2 100644 --- a/src/storage/knn_index/knn_ivf/ivf_index_data.cpp +++ b/src/storage/knn_index/knn_ivf/ivf_index_data.cpp @@ -23,7 +23,6 @@ import index_ivf; import ivf_index_storage; import column_def; import index_base; -import file_system; import embedding_info; import internal_types; import segment_entry; diff --git a/src/storage/knn_index/knn_ivf/ivf_index_storage.cpp b/src/storage/knn_index/knn_ivf/ivf_index_storage.cpp index 35f03e4ea2..377facc28f 100644 --- a/src/storage/knn_index/knn_ivf/ivf_index_storage.cpp +++ b/src/storage/knn_index/knn_ivf/ivf_index_storage.cpp @@ -24,7 +24,6 @@ import status; import logger; import third_party; import index_ivf; -import file_system; import column_vector; import internal_types; import logical_type; diff --git a/src/storage/knn_index/sparse/sparse_test_util.cppm b/src/storage/knn_index/sparse/sparse_test_util.cppm index 8679b9b7b3..fbf13f311b 100644 --- a/src/storage/knn_index/sparse/sparse_test_util.cppm +++ b/src/storage/knn_index/sparse/sparse_test_util.cppm @@ -22,7 +22,6 @@ export module sparse_test_util; import stl; import sparse_vector_distance; -import file_system; import linscan_alg; import sparse_util; import infinity_exception; diff --git a/src/storage/meta/catalog.cpp b/src/storage/meta/catalog.cpp index b87ef8f3eb..afe76d99a6 100644 --- a/src/storage/meta/catalog.cpp +++ b/src/storage/meta/catalog.cpp @@ -36,8 +36,6 @@ import special_function; import buffer_manager; import column_def; import virtual_store; -import file_system_type; -import file_system; import table_def; import table_entry_type; import meta_info; diff --git a/src/storage/meta/entry/block_version.cppm b/src/storage/meta/entry/block_version.cppm index 2454b3ca7e..8fd21cdee4 100644 --- a/src/storage/meta/entry/block_version.cppm +++ b/src/storage/meta/entry/block_version.cppm @@ -17,7 +17,6 @@ module; export module block_version; import stl; -import file_system; import local_file_handle; namespace infinity { diff --git a/src/storage/secondary_index/secondary_index_data.cpp b/src/storage/secondary_index/secondary_index_data.cpp index a76a78f978..c042341a76 100644 --- a/src/storage/secondary_index/secondary_index_data.cpp +++ b/src/storage/secondary_index/secondary_index_data.cpp @@ -24,7 +24,6 @@ import stl; import default_values; import index_base; import local_file_handle; -import file_system_type; import infinity_exception; import third_party; import secondary_index_pgm; diff --git a/src/storage/secondary_index/secondary_index_data.cppm b/src/storage/secondary_index/secondary_index_data.cppm index 3b70d59a55..27f1c10869 100644 --- a/src/storage/secondary_index/secondary_index_data.cppm +++ b/src/storage/secondary_index/secondary_index_data.cppm @@ -19,7 +19,6 @@ export module secondary_index_data; import stl; import default_values; import local_file_handle; -import file_system_type; import infinity_exception; import column_vector; import third_party; diff --git a/src/storage/storage.cpp b/src/storage/storage.cpp index 2bd1996b78..44fa27558a 100644 --- a/src/storage/storage.cpp +++ b/src/storage/storage.cpp @@ -98,7 +98,7 @@ void Storage::SetStorageMode(StorageMode target_mode) { LOG_INFO(fmt::format("Set storage from admin mode to un-init")); break; } - switch(config_ptr_->StorageType()) { + switch (config_ptr_->StorageType()) { case StorageType::kLocal: { // Not init remote store break; @@ -250,7 +250,7 @@ void Storage::SetStorageMode(StorageMode target_mode) { memory_index_tracer_.reset(); - switch(config_ptr_->StorageType()) { + switch (config_ptr_->StorageType()) { case StorageType::kLocal: { // Not init remote store break; @@ -335,7 +335,7 @@ void Storage::SetStorageMode(StorageMode target_mode) { memory_index_tracer_.reset(); - switch(config_ptr_->StorageType()) { + switch (config_ptr_->StorageType()) { case StorageType::kLocal: { // Not init remote store break; diff --git a/src/storage/storage.cppm b/src/storage/storage.cppm index b42127a875..756cc856ac 100644 --- a/src/storage/storage.cppm +++ b/src/storage/storage.cppm @@ -68,6 +68,7 @@ public: Config *config() const { return config_ptr_; } void CreateDefaultDB(); + private: Config *config_ptr_{}; UniquePtr new_catalog_{}; diff --git a/src/unit_test/storage/invertedindex/column_index_merger.cpp b/src/unit_test/storage/invertedindex/column_index_merger.cpp index 6a73a1061e..d08fd55ca6 100644 --- a/src/unit_test/storage/invertedindex/column_index_merger.cpp +++ b/src/unit_test/storage/invertedindex/column_index_merger.cpp @@ -14,7 +14,6 @@ import column_inverter; import index_defines; import column_index_reader; import posting_iterator; -import file_system; import file_writer; import term_meta; import index_defines; diff --git a/src/unit_test/storage/invertedindex/posting_merger.cpp b/src/unit_test/storage/invertedindex/posting_merger.cpp index 8b47ab74e9..ae400b33f3 100644 --- a/src/unit_test/storage/invertedindex/posting_merger.cpp +++ b/src/unit_test/storage/invertedindex/posting_merger.cpp @@ -13,8 +13,6 @@ import column_inverter; import index_defines; import column_index_reader; import posting_iterator; -import file_system; -import file_system_type; import virtual_store; import file_writer; import term_meta; diff --git a/src/unit_test/storage/io/byte_slice_write_read.cpp b/src/unit_test/storage/io/byte_slice_write_read.cpp index f0aa4599da..fbabe04fa0 100644 --- a/src/unit_test/storage/io/byte_slice_write_read.cpp +++ b/src/unit_test/storage/io/byte_slice_write_read.cpp @@ -42,7 +42,7 @@ class ByteSliceReaderWriterTest : public BaseTest { delete[] buffer; } - u8 *GetData(const ByteSliceList* list) { + u8 *GetData(const ByteSliceList *list) { u8 *buffer = new u8[list->GetTotalSize()]; SizeT n = 0; ByteSlice *slice = list->GetHead(); @@ -56,15 +56,15 @@ class ByteSliceReaderWriterTest : public BaseTest { return buffer; } - bool CheckListEq(const ByteSliceList* list_1, const ByteSliceList* list_2) { - if(list_1->GetTotalSize() != list_2->GetTotalSize()) { + bool CheckListEq(const ByteSliceList *list_1, const ByteSliceList *list_2) { + if (list_1->GetTotalSize() != list_2->GetTotalSize()) { return false; } auto buffer_1 = GetData(list_1); auto buffer_2 = GetData(list_2); bool eq = memcmp(buffer_1, buffer_2, list_1->GetTotalSize()) == 0; - + delete[] buffer_1; delete[] buffer_2; return eq; @@ -170,7 +170,6 @@ TEST_F(ByteSliceReaderWriterTest, TestDataConsistency) { // using namespace infinity; // LocalFileSystem local_file_system; // String path = String(GetFullTmpDir()) + "/test_byteslice_dump"; - // ByteSliceWriter writer; @@ -178,12 +177,12 @@ TEST_F(ByteSliceReaderWriterTest, TestDataConsistency) { // for (i = 0; i < 10000; i++) { // writer.WriteVLong(i); // } - + // auto filewriter = MakeShared(local_file_system, path, 128); // std::cout << writer.GetSize() << std::endl; -// writer.Dump(filewriter); +// writer.Dump(filewriter); // filewriter->Sync(); // ByteSliceWriter loader; diff --git a/src/unit_test/storage/io/file_write_read.cpp b/src/unit_test/storage/io/file_write_read.cpp index 341adf374a..b36037e4bf 100644 --- a/src/unit_test/storage/io/file_write_read.cpp +++ b/src/unit_test/storage/io/file_write_read.cpp @@ -54,7 +54,7 @@ TEST_F(FileWriteReadTest, test1) { LocalStore::DeleteFile(path); } -//write vint then read vint +// write vint then read vint TEST_F(FileWriteReadTest, test2) { using namespace infinity; String path = String(GetFullTmpDir()) + "/test_file2.abc"; @@ -73,7 +73,7 @@ TEST_F(FileWriteReadTest, test2) { LocalStore::DeleteFile(path); } -//hybrid datatype +// hybrid datatype TEST_F(FileWriteReadTest, test3) { using namespace infinity; String path = String(GetFullTmpDir()) + "/test_file3.abc"; @@ -107,15 +107,14 @@ TEST_F(FileWriteReadTest, test3) { LocalStore::DeleteFile(path); } - -//test total written bytes and GetFileSize() -//plus exceed case for reader/writer buffer +// test total written bytes and GetFileSize() +// plus exceed case for reader/writer buffer TEST_F(FileWriteReadTest, TestExceedWriterTotalSize) { using namespace infinity; String path = String(GetFullTmpDir()) + "/test_file_write_bytes.abc"; FileWriter file_writer(path, 128); - for(i32 i = 0; i < 1024; ++i) { + for (i32 i = 0; i < 1024; ++i) { file_writer.WriteInt(i); } @@ -137,15 +136,15 @@ TEST_F(FileWriteReadTest, TestExceedWriterTotalSize) { EXPECT_EQ(file_writer.TotalWrittenBytes(), 4 * 1024 + buffer.size()); } -//write byte in '0', '1'...'1023' -//read to '254', get pointer a, finish the read -//seek a, finish +// write byte in '0', '1'...'1023' +// read to '254', get pointer a, finish the read +// seek a, finish TEST_F(FileWriteReadTest, TestFilePointerSeek) { using namespace infinity; String path = String(GetFullTmpDir()) + "/test_file_write_bytes.abc"; FileWriter file_writer(path, 128); - for(i32 i = 0; i < 1024; i++) { + for (i32 i = 0; i < 1024; i++) { file_writer.WriteInt(i); } file_writer.Sync(); @@ -153,7 +152,7 @@ TEST_F(FileWriteReadTest, TestFilePointerSeek) { FileReader file_reader(path, 128); u64 a; for (i32 i = 0; i < 1024; ++i) { - if(i == 254) { + if (i == 254) { a = file_reader.GetFilePointer(); } EXPECT_EQ(file_reader.ReadInt(), i); @@ -162,21 +161,21 @@ TEST_F(FileWriteReadTest, TestFilePointerSeek) { file_reader.Seek(a); i32 exp = 254; - while(!file_reader.Finished()) { + while (!file_reader.Finished()) { EXPECT_EQ(file_reader.ReadInt(), exp); exp++; } EXPECT_EQ(exp, 1024); } -//test if ReFill works fine. +// test if ReFill works fine. TEST_F(FileWriteReadTest, TestFileReadOverflowBuffer) { using namespace infinity; String path = String(GetFullTmpDir()) + "/test_file_write_bytes.abc"; FileWriter file_writer(path, 128); String s; - for(i32 i = 0; i < 1000; i++) { + for (i32 i = 0; i < 1000; i++) { s += "abc"; } @@ -191,7 +190,7 @@ TEST_F(FileWriteReadTest, TestFileReadOverflowBuffer) { EXPECT_STREQ(s.c_str(), read_s.c_str()); } -//test all types of data of reader and writer +// test all types of data of reader and writer TEST_F(FileWriteReadTest, TestFileIODataTypes) { using namespace infinity; diff --git a/src/unit_test/storage/io/local_file_system.cpp b/src/unit_test/storage/io/local_file_system.cpp deleted file mode 100644 index 800fd96b8e..0000000000 --- a/src/unit_test/storage/io/local_file_system.cpp +++ /dev/null @@ -1,300 +0,0 @@ -// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// https://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include "gtest/gtest.h" -import base_test; - -import infinity_exception; - -import stl; -import global_resource_usage; -import third_party; -import logger; - -import file_system; -import local_file_system; -import file_writer; -import file_reader; -import infinity_context; -import file_system_type; - -using namespace infinity; - -class LocalFileSystemTest : public BaseTest {}; - -TEST_F(LocalFileSystemTest, file_write) { - using namespace infinity; - LocalFileSystem local_file_system; - String path = String(GetFullTmpDir()) + "/test_file2.abc"; - - auto [file_handler, status] = - local_file_system.OpenFile(path, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kWriteLock); - if(!status.ok()) { - UnrecoverableError(status.message()); - } - - SizeT len = 10; - UniquePtr data_array = MakeUnique(len); - for(SizeT i = 0; i < len; ++ i) { - data_array[i] = i + 1; - } - file_handler->Write(data_array.get(), len); - file_handler->Sync(); - file_handler->Close(); - local_file_system.DeleteFile(path); - EXPECT_FALSE(local_file_system.Exists(path)); -} - -TEST_F(LocalFileSystemTest, dir_ops) { - using namespace infinity; - LocalFileSystem local_file_system; - String dir = String(GetFullTmpDir()) + "/unit_test"; - String path = dir + "/test_file.test"; - - local_file_system.CreateDirectory(dir); - - auto [file_handler, status] = - local_file_system.OpenFile(path, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kWriteLock); - if(!status.ok()) { - UnrecoverableError(status.message()); - } - - SizeT len = 10; - UniquePtr data_array = MakeUnique(len); - for(SizeT i = 0; i < len; ++ i) { - data_array[i] = i + 1; - } - file_handler->Write(data_array.get(), len); - file_handler->Sync(); - file_handler->Close(); - - local_file_system.DeleteDirectory(dir); - EXPECT_FALSE(local_file_system.Exists(path)); - EXPECT_FALSE(local_file_system.Exists(dir)); -} - -TEST_F(LocalFileSystemTest, TestRead) { - using namespace infinity; - LocalFileSystem local_file_system; - String path = String(GetFullTmpDir()) + "/test_file_read.abc"; - - auto [file_handler, status] = - local_file_system.OpenFile(path, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kWriteLock); - if(!status.ok()) { - UnrecoverableError(status.message()); - } - - SizeT len = 10; - UniquePtr write_data = MakeUnique(len); - for(SizeT i = 0; i < len; ++i) { - write_data[i] = i + 1; - } - file_handler->Write(write_data.get(), len); - file_handler->Sync(); - file_handler->Close(); - - auto [read_handler, read_status] = - local_file_system.OpenFile(path, FileFlags::READ_FLAG, FileLockType::kNoLock); - if(!read_status.ok()) { - UnrecoverableError(read_status.message()); - } - - UniquePtr read_data = MakeUnique(len); - i64 read_len = read_handler->Read(read_data.get(), len); - read_handler->Close(); - - EXPECT_EQ(read_len, len); - for(SizeT i = 0; i < len; ++i) { - EXPECT_EQ(read_data[i], i + 1); - } - local_file_system.DeleteFile(path); - EXPECT_FALSE(local_file_system.Exists(path)); -} - -TEST_F(LocalFileSystemTest, TestRename) { - using namespace infinity; - LocalFileSystem local_file_system; - String old_path = String(GetFullTmpDir()) + "/test_file_old.abc"; - String new_path = String(GetFullTmpDir()) + "/test_file_new.abc"; - - auto [file_handler, status] = - local_file_system.OpenFile(old_path, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kWriteLock); - if(!status.ok()) { - UnrecoverableError(status.message()); - } - - SizeT len = 10; - UniquePtr data_array = MakeUnique(len); - for(SizeT i = 0; i < len; ++i) { - data_array[i] = i + 1; - } - file_handler->Write(data_array.get(), len); - file_handler->Sync(); - file_handler->Close(); - - local_file_system.Rename(old_path, new_path); - - EXPECT_FALSE(local_file_system.Exists(old_path)); - EXPECT_TRUE(local_file_system.Exists(new_path)); - - local_file_system.DeleteFile(new_path); - EXPECT_FALSE(local_file_system.Exists(new_path)); -} - -TEST_F(LocalFileSystemTest, TestTruncate) { - using namespace infinity; - LocalFileSystem local_file_system; - String path = String(GetFullTmpDir()) + "/test_file_truncate.abc"; - - auto [file_handler, status] = - local_file_system.OpenFile(path, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kWriteLock); - if(!status.ok()) { - UnrecoverableError(status.message()); - } - - SizeT initial_len = 20; - UniquePtr data_array = MakeUnique(initial_len); - for(SizeT i = 0; i < initial_len; ++i) { - data_array[i] = i + 1; - } - file_handler->Write(data_array.get(), initial_len); - file_handler->Sync(); - file_handler->Close(); - - local_file_system.Truncate(path, 10); - - auto [truncated_handler, truncate_status] = - local_file_system.OpenFile(path, FileFlags::READ_FLAG, FileLockType::kNoLock); - if(!truncate_status.ok()) { - UnrecoverableError(truncate_status.message()); - } - - UniquePtr truncated_data = MakeUnique(10); - i64 read_len = truncated_handler->Read(truncated_data.get(), 10); - truncated_handler->Close(); - - EXPECT_EQ(read_len, 10); - for(SizeT i = 0; i < 10; ++i) { - EXPECT_EQ(truncated_data[i], i + 1); - } - - local_file_system.DeleteFile(path); - EXPECT_FALSE(local_file_system.Exists(path)); -} - -TEST_F(LocalFileSystemTest, TestAppend) { - using namespace infinity; - LocalFileSystem local_file_system; - String dst_path = String(GetFullTmpDir()) + "/test_file_append_dst.abc"; - String src_path = String(GetFullTmpDir()) + "/test_file_append_src.abc"; - - auto [src_handler, src_status] = - local_file_system.OpenFile(src_path, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kWriteLock); - if(!src_status.ok()) { - UnrecoverableError(src_status.message()); - } - - SizeT src_len = 10; - UniquePtr src_data = MakeUnique(src_len); - for(SizeT i = 0; i < src_len; ++i) { - src_data[i] = i + 1; - } - src_handler->Write(src_data.get(), src_len); - src_handler->Sync(); - src_handler->Close(); - - auto [dst_handler, dst_status] = - local_file_system.OpenFile(dst_path, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kWriteLock); - if(!dst_status.ok()) { - UnrecoverableError(dst_status.message()); - } - - SizeT dst_len = 5; - UniquePtr dst_data = MakeUnique(dst_len); - for(SizeT i = 0; i < dst_len; ++i) { - dst_data[i] = i + 10; - } - dst_handler->Write(dst_data.get(), dst_len); - dst_handler->Sync(); - dst_handler->Close(); - - local_file_system.AppendFile(dst_path, src_path); - - auto [appended_handler, append_status] = - local_file_system.OpenFile(dst_path, FileFlags::READ_FLAG, FileLockType::kNoLock); - if(!append_status.ok()) { - UnrecoverableError(append_status.message()); - } - - UniquePtr combined_data = MakeUnique(src_len + dst_len); - i64 read_len = appended_handler->Read(combined_data.get(), src_len + dst_len); - appended_handler->Close(); - - EXPECT_EQ(read_len, (i64)(src_len + dst_len)); - for(SizeT i = 0; i < dst_len; ++i) { - EXPECT_EQ(combined_data[i], i + 10); - } - for(SizeT i = dst_len; i < src_len + dst_len; ++i) { - EXPECT_EQ(combined_data[i], i - dst_len + 1); - } - - local_file_system.DeleteFile(src_path); - local_file_system.DeleteFile(dst_path); - EXPECT_FALSE(local_file_system.Exists(src_path)); - EXPECT_FALSE(local_file_system.Exists(dst_path)); -} - -TEST_F(LocalFileSystemTest, TestCleanDir) { - using namespace infinity; - LocalFileSystem local_file_system; - String dir = String(GetFullTmpDir()) + "/cleanup_test_dir"; - String file_path1 = dir + "/file1.txt"; - String file_path2 = dir + "/file2.txt"; - - local_file_system.CreateDirectory(dir); - - auto [file_handler1, status1] = - local_file_system.OpenFile(file_path1, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kWriteLock); - if(!status1.ok()) { - UnrecoverableError(status1.message()); - } - SizeT len1 = 10; - UniquePtr data_array1 = MakeUnique(len1); - for(SizeT i = 0; i < len1; ++i) { - data_array1[i] = i + 1; - } - file_handler1->Write(data_array1.get(), len1); - file_handler1->Sync(); - file_handler1->Close(); - - auto [file_handler2, status2] = - local_file_system.OpenFile(file_path2, FileFlags::WRITE_FLAG | FileFlags::TRUNCATE_CREATE, FileLockType::kWriteLock); - if(!status2.ok()) { - UnrecoverableError(status2.message()); - } - SizeT len2 = 20; - UniquePtr data_array2 = MakeUnique(len2); - for(SizeT i = 0; i < len2; ++i) { - data_array2[i] = i + 11; - } - file_handler2->Write(data_array2.get(), len2); - file_handler2->Sync(); - file_handler2->Close(); - - local_file_system.CleanupDirectory(dir); - - EXPECT_FALSE(local_file_system.Exists(file_path1)); - EXPECT_FALSE(local_file_system.Exists(file_path2)); - EXPECT_TRUE(local_file_system.Exists(dir)); -} \ No newline at end of file diff --git a/src/unit_test/storage/io/stream_io.cpp b/src/unit_test/storage/io/stream_reader.cpp similarity index 67% rename from src/unit_test/storage/io/stream_io.cpp rename to src/unit_test/storage/io/stream_reader.cpp index cfe5a0dece..b9a2e23c57 100644 --- a/src/unit_test/storage/io/stream_io.cpp +++ b/src/unit_test/storage/io/stream_reader.cpp @@ -1,4 +1,5 @@ #include + import base_test; import infinity_exception; @@ -10,38 +11,36 @@ import logger; import file_writer; import file_reader; import infinity_context; -import stream_io; -import file_system_type; +import stream_reader; +import virtual_store; using namespace infinity; -class StreamIOTest : public BaseTest {}; +class StreamReaderTest : public BaseTest {}; -TEST_F(StreamIOTest, TestBasicStreamIO) { +TEST_F(StreamReaderTest, TestBasicStreamIO) { String path = String(GetFullTmpDir()) + "/test_streamio.abc"; FileWriter file_writer(path, 128); String lines[5]; - lines[0] = "hahahahha"; + lines[0] = "hahahahha"; lines[1] = "xixixixiix"; lines[2] = "huhuhuhu"; - lines[3]= "xuxuxuxuxxu"; + lines[3] = "xuxuxuxuxxu"; lines[4] = "ddddd"; - for(i64 i = 0; i < 5; i++) { + for (i64 i = 0; i < 5; i++) { file_writer.Write(lines[i].c_str(), lines[i].size()); file_writer.Write("\n", 1); } file_writer.Sync(); - StreamIO stream; - stream.Init(path, FileFlags::READ_FLAG); + UniquePtr stream = LocalStore::OpenStreamReader(path); i32 i = 0; String line; - while(stream.ReadLine(line)) { + while (stream->ReadLine(line)) { EXPECT_STREQ(line.c_str(), lines[i].c_str()); i++; } - stream.Close(); EXPECT_EQ(i, 5); } \ No newline at end of file diff --git a/src/unit_test/storage/io/virtual_store.cpp b/src/unit_test/storage/io/virtual_store.cpp new file mode 100644 index 0000000000..c0e4c5ce4c --- /dev/null +++ b/src/unit_test/storage/io/virtual_store.cpp @@ -0,0 +1,271 @@ +// Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "gtest/gtest.h" +import base_test; + +import infinity_exception; + +import stl; +import global_resource_usage; +import third_party; +import logger; + +import file_writer; +import file_reader; +import infinity_context; +import virtual_store; +import abstract_file_handle; + +using namespace infinity; + +class VirtualStoreTest : public BaseTest {}; + +TEST_F(VirtualStoreTest, file_write) { + using namespace infinity; + String path = String(GetFullTmpDir()) + "/test_file2.abc"; + + auto [file_handle, status] = LocalStore::Open(path, FileAccessMode::kWrite); + if (!status.ok()) { + UnrecoverableError(status.message()); + } + + SizeT len = 10; + UniquePtr data_array = MakeUnique(len); + for (SizeT i = 0; i < len; ++i) { + data_array[i] = i + 1; + } + file_handle->Append(data_array.get(), len); + file_handle->Sync(); + LocalStore::DeleteFile(path); + EXPECT_FALSE(LocalStore::Exists(path)); +} + +TEST_F(VirtualStoreTest, dir_ops) { + using namespace infinity; + String dir = String(GetFullTmpDir()) + "/unit_test"; + String path = dir + "/test_file.test"; + + LocalStore::MakeDirectory(dir); + + auto [file_handle, status] = LocalStore::Open(path, FileAccessMode::kWrite); + if (!status.ok()) { + UnrecoverableError(status.message()); + } + + SizeT len = 10; + UniquePtr data_array = MakeUnique(len); + for (SizeT i = 0; i < len; ++i) { + data_array[i] = i + 1; + } + file_handle->Append(data_array.get(), len); + file_handle->Sync(); + + LocalStore::RemoveDirectory(dir); + EXPECT_FALSE(LocalStore::Exists(path)); + EXPECT_FALSE(LocalStore::Exists(dir)); +} + +TEST_F(VirtualStoreTest, TestRead) { + using namespace infinity; + String path = String(GetFullTmpDir()) + "/test_file_read.abc"; + + auto [file_handle, open_write_status] = LocalStore::Open(path, FileAccessMode::kWrite); + if (!open_write_status.ok()) { + UnrecoverableError(open_write_status.message()); + } + + SizeT len = 10; + UniquePtr write_data = MakeUnique(len); + for (SizeT i = 0; i < len; ++i) { + write_data[i] = i + 1; + } + file_handle->Append(write_data.get(), len); + file_handle->Sync(); + + auto [read_handle, open_read_status] = LocalStore::Open(path, FileAccessMode::kRead); + if (!open_read_status.ok()) { + UnrecoverableError(open_read_status.message()); + } + + UniquePtr read_data = MakeUnique(len); + auto [read_len, read_status] = read_handle->Read(read_data.get(), len); + EXPECT_TRUE(read_status.ok()); + + EXPECT_EQ(read_len, len); + for (SizeT i = 0; i < len; ++i) { + EXPECT_EQ(read_data[i], i + 1); + } + LocalStore::DeleteFile(path); + EXPECT_FALSE(LocalStore::Exists(path)); +} + +TEST_F(VirtualStoreTest, TestRename) { + using namespace infinity; + String old_path = String(GetFullTmpDir()) + "/test_file_old.abc"; + String new_path = String(GetFullTmpDir()) + "/test_file_new.abc"; + + auto [file_handle, status] = LocalStore::Open(old_path, FileAccessMode::kWrite); + if (!status.ok()) { + UnrecoverableError(status.message()); + } + + SizeT len = 10; + UniquePtr data_array = MakeUnique(len); + for (SizeT i = 0; i < len; ++i) { + data_array[i] = i + 1; + } + file_handle->Append(data_array.get(), len); + file_handle->Sync(); + + LocalStore::Rename(old_path, new_path); + + EXPECT_FALSE(LocalStore::Exists(old_path)); + EXPECT_TRUE(LocalStore::Exists(new_path)); + + LocalStore::DeleteFile(new_path); + EXPECT_FALSE(LocalStore::Exists(new_path)); +} + +TEST_F(VirtualStoreTest, TestTruncate) { + using namespace infinity; + String path = String(GetFullTmpDir()) + "/test_file_truncate.abc"; + + auto [file_handle, status] = LocalStore::Open(path, FileAccessMode::kWrite); + if (!status.ok()) { + UnrecoverableError(status.message()); + } + + SizeT initial_len = 20; + UniquePtr data_array = MakeUnique(initial_len); + for (SizeT i = 0; i < initial_len; ++i) { + data_array[i] = i + 1; + } + file_handle->Append(data_array.get(), initial_len); + file_handle->Sync(); + + LocalStore::Truncate(path, 10); + + auto [truncated_handle, truncate_status] = LocalStore::Open(path, FileAccessMode::kRead); + if (!truncate_status.ok()) { + UnrecoverableError(truncate_status.message()); + } + + UniquePtr truncated_data = MakeUnique(10); + auto [read_len, read_status] = truncated_handle->Read(truncated_data.get(), 10); + EXPECT_TRUE(read_status.ok()); + + EXPECT_EQ(read_len, 10); + for (SizeT i = 0; i < 10; ++i) { + EXPECT_EQ(truncated_data[i], i + 1); + } + + LocalStore::DeleteFile(path); + EXPECT_FALSE(LocalStore::Exists(path)); +} + +TEST_F(VirtualStoreTest, TestAppend) { + using namespace infinity; + String dst_path = String(GetFullTmpDir()) + "/test_file_append_dst.abc"; + String src_path = String(GetFullTmpDir()) + "/test_file_append_src.abc"; + + auto [src_handler, src_status] = LocalStore::Open(src_path, FileAccessMode::kWrite); + if (!src_status.ok()) { + UnrecoverableError(src_status.message()); + } + + SizeT src_len = 10; + UniquePtr src_data = MakeUnique(src_len); + for (SizeT i = 0; i < src_len; ++i) { + src_data[i] = i + 1; + } + src_handler->Append(src_data.get(), src_len); + src_handler->Sync(); + + auto [dst_handler, dst_status] = LocalStore::Open(dst_path, FileAccessMode::kWrite); + if (!dst_status.ok()) { + UnrecoverableError(dst_status.message()); + } + + SizeT dst_len = 5; + UniquePtr dst_data = MakeUnique(dst_len); + for (SizeT i = 0; i < dst_len; ++i) { + dst_data[i] = i + 10; + } + dst_handler->Append(dst_data.get(), dst_len); + dst_handler->Sync(); + + LocalStore::Merge(dst_path, src_path); + + auto [appended_handle, append_status] = LocalStore::Open(dst_path, FileAccessMode::kRead); + if (!append_status.ok()) { + UnrecoverableError(append_status.message()); + } + + UniquePtr combined_data = MakeUnique(src_len + dst_len); + auto [read_len, read_status] = appended_handle->Read(combined_data.get(), src_len + dst_len); + EXPECT_TRUE(read_status.ok()); + + EXPECT_EQ(read_len, (i64)(src_len + dst_len)); + for (SizeT i = 0; i < dst_len; ++i) { + EXPECT_EQ(combined_data[i], i + 10); + } + for (SizeT i = dst_len; i < src_len + dst_len; ++i) { + EXPECT_EQ(combined_data[i], i - dst_len + 1); + } + + LocalStore::DeleteFile(src_path); + LocalStore::DeleteFile(dst_path); + EXPECT_FALSE(LocalStore::Exists(src_path)); + EXPECT_FALSE(LocalStore::Exists(dst_path)); +} + +TEST_F(VirtualStoreTest, TestCleanDir) { + using namespace infinity; + String dir = String(GetFullTmpDir()) + "/cleanup_test_dir"; + String file_path1 = dir + "/file1.txt"; + String file_path2 = dir + "/file2.txt"; + + LocalStore::MakeDirectory(dir); + + auto [file_handler1, status1] = LocalStore::Open(file_path1, FileAccessMode::kWrite); + if (!status1.ok()) { + UnrecoverableError(status1.message()); + } + SizeT len1 = 10; + UniquePtr data_array1 = MakeUnique(len1); + for (SizeT i = 0; i < len1; ++i) { + data_array1[i] = i + 1; + } + file_handler1->Append(data_array1.get(), len1); + file_handler1->Sync(); + + auto [file_handler2, status2] = LocalStore::Open(file_path2, FileAccessMode::kWrite); + if (!status2.ok()) { + UnrecoverableError(status2.message()); + } + SizeT len2 = 20; + UniquePtr data_array2 = MakeUnique(len2); + for (SizeT i = 0; i < len2; ++i) { + data_array2[i] = i + 11; + } + file_handler2->Append(data_array2.get(), len2); + file_handler2->Sync(); + + LocalStore::CleanupDirectory(dir); + + EXPECT_FALSE(LocalStore::Exists(file_path1)); + EXPECT_FALSE(LocalStore::Exists(file_path2)); + EXPECT_TRUE(LocalStore::Exists(dir)); +} \ No newline at end of file diff --git a/src/unit_test/storage/knnindex/emvb_search/test_emvb.cpp b/src/unit_test/storage/knnindex/emvb_search/test_emvb.cpp index 76e8dde8ae..88c3109263 100644 --- a/src/unit_test/storage/knnindex/emvb_search/test_emvb.cpp +++ b/src/unit_test/storage/knnindex/emvb_search/test_emvb.cpp @@ -20,7 +20,6 @@ import stl; import emvb_search; import emvb_product_quantization; import emvb_shared_vec; -import file_system; import local_file_handle; namespace infinity { diff --git a/src/unit_test/storage/knnindex/knn_diskann/test_build_diskann.cpp b/src/unit_test/storage/knnindex/knn_diskann/test_build_diskann.cpp index c082b2bc63..4a328790ff 100644 --- a/src/unit_test/storage/knnindex/knn_diskann/test_build_diskann.cpp +++ b/src/unit_test/storage/knnindex/knn_diskann/test_build_diskann.cpp @@ -20,10 +20,7 @@ import base_test; import infinity_exception; import knn_diskann; import internal_types; -import file_system; -import file_system_type; import virtual_store; -import file_system_type; import index_base; import diskann_index_data; import abstract_file_handle; diff --git a/src/unit_test/storage/knnindex/knn_hnsw/test_hnsw.cpp b/src/unit_test/storage/knnindex/knn_hnsw/test_hnsw.cpp index 302b9c6911..1e4b083fd7 100644 --- a/src/unit_test/storage/knnindex/knn_hnsw/test_hnsw.cpp +++ b/src/unit_test/storage/knnindex/knn_hnsw/test_hnsw.cpp @@ -19,8 +19,6 @@ import base_test; import stl; import hnsw_alg; -import file_system; -import file_system_type; #pragma clang diagnostic push #pragma clang diagnostic ignored "-Wunused-variable" diff --git a/src/unit_test/storage/knnindex/knn_hnsw/test_hnsw_sparse.cpp b/src/unit_test/storage/knnindex/knn_hnsw/test_hnsw_sparse.cpp index ff9aefbf51..20f70676f2 100644 --- a/src/unit_test/storage/knnindex/knn_hnsw/test_hnsw_sparse.cpp +++ b/src/unit_test/storage/knnindex/knn_hnsw/test_hnsw_sparse.cpp @@ -18,10 +18,8 @@ import base_test; import stl; import hnsw_alg; import vec_store_type; -import file_system; import hnsw_common; import sparse_util; -import file_system_type; import compilation_config; import infinity_exception; import third_party; diff --git a/src/unit_test/storage/knnindex/knn_hnsw/test_lvq.cpp b/src/unit_test/storage/knnindex/knn_hnsw/test_lvq.cpp index c7e68c3df8..464a1678ba 100644 --- a/src/unit_test/storage/knnindex/knn_hnsw/test_lvq.cpp +++ b/src/unit_test/storage/knnindex/knn_hnsw/test_lvq.cpp @@ -16,8 +16,6 @@ #include import base_test; -import file_system; -import file_system_type; import dist_func_l2; import data_store; import vec_store_type; diff --git a/src/unit_test/storage/knnindex/knn_sparse/test_bmp_index.cpp b/src/unit_test/storage/knnindex/knn_sparse/test_bmp_index.cpp index 484f8af595..4ab9963410 100644 --- a/src/unit_test/storage/knnindex/knn_sparse/test_bmp_index.cpp +++ b/src/unit_test/storage/knnindex/knn_sparse/test_bmp_index.cpp @@ -22,8 +22,6 @@ import bmp_util; import sparse_util; import third_party; import compilation_config; -import file_system; -import file_system_type; import sparse_test_util; import infinity_exception; import virtual_store; diff --git a/src/unit_test/storage/knnindex/knn_sparse/test_bp_reordering.cpp b/src/unit_test/storage/knnindex/knn_sparse/test_bp_reordering.cpp index 84eb09136c..c162029edb 100644 --- a/src/unit_test/storage/knnindex/knn_sparse/test_bp_reordering.cpp +++ b/src/unit_test/storage/knnindex/knn_sparse/test_bp_reordering.cpp @@ -20,10 +20,8 @@ import stl; import bp_reordering; import third_party; import infinity_exception; -import file_system; import virtual_store; import compilation_config; -import file_system_type; import sparse_util; import abstract_file_handle; import local_file_handle; diff --git a/src/unit_test/storage/meta/entry/block_version.cpp b/src/unit_test/storage/meta/entry/block_version.cpp index c5e399bd75..1efe89f5d4 100644 --- a/src/unit_test/storage/meta/entry/block_version.cpp +++ b/src/unit_test/storage/meta/entry/block_version.cpp @@ -23,11 +23,9 @@ import global_resource_usage; import third_party; import infinity_context; import block_version; -import file_system; import virtual_store; import virtual_storage_type; import abstract_file_handle; -import file_system_type; import buffer_manager; import version_file_worker; import column_vector; diff --git a/src/unit_test/storage/persistence/persistence_manager.cpp b/src/unit_test/storage/persistence/persistence_manager.cpp index 4263faf139..c94e94614d 100644 --- a/src/unit_test/storage/persistence/persistence_manager.cpp +++ b/src/unit_test/storage/persistence/persistence_manager.cpp @@ -5,7 +5,6 @@ import persistence_manager; import virtual_store; import virtual_storage_type; import abstract_file_handle; -import file_system_type; import third_party; import persist_result_handler;