Skip to content

Commit

Permalink
LOG_ERROR message before raise recoverable error (#1217)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

Refactor code: LOG_ERROR message before raise recoverable error

### Type of change

- [x] Refactoring

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored May 18, 2024
1 parent 7c9ec07 commit 735252b
Show file tree
Hide file tree
Showing 96 changed files with 1,191 additions and 402 deletions.
18 changes: 5 additions & 13 deletions docs/references/benchmark.md
Original file line number Diff line number Diff line change
Expand Up @@ -128,20 +128,12 @@ options:
### Enwiki

> - 33000000 documents
> - 10000 `OR` queries generated based on the dataset. All terms are extracted from the dataset and very rare(occurrence < 100) terms are excluded. The number of terms of each query match the weight `[0.03, 0.15, 0.25, 0.25, 0.15, 0.08, 0.04, 0.03, 0.02]`.
| | Time to insert & build index | Time to import & build index | Latency(ms)(mean_time, max_time, p95_time) |
| ----------------- | ---------------------------- | ---------------------------- | ------------------------------------------ |
| **Elasticsearch** | 2289 s | N/A | 7.27, 326.31, 14.75 |
| **Infinity** | 2321 s | 944 s | 1.54, 812.55, 3.51 |


| Python clients | Infinity(qps, RES, vCPU) | Elasticsearch(qps, RES, vCPU) |
| -------------- | ------------------------ | ----------------------------- |
| 1 | 636, 9G, 0.9 | 213, 20G, 3 |
| 4 | 1938, 9G, 3.2 | 672, 21G, 8.5 |
| 8 | 3294, 9G, 5.7 | 1174, 21G, 10 |
> - 100000 `OR` queries generated based on the dataset. All terms are extracted from the dataset and very rare(occurrence < 100) terms are excluded. The number of terms of each query match the weight `[0.03, 0.15, 0.25, 0.25, 0.15, 0.08, 0.04, 0.03, 0.02]`.
| | Time to insert & build index | Time to import & build index | P95 Latency(ms)| QPS (8 python clients) | Memory | vCPU |
| ----------------- | ---------------------------- | ---------------------------- | ---------------| -----------------------| --------| ----- |
| **Elasticsearch** | 2289 s | N/A | 14.75 | 1174 | 21.0GB | 10.0 |
| **Infinity** | 2321 s | 944 s | 3.51 | 3294 | 9.0GB | 5.7 |

---

Expand Down
25 changes: 19 additions & 6 deletions src/executor/explain_physical_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ import statement_common;
import flush_statement;
import common_query_filter;
import table_entry;
import logger;

namespace infinity {

Expand Down Expand Up @@ -1481,23 +1482,33 @@ void ExplainPhysicalPlan::Explain(const PhysicalShow *show_node, SharedPtr<Vecto
}

void ExplainPhysicalPlan::Explain(const PhysicalUnionAll *, SharedPtr<Vector<SharedPtr<String>>> &, i64) {
RecoverableError(Status::NotSupport("Not implemented"));
Status status = Status::NotSupport("Not implemented");
LOG_ERROR(status.message());
RecoverableError(status);
}

void ExplainPhysicalPlan::Explain(const PhysicalDummyScan *, SharedPtr<Vector<SharedPtr<String>>> &, i64) {
UnrecoverableError("Not implement: PhysicalDummyScan");
Status status = Status::NotSupport("Not implemented");
LOG_ERROR(status.message());
RecoverableError(status);
}

void ExplainPhysicalPlan::Explain(const PhysicalHashJoin *, SharedPtr<Vector<SharedPtr<String>>> &, i64) {
UnrecoverableError("Not implement: PhysicalHashJoin");
Status status = Status::NotSupport("Not implemented");
LOG_ERROR(status.message());
RecoverableError(status);
}

void ExplainPhysicalPlan::Explain(const PhysicalSortMergeJoin *, SharedPtr<Vector<SharedPtr<String>>> &, i64) {
UnrecoverableError("Not implement: PhysicalSortMergeJoin");
Status status = Status::NotSupport("Not implemented");
LOG_ERROR(status.message());
RecoverableError(status);
}

void ExplainPhysicalPlan::Explain(const PhysicalIndexJoin *, SharedPtr<Vector<SharedPtr<String>>> &, i64) {
UnrecoverableError("Not implement: PhysicalIndexJoin");
Status status = Status::NotSupport("Not implemented");
LOG_ERROR(status.message());
RecoverableError(status);
}

void ExplainPhysicalPlan::Explain(const PhysicalDelete *delete_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size) {
Expand Down Expand Up @@ -1657,7 +1668,9 @@ void ExplainPhysicalPlan::Explain(const PhysicalExport *export_node, SharedPtr<V
}

void ExplainPhysicalPlan::Explain(const PhysicalAlter *create_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size) {
RecoverableError(Status::NotSupport("Not implemented"));
Status status = Status::NotSupport("Not implemented");
LOG_ERROR(status.message());
RecoverableError(status);
}

void ExplainPhysicalPlan::Explain(const PhysicalCreateView *create_node, SharedPtr<Vector<SharedPtr<String>>> &result, i64 intent_size) {
Expand Down
13 changes: 10 additions & 3 deletions src/executor/expression/expression_evaluator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import third_party;
import infinity_exception;
import expression_type;
import bound_cast_func;
import logger;

namespace infinity {

Expand Down Expand Up @@ -68,7 +69,9 @@ void ExpressionEvaluator::Execute(const SharedPtr<AggregateExpression> &expr,
SharedPtr<ExpressionState> &state,
SharedPtr<ColumnVector> &output_column_vector) {
if (in_aggregate_) {
RecoverableError(Status::RecursiveAggregate(expr->ToString()));
Status status = Status::RecursiveAggregate(expr->ToString());
LOG_ERROR(status.message());
RecoverableError(status);
}
in_aggregate_ = true;
SharedPtr<ExpressionState> &child_state = state->Children()[0];
Expand All @@ -80,7 +83,9 @@ void ExpressionEvaluator::Execute(const SharedPtr<AggregateExpression> &expr,
this->Execute(child_expr, child_state, child_output_col);

if (expr->aggregate_function_.return_type_ != *output_column_vector->data_type()) {
RecoverableError(Status::DataTypeMismatch(expr->aggregate_function_.return_type_.ToString(), output_column_vector->data_type()->ToString()));
Status status = Status::DataTypeMismatch(expr->aggregate_function_.return_type_.ToString(), output_column_vector->data_type()->ToString());
LOG_ERROR(status.message());
RecoverableError(status);
}

auto data_state = state->agg_state_;
Expand Down Expand Up @@ -183,7 +188,9 @@ void ExpressionEvaluator::Execute(const SharedPtr<ReferenceExpression> &expr,
}

void ExpressionEvaluator::Execute(const SharedPtr<InExpression> &, SharedPtr<ExpressionState> &, SharedPtr<ColumnVector> &) {
RecoverableError(Status::NotSupport("IN execution isn't implemented yet."));
Status status = Status::NotSupport("IN execution isn't implemented yet.");
LOG_ERROR(status.message());
RecoverableError(status);
}

} // namespace infinity
9 changes: 7 additions & 2 deletions src/executor/expression/expression_state.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import status;
import default_values;
import internal_types;
import data_type;
import logger;

namespace infinity {

Expand Down Expand Up @@ -70,7 +71,9 @@ SharedPtr<ExpressionState> ExpressionState::CreateState(const SharedPtr<BaseExpr

SharedPtr<ExpressionState> ExpressionState::CreateState(const SharedPtr<AggregateExpression> &agg_expr, char *agg_state, const AggregateFlag agg_flag) {
if (agg_expr->arguments().size() != 1) {
RecoverableError(Status::FunctionArgsError(agg_expr->ToString()));
Status status = Status::FunctionArgsError(agg_expr->ToString());
LOG_ERROR(status.message());
RecoverableError(status);
}

SharedPtr<ExpressionState> result = MakeShared<ExpressionState>();
Expand Down Expand Up @@ -112,7 +115,9 @@ SharedPtr<ExpressionState> ExpressionState::CreateState(const SharedPtr<CaseExpr

SharedPtr<ExpressionState> ExpressionState::CreateState(const SharedPtr<CastExpression> &cast_expr) {
if (cast_expr->arguments().size() != 1) {
RecoverableError(Status::FunctionArgsError(cast_expr->ToString()));
Status status = Status::FunctionArgsError(cast_expr->ToString());
LOG_ERROR(status.message());
RecoverableError(status);
}

SharedPtr<ExpressionState> result = MakeShared<ExpressionState>();
Expand Down
4 changes: 3 additions & 1 deletion src/executor/fragment_builder.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@ void FragmentBuilder::BuildExplain(PhysicalOperator *phys_op, PlanFragment *curr
switch (explain_op->explain_type()) {

case ExplainType::kAnalyze: {
RecoverableError(Status::NotSupport("Not implement: Query analyze"));
Status status = Status::NotSupport("Not implement: Query analyze");
LOG_ERROR(status.message());
RecoverableError(status);
}
case ExplainType::kAst:
case ExplainType::kUnOpt:
Expand Down
8 changes: 6 additions & 2 deletions src/executor/operator/physical_aggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,9 @@ void PhysicalAggregate::GroupByInputTable(const SharedPtr<DataTable> &input_tabl
SharedPtr<DataType> input_type = input_table->GetColumnTypeById(column_id);
SharedPtr<DataType> output_type = grouped_input_table->GetColumnTypeById(column_id);
if (*input_type != *output_type) {
RecoverableError(Status::DataTypeMismatch(input_type->ToString(), output_type->ToString()));
Status status = Status::DataTypeMismatch(input_type->ToString(), output_type->ToString());
LOG_ERROR(status.message());
RecoverableError(status);
}
types.emplace_back(input_type);
}
Expand Down Expand Up @@ -357,7 +359,9 @@ void PhysicalAggregate::GenerateGroupByResult(const SharedPtr<DataTable> &input_
SharedPtr<DataType> input_type = input_table->GetColumnTypeById(column_id);
SharedPtr<DataType> output_type = output_table->GetColumnTypeById(column_id);
if (*input_type != *output_type) {
RecoverableError(Status::DataTypeMismatch(input_type->ToString(), output_type->ToString()));
Status status = Status::DataTypeMismatch(input_type->ToString(), output_type->ToString());
LOG_ERROR(status.message());
RecoverableError(status);
}
types.emplace_back(input_type);
}
Expand Down
45 changes: 34 additions & 11 deletions src/executor/operator/physical_command.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import config;
import status;
import infinity_exception;
import variables;
import logger;

namespace infinity {

Expand All @@ -56,16 +57,22 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
switch(session_var) {
case SessionVariable::kEnableProfile: {
if (set_command->value_type() != SetVarType::kBool) {
RecoverableError(Status::DataTypeMismatch("Boolean", set_command->value_type_str()));
Status status = Status::DataTypeMismatch("Boolean", set_command->value_type_str());
LOG_ERROR(status.message());
RecoverableError(status);
}
query_context->current_session()->SessionVariables()->enable_profile_ = set_command->value_bool();
return true;
}
case SessionVariable::kInvalid: {
RecoverableError(Status::InvalidCommand(fmt::format("Unknown session variable: {}", set_command->var_name())));
Status status = Status::InvalidCommand(fmt::format("Unknown session variable: {}", set_command->var_name()));
LOG_ERROR(status.message());
RecoverableError(status);
}
default: {
RecoverableError(Status::InvalidCommand(fmt::format("Session variable: {} is read-only", set_command->var_name())));
Status status = Status::InvalidCommand(fmt::format("Session variable: {} is read-only", set_command->var_name()));
LOG_ERROR(status.message());
RecoverableError(status);
}
}
break;
Expand All @@ -75,16 +82,22 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
switch(global_var) {
case GlobalVariable::kProfileRecordCapacity: {
if (set_command->value_type() != SetVarType::kInteger) {
RecoverableError(Status::DataTypeMismatch("Integer", set_command->value_type_str()));
Status status = Status::DataTypeMismatch("Integer", set_command->value_type_str());
LOG_ERROR(status.message());
RecoverableError(status);
}
query_context->storage()->catalog()->ResizeProfileHistory(set_command->value_int());
return true;
}
case GlobalVariable::kInvalid: {
RecoverableError(Status::InvalidCommand(fmt::format("unknown global variable {}", set_command->var_name())));
Status status = Status::InvalidCommand(fmt::format("unknown global variable {}", set_command->var_name()));
LOG_ERROR(status.message());
RecoverableError(status);
}
default: {
RecoverableError(Status::InvalidCommand(fmt::format("Global variable: {} is read-only", set_command->var_name())));
Status status = Status::InvalidCommand(fmt::format("Global variable: {} is read-only", set_command->var_name()));
LOG_ERROR(status.message());
RecoverableError(status);
}
}
break;
Expand Down Expand Up @@ -130,22 +143,30 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
return true;
}

RecoverableError(Status::SetInvalidVarValue("log level", "trace, debug, info, warning, error, critical"));
Status status = Status::SetInvalidVarValue("log level", "trace, debug, info, warning, error, critical");
LOG_ERROR(status.message());
RecoverableError(status);
break;
}
case GlobalOptionIndex::kInvalid: {
RecoverableError(Status::InvalidCommand(fmt::format("Unknown config: {}", set_command->var_name())));
Status status = Status::InvalidCommand(fmt::format("Unknown config: {}", set_command->var_name()));
LOG_ERROR(status.message());
RecoverableError(status);
break;
}
default: {
RecoverableError(Status::InvalidCommand(fmt::format("Config {} is read-only", set_command->var_name())));
Status status = Status::InvalidCommand(fmt::format("Config {} is read-only", set_command->var_name()));
LOG_ERROR(status.message());
RecoverableError(status);
break;
}
}
break;
}
default: {
RecoverableError(Status::InvalidCommand("Invalid set command scope, neither session nor global"));
Status status = Status::InvalidCommand("Invalid set command scope, neither session nor global");
LOG_ERROR(status.message());
RecoverableError(status);
}
}

Expand All @@ -156,7 +177,9 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
ExportCmd *export_command = (ExportCmd *)(command_info_.get());
auto profiler_record = query_context->current_session()->GetProfileRecord(export_command->file_no());
if (profiler_record == nullptr) {
RecoverableError(Status::DataNotExist(fmt::format("The record does not exist: {}", export_command->file_no())));
Status status = Status::DataNotExist(fmt::format("The record does not exist: {}", export_command->file_no()));
LOG_ERROR(status.message());
RecoverableError(status);
}
LocalFileSystem fs;
FileWriter file_writer(fs, export_command->file_name(), 128);
Expand Down
1 change: 1 addition & 0 deletions src/executor/operator/physical_create_index_finish.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ bool PhysicalCreateIndexFinish::Execute(QueryContext *query_context, OperatorSta
auto *txn = query_context->GetTxn();
auto status = txn->CreateIndexFinish(*db_name_, *table_name_, index_base_);
if (!status.ok()) {
LOG_ERROR(status.message());
RecoverableError(status);
}
operator_state->SetComplete();
Expand Down
10 changes: 7 additions & 3 deletions src/executor/operator/physical_explain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import value;
import status;
import infinity_exception;
import logical_type;
import logger;

namespace infinity {

Expand All @@ -54,7 +55,9 @@ void PhysicalExplain::Init() {
switch (explain_type_) {
case ExplainType::kAnalyze: {
output_names_->emplace_back("Query Analyze");
RecoverableError(Status::NotSupport("Not implement: Query analyze"));
Status status = Status::NotSupport("Not implement: Query analyze");
LOG_ERROR(status.message());
RecoverableError(status);
}
case ExplainType::kAst: {
output_names_->emplace_back("Abstract Syntax Tree");
Expand Down Expand Up @@ -102,8 +105,9 @@ bool PhysicalExplain::Execute(QueryContext *, OperatorState *operator_state) {

switch (explain_type_) {
case ExplainType::kAnalyze: {
title = "Query Analyze";
RecoverableError(Status::NotSupport("Not implement: Query analyze"));
Status status = Status::NotSupport("Not implement: Query analyze");
LOG_ERROR(status.message());
RecoverableError(status);
}
case ExplainType::kAst: {
title = "Abstract Syntax Tree";
Expand Down
5 changes: 4 additions & 1 deletion src/executor/operator/physical_fusion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ import third_party;
import infinity_exception;
import value;
import internal_types;
import logger;

namespace infinity {

Expand All @@ -67,7 +68,9 @@ bool PhysicalFusion::Execute(QueryContext *query_context, OperatorState *operato
return false;
}
if (fusion_expr_->method_.compare("rrf") != 0) {
RecoverableError(Status::NotSupport(fmt::format("Fusion method {} is not implemented.", fusion_expr_->method_)));
Status status = Status::NotSupport(fmt::format("Fusion method {} is not implemented.", fusion_expr_->method_));
LOG_ERROR(status.message());
RecoverableError(status);
}
SizeT rank_constant = 60;
if (fusion_expr_->options_.get() != nullptr) {
Expand Down
Loading

0 comments on commit 735252b

Please sign in to comment.