diff --git a/benchmark/local_infinity/knn/knn_query_benchmark.cpp b/benchmark/local_infinity/knn/knn_query_benchmark.cpp index 942395e4a1..719ada6937 100644 --- a/benchmark/local_infinity/knn/knn_query_benchmark.cpp +++ b/benchmark/local_infinity/knn/knn_query_benchmark.cpp @@ -216,7 +216,7 @@ int main(int argc, char *argv[]) { auto select_rowid_expr = new FunctionExpr(); select_rowid_expr->func_name_ = "row_id"; output_columns->emplace_back(select_rowid_expr); - auto result = infinity->Search(db_name, table_name, search_expr, nullptr, output_columns); + auto result = infinity->Search(db_name, table_name, search_expr, nullptr, nullptr, nullptr, output_columns); { auto &cv = result.result_table_->GetDataBlockById(0)->column_vectors; auto &column = *cv[0]; diff --git a/example/hybrid_search.py b/example/hybrid_search.py index 11020b1cf2..da41583ab7 100644 --- a/example/hybrid_search.py +++ b/example/hybrid_search.py @@ -16,8 +16,8 @@ This example is to connect local infinity instance, create table, insert data, search the data """ -import infinity_embedded as infinity -#import infinity +# import infinity_embedded as infinity +import infinity import sys try: @@ -101,12 +101,12 @@ .match_text("body", "blooms", 10) .filter("year < 2024") .fusion( - method="match_tensor", topn=2, + method="match_tensor", topn=3, fusion_params={"field": "tensor", "data_type": "float", - "data": [[0.9, 0.0, 0.0, 0.0], [1.1, 0.0, 0.0, 0.0]]}, - params={"filter": "year < 2024"} + "data": [[0.9, 0.0, 0.0, 0.0], [1.1, 0.0, 0.0, 0.0]]} ) .to_pl() + # .explain(explain_type=infinity.table.ExplainType.UnOpt) ) print(result) diff --git a/example/search_with_limit_offset.py b/example/search_with_limit_offset.py new file mode 100644 index 0000000000..ede57f24f1 --- /dev/null +++ b/example/search_with_limit_offset.py @@ -0,0 +1,73 @@ +# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +''' +This example is to connect local infinity instance, create table, insert data, search the data +''' + +# import infinity_embedded as infinity +import infinity +import sys + +try: + # Use infinity_embedded module to open a local directory + # infinity_instance = infinity.connect("/var/infinity") + + # Use infinity module to connect a remote server + infinity_instance = infinity.connect(infinity.common.NetworkAddress("127.0.0.1", 23817)) + + # 'default_db' is the default database + db_instance = infinity_instance.get_database("default_db") + + # Drop my_table if it already exists + db_instance.drop_table("my_table", infinity.common.ConflictType.Ignore) + + # Create a table named "my_table" + table_instance = db_instance.create_table("my_table", { + "num": {"type": "integer"}, + "body": {"type": "varchar"}, + "vec": {"type": "vector, 4, float"}, + }) + + # Insert 3 rows of data into the 'my_table' + table_instance.insert( + [ + { + "num": 1, + "body": r"unnecessary and harmful", + "vec": [1.0, 1.2, 0.8, 0.9], + }, + { + "num": 2, + "body": r"Office for Harmful Blooms", + "vec": [4.0, 4.2, 4.3, 4.5], + }, + { + "num": 3, + "body": r"A Bloom filter is a space-efficient probabilistic data structure, conceived by Burton Howard Bloom in 1970, that is used to test whether an element is a member of a set.", + "vec": [4.0, 4.2, 4.3, 4.2], + }, + ] + ) + + result = table_instance.output(["num", "vec", "_similarity"]).match_dense("vec", [3.0, 2.8, 2.7, 3.1], "float", + "cosine", 3).limit(2).offset(1).to_pl() + print(result) + infinity_instance.disconnect() + + print('test done') + sys.exit(0) +except Exception as e: + print(str(e)) + sys.exit(-1) diff --git a/src/embedded_infinity/wrap_infinity.cpp b/src/embedded_infinity/wrap_infinity.cpp index 07d7dbc6f7..c911cf1fd1 100644 --- a/src/embedded_infinity/wrap_infinity.cpp +++ b/src/embedded_infinity/wrap_infinity.cpp @@ -43,6 +43,7 @@ import table_def; import third_party; import logger; import query_options; +import defer_op; namespace infinity { @@ -1218,44 +1219,46 @@ WrapQueryResult WrapSearch(Infinity &instance, WrapParsedExpr *limit_expr, WrapParsedExpr *offset_expr) { SearchExpr *search_expr = nullptr; + DeferFn defer_fn1([&]() { + if (search_expr != nullptr) { + delete search_expr; + search_expr = nullptr; + } + }); if (wrap_search_expr != nullptr) { Status status; search_expr = dynamic_cast(wrap_search_expr->GetParsedExpr(status)); if (status.code_ != ErrorCode::kOk) { - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; - } return WrapQueryResult(status.code_, status.msg_->c_str()); } } ParsedExpr *filter = nullptr; + DeferFn defer_fn2([&]() { + if (filter != nullptr) { + delete filter; + filter = nullptr; + } + }); if (where_expr != nullptr) { Status status; filter = where_expr->GetParsedExpr(status); if (status.code_ != ErrorCode::kOk) { - if (filter != nullptr) { - delete filter; - filter = nullptr; - } - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; - } return WrapQueryResult(status.code_, status.msg_->c_str()); } } Vector *output_columns = nullptr; - - if (select_list.empty()) { - if (filter != nullptr) { - delete filter; - filter = nullptr; - } - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; + DeferFn defer_fn3([&]() { + if(output_columns != nullptr) { + SizeT output_column_len = output_columns->size(); + for(SizeT i = 0; i < output_column_len; ++ i) { + if ((*output_columns)[i] != nullptr) { + delete (*output_columns)[i]; + (*output_columns)[i] = nullptr; + } + } } + }); + if (select_list.empty()) { return WrapQueryResult(ErrorCode::kEmptySelectFields, "[error] Select fields are empty"); } else { output_columns = new Vector(); @@ -1264,30 +1267,47 @@ WrapQueryResult WrapSearch(Infinity &instance, Status status; output_columns->emplace_back(select_list[i].GetParsedExpr(status)); if (status.code_ != ErrorCode::kOk) { - if (output_columns != nullptr) { - for (SizeT j = 0; j <= i; ++j) { - if ((*output_columns)[j] != nullptr) { - delete (*output_columns)[j]; - (*output_columns)[j] = nullptr; - } - } - delete output_columns; - output_columns = nullptr; - } - if (filter != nullptr) { - delete filter; - filter = nullptr; - } - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; - } return WrapQueryResult(status.code_, status.msg_->c_str()); } } } - auto query_result = instance.Search(db_name, table_name, search_expr, filter, output_columns); + ParsedExpr *limit = nullptr; + DeferFn defer_fn4([&]() { + if (limit != nullptr) { + delete limit; + limit = nullptr; + } + }); + if (limit_expr != nullptr) { + Status status; + limit = limit_expr->GetParsedExpr(status); + if (status.code_ != ErrorCode::kOk) { + return WrapQueryResult(status.code_, status.msg_->c_str()); + } + } + + ParsedExpr *offset = nullptr; + DeferFn defer_fn5([&]() { + if (offset != nullptr) { + delete offset; + offset = nullptr; + } + }); + if (offset_expr != nullptr) { + Status status; + offset = offset_expr->GetParsedExpr(status); + if (status.code_ != ErrorCode::kOk) { + return WrapQueryResult(status.code_, status.msg_->c_str()); + } + } + + auto query_result = instance.Search(db_name, table_name, search_expr, filter, limit, offset, output_columns); + search_expr = nullptr; + filter = nullptr; + limit = nullptr; + offset = nullptr; + output_columns = nullptr; if (!query_result.IsOk()) { return WrapQueryResult(query_result.ErrorCode(), query_result.ErrorMsg()); } @@ -1306,63 +1326,63 @@ WrapQueryResult WrapExplain(Infinity &instance, WrapSearchExpr *wrap_search_expr, WrapParsedExpr *wrap_filter) { SearchExpr *search_expr = nullptr; + DeferFn defer_fn1([&]() { + if (search_expr != nullptr) { + delete search_expr; + search_expr = nullptr; + } + }); if (wrap_search_expr != nullptr) { Status status; search_expr = dynamic_cast(wrap_search_expr->GetParsedExpr(status)); if (status.code_ != ErrorCode::kOk) { - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; - } return WrapQueryResult(status.code_, status.msg_->c_str()); } } ParsedExpr *filter = nullptr; + DeferFn defer_fn2([&]() { + if (filter != nullptr) { + delete filter; + filter = nullptr; + } + }); if (wrap_filter != nullptr) { Status status; filter = wrap_filter->GetParsedExpr(status); if (status.code_ != ErrorCode::kOk) { - if (filter != nullptr) { - delete filter; - filter = nullptr; - } - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; - } return WrapQueryResult(status.code_, status.msg_->c_str()); } } - Vector *output_columns = new Vector(); - output_columns->reserve(wrap_output_columns.size()); - for (SizeT i = 0; i < wrap_output_columns.size(); ++i) { - Status status; - output_columns->emplace_back(wrap_output_columns[i].GetParsedExpr(status)); - if (status.code_ != ErrorCode::kOk) { - if (output_columns != nullptr) { - for (SizeT j = 0; j <= i; ++j) { - if ((*output_columns)[j] != nullptr) { - delete (*output_columns)[j]; - (*output_columns)[j] = nullptr; - } + Vector *output_columns = nullptr; + DeferFn defer_fn3([&]() { + if(output_columns != nullptr) { + SizeT output_column_len = output_columns->size(); + for(SizeT i = 0; i < output_column_len; ++ i) { + if ((*output_columns)[i] != nullptr) { + delete (*output_columns)[i]; + (*output_columns)[i] = nullptr; } - delete output_columns; - output_columns = nullptr; - } - if (filter != nullptr) { - delete filter; - filter = nullptr; } - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; + } + }); + if (wrap_output_columns.empty()) { + return WrapQueryResult(ErrorCode::kEmptySelectFields, "[error] Select fields are empty"); + } else { + output_columns = new Vector(); + output_columns->reserve(wrap_output_columns.size()); + for (SizeT i = 0; i < wrap_output_columns.size(); ++i) { + Status status; + output_columns->emplace_back(wrap_output_columns[i].GetParsedExpr(status)); + if (status.code_ != ErrorCode::kOk) { + return WrapQueryResult(status.code_, status.msg_->c_str()); } - return WrapQueryResult(status.code_, status.msg_->c_str()); } } - auto query_result = instance.Explain(db_name, table_name, explain_type, search_expr, filter, output_columns); - + auto query_result = instance.Explain(db_name, table_name, explain_type, search_expr, filter, nullptr, nullptr, output_columns); + search_expr = nullptr; + filter = nullptr; + output_columns = nullptr; if (!query_result.IsOk()) { return WrapQueryResult(query_result.ErrorCode(), query_result.ErrorMsg()); } diff --git a/src/executor/explain_physical_plan.cpp b/src/executor/explain_physical_plan.cpp index 6d731e0f1d..20071d1fde 100644 --- a/src/executor/explain_physical_plan.cpp +++ b/src/executor/explain_physical_plan.cpp @@ -2422,6 +2422,35 @@ void ExplainPhysicalPlan::Explain(const PhysicalMatch *match_node, SharedPtr>> &result, i64 intent_size) { + String explain_header_str; + if (intent_size != 0) { + explain_header_str = String(intent_size - 2, ' ') + "-> MatchSparseScan "; + } else { + explain_header_str = "MatchSparseScan "; + } + explain_header_str += "(" + std::to_string(match_sparse_node->node_id()) + ")"; + result->emplace_back(MakeShared(explain_header_str)); + + // Table index + String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(match_sparse_node->table_index()); + result->emplace_back(MakeShared(table_index)); + + // Output columns + String output_columns = String(intent_size, ' ') + " - output columns: ["; + SizeT column_count = match_sparse_node->GetOutputNames()->size(); + if (column_count == 0) { + String error_message = "No column in match sparse node."; + UnrecoverableError(error_message); + } + for (SizeT idx = 0; idx < column_count - 1; ++idx) { + output_columns += match_sparse_node->GetOutputNames()->at(idx) + ", "; + } + output_columns += match_sparse_node->GetOutputNames()->back(); + output_columns += "]"; + result->emplace_back(MakeShared(output_columns)); +} + void ExplainPhysicalPlan::Explain(const PhysicalMatchTensorScan *match_tensor_node, SharedPtr>> &result, i64 intent_size) { String explain_header_str; if (intent_size != 0) { diff --git a/src/executor/explain_physical_plan.cppm b/src/executor/explain_physical_plan.cppm index 2c8b7a4de9..3144fe1bc0 100644 --- a/src/executor/explain_physical_plan.cppm +++ b/src/executor/explain_physical_plan.cppm @@ -66,6 +66,7 @@ import physical_match; import physical_match_tensor_scan; import physical_fusion; import physical_merge_aggregate; +import physical_match_sparse_scan; export module explain_physical_plan; @@ -107,10 +108,7 @@ public: static void Explain(const PhysicalCreateTable *create_node, SharedPtr>> &result, i64 intent_size = 0); - static void Explain(const PhysicalCreateIndexPrepare *create_index, - SharedPtr>> &result, - - i64 intent_size = 0); + static void Explain(const PhysicalCreateIndexPrepare *create_index, SharedPtr>> &result, i64 intent_size = 0); static void Explain(const PhysicalCreateCollection *create_node, SharedPtr>> &result, i64 intent_size = 0); @@ -148,15 +146,10 @@ public: static void Explain(const PhysicalSink *flush_node, SharedPtr>> &result, i64 intent_size = 0); - static void Explain(const PhysicalParallelAggregate *parallel_aggregate_node, - SharedPtr>> &result, - - i64 intent_size = 0); - - static void Explain(const PhysicalMergeParallelAggregate *merge_parallel_aggregate_node, - SharedPtr>> &result, + static void Explain(const PhysicalParallelAggregate *parallel_aggregate_node, SharedPtr>> &result, i64 intent_size = 0); - i64 intent_size = 0); + static void + Explain(const PhysicalMergeParallelAggregate *merge_parallel_aggregate_node, SharedPtr>> &result, i64 intent_size = 0); static void Explain(const PhysicalIntersect *intersect_node, SharedPtr>> &result, i64 intent_size = 0); @@ -178,10 +171,12 @@ public: static void Explain(const PhysicalMatchTensorScan *match_tensor_node, SharedPtr>> &result, i64 intent_size = 0); + static void Explain(const PhysicalMatchSparseScan *match_sparse_node, SharedPtr>> &result, i64 intent_size = 0); + static void Explain(const PhysicalMergeMatchTensor *merge_match_tensor_node, SharedPtr>> &result, i64 intent_size = 0); static void Explain(const PhysicalFusion *fusion_node, SharedPtr>> &result, i64 intent_size = 0); - + static void Explain(const PhysicalMergeAggregate *fusion_node, SharedPtr>> &result, i64 intent_size = 0); }; diff --git a/src/executor/operator/physical_fusion.cppm b/src/executor/operator/physical_fusion.cppm index f75bbd3a0f..36a96da495 100644 --- a/src/executor/operator/physical_fusion.cppm +++ b/src/executor/operator/physical_fusion.cppm @@ -55,9 +55,7 @@ public: SharedPtr>> GetOutputTypes() const override { return output_types_; } SizeT TaskletCount() override { - String error_message = "Not implement: TaskletCount not Implement"; - UnrecoverableError(error_message); - return 0; + return 1; } void FillingTableRefs(HashMap> &table_refs) override { diff --git a/src/executor/operator/physical_match.cppm b/src/executor/operator/physical_match.cppm index a148c166c4..091f74365d 100644 --- a/src/executor/operator/physical_match.cppm +++ b/src/executor/operator/physical_match.cppm @@ -63,9 +63,7 @@ public: SharedPtr>> GetOutputTypes() const final; SizeT TaskletCount() override { - String error_message = "Not implement: TaskletCount not Implement"; - UnrecoverableError(error_message); - return 0; + return 1; } void FillingTableRefs(HashMap> &table_refs) override { diff --git a/src/executor/operator/physical_scan/physical_match_sparse_scan.cppm b/src/executor/operator/physical_scan/physical_match_sparse_scan.cppm index 187a7a37a0..2883c0ee7c 100644 --- a/src/executor/operator/physical_scan/physical_match_sparse_scan.cppm +++ b/src/executor/operator/physical_scan/physical_match_sparse_scan.cppm @@ -59,6 +59,10 @@ public: Vector>> PlanWithIndex(Vector>> &block_groups, i64 parallel_count); + u64 table_index() const { + return table_index_; + } + private: template void ExecuteInner(QueryContext *query_context, diff --git a/src/main/infinity.cpp b/src/main/infinity.cpp index 005a08ec2f..e900e02e53 100644 --- a/src/main/infinity.cpp +++ b/src/main/infinity.cpp @@ -65,9 +65,7 @@ namespace infinity { u64 Infinity::GetSessionId() { return session_->session_id(); } -void Infinity::Hello() { - fmt::print("hello infinity\n"); -} +void Infinity::Hello() { fmt::print("hello infinity\n"); } void Infinity::LocalInit(const String &path) { LocalFileSystem fs; @@ -107,7 +105,7 @@ SharedPtr Infinity::RemoteConnect() { SharedPtr infinity_ptr = MakeShared(); SessionManager *session_mgr = InfinityContext::instance().session_manager(); SharedPtr remote_session = session_mgr->CreateRemoteSession(); - if(remote_session == nullptr) { + if (remote_session == nullptr) { return nullptr; } infinity_ptr->session_ = std::move(remote_session); @@ -428,13 +426,13 @@ QueryResult Infinity::CreateTable(const String &db_name, ToLower(create_table_info->table_name_); create_table_info->column_defs_ = std::move(column_defs); - for(ColumnDef* column_def_ptr: create_table_info->column_defs_) { + for (ColumnDef *column_def_ptr : create_table_info->column_defs_) { ToLower(column_def_ptr->name_); } create_table_info->constraints_ = std::move(constraints); create_table_info->conflict_type_ = create_table_options.conflict_type_; create_table_info->properties_ = std::move(create_table_options.properties_); - for(InitParameter* parameter_ptr: create_table_info->properties_) { + for (InitParameter *parameter_ptr : create_table_info->properties_) { ToLower(parameter_ptr->param_name_); ToLower(parameter_ptr->param_value_); } @@ -603,7 +601,7 @@ QueryResult Infinity::CreateIndex(const String &db_name, ToLower(create_index_info->index_name_); ToLower(index_info_ptr->column_name_); - for(InitParameter* init_param_ptr: *index_info_ptr->index_param_list_) { + for (InitParameter *init_param_ptr : *index_info_ptr->index_param_list_) { ToLower(init_param_ptr->param_name_); ToLower(init_param_ptr->param_value_); } @@ -666,7 +664,6 @@ QueryResult Infinity::ShowIndex(const String &db_name, const String &table_name, show_statement->index_name_ = index_name_internal; - show_statement->show_type_ = ShowStmtType::kIndex; QueryResult result = query_context_ptr->QueryStatement(show_statement.get()); return result; @@ -854,7 +851,7 @@ QueryResult Infinity::Insert(const String &db_name, const String &table_name, Ve ToLower(insert_statement->table_name_); insert_statement->columns_ = columns; - for(String& column_name: *insert_statement->columns_) { + for (String &column_name : *insert_statement->columns_) { ToLower(column_name); } insert_statement->values_ = values; @@ -890,7 +887,8 @@ QueryResult Infinity::Import(const String &db_name, const String &table_name, co return result; } -QueryResult Infinity::Export(const String &db_name, const String &table_name, Vector *columns, const String &path, ExportOptions export_options) { +QueryResult +Infinity::Export(const String &db_name, const String &table_name, Vector *columns, const String &path, ExportOptions export_options) { UniquePtr query_context_ptr = MakeUnique(session_.get()); query_context_ptr->Init(InfinityContext::instance().config(), InfinityContext::instance().task_scheduler(), @@ -960,7 +958,7 @@ QueryResult Infinity::Update(const String &db_name, const String &table_name, Pa // TODO: to lower expression identifier string update_statement->where_expr_ = filter; update_statement->update_expr_array_ = update_list; - for(UpdateExpr* update_expr_ptr: *update_statement->update_expr_array_) { + for (UpdateExpr *update_expr_ptr : *update_statement->update_expr_array_) { ToLower(update_expr_ptr->column_name); } QueryResult result = query_context_ptr->QueryStatement(update_statement.get()); @@ -972,6 +970,8 @@ QueryResult Infinity::Explain(const String &db_name, ExplainType explain_type, SearchExpr *search_expr, ParsedExpr *filter, + ParsedExpr *limit, + ParsedExpr *offset, Vector *output_columns) { UniquePtr query_context_ptr = MakeUnique(session_.get()); @@ -1000,6 +1000,8 @@ QueryResult Infinity::Explain(const String &db_name, select_statement->select_list_ = output_columns; select_statement->where_expr_ = filter; select_statement->search_expr_ = search_expr; + select_statement->limit_expr_ = limit; + select_statement->offset_expr_ = offset; explain_statment->statement_ = select_statement; @@ -1007,8 +1009,13 @@ QueryResult Infinity::Explain(const String &db_name, return result; } -QueryResult -Infinity::Search(const String &db_name, const String &table_name, SearchExpr *search_expr, ParsedExpr *filter, Vector *output_columns) { +QueryResult Infinity::Search(const String &db_name, + const String &table_name, + SearchExpr *search_expr, + ParsedExpr *filter, + ParsedExpr *limit, + ParsedExpr *offset, + Vector *output_columns) { UniquePtr query_context_ptr = MakeUnique(session_.get()); query_context_ptr->Init(InfinityContext::instance().config(), InfinityContext::instance().task_scheduler(), @@ -1032,6 +1039,8 @@ Infinity::Search(const String &db_name, const String &table_name, SearchExpr *se select_statement->select_list_ = output_columns; select_statement->where_expr_ = filter; select_statement->search_expr_ = search_expr; + select_statement->limit_expr_ = limit; + select_statement->offset_expr_ = offset; QueryResult result = query_context_ptr->QueryStatement(select_statement.get()); return result; diff --git a/src/main/infinity.cppm b/src/main/infinity.cppm index d2968515f7..ce73b4d522 100644 --- a/src/main/infinity.cppm +++ b/src/main/infinity.cppm @@ -141,7 +141,8 @@ public: QueryResult Import(const String &db_name, const String &table_name, const String &path, ImportOptions import_options); - QueryResult Export(const String &db_name, const String &table_name, Vector *columns, const String &path, ExportOptions export_options); + QueryResult + Export(const String &db_name, const String &table_name, Vector *columns, const String &path, ExportOptions export_options); QueryResult Delete(const String &db_name, const String &table_name, ParsedExpr *filter); @@ -152,10 +153,17 @@ public: ExplainType explain_type, SearchExpr *search_expr, ParsedExpr *filter, + ParsedExpr *limit, + ParsedExpr *offset, Vector *output_columns); - QueryResult - Search(const String &db_name, const String &table_name, SearchExpr *search_expr, ParsedExpr *filter, Vector *output_columns); + QueryResult Search(const String &db_name, + const String &table_name, + SearchExpr *search_expr, + ParsedExpr *filter, + ParsedExpr *limit, + ParsedExpr *offset, + Vector *output_columns); QueryResult Optimize(const String &db_name, const String &table_name, OptimizeOptions optimize_options = OptimizeOptions{}); diff --git a/src/network/http/http_search.cpp b/src/network/http/http_search.cpp index 9fbe64e5ab..ca1565acfe 100644 --- a/src/network/http/http_search.cpp +++ b/src/network/http/http_search.cpp @@ -59,6 +59,8 @@ void HTTPSearch::Process(Infinity *infinity_ptr, response["error_message"] = "HTTP Body isn't json object"; } UniquePtr filter{}; + UniquePtr limit{}; + UniquePtr offset{}; UniquePtr search_expr{}; Vector *output_columns{nullptr}; DeferFn defer_fn([&]() { @@ -101,6 +103,28 @@ void HTTPSearch::Process(Infinity *infinity_ptr, if (!filter) { return; } + } else if (IsEqual(key, "limit")) { + + if (limit) { + response["error_code"] = ErrorCode::kInvalidExpression; + response["error_message"] = "More than one limit field."; + return; + } + limit = ParseFilter(elem.value(), http_status, response); + if (!limit) { + return; + } + } else if (IsEqual(key, "offset")) { + + if (offset) { + response["error_code"] = ErrorCode::kInvalidExpression; + response["error_message"] = "More than one offset field."; + return; + } + offset = ParseFilter(elem.value(), http_status, response); + if (!offset) { + return; + } } else if (IsEqual(key, "search")) { if (search_expr) { response["error_code"] = ErrorCode::kInvalidExpression; @@ -118,7 +142,8 @@ void HTTPSearch::Process(Infinity *infinity_ptr, } } - const QueryResult result = infinity_ptr->Search(db_name, table_name, search_expr.release(), filter.release(), output_columns); + const QueryResult result = + infinity_ptr->Search(db_name, table_name, search_expr.release(), filter.release(), limit.release(), offset.release(), output_columns); output_columns = nullptr; if (result.IsOk()) { @@ -168,6 +193,8 @@ void HTTPSearch::Explain(Infinity *infinity_ptr, response["error_message"] = "HTTP Body isn't json object"; } UniquePtr filter{}; + UniquePtr limit{}; + UniquePtr offset{}; UniquePtr search_expr{}; Vector *output_columns{nullptr}; DeferFn defer_fn([&]() { @@ -179,7 +206,7 @@ void HTTPSearch::Explain(Infinity *infinity_ptr, output_columns = nullptr; } }); - ExplainType explain_type = ExplainType::kInvalid; + ExplainType explain_type = ExplainType::kInvalid; for (const auto &elem : input_json.items()) { String key = elem.key(); ToLower(key); @@ -211,6 +238,28 @@ void HTTPSearch::Explain(Infinity *infinity_ptr, if (filter == nullptr) { return; } + } else if (IsEqual(key, "limit")) { + + if (limit) { + response["error_code"] = ErrorCode::kInvalidExpression; + response["error_message"] = "More than one limit field."; + return; + } + limit = ParseFilter(elem.value(), http_status, response); + if (!limit) { + return; + } + } else if (IsEqual(key, "offset")) { + + if (offset) { + response["error_code"] = ErrorCode::kInvalidExpression; + response["error_message"] = "More than one offset field."; + return; + } + offset = ParseFilter(elem.value(), http_status, response); + if (!offset) { + return; + } } else if (IsEqual(key, "search")) { if (search_expr) { response["error_code"] = ErrorCode::kInvalidExpression; @@ -247,7 +296,14 @@ void HTTPSearch::Explain(Infinity *infinity_ptr, } } - const QueryResult result = infinity_ptr->Explain(db_name, table_name, explain_type, search_expr.release(), filter.release(), output_columns); + const QueryResult result = infinity_ptr->Explain(db_name, + table_name, + explain_type, + search_expr.release(), + filter.release(), + limit.release(), + offset.release(), + output_columns); output_columns = nullptr; if (result.IsOk()) { @@ -325,15 +381,15 @@ Vector *HTTPSearch::ParseOutput(const nlohmann::json &output_list, return nullptr; } - if(output_expr_str == "_row_id" or output_expr_str == "_similarity" or output_expr_str == "_distance" or output_expr_str == "_score") { + if (output_expr_str == "_row_id" or output_expr_str == "_similarity" or output_expr_str == "_distance" or output_expr_str == "_score") { auto parsed_expr = new FunctionExpr(); - if(output_expr_str == "_row_id") { + if (output_expr_str == "_row_id") { parsed_expr->func_name_ = "row_id"; - } else if(output_expr_str == "_similarity") { + } else if (output_expr_str == "_similarity") { parsed_expr->func_name_ = "similarity"; - } else if(output_expr_str == "_distance") { + } else if (output_expr_str == "_distance") { parsed_expr->func_name_ = "distance"; - } else if(output_expr_str == "_score") { + } else if (output_expr_str == "_score") { parsed_expr->func_name_ = "score"; } output_columns->emplace_back(parsed_expr); diff --git a/src/network/infinity_thrift_service.cpp b/src/network/infinity_thrift_service.cpp index b2aca4e6f7..18d7c3f544 100644 --- a/src/network/infinity_thrift_service.cpp +++ b/src/network/infinity_thrift_service.cpp @@ -564,25 +564,49 @@ void InfinityThriftService::Select(infinity_thrift_rpc::SelectResponse &response } } - // TODO: - // ParsedExpr *offset; - // offset = new ParsedExpr(); + // Limit + ParsedExpr *limit = nullptr; + DeferFn defer_fn7([&]() { + if (limit != nullptr) { + delete limit; + limit = nullptr; + } + }); + if (request.__isset.limit_expr == true) { + limit = GetParsedExprFromProto(parsed_expr_status, request.limit_expr); + if (!parsed_expr_status.ok()) { + ProcessStatus(response, parsed_expr_status); + return; + } + } - // limit - // ParsedExpr *limit = nullptr; - // if (request.__isset.limit_expr == true) { - // limit = GetParsedExprFromProto(request.limit_expr); - // } + // Offset + ParsedExpr *offset = nullptr; + DeferFn defer_fn8([&]() { + if (offset != nullptr) { + delete offset; + offset = nullptr; + } + }); + if (request.__isset.offset_expr == true) { + offset = GetParsedExprFromProto(parsed_expr_status, request.offset_expr); + if (!parsed_expr_status.ok()) { + ProcessStatus(response, parsed_expr_status); + return; + } + } // auto end2 = std::chrono::steady_clock::now(); // phase_2_duration_ += end2 - start2; // // auto start3 = std::chrono::steady_clock::now(); - const QueryResult result = infinity->Search(request.db_name, request.table_name, search_expr, filter, output_columns); + const QueryResult result = infinity->Search(request.db_name, request.table_name, search_expr, filter, limit, offset, output_columns); output_columns = nullptr; filter = nullptr; search_expr = nullptr; + limit = nullptr; + offset = nullptr; // auto end3 = std::chrono::steady_clock::now(); // // phase_3_duration_ += end3 - start3; @@ -635,21 +659,21 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons Vector *output_columns = new Vector(); output_columns->reserve(request.select_list.size()); + DeferFn defer_fn1([&]() { + if (output_columns != nullptr) { + for (auto &expr_ptr : *output_columns) { + delete expr_ptr; + expr_ptr = nullptr; + } + delete output_columns; + output_columns = nullptr; + } + }); Status parsed_expr_status; for (auto &expr : request.select_list) { auto parsed_expr = GetParsedExprFromProto(parsed_expr_status, expr); if (!parsed_expr_status.ok()) { - - if (output_columns != nullptr) { - for (auto &expr_ptr : *output_columns) { - delete expr_ptr; - expr_ptr = nullptr; - } - delete output_columns; - output_columns = nullptr; - } - if (parsed_expr != nullptr) { delete parsed_expr; parsed_expr = nullptr; @@ -663,43 +687,44 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons // search expr SearchExpr *search_expr = nullptr; + DeferFn defer_fn5([&]() { + if (search_expr != nullptr) { + delete search_expr; + search_expr = nullptr; + } + }); if (request.__isset.search_expr) { auto search_expr_list = new Vector(); + DeferFn defer_fn2([&]() { + if (search_expr_list != nullptr) { + for (auto &expr_ptr : *search_expr_list) { + delete expr_ptr; + expr_ptr = nullptr; + } + delete search_expr_list; + search_expr_list = nullptr; + } + }); + SizeT match_expr_count = request.search_expr.match_exprs.size(); SizeT fusion_expr_count = request.search_expr.fusion_exprs.size(); SizeT total_expr_count = match_expr_count + fusion_expr_count; search_expr_list->reserve(total_expr_count); for (SizeT idx = 0; idx < match_expr_count; ++idx) { Status status; - auto match_expr = GetGenericMatchExprFromProto(status, request.search_expr.match_exprs[idx]); - if (!status.ok()) { - if (output_columns != nullptr) { - for (auto &expr_ptr : *output_columns) { - delete expr_ptr; - expr_ptr = nullptr; - } - delete output_columns; - output_columns = nullptr; - } - - if (search_expr_list != nullptr) { - for (auto &expr_ptr : *search_expr_list) { - delete expr_ptr; - expr_ptr = nullptr; - } - delete search_expr_list; - search_expr_list = nullptr; - } - + ParsedExpr *match_expr = GetGenericMatchExprFromProto(status, request.search_expr.match_exprs[idx]); + DeferFn defer_fn3([&]() { if (match_expr != nullptr) { delete match_expr; match_expr = nullptr; } - + }); + if (!status.ok()) { ProcessStatus(response, status); return; } search_expr_list->emplace_back(match_expr); + match_expr = nullptr; } for (SizeT idx = 0; idx < fusion_expr_count; ++idx) { @@ -709,6 +734,7 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons search_expr = new SearchExpr(); search_expr->SetExprs(search_expr_list); + search_expr_list = nullptr; } // filter @@ -716,16 +742,6 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons if (request.__isset.where_expr == true) { filter = GetParsedExprFromProto(parsed_expr_status, request.where_expr); if (!parsed_expr_status.ok()) { - - if (output_columns != nullptr) { - for (auto &expr_ptr : *output_columns) { - delete expr_ptr; - expr_ptr = nullptr; - } - delete output_columns; - output_columns = nullptr; - } - if (search_expr != nullptr) { delete search_expr; search_expr = nullptr; @@ -741,19 +757,46 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons } } - // TODO: - // ParsedExpr *offset; - // offset = new ParsedExpr(); + // Limit + ParsedExpr *limit = nullptr; + DeferFn defer_fn7([&]() { + if (limit != nullptr) { + delete limit; + limit = nullptr; + } + }); + if (request.__isset.limit_expr == true) { + limit = GetParsedExprFromProto(parsed_expr_status, request.limit_expr); + if (!parsed_expr_status.ok()) { + ProcessStatus(response, parsed_expr_status); + return; + } + } - // limit - // ParsedExpr *limit = nullptr; - // if (request.__isset.limit_expr == true) { - // limit = GetParsedExprFromProto(request.limit_expr); - // } + // Offset + ParsedExpr *offset = nullptr; + DeferFn defer_fn8([&]() { + if (offset != nullptr) { + delete offset; + offset = nullptr; + } + }); + if (request.__isset.offset_expr == true) { + offset = GetParsedExprFromProto(parsed_expr_status, request.offset_expr); + if (!parsed_expr_status.ok()) { + ProcessStatus(response, parsed_expr_status); + return; + } + } // Explain type auto explain_type = GetExplainTypeFromProto(request.explain_type); - const QueryResult result = infinity->Explain(request.db_name, request.table_name, explain_type, search_expr, filter, output_columns); + const QueryResult result = infinity->Explain(request.db_name, request.table_name, explain_type, search_expr, filter, limit, offset, output_columns); + output_columns = nullptr; + search_expr = nullptr; + filter = nullptr; + limit = nullptr; + offset = nullptr; if (result.IsOk()) { auto &columns = response.column_fields; diff --git a/src/planner/bound_select_statement.cpp b/src/planner/bound_select_statement.cpp index 0e1c40ac80..1b957abbae 100644 --- a/src/planner/bound_select_statement.cpp +++ b/src/planner/bound_select_statement.cpp @@ -329,6 +329,12 @@ SharedPtr BoundSelectStatement::BuildPlan(QueryContext *query_conte root = std::move(match_knn_nodes[0]); } + if (limit_expression_.get() != nullptr) { + auto limit = MakeShared(bind_context->GetNewLogicalNodeId(), limit_expression_, offset_expression_); + limit->set_left_node(root); + root = limit; + } + auto project = MakeShared(bind_context->GetNewLogicalNodeId(), projection_expressions_, projection_index_); project->set_left_node(root); root = std::move(project); diff --git a/src/planner/explain_logical_plan.cpp b/src/planner/explain_logical_plan.cpp index 4857892f75..f4a54f141b 100644 --- a/src/planner/explain_logical_plan.cpp +++ b/src/planner/explain_logical_plan.cpp @@ -2299,6 +2299,14 @@ void ExplainLogicalPlan::Explain(const LogicalMatch *match_node, SharedPtr>> &result, i64 intent_size) { + IStringStream iss(match_node->ToString(intent_size)); + String line; + while (std::getline(iss, line)) { + result->emplace_back(MakeShared(std::move(line))); + } +} + void ExplainLogicalPlan::Explain(const LogicalMatchTensorScan *match_tensor_node, SharedPtr>> &result, i64 intent_size) { IStringStream iss(match_tensor_node->ToString(intent_size)); String line; diff --git a/src/planner/explain_logical_plan.cppm b/src/planner/explain_logical_plan.cppm index de3a8a0d11..8d9eca9028 100644 --- a/src/planner/explain_logical_plan.cppm +++ b/src/planner/explain_logical_plan.cppm @@ -47,6 +47,7 @@ import logical_export; import logical_flush; import logical_optimize; import logical_match; +import logical_match_sparse_scan; import logical_match_tensor_scan; import logical_fusion; import base_expression; @@ -115,6 +116,8 @@ public: static void Explain(const LogicalMatch *match_node, SharedPtr>> &result, i64 intent_size = 0); + static void Explain(const LogicalMatchSparseScan *match_node, SharedPtr>> &result, i64 intent_size = 0); + static void Explain(const LogicalMatchTensorScan *match_tensor_node, SharedPtr>> &result, i64 intent_size = 0); static void Explain(const LogicalFusion *fusion_node, SharedPtr>> &result, i64 intent_size = 0); diff --git a/src/unit_test/main/infinity.cpp b/src/unit_test/main/infinity.cpp index b09c6fd61d..ddea6d6177 100644 --- a/src/unit_test/main/infinity.cpp +++ b/src/unit_test/main/infinity.cpp @@ -203,7 +203,7 @@ TEST_F(InfinityTest, test1) { SearchExpr * search_expr = nullptr; - result = infinity->Search("default_db", "table1", search_expr, nullptr, output_columns); + result = infinity->Search("default_db", "table1", search_expr, nullptr, nullptr, nullptr, output_columns); SharedPtr data_block = result.result_table_->GetDataBlockById(0); EXPECT_EQ(data_block->row_count(), 1); Value value = data_block->GetValue(0, 0); diff --git a/src/unit_test/main/table.cpp b/src/unit_test/main/table.cpp index 12e46cc03e..880f2fc5d5 100644 --- a/src/unit_test/main/table.cpp +++ b/src/unit_test/main/table.cpp @@ -85,7 +85,7 @@ TEST_F(InfinityTableTest, test1) { col2->names_.emplace_back(col2_name); output_columns->emplace_back(col2); - QueryResult explain_ast = infinity->Explain(db_name, table_name, ExplainType::kAst, nullptr, nullptr, output_columns); + QueryResult explain_ast = infinity->Explain(db_name, table_name, ExplainType::kAst, nullptr, nullptr, nullptr, nullptr, output_columns); EXPECT_TRUE(explain_ast.IsOk()); // fmt::print("AST: {}\n", explain_ast.ToString()); } @@ -96,7 +96,7 @@ TEST_F(InfinityTableTest, test1) { col2->names_.emplace_back(col2_name); output_columns->emplace_back(col2); - QueryResult explain_unopt = infinity->Explain(db_name, table_name, ExplainType::kUnOpt, nullptr, nullptr, output_columns); + QueryResult explain_unopt = infinity->Explain(db_name, table_name, ExplainType::kUnOpt, nullptr, nullptr, nullptr, nullptr, output_columns); EXPECT_TRUE(explain_unopt.IsOk()); // fmt::print("Unoptimized logical plan: {}\n", explain_unopt.ToString()); } @@ -107,7 +107,7 @@ TEST_F(InfinityTableTest, test1) { col2->names_.emplace_back(col2_name); output_columns->emplace_back(col2); - QueryResult explain_opt = infinity->Explain(db_name, table_name, ExplainType::kOpt, nullptr, nullptr, output_columns); + QueryResult explain_opt = infinity->Explain(db_name, table_name, ExplainType::kOpt, nullptr, nullptr, nullptr, nullptr, output_columns); EXPECT_TRUE(explain_opt.IsOk()); // fmt::print("Optimized logical plan: {}\n", explain_opt.ToString()); } @@ -118,7 +118,7 @@ TEST_F(InfinityTableTest, test1) { col2->names_.emplace_back(col2_name); output_columns->emplace_back(col2); - QueryResult explain_phy = infinity->Explain(db_name, table_name, ExplainType::kPhysical, nullptr, nullptr, output_columns); + QueryResult explain_phy = infinity->Explain(db_name, table_name, ExplainType::kPhysical, nullptr, nullptr, nullptr, nullptr, output_columns); EXPECT_TRUE(explain_phy.IsOk()); // fmt::print("Physical plan: {}\n", explain_phy.ToString()); } @@ -129,7 +129,7 @@ TEST_F(InfinityTableTest, test1) { col2->names_.emplace_back(col2_name); output_columns->emplace_back(col2); - QueryResult explain_fragment = infinity->Explain(db_name, table_name, ExplainType::kFragment, nullptr, nullptr, output_columns); + QueryResult explain_fragment = infinity->Explain(db_name, table_name, ExplainType::kFragment, nullptr, nullptr, nullptr, nullptr, output_columns); EXPECT_TRUE(explain_fragment.IsOk()); // fmt::print("Fragment: {}\n", explain_fragment.ToString()); } @@ -140,7 +140,7 @@ TEST_F(InfinityTableTest, test1) { col2->names_.emplace_back(col2_name); output_columns->emplace_back(col2); - QueryResult explain_pipeline = infinity->Explain(db_name, table_name, ExplainType::kPipeline, nullptr, nullptr, output_columns); + QueryResult explain_pipeline = infinity->Explain(db_name, table_name, ExplainType::kPipeline, nullptr, nullptr, nullptr, nullptr, output_columns); EXPECT_TRUE(explain_pipeline.IsOk()); // fmt::print("Pipeline: {}\n", explain_pipeline.ToString()); }