Skip to content

Commit

Permalink
[Feature]: Exports table to JSONL or CSV file. (#1245)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Issue link:#1175

### Type of change

- [x] New Feature (non-breaking change which adds functionality)

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored May 25, 2024
1 parent 0f9b55f commit 3baee70
Show file tree
Hide file tree
Showing 33 changed files with 478 additions and 75 deletions.
11 changes: 9 additions & 2 deletions benchmark/local_infinity/sparse/sparse_benchmark.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ SparseMatrix DecodeSparseDataset(const Path &data_path) {
if (!fs.Exists(data_path)) {
throw std::runtime_error(fmt::format("Data path: {} does not exist.", data_path.string()));
}
UniquePtr<FileHandler> file_handler = fs.OpenFile(data_path.string(), FileFlags::READ_FLAG, FileLockType::kNoLock);
auto [file_handler, status] = fs.OpenFile(data_path.string(), FileFlags::READ_FLAG, FileLockType::kNoLock);
if(!status.ok()) {
throw std::runtime_error(fmt::format("Can't open file: {}, reason: {}", data_path.string(), status.message()));
}
i64 nrow = 0;
i64 ncol = 0;
i64 nnz = 0;
Expand Down Expand Up @@ -73,7 +76,11 @@ Pair<UniquePtr<u32[]>, UniquePtr<f32[]>> DecodeGroundtruth(const Path &groundtru
if (!fs.Exists(groundtruth_path)) {
throw std::runtime_error(fmt::format("Groundtruth path: {} does not exist.", groundtruth_path.string()));
}
UniquePtr<FileHandler> file_handler = fs.OpenFile(groundtruth_path.string(), FileFlags::READ_FLAG, FileLockType::kNoLock);
auto [file_handler, status] = fs.OpenFile(groundtruth_path.string(), FileFlags::READ_FLAG, FileLockType::kNoLock);
if(!status.ok()) {
throw std::runtime_error(fmt::format("Can't open file: {}, reason: {}", groundtruth_path.string(), status.message()));
}

SizeT file_size = fs.GetFileSize(*file_handler);
if (file_size != sizeof(u32) * 2 + (sizeof(u32) + sizeof(float)) * (query_n * top_k)) {
throw std::runtime_error("Invalid groundtruth file format");
Expand Down
10 changes: 7 additions & 3 deletions src/common/status.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -433,15 +433,15 @@ Status Status::IOError(const String &detailed_info) {
}

Status Status::DuplicatedFile(const String &filename) {
return Status(ErrorCode::kDuplicatedFile, MakeUnique<String>(fmt::format("Duplicated file: {}", filename)));
return Status(ErrorCode::kDuplicatedFile, MakeUnique<String>(fmt::format("File already existed: {}", filename)));
}

Status Status::ConfigFileError(const String &path, const String &detailed_info) {
return Status(ErrorCode::kConfigFileError, MakeUnique<String>(fmt::format("Config file: {}, {}", path, detailed_info)));
}

Status Status::LockFileExists(const String &path) {
return Status(ErrorCode::kLockFileExists, MakeUnique<String>(fmt::format("Lock file: is existed", path)));
Status Status::LockFileError(const String &path, const String& error_msg) {
return Status(ErrorCode::kLockFileError, MakeUnique<String>(fmt::format("Lock file error: {}, {}", path, error_msg)));
}

Status Status::CatalogCorrupted(const String &path) {
Expand Down Expand Up @@ -484,6 +484,10 @@ Status Status::MunmapFileError(const String &detailed_info) {
return Status(ErrorCode::kMunmapFileError, MakeUnique<String>(fmt::format("munmap error: {}", detailed_info)));
}

Status Status::InvalidFileFlag(u8 flag) {
return Status(ErrorCode::kInvalidFileFlag, MakeUnique<String>(fmt::format("Invalid open file flag: {}", flag)));
}

Status Status::ColumnCountMismatch(const String &detailed_info) {
return Status(ErrorCode::kColumnCountMismatch, MakeUnique<String>(fmt::format("Column count mismatch: {}", detailed_info)));
}
Expand Down
6 changes: 4 additions & 2 deletions src/common/status.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ export enum class ErrorCode : long {
kIOError = 7001,
kDuplicatedFile = 7002,
kConfigFileError = 7003,
kLockFileExists = 7004,
kLockFileError = 7004,
kCatalogCorrupted = 7005,
kDataCorrupted = 7006,
kIndexCorrupted = 7007,
Expand All @@ -153,6 +153,7 @@ export enum class ErrorCode : long {
kParserError = 7012,
kMmapFileError = 7013,
kMunmapFileError = 7014,
kInvalidFileFlag = 7015,

// 8. meta error
kInvalidEntry = 8001,
Expand Down Expand Up @@ -280,7 +281,7 @@ public:
static Status IOError(const String &detailed_info);
static Status DuplicatedFile(const String &filename);
static Status ConfigFileError(const String &path, const String &detailed_info);
static Status LockFileExists(const String &path);
static Status LockFileError(const String &path, const String& error_msg);
static Status CatalogCorrupted(const String &path);
static Status DataCorrupted(const String &path);
static Status IndexCorrupted(const String &path);
Expand All @@ -291,6 +292,7 @@ public:
static Status ParserError(const String &detailed_info);
static Status MmapFileError(const String &detailed_info);
static Status MunmapFileError(const String &detailed_info);
static Status InvalidFileFlag(u8 flag);

// meta
static Status InvalidEntry();
Expand Down
155 changes: 149 additions & 6 deletions src/executor/operator/physical_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,165 @@

module;

import query_context;
import operator_state;
#include <string>

module physical_export;

import query_context;
import operator_state;
import logger;
import statement_common;
import third_party;
import column_def;
import block_entry;
import column_vector;
import value;
import local_file_system;
import file_system;
import file_system_type;
import defer_op;
import stl;

namespace infinity {

void PhysicalExport::Init() {}

bool PhysicalExport::Execute(QueryContext *, OperatorState *operator_state) {
operator_state->SetComplete();
bool PhysicalExport::Execute(QueryContext *query_context, OperatorState *operator_state) {
ExportOperatorState *export_op_state = static_cast<ExportOperatorState *>(operator_state);
SizeT exported_row_count{0};
switch (file_type_) {
case CopyFileType::kCSV: {
exported_row_count = ExportToCSV(query_context, export_op_state);
break;
}
case CopyFileType::kJSONL: {
exported_row_count = ExportToJSONL(query_context, export_op_state);
break;
}
default: {
UnrecoverableError("Not supported file type");
}
}

auto result_msg = MakeUnique<String>(fmt::format("EXPORT {} Rows", exported_row_count));
export_op_state->result_msg_ = std::move(result_msg);

export_op_state->SetComplete();
return true;
}

void PhysicalExport::ExportCSV(QueryContext *) {}
SizeT PhysicalExport::ExportToCSV(QueryContext *query_context, ExportOperatorState *export_op_state) {
const Vector<SharedPtr<ColumnDef>>& column_defs = table_entry_->column_defs();
SizeT column_count = column_defs.size();

LocalFileSystem fs;
auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock);
if(!status.ok()) {
RecoverableError(status);
}
DeferFn file_defer([&]() { fs.Close(*file_handler); });

if(header_) {
// Output CSV header
String header;
for(SizeT column_idx = 0; column_idx < column_count; ++ column_idx) {
ColumnDef* column_def = column_defs[column_idx].get();
header += column_def->name();
if(column_idx != column_count - 1) {
header += delimiter_;
} else {
header += '\n';
}
}
fs.Write(*file_handler, header.c_str(), header.size());
}

SizeT row_count{0};
Map<SegmentID, SegmentSnapshot>& segment_block_index_ref = block_index_->segment_block_index_;
for(auto& [segment_id, segment_snapshot]: segment_block_index_ref) {
LOG_DEBUG(fmt::format("Export segment_id: {}", segment_id));
SizeT block_count = segment_snapshot.block_map_.size();
for(SizeT block_idx = 0; block_idx < block_count; ++ block_idx) {
LOG_DEBUG(fmt::format("Export block_idx: {}", block_idx));
BlockEntry *block_entry = segment_snapshot.block_map_[block_idx];
SizeT block_row_count = block_entry->row_count();

void PhysicalExport::ExportJSON(QueryContext *) {}
Vector<ColumnVector> column_vectors;
column_vectors.reserve(column_count);
for(SizeT column_idx = 0; column_idx < column_count; ++ column_idx) {
column_vectors.emplace_back(block_entry->GetColumnBlockEntry(column_idx)->GetColumnVector(query_context->storage()->buffer_manager()));
if(column_vectors[column_idx].Size() != block_row_count) {
UnrecoverableError("Unmatched row_count between block and block_column");
}
}

for(SizeT row_idx = 0; row_idx < block_row_count; ++ row_idx) {
String line;
for(SizeT column_idx = 0; column_idx < column_count; ++ column_idx) {
Value v = column_vectors[column_idx].GetValue(row_idx);
line += v.ToString();
if(column_idx == column_count - 1) {
line += "\n";
} else {
line += delimiter_;
}
}
fs.Write(*file_handler, line.c_str(), line.size());
++ row_count;
}
}
}
LOG_DEBUG(fmt::format("Export to CSV, db {}, table {}, file: {}, row: {}", schema_name_, table_name_, file_path_, row_count));
return row_count;
}

SizeT PhysicalExport::ExportToJSONL(QueryContext *query_context, ExportOperatorState *export_op_state) {

const Vector<SharedPtr<ColumnDef>>& column_defs = table_entry_->column_defs();
SizeT column_count = column_defs.size();

LocalFileSystem fs;
auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::WRITE_FLAG | FileFlags::CREATE_FLAG, FileLockType::kWriteLock);
if(!status.ok()) {
RecoverableError(status);
}
DeferFn file_defer([&]() { fs.Close(*file_handler); });

SizeT row_count{0};
Map<SegmentID, SegmentSnapshot>& segment_block_index_ref = block_index_->segment_block_index_;
for(auto& [segment_id, segment_snapshot]: segment_block_index_ref) {
LOG_DEBUG(fmt::format("Export segment_id: {}", segment_id));
SizeT block_count = segment_snapshot.block_map_.size();
for(SizeT block_idx = 0; block_idx < block_count; ++ block_idx) {
LOG_DEBUG(fmt::format("Export block_idx: {}", block_idx));
BlockEntry *block_entry = segment_snapshot.block_map_[block_idx];
SizeT block_row_count = block_entry->row_count();

Vector<ColumnVector> column_vectors;
column_vectors.reserve(column_count);
for(SizeT column_idx = 0; column_idx < column_count; ++ column_idx) {
column_vectors.emplace_back(block_entry->GetColumnBlockEntry(column_idx)->GetColumnVector(query_context->storage()->buffer_manager()));
if(column_vectors[column_idx].Size() != block_row_count) {
UnrecoverableError("Unmatched row_count between block and block_column");
}
}

for(SizeT row_idx = 0; row_idx < block_row_count; ++ row_idx) {
nlohmann::json line_json;
for(SizeT column_idx = 0; column_idx < column_count; ++ column_idx) {
ColumnDef* column_def = column_defs[column_idx].get();
Value v = column_vectors[column_idx].GetValue(row_idx);
v.AppendToJson(column_def->name(), line_json);
}
LOG_DEBUG(line_json.dump());
String to_write = line_json.dump() + "\n";
fs.Write(*file_handler, to_write.c_str(), to_write.size());
++ row_count;
}
}
}
LOG_DEBUG(fmt::format("Export to JSONL, db {}, table {}, file: {}, row: {}", schema_name_, table_name_, file_path_, row_count));
return row_count;
}

} // namespace infinity
15 changes: 11 additions & 4 deletions src/executor/operator/physical_export.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,25 @@ import infinity_exception;
import internal_types;
import statement_common;
import data_type;
import table_entry;
import block_index;

namespace infinity {

export class PhysicalExport : public PhysicalOperator {
public:
explicit PhysicalExport(u64 id,
TableEntry *table_entry,
String schema_name,
String table_name,
String file_path,
bool header,
char delimiter,
CopyFileType type,
SharedPtr<BlockIndex> block_index,
SharedPtr<Vector<LoadMeta>> load_metas)
: PhysicalOperator(PhysicalOperatorType::kExport, nullptr, nullptr, id, load_metas), file_type_(type), file_path_(std::move(file_path)),
table_name_(std::move(table_name)), schema_name_(std::move(schema_name)), header_(header), delimiter_(delimiter) {}
: PhysicalOperator(PhysicalOperatorType::kExport, nullptr, nullptr, id, load_metas), table_entry_(table_entry), file_type_(type), file_path_(std::move(file_path)),
table_name_(std::move(table_name)), schema_name_(std::move(schema_name)), header_(header), delimiter_(delimiter), block_index_(std::move(block_index)) {}

~PhysicalExport() override = default;

Expand All @@ -58,9 +62,9 @@ public:
return 0;
}

void ExportCSV(QueryContext *query_context);
SizeT ExportToCSV(QueryContext *query_context, ExportOperatorState *export_op_state);

void ExportJSON(QueryContext *query_context);
SizeT ExportToJSONL(QueryContext *query_context, ExportOperatorState *export_op_state);

inline CopyFileType FileType() const { return file_type_; }

Expand All @@ -78,12 +82,15 @@ private:
SharedPtr<Vector<String>> output_names_{};
SharedPtr<Vector<SharedPtr<DataType>>> output_types_{};

TableEntry *table_entry_{};
CopyFileType file_type_{CopyFileType::kCSV};
String file_path_{};
String table_name_{};
String schema_name_{"default_db"};
bool header_{false};
char delimiter_{','};

SharedPtr<BlockIndex> block_index_{};
};

} // namespace infinity
15 changes: 12 additions & 3 deletions src/executor/operator/physical_import.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,10 @@ void PhysicalImport::ImportFVECS(QueryContext *query_context, ImportOperatorStat

LocalFileSystem fs;

UniquePtr<FileHandler> file_handler = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
if(!status.ok()) {
UnrecoverableError(status.message());
}
DeferFn defer_fn([&]() { fs.Close(*file_handler); });

int dimension = 0;
Expand Down Expand Up @@ -275,7 +278,10 @@ void PhysicalImport::ImportCSV(QueryContext *query_context, ImportOperatorState

void PhysicalImport::ImportJSONL(QueryContext *query_context, ImportOperatorState *import_op_state) {
LocalFileSystem fs;
UniquePtr<FileHandler> file_handler = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
if(!status.ok()) {
UnrecoverableError(status.message());
}
DeferFn file_defer([&]() { fs.Close(*file_handler); });

SizeT file_size = fs.GetFileSize(*file_handler);
Expand Down Expand Up @@ -355,7 +361,10 @@ void PhysicalImport::ImportJSON(QueryContext *query_context, ImportOperatorState
nlohmann::json json_arr;
{
LocalFileSystem fs;
UniquePtr<FileHandler> file_handler = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
auto [file_handler, status] = fs.OpenFile(file_path_, FileFlags::READ_FLAG, FileLockType::kReadLock);
if(!status.ok()) {
UnrecoverableError(status.message());
}
DeferFn file_defer([&]() { fs.Close(*file_handler); });

SizeT file_size = fs.GetFileSize(*file_handler);
Expand Down
5 changes: 5 additions & 0 deletions src/executor/operator/physical_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,11 @@ void PhysicalSink::FillSinkStateFromLastOperatorState(MessageSinkState *message_
message_sink_state->message_ = std::move(import_output_state->result_msg_);
break;
}
case PhysicalOperatorType::kExport: {
auto *export_output_state = static_cast<ExportOperatorState *>(task_operator_state);
message_sink_state->message_ = std::move(export_output_state->result_msg_);
break;
}
case PhysicalOperatorType::kInsert: {
auto *insert_output_state = static_cast<InsertOperatorState *>(task_operator_state);
message_sink_state->message_ = std::move(insert_output_state->result_msg_);
Expand Down
1 change: 1 addition & 0 deletions src/executor/operator_state.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ export struct ImportOperatorState : public OperatorState {
// Export
export struct ExportOperatorState : public OperatorState {
inline explicit ExportOperatorState() : OperatorState(PhysicalOperatorType::kExport) {}
UniquePtr<String> result_msg_{};
};

// Alter
Expand Down
2 changes: 2 additions & 0 deletions src/executor/physical_planner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,12 +540,14 @@ UniquePtr<PhysicalOperator> PhysicalPlanner::BuildImport(const SharedPtr<Logical
UniquePtr<PhysicalOperator> PhysicalPlanner::BuildExport(const SharedPtr<LogicalNode> &logical_operator) const {
LogicalExport *logical_export = (LogicalExport *)(logical_operator.get());
return MakeUnique<PhysicalExport>(logical_export->node_id(),
logical_export->table_entry(),
logical_export->schema_name(),
logical_export->table_name(),
logical_export->file_path(),
logical_export->header(),
logical_export->delimiter(),
logical_export->FileType(),
logical_export->block_index(),
logical_operator->load_metas());
}

Expand Down
Loading

0 comments on commit 3baee70

Please sign in to comment.