Skip to content

Commit

Permalink
Fix export directory issue (#1966)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

_Briefly describe what this PR aims to solve. Include background context
that will help reviewers understand the purpose of the PR._

Issue link:#1956

### Type of change

- [x] Bug Fix (non-breaking change which fixes an issue)
- [x] Refactoring

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Oct 4, 2024
1 parent aa64d3a commit 0fd78f0
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 75 deletions.
74 changes: 51 additions & 23 deletions src/executor/operator/physical_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import column_def;
import block_entry;
import column_vector;
import value;
import local_file_system;
import virtual_store;
import file_system;
import file_system_type;
import defer_op;
Expand Down Expand Up @@ -102,12 +102,18 @@ SizeT PhysicalExport::ExportToCSV(QueryContext *query_context, ExportOperatorSta

SizeT select_column_count = select_columns.size();

LocalFileSystem fs;
auto [file_handle, status] = fs.OpenFile(file_path_, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock);
String parent_path = LocalStore::GetParentPath(file_path_);
if(!parent_path.empty()) {
Status create_status = LocalStore::MakeDirectory(parent_path);
if(!create_status.ok()) {
RecoverableError(create_status);
}
}

auto [file_handle, status] = LocalStore::Open(file_path_, FileAccessMode::kWrite);
if (!status.ok()) {
RecoverableError(status);
}
DeferFn file_defer([&]() { fs.Close(*file_handle); });

if (header_) {
// Output CSV header
Expand Down Expand Up @@ -138,7 +144,7 @@ SizeT PhysicalExport::ExportToCSV(QueryContext *query_context, ExportOperatorSta
header += '\n';
}
}
fs.Write(*file_handle, header.c_str(), header.size());
file_handle->Append(header.c_str(), header.size());
}

SizeT offset = offset_;
Expand Down Expand Up @@ -220,24 +226,22 @@ SizeT PhysicalExport::ExportToCSV(QueryContext *query_context, ExportOperatorSta

if (row_count > 0 && this->row_limit_ != 0 && (row_count % this->row_limit_) == 0) {
++file_no_;
fs.Close(*file_handle);
String new_file_path = fmt::format("{}.part{}", file_path_, file_no_);
auto result = fs.OpenFile(new_file_path, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock);
if (!result.second.ok()) {
RecoverableError(result.second);
auto [new_file_handle, new_status] = LocalStore::Open(new_file_path, FileAccessMode::kWrite);
if (!new_status.ok()) {
RecoverableError(new_status);
}
file_handle = std::move(result.first);
file_handle = std::move(new_file_handle);
}

fs.Write(*file_handle, line.c_str(), line.size());

file_handle->Append(line.c_str(), line.size());
++row_count;
if (limit_ != 0 && row_count == limit_) {
return row_count;
}
}
}
}

