Skip to content

Commit

Permalink
Merge pull request #401 from ClibMouse/Kusto-p3-summarize-opt
Browse files Browse the repository at this point in the history
Kusto-phase3: Optimizing the 'summarize' operator
  • Loading branch information
kashwy authored Nov 1, 2023
2 parents e28bf33 + ee4bd0f commit c4b45a3
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 7 deletions.
19 changes: 19 additions & 0 deletions src/Parsers/Kusto/KQL_ReleaseNote.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,25 @@
```
print lookup_contains('test_rocksDB',1);
```
- Optimizing the 'summarize' operator.

KQL:
```
events_dist
| project original_time, ip
| where unixtime_milliseconds_todatetime(original_time) > ago(1h)
| summarize ip_count = count(*) by ip
```
Optimal SQL:
```
SELECT
ip,
count(*) AS ip_count
FROM events_dist
WHERE kql_todatetime(fromUnixTimestamp64Milli(original_time, 'UTC')) > (now64(9, 'UTC') + (-1 * toIntervalNanosecond(3600000000000)))
GROUP BY ip
```


# July, 2023
## Features
Expand Down
12 changes: 7 additions & 5 deletions src/Parsers/Kusto/ParserKQLQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ const std::unordered_map<std::string, ParserKQLQuery::KQLOperatorDataFlowState>
{"order by", {"order by", false, false, false, 4}},
{"table", {"table", false, false, false, 3}},
{"print", {"print", false, true, false, 3}},
{"summarize", {"summarize", true, true, false, 3}},
{"summarize", {"summarize", false, true, false, 3}},
{"make-series", {"make-series", true, true, false, 5}},
{"mv-expand", {"mv-expand", true, true, false, 5}},
{"count", {"count", true, true, false, 3}},
Expand Down Expand Up @@ -629,9 +629,10 @@ bool ParserKQLQuery::executeImpl(Pos & pos, ASTPtr & node, Expected & expected)
limit_clause = op_str;
else if (op == "order by" || op == "sort by")
order_clause = order_clause.empty() ? op_str : order_clause + "," + op_str;
return op == "project" || op == "where" || op == "filter" || op == "limit" || op == "take" ||op == "order by" || op == "sort by";
};

set_main_query_clause(last_op, last_pos);
bool last_op_processed = set_main_query_clause(last_op, last_pos);

operation_pos.pop_back();

Expand Down Expand Up @@ -672,9 +673,6 @@ bool ParserKQLQuery::executeImpl(Pos & pos, ASTPtr & node, Expected & expected)
return false;
}

if (!kql_operator_p->parse(npos, node, expected))
return false;

auto set_query_clause = [&](const String & op_str, const String & op_clause)
{
auto parser = getOperator(op_str);
Expand All @@ -695,6 +693,10 @@ bool ParserKQLQuery::executeImpl(Pos & pos, ASTPtr & node, Expected & expected)
|| (!where_clause.empty() && !set_query_clause("where", where_clause))
|| (!limit_clause.empty() && !set_query_clause("limit", limit_clause)))
return false;

if (!last_op_processed)
if (!kql_operator_p->parse(npos, node, expected))
return false;
}

if (auto * select_query = node->as<ASTSelectQuery>(); !select_query->select())
Expand Down
26 changes: 24 additions & 2 deletions src/Parsers/Kusto/ParserKQLSummarize.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <memory>
#include <queue>
#include <vector>
#include <sstream>
#include <string>

namespace DB
{
Expand Down Expand Up @@ -201,7 +203,27 @@ bool ParserKQLSummarize::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
expr_columns = expr_columns + "," + expr_aggregation;
}

String converted_columns = getExprFromToken(expr_columns, pos.max_depth);
std::unordered_map <String,String> alias_map;
auto select_expr = node->as<ASTSelectQuery>()->select();
if (select_expr)
{
std::ranges::for_each(
node->as<ASTSelectQuery>()->select()->children,
[&alias_map](const ASTPtr & expression)
{
if (const auto alias = expression->tryGetAlias(); !alias.empty())
{
alias_map[alias] = expression->getColumnNameWithoutAlias();
}
});
}
String converted_columns_raw = getExprFromToken(expr_columns, pos.max_depth);
std::istringstream column_stream(converted_columns_raw);
String converted_columns;
for (String value; std::getline(column_stream, value, ',');) {
converted_columns += alias_map.contains(value) ? alias_map[value] + " as " + value : value;
converted_columns +=",";
}

