From 0ba8617db773dbeb08555ce49df926576828a318 Mon Sep 17 00:00:00 2001 From: Jin Hai Date: Tue, 24 Sep 2024 16:09:46 +0800 Subject: [PATCH] Enable limit and offset expression of Python and HTTP API (#1905) ### What problem does this PR solve? 1. Fix limit and offset in Python API and HTTP API 2. Add an example to demonstrate how to use limit and offset in Python API. Issue link:#1903 #1896 ### Type of change - [x] Bug Fix (non-breaking change which fixes an issue) --------- Signed-off-by: Jin Hai --- .../knn/knn_query_benchmark.cpp | 2 +- example/hybrid_search.py | 10 +- example/search_with_limit_offset.py | 73 ++++++++ src/embedded_infinity/wrap_infinity.cpp | 172 ++++++++++-------- src/executor/explain_physical_plan.cpp | 29 +++ src/executor/explain_physical_plan.cppm | 21 +-- src/executor/operator/physical_fusion.cppm | 4 +- src/executor/operator/physical_match.cppm | 4 +- .../physical_match_sparse_scan.cppm | 4 + src/main/infinity.cpp | 35 ++-- src/main/infinity.cppm | 14 +- src/network/http/http_search.cpp | 72 +++++++- src/network/infinity_thrift_service.cpp | 161 ++++++++++------ src/planner/bound_select_statement.cpp | 6 + src/planner/explain_logical_plan.cpp | 8 + src/planner/explain_logical_plan.cppm | 3 + src/unit_test/main/infinity.cpp | 2 +- src/unit_test/main/table.cpp | 12 +- 18 files changed, 441 insertions(+), 191 deletions(-) create mode 100644 example/search_with_limit_offset.py diff --git a/benchmark/local_infinity/knn/knn_query_benchmark.cpp b/benchmark/local_infinity/knn/knn_query_benchmark.cpp index 942395e4a1..719ada6937 100644 --- a/benchmark/local_infinity/knn/knn_query_benchmark.cpp +++ b/benchmark/local_infinity/knn/knn_query_benchmark.cpp @@ -216,7 +216,7 @@ int main(int argc, char *argv[]) { auto select_rowid_expr = new FunctionExpr(); select_rowid_expr->func_name_ = "row_id"; output_columns->emplace_back(select_rowid_expr); - auto result = infinity->Search(db_name, table_name, search_expr, nullptr, output_columns); + auto result = infinity->Search(db_name, table_name, search_expr, nullptr, nullptr, nullptr, output_columns); { auto &cv = result.result_table_->GetDataBlockById(0)->column_vectors; auto &column = *cv[0]; diff --git a/example/hybrid_search.py b/example/hybrid_search.py index 11020b1cf2..da41583ab7 100644 --- a/example/hybrid_search.py +++ b/example/hybrid_search.py @@ -16,8 +16,8 @@ This example is to connect local infinity instance, create table, insert data, search the data """ -import infinity_embedded as infinity -#import infinity +# import infinity_embedded as infinity +import infinity import sys try: @@ -101,12 +101,12 @@ .match_text("body", "blooms", 10) .filter("year < 2024") .fusion( - method="match_tensor", topn=2, + method="match_tensor", topn=3, fusion_params={"field": "tensor", "data_type": "float", - "data": [[0.9, 0.0, 0.0, 0.0], [1.1, 0.0, 0.0, 0.0]]}, - params={"filter": "year < 2024"} + "data": [[0.9, 0.0, 0.0, 0.0], [1.1, 0.0, 0.0, 0.0]]} ) .to_pl() + # .explain(explain_type=infinity.table.ExplainType.UnOpt) ) print(result) diff --git a/example/search_with_limit_offset.py b/example/search_with_limit_offset.py new file mode 100644 index 0000000000..ede57f24f1 --- /dev/null +++ b/example/search_with_limit_offset.py @@ -0,0 +1,73 @@ +# Copyright(C) 2023 InfiniFlow, Inc. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +''' +This example is to connect local infinity instance, create table, insert data, search the data +''' + +# import infinity_embedded as infinity +import infinity +import sys + +try: + # Use infinity_embedded module to open a local directory + # infinity_instance = infinity.connect("/var/infinity") + + # Use infinity module to connect a remote server + infinity_instance = infinity.connect(infinity.common.NetworkAddress("127.0.0.1", 23817)) + + # 'default_db' is the default database + db_instance = infinity_instance.get_database("default_db") + + # Drop my_table if it already exists + db_instance.drop_table("my_table", infinity.common.ConflictType.Ignore) + + # Create a table named "my_table" + table_instance = db_instance.create_table("my_table", { + "num": {"type": "integer"}, + "body": {"type": "varchar"}, + "vec": {"type": "vector, 4, float"}, + }) + + # Insert 3 rows of data into the 'my_table' + table_instance.insert( + [ + { + "num": 1, + "body": r"unnecessary and harmful", + "vec": [1.0, 1.2, 0.8, 0.9], + }, + { + "num": 2, + "body": r"Office for Harmful Blooms", + "vec": [4.0, 4.2, 4.3, 4.5], + }, + { + "num": 3, + "body": r"A Bloom filter is a space-efficient probabilistic data structure, conceived by Burton Howard Bloom in 1970, that is used to test whether an element is a member of a set.", + "vec": [4.0, 4.2, 4.3, 4.2], + }, + ] + ) + + result = table_instance.output(["num", "vec", "_similarity"]).match_dense("vec", [3.0, 2.8, 2.7, 3.1], "float", + "cosine", 3).limit(2).offset(1).to_pl() + print(result) + infinity_instance.disconnect() + + print('test done') + sys.exit(0) +except Exception as e: + print(str(e)) + sys.exit(-1) diff --git a/src/embedded_infinity/wrap_infinity.cpp b/src/embedded_infinity/wrap_infinity.cpp index 07d7dbc6f7..c911cf1fd1 100644 --- a/src/embedded_infinity/wrap_infinity.cpp +++ b/src/embedded_infinity/wrap_infinity.cpp @@ -43,6 +43,7 @@ import table_def; import third_party; import logger; import query_options; +import defer_op; namespace infinity { @@ -1218,44 +1219,46 @@ WrapQueryResult WrapSearch(Infinity &instance, WrapParsedExpr *limit_expr, WrapParsedExpr *offset_expr) { SearchExpr *search_expr = nullptr; + DeferFn defer_fn1([&]() { + if (search_expr != nullptr) { + delete search_expr; + search_expr = nullptr; + } + }); if (wrap_search_expr != nullptr) { Status status; search_expr = dynamic_cast(wrap_search_expr->GetParsedExpr(status)); if (status.code_ != ErrorCode::kOk) { - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; - } return WrapQueryResult(status.code_, status.msg_->c_str()); } } ParsedExpr *filter = nullptr; + DeferFn defer_fn2([&]() { + if (filter != nullptr) { + delete filter; + filter = nullptr; + } + }); if (where_expr != nullptr) { Status status; filter = where_expr->GetParsedExpr(status); if (status.code_ != ErrorCode::kOk) { - if (filter != nullptr) { - delete filter; - filter = nullptr; - } - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; - } return WrapQueryResult(status.code_, status.msg_->c_str()); } } Vector *output_columns = nullptr; - - if (select_list.empty()) { - if (filter != nullptr) { - delete filter; - filter = nullptr; - } - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; + DeferFn defer_fn3([&]() { + if(output_columns != nullptr) { + SizeT output_column_len = output_columns->size(); + for(SizeT i = 0; i < output_column_len; ++ i) { + if ((*output_columns)[i] != nullptr) { + delete (*output_columns)[i]; + (*output_columns)[i] = nullptr; + } + } } + }); + if (select_list.empty()) { return WrapQueryResult(ErrorCode::kEmptySelectFields, "[error] Select fields are empty"); } else { output_columns = new Vector(); @@ -1264,30 +1267,47 @@ WrapQueryResult WrapSearch(Infinity &instance, Status status; output_columns->emplace_back(select_list[i].GetParsedExpr(status)); if (status.code_ != ErrorCode::kOk) { - if (output_columns != nullptr) { - for (SizeT j = 0; j <= i; ++j) { - if ((*output_columns)[j] != nullptr) { - delete (*output_columns)[j]; - (*output_columns)[j] = nullptr; - } - } - delete output_columns; - output_columns = nullptr; - } - if (filter != nullptr) { - delete filter; - filter = nullptr; - } - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; - } return WrapQueryResult(status.code_, status.msg_->c_str()); } } } - auto query_result = instance.Search(db_name, table_name, search_expr, filter, output_columns); + ParsedExpr *limit = nullptr; + DeferFn defer_fn4([&]() { + if (limit != nullptr) { + delete limit; + limit = nullptr; + } + }); + if (limit_expr != nullptr) { + Status status; + limit = limit_expr->GetParsedExpr(status); + if (status.code_ != ErrorCode::kOk) { + return WrapQueryResult(status.code_, status.msg_->c_str()); + } + } + + ParsedExpr *offset = nullptr; + DeferFn defer_fn5([&]() { + if (offset != nullptr) { + delete offset; + offset = nullptr; + } + }); + if (offset_expr != nullptr) { + Status status; + offset = offset_expr->GetParsedExpr(status); + if (status.code_ != ErrorCode::kOk) { + return WrapQueryResult(status.code_, status.msg_->c_str()); + } + } + + auto query_result = instance.Search(db_name, table_name, search_expr, filter, limit, offset, output_columns); + search_expr = nullptr; + filter = nullptr; + limit = nullptr; + offset = nullptr; + output_columns = nullptr; if (!query_result.IsOk()) { return WrapQueryResult(query_result.ErrorCode(), query_result.ErrorMsg()); } @@ -1306,63 +1326,63 @@ WrapQueryResult WrapExplain(Infinity &instance, WrapSearchExpr *wrap_search_expr, WrapParsedExpr *wrap_filter) { SearchExpr *search_expr = nullptr; + DeferFn defer_fn1([&]() { + if (search_expr != nullptr) { + delete search_expr; + search_expr = nullptr; + } + }); if (wrap_search_expr != nullptr) { Status status; search_expr = dynamic_cast(wrap_search_expr->GetParsedExpr(status)); if (status.code_ != ErrorCode::kOk) { - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; - } return WrapQueryResult(status.code_, status.msg_->c_str()); } } ParsedExpr *filter = nullptr; + DeferFn defer_fn2([&]() { + if (filter != nullptr) { + delete filter; + filter = nullptr; + } + }); if (wrap_filter != nullptr) { Status status; filter = wrap_filter->GetParsedExpr(status); if (status.code_ != ErrorCode::kOk) { - if (filter != nullptr) { - delete filter; - filter = nullptr; - } - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; - } return WrapQueryResult(status.code_, status.msg_->c_str()); } } - Vector *output_columns = new Vector(); - output_columns->reserve(wrap_output_columns.size()); - for (SizeT i = 0; i < wrap_output_columns.size(); ++i) { - Status status; - output_columns->emplace_back(wrap_output_columns[i].GetParsedExpr(status)); - if (status.code_ != ErrorCode::kOk) { - if (output_columns != nullptr) { - for (SizeT j = 0; j <= i; ++j) { - if ((*output_columns)[j] != nullptr) { - delete (*output_columns)[j]; - (*output_columns)[j] = nullptr; - } + Vector *output_columns = nullptr; + DeferFn defer_fn3([&]() { + if(output_columns != nullptr) { + SizeT output_column_len = output_columns->size(); + for(SizeT i = 0; i < output_column_len; ++ i) { + if ((*output_columns)[i] != nullptr) { + delete (*output_columns)[i]; + (*output_columns)[i] = nullptr; } - delete output_columns; - output_columns = nullptr; - } - if (filter != nullptr) { - delete filter; - filter = nullptr; } - if (search_expr != nullptr) { - delete search_expr; - search_expr = nullptr; + } + }); + if (wrap_output_columns.empty()) { + return WrapQueryResult(ErrorCode::kEmptySelectFields, "[error] Select fields are empty"); + } else { + output_columns = new Vector(); + output_columns->reserve(wrap_output_columns.size()); + for (SizeT i = 0; i < wrap_output_columns.size(); ++i) { + Status status; + output_columns->emplace_back(wrap_output_columns[i].GetParsedExpr(status)); + if (status.code_ != ErrorCode::kOk) { + return WrapQueryResult(status.code_, status.msg_->c_str()); } - return WrapQueryResult(status.code_, status.msg_->c_str()); } } - auto query_result = instance.Explain(db_name, table_name, explain_type, search_expr, filter, output_columns); - + auto query_result = instance.Explain(db_name, table_name, explain_type, search_expr, filter, nullptr, nullptr, output_columns); + search_expr = nullptr; + filter = nullptr; + output_columns = nullptr; if (!query_result.IsOk()) { return WrapQueryResult(query_result.ErrorCode(), query_result.ErrorMsg()); } diff --git a/src/executor/explain_physical_plan.cpp b/src/executor/explain_physical_plan.cpp index 6d731e0f1d..20071d1fde 100644 --- a/src/executor/explain_physical_plan.cpp +++ b/src/executor/explain_physical_plan.cpp @@ -2422,6 +2422,35 @@ void ExplainPhysicalPlan::Explain(const PhysicalMatch *match_node, SharedPtr>> &result, i64 intent_size) { + String explain_header_str; + if (intent_size != 0) { + explain_header_str = String(intent_size - 2, ' ') + "-> MatchSparseScan "; + } else { + explain_header_str = "MatchSparseScan "; + } + explain_header_str += "(" + std::to_string(match_sparse_node->node_id()) + ")"; + result->emplace_back(MakeShared(explain_header_str)); + + // Table index + String table_index = String(intent_size, ' ') + " - table index: #" + std::to_string(match_sparse_node->table_index()); + result->emplace_back(MakeShared(table_index)); + + // Output columns + String output_columns = String(intent_size, ' ') + " - output columns: ["; + SizeT column_count = match_sparse_node->GetOutputNames()->size(); + if (column_count == 0) { + String error_message = "No column in match sparse node."; + UnrecoverableError(error_message); + } + for (SizeT idx = 0; idx < column_count - 1; ++idx) { + output_columns += match_sparse_node->GetOutputNames()->at(idx) + ", "; + } + output_columns += match_sparse_node->GetOutputNames()->back(); + output_columns += "]"; + result->emplace_back(MakeShared(output_columns)); +} + void ExplainPhysicalPlan::Explain(const PhysicalMatchTensorScan *match_tensor_node, SharedPtr>> &result, i64 intent_size) { String explain_header_str; if (intent_size != 0) { diff --git a/src/executor/explain_physical_plan.cppm b/src/executor/explain_physical_plan.cppm index 2c8b7a4de9..3144fe1bc0 100644 --- a/src/executor/explain_physical_plan.cppm +++ b/src/executor/explain_physical_plan.cppm @@ -66,6 +66,7 @@ import physical_match; import physical_match_tensor_scan; import physical_fusion; import physical_merge_aggregate; +import physical_match_sparse_scan; export module explain_physical_plan; @@ -107,10 +108,7 @@ public: static void Explain(const PhysicalCreateTable *create_node, SharedPtr>> &result, i64 intent_size = 0); - static void Explain(const PhysicalCreateIndexPrepare *create_index, - SharedPtr>> &result, - - i64 intent_size = 0); + static void Explain(const PhysicalCreateIndexPrepare *create_index, SharedPtr>> &result, i64 intent_size = 0); static void Explain(const PhysicalCreateCollection *create_node, SharedPtr>> &result, i64 intent_size = 0); @@ -148,15 +146,10 @@ public: static void Explain(const PhysicalSink *flush_node, SharedPtr>> &result, i64 intent_size = 0); - static void Explain(const PhysicalParallelAggregate *parallel_aggregate_node, - SharedPtr>> &result, - - i64 intent_size = 0); - - static void Explain(const PhysicalMergeParallelAggregate *merge_parallel_aggregate_node, - SharedPtr>> &result, + static void Explain(const PhysicalParallelAggregate *parallel_aggregate_node, SharedPtr>> &result, i64 intent_size = 0); - i64 intent_size = 0); + static void + Explain(const PhysicalMergeParallelAggregate *merge_parallel_aggregate_node, SharedPtr>> &result, i64 intent_size = 0); static void Explain(const PhysicalIntersect *intersect_node, SharedPtr>> &result, i64 intent_size = 0); @@ -178,10 +171,12 @@ public: static void Explain(const PhysicalMatchTensorScan *match_tensor_node, SharedPtr>> &result, i64 intent_size = 0); + static void Explain(const PhysicalMatchSparseScan *match_sparse_node, SharedPtr>> &result, i64 intent_size = 0); + static void Explain(const PhysicalMergeMatchTensor *merge_match_tensor_node, SharedPtr>> &result, i64 intent_size = 0); static void Explain(const PhysicalFusion *fusion_node, SharedPtr>> &result, i64 intent_size = 0); - + static void Explain(const PhysicalMergeAggregate *fusion_node, SharedPtr>> &result, i64 intent_size = 0); }; diff --git a/src/executor/operator/physical_fusion.cppm b/src/executor/operator/physical_fusion.cppm index f75bbd3a0f..36a96da495 100644 --- a/src/executor/operator/physical_fusion.cppm +++ b/src/executor/operator/physical_fusion.cppm @@ -55,9 +55,7 @@ public: SharedPtr>> GetOutputTypes() const override { return output_types_; } SizeT TaskletCount() override { - String error_message = "Not implement: TaskletCount not Implement"; - UnrecoverableError(error_message); - return 0; + return 1; } void FillingTableRefs(HashMap> &table_refs) override { diff --git a/src/executor/operator/physical_match.cppm b/src/executor/operator/physical_match.cppm index a148c166c4..091f74365d 100644 --- a/src/executor/operator/physical_match.cppm +++ b/src/executor/operator/physical_match.cppm @@ -63,9 +63,7 @@ public: SharedPtr>> GetOutputTypes() const final; SizeT TaskletCount() override { - String error_message = "Not implement: TaskletCount not Implement"; - UnrecoverableError(error_message); - return 0; + return 1; } void FillingTableRefs(HashMap> &table_refs) override { diff --git a/src/executor/operator/physical_scan/physical_match_sparse_scan.cppm b/src/executor/operator/physical_scan/physical_match_sparse_scan.cppm index 187a7a37a0..2883c0ee7c 100644 --- a/src/executor/operator/physical_scan/physical_match_sparse_scan.cppm +++ b/src/executor/operator/physical_scan/physical_match_sparse_scan.cppm @@ -59,6 +59,10 @@ public: Vector>> PlanWithIndex(Vector>> &block_groups, i64 parallel_count); + u64 table_index() const { + return table_index_; + } + private: template void ExecuteInner(QueryContext *query_context, diff --git a/src/main/infinity.cpp b/src/main/infinity.cpp index 005a08ec2f..e900e02e53 100644 --- a/src/main/infinity.cpp +++ b/src/main/infinity.cpp @@ -65,9 +65,7 @@ namespace infinity { u64 Infinity::GetSessionId() { return session_->session_id(); } -void Infinity::Hello() { - fmt::print("hello infinity\n"); -} +void Infinity::Hello() { fmt::print("hello infinity\n"); } void Infinity::LocalInit(const String &path) { LocalFileSystem fs; @@ -107,7 +105,7 @@ SharedPtr Infinity::RemoteConnect() { SharedPtr infinity_ptr = MakeShared(); SessionManager *session_mgr = InfinityContext::instance().session_manager(); SharedPtr remote_session = session_mgr->CreateRemoteSession(); - if(remote_session == nullptr) { + if (remote_session == nullptr) { return nullptr; } infinity_ptr->session_ = std::move(remote_session); @@ -428,13 +426,13 @@ QueryResult Infinity::CreateTable(const String &db_name, ToLower(create_table_info->table_name_); create_table_info->column_defs_ = std::move(column_defs); - for(ColumnDef* column_def_ptr: create_table_info->column_defs_) { + for (ColumnDef *column_def_ptr : create_table_info->column_defs_) { ToLower(column_def_ptr->name_); } create_table_info->constraints_ = std::move(constraints); create_table_info->conflict_type_ = create_table_options.conflict_type_; create_table_info->properties_ = std::move(create_table_options.properties_); - for(InitParameter* parameter_ptr: create_table_info->properties_) { + for (InitParameter *parameter_ptr : create_table_info->properties_) { ToLower(parameter_ptr->param_name_); ToLower(parameter_ptr->param_value_); } @@ -603,7 +601,7 @@ QueryResult Infinity::CreateIndex(const String &db_name, ToLower(create_index_info->index_name_); ToLower(index_info_ptr->column_name_); - for(InitParameter* init_param_ptr: *index_info_ptr->index_param_list_) { + for (InitParameter *init_param_ptr : *index_info_ptr->index_param_list_) { ToLower(init_param_ptr->param_name_); ToLower(init_param_ptr->param_value_); } @@ -666,7 +664,6 @@ QueryResult Infinity::ShowIndex(const String &db_name, const String &table_name, show_statement->index_name_ = index_name_internal; - show_statement->show_type_ = ShowStmtType::kIndex; QueryResult result = query_context_ptr->QueryStatement(show_statement.get()); return result; @@ -854,7 +851,7 @@ QueryResult Infinity::Insert(const String &db_name, const String &table_name, Ve ToLower(insert_statement->table_name_); insert_statement->columns_ = columns; - for(String& column_name: *insert_statement->columns_) { + for (String &column_name : *insert_statement->columns_) { ToLower(column_name); } insert_statement->values_ = values; @@ -890,7 +887,8 @@ QueryResult Infinity::Import(const String &db_name, const String &table_name, co return result; } -QueryResult Infinity::Export(const String &db_name, const String &table_name, Vector *columns, const String &path, ExportOptions export_options) { +QueryResult +Infinity::Export(const String &db_name, const String &table_name, Vector *columns, const String &path, ExportOptions export_options) { UniquePtr query_context_ptr = MakeUnique(session_.get()); query_context_ptr->Init(InfinityContext::instance().config(), InfinityContext::instance().task_scheduler(), @@ -960,7 +958,7 @@ QueryResult Infinity::Update(const String &db_name, const String &table_name, Pa // TODO: to lower expression identifier string update_statement->where_expr_ = filter; update_statement->update_expr_array_ = update_list; - for(UpdateExpr* update_expr_ptr: *update_statement->update_expr_array_) { + for (UpdateExpr *update_expr_ptr : *update_statement->update_expr_array_) { ToLower(update_expr_ptr->column_name); } QueryResult result = query_context_ptr->QueryStatement(update_statement.get()); @@ -972,6 +970,8 @@ QueryResult Infinity::Explain(const String &db_name, ExplainType explain_type, SearchExpr *search_expr, ParsedExpr *filter, + ParsedExpr *limit, + ParsedExpr *offset, Vector *output_columns) { UniquePtr query_context_ptr = MakeUnique(session_.get()); @@ -1000,6 +1000,8 @@ QueryResult Infinity::Explain(const String &db_name, select_statement->select_list_ = output_columns; select_statement->where_expr_ = filter; select_statement->search_expr_ = search_expr; + select_statement->limit_expr_ = limit; + select_statement->offset_expr_ = offset; explain_statment->statement_ = select_statement; @@ -1007,8 +1009,13 @@ QueryResult Infinity::Explain(const String &db_name, return result; } -QueryResult -Infinity::Search(const String &db_name, const String &table_name, SearchExpr *search_expr, ParsedExpr *filter, Vector *output_columns) { +QueryResult Infinity::Search(const String &db_name, + const String &table_name, + SearchExpr *search_expr, + ParsedExpr *filter, + ParsedExpr *limit, + ParsedExpr *offset, + Vector *output_columns) { UniquePtr query_context_ptr = MakeUnique(session_.get()); query_context_ptr->Init(InfinityContext::instance().config(), InfinityContext::instance().task_scheduler(), @@ -1032,6 +1039,8 @@ Infinity::Search(const String &db_name, const String &table_name, SearchExpr *se select_statement->select_list_ = output_columns; select_statement->where_expr_ = filter; select_statement->search_expr_ = search_expr; + select_statement->limit_expr_ = limit; + select_statement->offset_expr_ = offset; QueryResult result = query_context_ptr->QueryStatement(select_statement.get()); return result; diff --git a/src/main/infinity.cppm b/src/main/infinity.cppm index d2968515f7..ce73b4d522 100644 --- a/src/main/infinity.cppm +++ b/src/main/infinity.cppm @@ -141,7 +141,8 @@ public: QueryResult Import(const String &db_name, const String &table_name, const String &path, ImportOptions import_options); - QueryResult Export(const String &db_name, const String &table_name, Vector *columns, const String &path, ExportOptions export_options); + QueryResult + Export(const String &db_name, const String &table_name, Vector *columns, const String &path, ExportOptions export_options); QueryResult Delete(const String &db_name, const String &table_name, ParsedExpr *filter); @@ -152,10 +153,17 @@ public: ExplainType explain_type, SearchExpr *search_expr, ParsedExpr *filter, + ParsedExpr *limit, + ParsedExpr *offset, Vector *output_columns); - QueryResult - Search(const String &db_name, const String &table_name, SearchExpr *search_expr, ParsedExpr *filter, Vector *output_columns); + QueryResult Search(const String &db_name, + const String &table_name, + SearchExpr *search_expr, + ParsedExpr *filter, + ParsedExpr *limit, + ParsedExpr *offset, + Vector *output_columns); QueryResult Optimize(const String &db_name, const String &table_name, OptimizeOptions optimize_options = OptimizeOptions{}); diff --git a/src/network/http/http_search.cpp b/src/network/http/http_search.cpp index 9fbe64e5ab..ca1565acfe 100644 --- a/src/network/http/http_search.cpp +++ b/src/network/http/http_search.cpp @@ -59,6 +59,8 @@ void HTTPSearch::Process(Infinity *infinity_ptr, response["error_message"] = "HTTP Body isn't json object"; } UniquePtr filter{}; + UniquePtr limit{}; + UniquePtr offset{}; UniquePtr search_expr{}; Vector *output_columns{nullptr}; DeferFn defer_fn([&]() { @@ -101,6 +103,28 @@ void HTTPSearch::Process(Infinity *infinity_ptr, if (!filter) { return; } + } else if (IsEqual(key, "limit")) { + + if (limit) { + response["error_code"] = ErrorCode::kInvalidExpression; + response["error_message"] = "More than one limit field."; + return; + } + limit = ParseFilter(elem.value(), http_status, response); + if (!limit) { + return; + } + } else if (IsEqual(key, "offset")) { + + if (offset) { + response["error_code"] = ErrorCode::kInvalidExpression; + response["error_message"] = "More than one offset field."; + return; + } + offset = ParseFilter(elem.value(), http_status, response); + if (!offset) { + return; + } } else if (IsEqual(key, "search")) { if (search_expr) { response["error_code"] = ErrorCode::kInvalidExpression; @@ -118,7 +142,8 @@ void HTTPSearch::Process(Infinity *infinity_ptr, } } - const QueryResult result = infinity_ptr->Search(db_name, table_name, search_expr.release(), filter.release(), output_columns); + const QueryResult result = + infinity_ptr->Search(db_name, table_name, search_expr.release(), filter.release(), limit.release(), offset.release(), output_columns); output_columns = nullptr; if (result.IsOk()) { @@ -168,6 +193,8 @@ void HTTPSearch::Explain(Infinity *infinity_ptr, response["error_message"] = "HTTP Body isn't json object"; } UniquePtr filter{}; + UniquePtr limit{}; + UniquePtr offset{}; UniquePtr search_expr{}; Vector *output_columns{nullptr}; DeferFn defer_fn([&]() { @@ -179,7 +206,7 @@ void HTTPSearch::Explain(Infinity *infinity_ptr, output_columns = nullptr; } }); - ExplainType explain_type = ExplainType::kInvalid; + ExplainType explain_type = ExplainType::kInvalid; for (const auto &elem : input_json.items()) { String key = elem.key(); ToLower(key); @@ -211,6 +238,28 @@ void HTTPSearch::Explain(Infinity *infinity_ptr, if (filter == nullptr) { return; } + } else if (IsEqual(key, "limit")) { + + if (limit) { + response["error_code"] = ErrorCode::kInvalidExpression; + response["error_message"] = "More than one limit field."; + return; + } + limit = ParseFilter(elem.value(), http_status, response); + if (!limit) { + return; + } + } else if (IsEqual(key, "offset")) { + + if (offset) { + response["error_code"] = ErrorCode::kInvalidExpression; + response["error_message"] = "More than one offset field."; + return; + } + offset = ParseFilter(elem.value(), http_status, response); + if (!offset) { + return; + } } else if (IsEqual(key, "search")) { if (search_expr) { response["error_code"] = ErrorCode::kInvalidExpression; @@ -247,7 +296,14 @@ void HTTPSearch::Explain(Infinity *infinity_ptr, } } - const QueryResult result = infinity_ptr->Explain(db_name, table_name, explain_type, search_expr.release(), filter.release(), output_columns); + const QueryResult result = infinity_ptr->Explain(db_name, + table_name, + explain_type, + search_expr.release(), + filter.release(), + limit.release(), + offset.release(), + output_columns); output_columns = nullptr; if (result.IsOk()) { @@ -325,15 +381,15 @@ Vector *HTTPSearch::ParseOutput(const nlohmann::json &output_list, return nullptr; } - if(output_expr_str == "_row_id" or output_expr_str == "_similarity" or output_expr_str == "_distance" or output_expr_str == "_score") { + if (output_expr_str == "_row_id" or output_expr_str == "_similarity" or output_expr_str == "_distance" or output_expr_str == "_score") { auto parsed_expr = new FunctionExpr(); - if(output_expr_str == "_row_id") { + if (output_expr_str == "_row_id") { parsed_expr->func_name_ = "row_id"; - } else if(output_expr_str == "_similarity") { + } else if (output_expr_str == "_similarity") { parsed_expr->func_name_ = "similarity"; - } else if(output_expr_str == "_distance") { + } else if (output_expr_str == "_distance") { parsed_expr->func_name_ = "distance"; - } else if(output_expr_str == "_score") { + } else if (output_expr_str == "_score") { parsed_expr->func_name_ = "score"; } output_columns->emplace_back(parsed_expr); diff --git a/src/network/infinity_thrift_service.cpp b/src/network/infinity_thrift_service.cpp index b2aca4e6f7..18d7c3f544 100644 --- a/src/network/infinity_thrift_service.cpp +++ b/src/network/infinity_thrift_service.cpp @@ -564,25 +564,49 @@ void InfinityThriftService::Select(infinity_thrift_rpc::SelectResponse &response } } - // TODO: - // ParsedExpr *offset; - // offset = new ParsedExpr(); + // Limit + ParsedExpr *limit = nullptr; + DeferFn defer_fn7([&]() { + if (limit != nullptr) { + delete limit; + limit = nullptr; + } + }); + if (request.__isset.limit_expr == true) { + limit = GetParsedExprFromProto(parsed_expr_status, request.limit_expr); + if (!parsed_expr_status.ok()) { + ProcessStatus(response, parsed_expr_status); + return; + } + } - // limit - // ParsedExpr *limit = nullptr; - // if (request.__isset.limit_expr == true) { - // limit = GetParsedExprFromProto(request.limit_expr); - // } + // Offset + ParsedExpr *offset = nullptr; + DeferFn defer_fn8([&]() { + if (offset != nullptr) { + delete offset; + offset = nullptr; + } + }); + if (request.__isset.offset_expr == true) { + offset = GetParsedExprFromProto(parsed_expr_status, request.offset_expr); + if (!parsed_expr_status.ok()) { + ProcessStatus(response, parsed_expr_status); + return; + } + } // auto end2 = std::chrono::steady_clock::now(); // phase_2_duration_ += end2 - start2; // // auto start3 = std::chrono::steady_clock::now(); - const QueryResult result = infinity->Search(request.db_name, request.table_name, search_expr, filter, output_columns); + const QueryResult result = infinity->Search(request.db_name, request.table_name, search_expr, filter, limit, offset, output_columns); output_columns = nullptr; filter = nullptr; search_expr = nullptr; + limit = nullptr; + offset = nullptr; // auto end3 = std::chrono::steady_clock::now(); // // phase_3_duration_ += end3 - start3; @@ -635,21 +659,21 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons Vector *output_columns = new Vector(); output_columns->reserve(request.select_list.size()); + DeferFn defer_fn1([&]() { + if (output_columns != nullptr) { + for (auto &expr_ptr : *output_columns) { + delete expr_ptr; + expr_ptr = nullptr; + } + delete output_columns; + output_columns = nullptr; + } + }); Status parsed_expr_status; for (auto &expr : request.select_list) { auto parsed_expr = GetParsedExprFromProto(parsed_expr_status, expr); if (!parsed_expr_status.ok()) { - - if (output_columns != nullptr) { - for (auto &expr_ptr : *output_columns) { - delete expr_ptr; - expr_ptr = nullptr; - } - delete output_columns; - output_columns = nullptr; - } - if (parsed_expr != nullptr) { delete parsed_expr; parsed_expr = nullptr; @@ -663,43 +687,44 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons // search expr SearchExpr *search_expr = nullptr; + DeferFn defer_fn5([&]() { + if (search_expr != nullptr) { + delete search_expr; + search_expr = nullptr; + } + }); if (request.__isset.search_expr) { auto search_expr_list = new Vector(); + DeferFn defer_fn2([&]() { + if (search_expr_list != nullptr) { + for (auto &expr_ptr : *search_expr_list) { + delete expr_ptr; + expr_ptr = nullptr; + } + delete search_expr_list; + search_expr_list = nullptr; + } + }); + SizeT match_expr_count = request.search_expr.match_exprs.size(); SizeT fusion_expr_count = request.search_expr.fusion_exprs.size(); SizeT total_expr_count = match_expr_count + fusion_expr_count; search_expr_list->reserve(total_expr_count); for (SizeT idx = 0; idx < match_expr_count; ++idx) { Status status; - auto match_expr = GetGenericMatchExprFromProto(status, request.search_expr.match_exprs[idx]); - if (!status.ok()) { - if (output_columns != nullptr) { - for (auto &expr_ptr : *output_columns) { - delete expr_ptr; - expr_ptr = nullptr; - } - delete output_columns; - output_columns = nullptr; - } - - if (search_expr_list != nullptr) { - for (auto &expr_ptr : *search_expr_list) { - delete expr_ptr; - expr_ptr = nullptr; - } - delete search_expr_list; - search_expr_list = nullptr; - } - + ParsedExpr *match_expr = GetGenericMatchExprFromProto(status, request.search_expr.match_exprs[idx]); + DeferFn defer_fn3([&]() { if (match_expr != nullptr) { delete match_expr; match_expr = nullptr; } - + }); + if (!status.ok()) { ProcessStatus(response, status); return; } search_expr_list->emplace_back(match_expr); + match_expr = nullptr; } for (SizeT idx = 0; idx < fusion_expr_count; ++idx) { @@ -709,6 +734,7 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons search_expr = new SearchExpr(); search_expr->SetExprs(search_expr_list); + search_expr_list = nullptr; } // filter @@ -716,16 +742,6 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons if (request.__isset.where_expr == true) { filter = GetParsedExprFromProto(parsed_expr_status, request.where_expr); if (!parsed_expr_status.ok()) { - - if (output_columns != nullptr) { - for (auto &expr_ptr : *output_columns) { - delete expr_ptr; - expr_ptr = nullptr; - } - delete output_columns; - output_columns = nullptr; - } - if (search_expr != nullptr) { delete search_expr; search_expr = nullptr; @@ -741,19 +757,46 @@ void InfinityThriftService::Explain(infinity_thrift_rpc::SelectResponse &respons } } - // TODO: - // ParsedExpr *offset; - // offset = new ParsedExpr(); + // Limit + ParsedExpr *limit = nullptr; + DeferFn defer_fn7([&]() { + if (limit != nullptr) { + delete limit; + limit = nullptr; + } + }); + if (request.__isset.limit_expr == true) { + limit = GetParsedExprFromProto(parsed_expr_status, request.limit_expr); + if (!parsed_expr_status.ok()) { + ProcessStatus(response, parsed_expr_status); + return; + } + } - // limit - // ParsedExpr *limit = nullptr; - // if (request.__isset.limit_expr == true) { - // limit = GetParsedExprFromProto(request.limit_expr); - // } + // Offset + ParsedExpr *offset = nullptr; + DeferFn defer_fn8([&]() { + if (offset != nullptr) { + delete offset; + offset = nullptr; + } + }); + if (request.__isset.offset_expr == true) { + offset = GetParsedExprFromProto(parsed_expr_status, request.offset_expr); + if (!parsed_expr_status.ok()) { + ProcessStatus(response, parsed_expr_status); + return; + } + } // Explain type auto explain_type = GetExplainTypeFromProto(request.explain_type); - const QueryResult result = infinity->Explain(request.db_name, request.table_name, explain_type, search_expr, filter, output_columns); + const QueryResult result = infinity->Explain(request.db_name, request.table_name, explain_type, search_expr, filter, limit, offset, output_columns); + output_columns = nullptr; + search_expr = nullptr; + filter = nullptr; + limit = nullptr; + offset = nullptr; if (result.IsOk()) { auto &columns = response.column_fields; diff --git a/src/planner/bound_select_statement.cpp b/src/planner/bound_select_statement.cpp index 0e1c40ac80..1b957abbae 100644 --- a/src/planner/bound_select_statement.cpp +++ b/src/planner/bound_select_statement.cpp @@ -329,6 +329,12 @@ SharedPtr BoundSelectStatement::BuildPlan(QueryContext *query_conte root = std::move(match_knn_nodes[0]); } + if (limit_expression_.get() != nullptr) { + auto limit = MakeShared(bind_context->GetNewLogicalNodeId(), limit_expression_, offset_expression_); + limit->set_left_node(root); + root = limit; + } + auto project = MakeShared(bind_context->GetNewLogicalNodeId(), projection_expressions_, projection_index_); project->set_left_node(root); root = std::move(project); diff --git a/src/planner/explain_logical_plan.cpp b/src/planner/explain_logical_plan.cpp index 4857892f75..f4a54f141b 100644 --- a/src/planner/explain_logical_plan.cpp +++ b/src/planner/explain_logical_plan.cpp @@ -2299,6 +2299,14 @@ void ExplainLogicalPlan::Explain(const LogicalMatch *match_node, SharedPtr>> &result, i64 intent_size) { + IStringStream iss(match_node->ToString(intent_size)); + String line; + while (std::getline(iss, line)) { + result->emplace_back(MakeShared(std::move(line))); + } +} + void ExplainLogicalPlan::Explain(const LogicalMatchTensorScan *match_tensor_node, SharedPtr>> &result, i64 intent_size) { IStringStream iss(match_tensor_node->ToString(intent_size)); String line; diff --git a/src/planner/explain_logical_plan.cppm b/src/planner/explain_logical_plan.cppm index de3a8a0d11..8d9eca9028 100644 --- a/src/planner/explain_logical_plan.cppm +++ b/src/planner/explain_logical_plan.cppm @@ -47,6 +47,7 @@ import logical_export; import logical_flush; import logical_optimize; import logical_match; +import logical_match_sparse_scan; import logical_match_tensor_scan; import logical_fusion; import base_expression; @@ -115,6 +116,8 @@ public: static void Explain(const LogicalMatch *match_node, SharedPtr>> &result, i64 intent_size = 0); + static void Explain(const LogicalMatchSparseScan *match_node, SharedPtr>> &result, i64 intent_size = 0); + static void Explain(const LogicalMatchTensorScan *match_tensor_node, SharedPtr>> &result, i64 intent_size = 0); static void Explain(const LogicalFusion *fusion_node, SharedPtr>> &result, i64 intent_size = 0); diff --git a/src/unit_test/main/infinity.cpp b/src/unit_test/main/infinity.cpp index b09c6fd61d..ddea6d6177 100644 --- a/src/unit_test/main/infinity.cpp +++ b/src/unit_test/main/infinity.cpp @@ -203,7 +203,7 @@ TEST_F(InfinityTest, test1) { SearchExpr * search_expr = nullptr; - result = infinity->Search("default_db", "table1", search_expr, nullptr, output_columns); + result = infinity->Search("default_db", "table1", search_expr, nullptr, nullptr, nullptr, output_columns); SharedPtr data_block = result.result_table_->GetDataBlockById(0); EXPECT_EQ(data_block->row_count(), 1); Value value = data_block->GetValue(0, 0); diff --git a/src/unit_test/main/table.cpp b/src/unit_test/main/table.cpp index 12e46cc03e..880f2fc5d5 100644 --- a/src/unit_test/main/table.cpp +++ b/src/unit_test/main/table.cpp @@ -85,7 +85,7 @@ TEST_F(InfinityTableTest, test1) { col2->names_.emplace_back(col2_name); output_columns->emplace_back(col2); - QueryResult explain_ast = infinity->Explain(db_name, table_name, ExplainType::kAst, nullptr, nullptr, output_columns); + QueryResult explain_ast = infinity->Explain(db_name, table_name, ExplainType::kAst, nullptr, nullptr, nullptr, nullptr, output_columns); EXPECT_TRUE(explain_ast.IsOk()); // fmt::print("AST: {}\n", explain_ast.ToString()); } @@ -96,7 +96,7 @@ TEST_F(InfinityTableTest, test1) { col2->names_.emplace_back(col2_name); output_columns->emplace_back(col2); - QueryResult explain_unopt = infinity->Explain(db_name, table_name, ExplainType::kUnOpt, nullptr, nullptr, output_columns); + QueryResult explain_unopt = infinity->Explain(db_name, table_name, ExplainType::kUnOpt, nullptr, nullptr, nullptr, nullptr, output_columns); EXPECT_TRUE(explain_unopt.IsOk()); // fmt::print("Unoptimized logical plan: {}\n", explain_unopt.ToString()); } @@ -107,7 +107,7 @@ TEST_F(InfinityTableTest, test1) { col2->names_.emplace_back(col2_name); output_columns->emplace_back(col2); - QueryResult explain_opt = infinity->Explain(db_name, table_name, ExplainType::kOpt, nullptr, nullptr, output_columns); + QueryResult explain_opt = infinity->Explain(db_name, table_name, ExplainType::kOpt, nullptr, nullptr, nullptr, nullptr, output_columns); EXPECT_TRUE(explain_opt.IsOk()); // fmt::print("Optimized logical plan: {}\n", explain_opt.ToString()); } @@ -118,7 +118,7 @@ TEST_F(InfinityTableTest, test1) { col2->names_.emplace_back(col2_name); output_columns->emplace_back(col2); - QueryResult explain_phy = infinity->Explain(db_name, table_name, ExplainType::kPhysical, nullptr, nullptr, output_columns); + QueryResult explain_phy = infinity->Explain(db_name, table_name, ExplainType::kPhysical, nullptr, nullptr, nullptr, nullptr, output_columns); EXPECT_TRUE(explain_phy.IsOk()); // fmt::print("Physical plan: {}\n", explain_phy.ToString()); } @@ -129,7 +129,7 @@ TEST_F(InfinityTableTest, test1) { col2->names_.emplace_back(col2_name); output_columns->emplace_back(col2); - QueryResult explain_fragment = infinity->Explain(db_name, table_name, ExplainType::kFragment, nullptr, nullptr, output_columns); + QueryResult explain_fragment = infinity->Explain(db_name, table_name, ExplainType::kFragment, nullptr, nullptr, nullptr, nullptr, output_columns); EXPECT_TRUE(explain_fragment.IsOk()); // fmt::print("Fragment: {}\n", explain_fragment.ToString()); } @@ -140,7 +140,7 @@ TEST_F(InfinityTableTest, test1) { col2->names_.emplace_back(col2_name); output_columns->emplace_back(col2); - QueryResult explain_pipeline = infinity->Explain(db_name, table_name, ExplainType::kPipeline, nullptr, nullptr, output_columns); + QueryResult explain_pipeline = infinity->Explain(db_name, table_name, ExplainType::kPipeline, nullptr, nullptr, nullptr, nullptr, output_columns); EXPECT_TRUE(explain_pipeline.IsOk()); // fmt::print("Pipeline: {}\n", explain_pipeline.ToString()); }