diff --git a/src/executor/operator/physical_export.cpp b/src/executor/operator/physical_export.cpp index c706c14e95..acfc810097 100644 --- a/src/executor/operator/physical_export.cpp +++ b/src/executor/operator/physical_export.cpp @@ -32,7 +32,7 @@ import column_def; import block_entry; import column_vector; import value; -import local_file_system; +import virtual_store; import file_system; import file_system_type; import defer_op; @@ -102,12 +102,18 @@ SizeT PhysicalExport::ExportToCSV(QueryContext *query_context, ExportOperatorSta SizeT select_column_count = select_columns.size(); - LocalFileSystem fs; - auto [file_handle, status] = fs.OpenFile(file_path_, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock); + String parent_path = LocalStore::GetParentPath(file_path_); + if(!parent_path.empty()) { + Status create_status = LocalStore::MakeDirectory(parent_path); + if(!create_status.ok()) { + RecoverableError(create_status); + } + } + + auto [file_handle, status] = LocalStore::Open(file_path_, FileAccessMode::kWrite); if (!status.ok()) { RecoverableError(status); } - DeferFn file_defer([&]() { fs.Close(*file_handle); }); if (header_) { // Output CSV header @@ -138,7 +144,7 @@ SizeT PhysicalExport::ExportToCSV(QueryContext *query_context, ExportOperatorSta header += '\n'; } } - fs.Write(*file_handle, header.c_str(), header.size()); + file_handle->Append(header.c_str(), header.size()); } SizeT offset = offset_; @@ -220,17 +226,14 @@ SizeT PhysicalExport::ExportToCSV(QueryContext *query_context, ExportOperatorSta if (row_count > 0 && this->row_limit_ != 0 && (row_count % this->row_limit_) == 0) { ++file_no_; - fs.Close(*file_handle); String new_file_path = fmt::format("{}.part{}", file_path_, file_no_); - auto result = fs.OpenFile(new_file_path, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock); - if (!result.second.ok()) { - RecoverableError(result.second); + auto [new_file_handle, new_status] = LocalStore::Open(new_file_path, FileAccessMode::kWrite); + if (!new_status.ok()) { + RecoverableError(new_status); } - file_handle = std::move(result.first); + file_handle = std::move(new_file_handle); } - - fs.Write(*file_handle, line.c_str(), line.size()); - + file_handle->Append(line.c_str(), line.size()); ++row_count; if (limit_ != 0 && row_count == limit_) { return row_count; @@ -238,6 +241,7 @@ SizeT PhysicalExport::ExportToCSV(QueryContext *query_context, ExportOperatorSta } } } + LOG_DEBUG(fmt::format("Export to CSV, db {}, table {}, file: {}, row: {}", schema_name_, table_name_, file_path_, row_count)); return row_count; } @@ -260,6 +264,14 @@ SizeT PhysicalExport::ExportToJSONL(QueryContext *query_context, ExportOperatorS SizeT select_column_count = select_columns.size(); + String parent_path = LocalStore::GetParentPath(file_path_); + if(!parent_path.empty()) { + Status create_status = LocalStore::MakeDirectory(parent_path); + if(!create_status.ok()) { + RecoverableError(create_status); + } + } + auto [file_handle, status] = LocalStore::Open(file_path_, FileAccessMode::kWrite); if (!status.ok()) { RecoverableError(status); @@ -348,6 +360,7 @@ SizeT PhysicalExport::ExportToJSONL(QueryContext *query_context, ExportOperatorS } if (row_count > 0 && this->row_limit_ != 0 && (row_count % this->row_limit_) == 0) { ++file_no_; + String new_file_path = fmt::format("{}.part{}", file_path_, file_no_); auto [part_file_handle, part_status] = LocalStore::Open(new_file_path, FileAccessMode::kWrite); if (!part_status.ok()) { @@ -359,6 +372,7 @@ SizeT PhysicalExport::ExportToJSONL(QueryContext *query_context, ExportOperatorS // LOG_DEBUG(line_json.dump()); String to_write = line_json.dump() + "\n"; file_handle->Append(to_write.c_str(), to_write.size()); + ++row_count; if (limit_ != 0 && row_count == limit_) { return row_count; @@ -399,12 +413,18 @@ SizeT PhysicalExport::ExportToFVECS(QueryContext *query_context, ExportOperatorS i32 dimension = embedding_type_info->Dimension(); - LocalFileSystem fs; - auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock); + String parent_path = LocalStore::GetParentPath(file_path_); + if(!parent_path.empty()) { + Status create_status = LocalStore::MakeDirectory(parent_path); + if(!create_status.ok()) { + RecoverableError(create_status); + } + } + + auto [file_handle, status] = LocalStore::Open(file_path_, FileAccessMode::kWrite); if (!status.ok()) { RecoverableError(status); } - DeferFn file_defer([&]() { fs.Close(*file_handler); }); SizeT offset = offset_; SizeT row_count{0}; @@ -438,17 +458,17 @@ SizeT PhysicalExport::ExportToFVECS(QueryContext *query_context, ExportOperatorS if (row_count > 0 && this->row_limit_ != 0 && (row_count % this->row_limit_) == 0) { ++file_no_; - fs.Close(*file_handler); String new_file_path = fmt::format("{}.part{}", file_path_, file_no_); - auto result = fs.OpenFile(new_file_path, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock); - if (!result.second.ok()) { - RecoverableError(result.second); + auto [new_file_handle, new_status] = LocalStore::Open(new_file_path, FileAccessMode::kWrite); + if (!new_status.ok()) { + RecoverableError(new_status); } - file_handler = std::move(result.first); + file_handle = std::move(new_file_handle); } - fs.Write(*file_handler, &dimension, sizeof(dimension)); - fs.Write(*file_handler, embedding.data(), embedding.size_bytes()); + file_handle->Append(&dimension, sizeof(dimension)); + file_handle->Append(embedding.data(), embedding.size_bytes()); + ++row_count; if (limit_ != 0 && row_count == limit_) { return row_count; @@ -488,6 +508,14 @@ SizeT PhysicalExport::ExportToPARQUET(QueryContext *query_context, ExportOperato SharedPtr<::arrow::io::FileOutputStream> file_stream; SharedPtr<::parquet::arrow::FileWriter> file_writer; + String parent_path = LocalStore::GetParentPath(file_path_); + if(!parent_path.empty()) { + Status create_status = LocalStore::MakeDirectory(parent_path); + if(!create_status.ok()) { + RecoverableError(create_status); + } + } + file_stream = ::arrow::io::FileOutputStream::Open(file_path_, pool).ValueOrDie(); file_writer = ::parquet::arrow::FileWriter::Open(*schema, pool, file_stream, ::parquet::default_writer_properties()).ValueOrDie(); diff --git a/src/executor/operator/physical_import.cpp b/src/executor/operator/physical_import.cpp index cf0f7fc6d0..421cc00312 100644 --- a/src/executor/operator/physical_import.cpp +++ b/src/executor/operator/physical_import.cpp @@ -40,7 +40,6 @@ import expression_state; import data_block; import logger; import third_party; -import local_file_system; import defer_op; import txn_store; @@ -137,17 +136,17 @@ void PhysicalImport::ImportFVECS(QueryContext *query_context, ImportOperatorStat RecoverableError(status); } - LocalFileSystem fs; - - auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock); - if (!status.ok()) { - UnrecoverableError(status.message()); + auto [file_handle, status_open] = LocalStore::Open(file_path_, FileAccessMode::kRead); + if (!status_open.ok()) { + UnrecoverableError(status_open.message()); } - DeferFn defer_fn([&]() { fs.Close(*file_handler); }); i32 dimension = 0; - i64 nbytes = fs.Read(*file_handler, &dimension, sizeof(dimension)); - fs.Seek(*file_handler, 0); + auto [nbytes, status_read] = file_handle->Read(&dimension, sizeof(dimension)); + if (!status_read.ok()) { + RecoverableError(status_read); + } + file_handle->Seek(0); if (nbytes == 0) { // file is empty auto result_msg = MakeUnique("IMPORT 0 Rows"); @@ -164,7 +163,10 @@ void PhysicalImport::ImportFVECS(QueryContext *query_context, ImportOperatorStat fmt::format("Dimension in file ({}) doesn't match with table definition ({}).", dimension, embedding_info->Dimension())); RecoverableError(status); } - SizeT file_size = fs.GetFileSize(*file_handler); + i64 file_size = file_handle->FileSize(); + if (file_size == -1) { + UnrecoverableError("Can't get file size"); + } SizeT row_size = dimension * sizeof(FloatT) + sizeof(dimension); if (file_size % row_size != 0) { String error_message = "Weird file size."; @@ -183,14 +185,17 @@ void PhysicalImport::ImportFVECS(QueryContext *query_context, ImportOperatorStat auto buf_ptr = static_cast(buffer_handle.GetDataMut()); while (true) { i32 dim; - nbytes = fs.Read(*file_handler, &dim, sizeof(dimension)); + auto [nbytes, status_read] = file_handle->Read(&dim, sizeof(dimension)); + if(!status_read.ok()) { + RecoverableError(status_read); + } if (dim != dimension or nbytes != sizeof(dimension)) { - Status status = + Status status_error = Status::ImportFileFormatError(fmt::format("Dimension in file ({}) doesn't match with table definition ({}).", dim, dimension)); - RecoverableError(status); + RecoverableError(status_error); } ptr_t dst_ptr = buf_ptr + block_entry->row_count() * sizeof(FloatT) * dimension; - fs.Read(*file_handler, dst_ptr, sizeof(FloatT) * dimension); + file_handle->Read(dst_ptr, sizeof(FloatT) * dimension); block_entry->IncreaseRowCount(1); ++row_idx; @@ -239,17 +244,17 @@ void PhysicalImport::ImportBVECS(QueryContext *query_context, ImportOperatorStat RecoverableError(status); } - LocalFileSystem fs; - - auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock); - if (!status.ok()) { - UnrecoverableError(status.message()); + auto [file_handle, status_open] = LocalStore::Open(file_path_, FileAccessMode::kRead); + if (!status_open.ok()) { + UnrecoverableError(status_open.message()); } - DeferFn defer_fn([&]() { fs.Close(*file_handler); }); i32 dimension = 0; - i64 nbytes = fs.Read(*file_handler, &dimension, sizeof(dimension)); - fs.Seek(*file_handler, 0); + auto [nbytes, status_read] = file_handle->Read(&dimension, sizeof(dimension)); + if (!status_read.ok()) { + RecoverableError(status_read); + } + file_handle->Seek(0); if (nbytes == 0) { // file is empty auto result_msg = MakeUnique("IMPORT 0 Rows"); @@ -266,7 +271,10 @@ void PhysicalImport::ImportBVECS(QueryContext *query_context, ImportOperatorStat fmt::format("Dimension in file ({}) doesn't match with table definition ({}).", dimension, embedding_info->Dimension())); RecoverableError(status); } - SizeT file_size = fs.GetFileSize(*file_handler); + i64 file_size = file_handle->FileSize(); + if (file_size == -1) { + UnrecoverableError("Can't get file size"); + } SizeT row_size = dimension * sizeof(u8) + sizeof(dimension); if (file_size % row_size != 0) { String error_message = "Weird file size."; @@ -287,13 +295,17 @@ void PhysicalImport::ImportBVECS(QueryContext *query_context, ImportOperatorStat UniquePtr u8_buffer = MakeUniqueForOverwrite(sizeof(u8) * dimension); while (true) { i32 dim; - nbytes = fs.Read(*file_handler, &dim, sizeof(dimension)); + auto [nbytes, status] = file_handle->Read(&dim, sizeof(dimension)); + if (!status.ok()) { + RecoverableError(status); + } + if (dim != dimension or nbytes != sizeof(dimension)) { - Status status = + Status error_status = Status::ImportFileFormatError(fmt::format("Dimension in file ({}) doesn't match with table definition ({}).", dim, dimension)); - RecoverableError(status); + RecoverableError(error_status); } - fs.Read(*file_handler, u8_buffer.get(), sizeof(u8) * dimension); + file_handle->Read(u8_buffer.get(), sizeof(u8) * dimension); u8 *dst_ptr = reinterpret_cast(buf_ptr + block_entry->row_count() * sizeof(u8) * dimension); for (i32 i = 0; i < dimension; ++i) { @@ -386,8 +398,7 @@ void PhysicalImport::ImportCSR(QueryContext *query_context, ImportOperatorState RecoverableError(status); } - LocalFileSystem fs; - auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock); + auto [file_handle, status] = LocalStore::Open(file_path_, FileAccessMode::kRead); if (!status.ok()) { UnrecoverableError(status.message()); } @@ -395,31 +406,33 @@ void PhysicalImport::ImportCSR(QueryContext *query_context, ImportOperatorState i64 nrow = 0; i64 ncol = 0; i64 nnz = 0; - file_handler->Read(&nrow, sizeof(nrow)); - file_handler->Read(&ncol, sizeof(ncol)); - file_handler->Read(&nnz, sizeof(nnz)); + file_handle->Read(&nrow, sizeof(nrow)); + file_handle->Read(&ncol, sizeof(ncol)); + file_handle->Read(&nnz, sizeof(nnz)); - SizeT file_size = fs.GetFileSize(*file_handler); - if (file_size != 3 * sizeof(i64) + (nrow + 1) * sizeof(i64) + nnz * sizeof(i32) + nnz * sizeof(FloatT)) { + i64 file_size = file_handle->FileSize(); + if ((SizeT)file_size != 3 * sizeof(i64) + (nrow + 1) * sizeof(i64) + nnz * sizeof(i32) + nnz * sizeof(FloatT)) { String error_message = "Invalid CSR file format."; UnrecoverableError(error_message); } i64 prev_off = 0; - file_handler->Read(&prev_off, sizeof(i64)); + file_handle->Read(&prev_off, sizeof(i64)); if (prev_off != 0) { String error_message = "Invalid CSR file format."; UnrecoverableError(error_message); } - auto [idx_reader, idx_status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock); + auto [idx_file_handle, idx_status] = LocalStore::Open(file_path_, FileAccessMode::kRead); if (!idx_status.ok()) { UnrecoverableError(idx_status.message()); } - fs.Seek(*idx_reader, 3 * sizeof(i64) + (nrow + 1) * sizeof(i64)); - auto [data_reader, data_status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock); + + idx_file_handle->Seek(3 * sizeof(i64) + (nrow + 1) * sizeof(i64)); + auto [data_file_handle, data_status] = LocalStore::Open(file_path_, FileAccessMode::kRead); if (!data_status.ok()) { UnrecoverableError(data_status.message()); } - fs.Seek(*data_reader, 3 * sizeof(i64) + (nrow + 1) * sizeof(i64) + nnz * sizeof(i32)); + + data_file_handle->Seek(3 * sizeof(i64) + (nrow + 1) * sizeof(i64) + nnz * sizeof(i32)); //------------------------------------------------------------------------------------------------------------------------ @@ -435,13 +448,13 @@ void PhysicalImport::ImportCSR(QueryContext *query_context, ImportOperatorState i64 row_id = 0; while (true) { i64 off = 0; - file_handler->Read(&off, sizeof(i64)); + file_handle->Read(&off, sizeof(i64)); i64 nnz = off - prev_off; SizeT data_len = sparse_info->DataSize(nnz); auto tmp_indice_ptr = MakeUnique(sizeof(i32) * nnz); auto data_ptr = MakeUnique(data_len); - idx_reader->Read(tmp_indice_ptr.get(), sizeof(i32) * nnz); - data_reader->Read(data_ptr.get(), data_len); + idx_file_handle->Read(tmp_indice_ptr.get(), sizeof(i32) * nnz); + data_file_handle->Read(data_ptr.get(), data_len); auto indice_ptr = ConvertCSRIndice(std::move(tmp_indice_ptr), sparse_info.get(), nnz); auto value = Value::MakeSparse(nnz, std::move(indice_ptr), std::move(data_ptr), sparse_info); @@ -637,17 +650,21 @@ void PhysicalImport::ImportJSONL(QueryContext *query_context, ImportOperatorStat void PhysicalImport::ImportJSON(QueryContext *query_context, ImportOperatorState *import_op_state) { nlohmann::json json_arr; { - LocalFileSystem fs; - auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock); + auto [file_handle, status] = LocalStore::Open(file_path_, FileAccessMode::kRead); if (!status.ok()) { UnrecoverableError(status.message()); } - DeferFn file_defer([&]() { fs.Close(*file_handler); }); - SizeT file_size = fs.GetFileSize(*file_handler); + i64 file_size = file_handle->FileSize(); + if (file_size == -1) { + UnrecoverableError("Can't get file size"); + } String json_str(file_size, 0); - SizeT read_n = file_handler->Read(json_str.data(), file_size); - if (read_n != file_size) { + auto [read_n, status_read] = file_handle->Read(json_str.data(), file_size); + if(!status_read.ok()) { + UnrecoverableError(status_read.message()); + } + if ((i64)read_n != file_size) { String error_message = fmt::format("Read file size {} doesn't match with file size {}.", read_n, file_size); UnrecoverableError(error_message); } diff --git a/src/storage/knn_index/knn_diskann/diskann_index_data.cppm b/src/storage/knn_index/knn_diskann/diskann_index_data.cppm index ed1dc85637..d270c2f642 100644 --- a/src/storage/knn_index/knn_diskann/diskann_index_data.cppm +++ b/src/storage/knn_index/knn_diskann/diskann_index_data.cppm @@ -190,9 +190,9 @@ private: SharedPtr &train_data_ids) { train_data_handle.Append(&this->train_size_, sizeof(u32)); train_data_handle.Append(&this->dimension_, sizeof(u32)); - train_data_handle.Append(&this->train_size_, sizeof(u32)); + train_ids_handle.Append(&this->train_size_, sizeof(u32)); u32 const_one = 1; - train_data_handle.Append(&const_one, sizeof(u32)); + train_ids_handle.Append(&const_one, sizeof(u32)); for (u32 i = 0; i < this->train_size_; i++) { train_data_handle.Append(train_data.get() + i * this->dimension_, sizeof(VectorDataType) * this->dimension_); train_ids_handle.Append(train_data_ids.get() + i, sizeof(SizeT)); diff --git a/src/storage/knn_index/knn_diskann/inner/diskann_mem_data_store.cppm b/src/storage/knn_index/knn_diskann/inner/diskann_mem_data_store.cppm index f246d41e85..79a1ebfd96 100644 --- a/src/storage/knn_index/knn_diskann/inner/diskann_mem_data_store.cppm +++ b/src/storage/knn_index/knn_diskann/inner/diskann_mem_data_store.cppm @@ -98,8 +98,8 @@ public: } } - void Load(FileHandler &load_file_handler) {} - void Save(FileHandler &save_file_handler) {} + void Load(LocalFileHandle &load_file_handle) {} + void Save(LocalFileHandle &save_file_handle) {} void PopulateData(DataType *vectors, SizeT num_pts) { memset(data_, 0, num_pts * aligned_dim_ * sizeof(DataType));