Tokens token_converted_columns(converted_columns.c_str(), converted_columns.c_str() + converted_columns.size());
IParser::Pos pos_converted_columns(token_converted_columns, pos.max_depth);
Expand All @@ -210,7 +232,7 @@ bool ParserKQLSummarize::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
return false;

node->as<ASTSelectQuery>()->setExpression(ASTSelectQuery::Expression::SELECT, std::move(select_expression_list));

node->as<ASTSelectQuery>()->setExpression(ASTSelectQuery::Expression::ORDER_BY, nullptr);
if (groupby)
{
String converted_groupby = getExprFromToken(expr_groupby, pos.max_depth);
Expand Down
12 changes: 12 additions & 0 deletions src/Parsers/tests/KQL/gtest_KQL_AggregateFunctions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,18 @@ INSTANTIATE_TEST_SUITE_P(ParserKQLQuery_Aggregate, ParserTest,
{
"Customers | summarize x = hll(Education), y = hll(Occupation) | project xy = hll_merge(x, y) | project dcount_hll(xy);",
"SELECT uniqCombined64Merge(18)(xy) AS Column1\nFROM\n(\n SELECT uniqCombined64MergeState(18)(arrayJoin([x, y])) AS xy\n FROM\n (\n SELECT\n uniqCombined64State(18)(Education) AS x,\n uniqCombined64State(18)(Occupation) AS y\n FROM Customers\n )\n)"
},
{
"events_dist | project original_time, ip | where unixtime_milliseconds_todatetime(original_time) > ago(1h) | summarize ip_count = count(*) by ip;",
"SELECT\n ip,\n count(*) AS ip_count\nFROM events_dist\nWHERE kql_todatetime(fromUnixTimestamp64Milli(original_time, 'UTC')) > (now64(9, 'UTC') + (-1 * toIntervalNanosecond(3600000000000)))\nGROUP BY ip"
},
{
"events_dist | project abc = ip ,original_time | where unixtime_milliseconds_todatetime(original_time) > ago(1h) | summarize ip_count = count(*) by abc;",
"SELECT\n ip AS abc,\n count(*) AS ip_count\nFROM events_dist\nWHERE kql_todatetime(fromUnixTimestamp64Milli(original_time, 'UTC')) > (now64(9, 'UTC') + (-1 * toIntervalNanosecond(3600000000000)))\nGROUP BY abc"
},
{
"events_dist | where unixtime_milliseconds_todatetime(original_time) > ago(1h) | summarize ip_count = count(*) by ip | summarize avg(ip_count);",
"SELECT avg(ip_count) AS avg_ip_count\nFROM\n(\n SELECT\n ip,\n count(*) AS ip_count\n FROM events_dist\n WHERE kql_todatetime(fromUnixTimestamp64Milli(original_time, 'UTC')) > (now64(9, 'UTC') + (-1 * toIntervalNanosecond(3600000000000)))\n GROUP BY ip\n)"
}
})));

Expand Down
8 changes: 8 additions & 0 deletions tests/queries/0_stateless/02366_kql_summarize.reference
Original file line number Diff line number Diff line change
Expand Up @@ -139,3 +139,11 @@ Management abcd defg Stephanie Cox 33
3
7
7
-- summarize optimization --
Skilled Manual 2
Professional 3
Management abcd defg 1
Skilled Manual 2
Professional 3
Management abcd defg 1
2
5 changes: 5 additions & 0 deletions tests/queries/0_stateless/02366_kql_summarize.sql
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,8 @@ Customers | summarize x = hll(Education) | project dcount_hll(x);
Customers | summarize y = hll(Occupation) | project dcount_hll(y);
Customers | summarize x = hll(Education), y = hll(Occupation) | project xy = hll_merge(x, y) | project dcount_hll(xy);
Customers | summarize x = hll(Education), y = hll(Occupation) | summarize xy = hll_merge(x, y) | project dcount_hll(xy);

print '-- summarize optimization --';
Customers|project Occupation, Age |where Age > 30 | summarize occ_count = count(*) by Occupation;
Customers|project prefession = Occupation, Age | where Age > 30 | summarize occ_count = count(*) by prefession;
Customers|project prefession = Occupation, Age |where Age > 30 | summarize occ_count = count(*) by prefession |summarize avg(occ_count);

0 comments on commit c4b45a3

Please sign in to comment.