LOG_DEBUG(fmt::format("Export to CSV, db {}, table {}, file: {}, row: {}", schema_name_, table_name_, file_path_, row_count));
return row_count;
}
Expand All @@ -260,6 +264,14 @@ SizeT PhysicalExport::ExportToJSONL(QueryContext *query_context, ExportOperatorS

SizeT select_column_count = select_columns.size();

String parent_path = LocalStore::GetParentPath(file_path_);
if(!parent_path.empty()) {
Status create_status = LocalStore::MakeDirectory(parent_path);
if(!create_status.ok()) {
RecoverableError(create_status);
}
}

auto [file_handle, status] = LocalStore::Open(file_path_, FileAccessMode::kWrite);
if (!status.ok()) {
RecoverableError(status);
Expand Down Expand Up @@ -348,6 +360,7 @@ SizeT PhysicalExport::ExportToJSONL(QueryContext *query_context, ExportOperatorS
}
if (row_count > 0 && this->row_limit_ != 0 && (row_count % this->row_limit_) == 0) {
++file_no_;

String new_file_path = fmt::format("{}.part{}", file_path_, file_no_);
auto [part_file_handle, part_status] = LocalStore::Open(new_file_path, FileAccessMode::kWrite);
if (!part_status.ok()) {
Expand All @@ -359,6 +372,7 @@ SizeT PhysicalExport::ExportToJSONL(QueryContext *query_context, ExportOperatorS
// LOG_DEBUG(line_json.dump());
String to_write = line_json.dump() + "\n";
file_handle->Append(to_write.c_str(), to_write.size());

++row_count;
if (limit_ != 0 && row_count == limit_) {
return row_count;
Expand Down Expand Up @@ -399,12 +413,18 @@ SizeT PhysicalExport::ExportToFVECS(QueryContext *query_context, ExportOperatorS

i32 dimension = embedding_type_info->Dimension();

LocalFileSystem fs;
auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock);
String parent_path = LocalStore::GetParentPath(file_path_);
if(!parent_path.empty()) {
Status create_status = LocalStore::MakeDirectory(parent_path);
if(!create_status.ok()) {
RecoverableError(create_status);
}
}

auto [file_handle, status] = LocalStore::Open(file_path_, FileAccessMode::kWrite);
if (!status.ok()) {
RecoverableError(status);
}
DeferFn file_defer([&]() { fs.Close(*file_handler); });

SizeT offset = offset_;
SizeT row_count{0};
Expand Down Expand Up @@ -438,17 +458,17 @@ SizeT PhysicalExport::ExportToFVECS(QueryContext *query_context, ExportOperatorS

if (row_count > 0 && this->row_limit_ != 0 && (row_count % this->row_limit_) == 0) {
++file_no_;
fs.Close(*file_handler);
String new_file_path = fmt::format("{}.part{}", file_path_, file_no_);
auto result = fs.OpenFile(new_file_path, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock);
if (!result.second.ok()) {
RecoverableError(result.second);
auto [new_file_handle, new_status] = LocalStore::Open(new_file_path, FileAccessMode::kWrite);
if (!new_status.ok()) {
RecoverableError(new_status);
}
file_handler = std::move(result.first);
file_handle = std::move(new_file_handle);
}

fs.Write(*file_handler, &dimension, sizeof(dimension));
fs.Write(*file_handler, embedding.data(), embedding.size_bytes());
file_handle->Append(&dimension, sizeof(dimension));
file_handle->Append(embedding.data(), embedding.size_bytes());

++row_count;
if (limit_ != 0 && row_count == limit_) {
return row_count;
Expand Down Expand Up @@ -488,6 +508,14 @@ SizeT PhysicalExport::ExportToPARQUET(QueryContext *query_context, ExportOperato
SharedPtr<::arrow::io::FileOutputStream> file_stream;
SharedPtr<::parquet::arrow::FileWriter> file_writer;

String parent_path = LocalStore::GetParentPath(file_path_);
if(!parent_path.empty()) {
Status create_status = LocalStore::MakeDirectory(parent_path);
if(!create_status.ok()) {
RecoverableError(create_status);
}
}

file_stream = ::arrow::io::FileOutputStream::Open(file_path_, pool).ValueOrDie();
file_writer = ::parquet::arrow::FileWriter::Open(*schema, pool, file_stream, ::parquet::default_writer_properties()).ValueOrDie();

Expand Down
113 changes: 65 additions & 48 deletions src/executor/operator/physical_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import expression_state;
import data_block;
import logger;
import third_party;
import local_file_system;
import defer_op;
import txn_store;

Expand Down Expand Up @@ -137,17 +136,17 @@ void PhysicalImport::ImportFVECS(QueryContext *query_context, ImportOperatorStat
RecoverableError(status);
}

LocalFileSystem fs;

auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
if (!status.ok()) {
UnrecoverableError(status.message());
auto [file_handle, status_open] = LocalStore::Open(file_path_, FileAccessMode::kRead);
if (!status_open.ok()) {
UnrecoverableError(status_open.message());
}
DeferFn defer_fn([&]() { fs.Close(*file_handler); });

i32 dimension = 0;
i64 nbytes = fs.Read(*file_handler, &dimension, sizeof(dimension));
fs.Seek(*file_handler, 0);
auto [nbytes, status_read] = file_handle->Read(&dimension, sizeof(dimension));
if (!status_read.ok()) {
RecoverableError(status_read);
}
file_handle->Seek(0);
if (nbytes == 0) {
// file is empty
auto result_msg = MakeUnique<String>("IMPORT 0 Rows");
Expand All @@ -164,7 +163,10 @@ void PhysicalImport::ImportFVECS(QueryContext *query_context, ImportOperatorStat
fmt::format("Dimension in file ({}) doesn't match with table definition ({}).", dimension, embedding_info->Dimension()));
RecoverableError(status);
}
SizeT file_size = fs.GetFileSize(*file_handler);
i64 file_size = file_handle->FileSize();
if (file_size == -1) {
UnrecoverableError("Can't get file size");
}
SizeT row_size = dimension * sizeof(FloatT) + sizeof(dimension);
if (file_size % row_size != 0) {
String error_message = "Weird file size.";
Expand All @@ -183,14 +185,17 @@ void PhysicalImport::ImportFVECS(QueryContext *query_context, ImportOperatorStat
auto buf_ptr = static_cast<ptr_t>(buffer_handle.GetDataMut());
while (true) {
i32 dim;
nbytes = fs.Read(*file_handler, &dim, sizeof(dimension));
auto [nbytes, status_read] = file_handle->Read(&dim, sizeof(dimension));
if(!status_read.ok()) {
RecoverableError(status_read);
}
if (dim != dimension or nbytes != sizeof(dimension)) {
Status status =
Status status_error =
Status::ImportFileFormatError(fmt::format("Dimension in file ({}) doesn't match with table definition ({}).", dim, dimension));
RecoverableError(status);
RecoverableError(status_error);
}
ptr_t dst_ptr = buf_ptr + block_entry->row_count() * sizeof(FloatT) * dimension;
fs.Read(*file_handler, dst_ptr, sizeof(FloatT) * dimension);
file_handle->Read(dst_ptr, sizeof(FloatT) * dimension);
block_entry->IncreaseRowCount(1);
++row_idx;

Expand Down Expand Up @@ -239,17 +244,17 @@ void PhysicalImport::ImportBVECS(QueryContext *query_context, ImportOperatorStat
RecoverableError(status);
}

LocalFileSystem fs;

auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
if (!status.ok()) {
UnrecoverableError(status.message());
auto [file_handle, status_open] = LocalStore::Open(file_path_, FileAccessMode::kRead);
if (!status_open.ok()) {
UnrecoverableError(status_open.message());
}
DeferFn defer_fn([&]() { fs.Close(*file_handler); });

i32 dimension = 0;
i64 nbytes = fs.Read(*file_handler, &dimension, sizeof(dimension));
fs.Seek(*file_handler, 0);
auto [nbytes, status_read] = file_handle->Read(&dimension, sizeof(dimension));
if (!status_read.ok()) {
RecoverableError(status_read);
}
file_handle->Seek(0);
if (nbytes == 0) {
// file is empty
auto result_msg = MakeUnique<String>("IMPORT 0 Rows");
Expand All @@ -266,7 +271,10 @@ void PhysicalImport::ImportBVECS(QueryContext *query_context, ImportOperatorStat
fmt::format("Dimension in file ({}) doesn't match with table definition ({}).", dimension, embedding_info->Dimension()));
RecoverableError(status);
}
SizeT file_size = fs.GetFileSize(*file_handler);
i64 file_size = file_handle->FileSize();
if (file_size == -1) {
UnrecoverableError("Can't get file size");
}
SizeT row_size = dimension * sizeof(u8) + sizeof(dimension);
if (file_size % row_size != 0) {
String error_message = "Weird file size.";
Expand All @@ -287,13 +295,17 @@ void PhysicalImport::ImportBVECS(QueryContext *query_context, ImportOperatorStat
UniquePtr<u8[]> u8_buffer = MakeUniqueForOverwrite<u8[]>(sizeof(u8) * dimension);
while (true) {
i32 dim;
nbytes = fs.Read(*file_handler, &dim, sizeof(dimension));
auto [nbytes, status] = file_handle->Read(&dim, sizeof(dimension));
if (!status.ok()) {
RecoverableError(status);
}

if (dim != dimension or nbytes != sizeof(dimension)) {
Status status =
Status error_status =
Status::ImportFileFormatError(fmt::format("Dimension in file ({}) doesn't match with table definition ({}).", dim, dimension));
RecoverableError(status);
RecoverableError(error_status);
}
fs.Read(*file_handler, u8_buffer.get(), sizeof(u8) * dimension);
file_handle->Read(u8_buffer.get(), sizeof(u8) * dimension);

u8 *dst_ptr = reinterpret_cast<u8 *>(buf_ptr + block_entry->row_count() * sizeof(u8) * dimension);
for (i32 i = 0; i < dimension; ++i) {
Expand Down Expand Up @@ -386,40 +398,41 @@ void PhysicalImport::ImportCSR(QueryContext *query_context, ImportOperatorState
RecoverableError(status);
}

LocalFileSystem fs;
auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
auto [file_handle, status] = LocalStore::Open(file_path_, FileAccessMode::kRead);
if (!status.ok()) {
UnrecoverableError(status.message());
}

i64 nrow = 0;
i64 ncol = 0;
i64 nnz = 0;
file_handler->Read(&nrow, sizeof(nrow));
file_handler->Read(&ncol, sizeof(ncol));
file_handler->Read(&nnz, sizeof(nnz));
file_handle->Read(&nrow, sizeof(nrow));
file_handle->Read(&ncol, sizeof(ncol));
file_handle->Read(&nnz, sizeof(nnz));

SizeT file_size = fs.GetFileSize(*file_handler);
if (file_size != 3 * sizeof(i64) + (nrow + 1) * sizeof(i64) + nnz * sizeof(i32) + nnz * sizeof(FloatT)) {
i64 file_size = file_handle->FileSize();
if ((SizeT)file_size != 3 * sizeof(i64) + (nrow + 1) * sizeof(i64) + nnz * sizeof(i32) + nnz * sizeof(FloatT)) {
String error_message = "Invalid CSR file format.";
UnrecoverableError(error_message);
}
i64 prev_off = 0;
file_handler->Read(&prev_off, sizeof(i64));
file_handle->Read(&prev_off, sizeof(i64));
if (prev_off != 0) {
String error_message = "Invalid CSR file format.";
UnrecoverableError(error_message);
}
auto [idx_reader, idx_status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
auto [idx_file_handle, idx_status] = LocalStore::Open(file_path_, FileAccessMode::kRead);
if (!idx_status.ok()) {
UnrecoverableError(idx_status.message());
}
fs.Seek(*idx_reader, 3 * sizeof(i64) + (nrow + 1) * sizeof(i64));
auto [data_reader, data_status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);

idx_file_handle->Seek(3 * sizeof(i64) + (nrow + 1) * sizeof(i64));
auto [data_file_handle, data_status] = LocalStore::Open(file_path_, FileAccessMode::kRead);
if (!data_status.ok()) {
UnrecoverableError(data_status.message());
}
fs.Seek(*data_reader, 3 * sizeof(i64) + (nrow + 1) * sizeof(i64) + nnz * sizeof(i32));

data_file_handle->Seek(3 * sizeof(i64) + (nrow + 1) * sizeof(i64) + nnz * sizeof(i32));

//------------------------------------------------------------------------------------------------------------------------

Expand All @@ -435,13 +448,13 @@ void PhysicalImport::ImportCSR(QueryContext *query_context, ImportOperatorState
i64 row_id = 0;
while (true) {
i64 off = 0;
file_handler->Read(&off, sizeof(i64));
file_handle->Read(&off, sizeof(i64));
i64 nnz = off - prev_off;
SizeT data_len = sparse_info->DataSize(nnz);
auto tmp_indice_ptr = MakeUnique<char[]>(sizeof(i32) * nnz);
auto data_ptr = MakeUnique<char[]>(data_len);
idx_reader->Read(tmp_indice_ptr.get(), sizeof(i32) * nnz);
data_reader->Read(data_ptr.get(), data_len);
idx_file_handle->Read(tmp_indice_ptr.get(), sizeof(i32) * nnz);
data_file_handle->Read(data_ptr.get(), data_len);
auto indice_ptr = ConvertCSRIndice(std::move(tmp_indice_ptr), sparse_info.get(), nnz);

auto value = Value::MakeSparse(nnz, std::move(indice_ptr), std::move(data_ptr), sparse_info);
Expand Down Expand Up @@ -637,17 +650,21 @@ void PhysicalImport::ImportJSONL(QueryContext *query_context, ImportOperatorStat
void PhysicalImport::ImportJSON(QueryContext *query_context, ImportOperatorState *import_op_state) {
nlohmann::json json_arr;
{
LocalFileSystem fs;
auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
auto [file_handle, status] = LocalStore::Open(file_path_, FileAccessMode::kRead);
if (!status.ok()) {
UnrecoverableError(status.message());
}
DeferFn file_defer([&]() { fs.Close(*file_handler); });

SizeT file_size = fs.GetFileSize(*file_handler);
i64 file_size = file_handle->FileSize();
if (file_size == -1) {
UnrecoverableError("Can't get file size");
}
String json_str(file_size, 0);
SizeT read_n = file_handler->Read(json_str.data(), file_size);
if (read_n != file_size) {
auto [read_n, status_read] = file_handle->Read(json_str.data(), file_size);
if(!status_read.ok()) {
UnrecoverableError(status_read.message());
}
if ((i64)read_n != file_size) {
String error_message = fmt::format("Read file size {} doesn't match with file size {}.", read_n, file_size);
UnrecoverableError(error_message);
}
Expand Down
Loading

0 comments on commit 0fd78f0

Please sign in to comment.