Skip to content

Commit

Permalink
Merge pull request #116 from ClibMouse/Kusto-p3-tophitter
Browse files Browse the repository at this point in the history
Kusto p3 tophitter
  • Loading branch information
kashwy authored Oct 25, 2022
2 parents 6bf169a + c2d9078 commit 41db86e
Show file tree
Hide file tree
Showing 10 changed files with 392 additions and 42 deletions.
17 changes: 16 additions & 1 deletion src/Parsers/Kusto/KQL_ReleaseNote.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,27 @@
# October 25, 2022
## operator
- [count](https://learn.microsoft.com/en-us/azure/data-explorer/kusto/query/countoperator)

`Customers | count;`
`Customers | where Age< 30 | count;`
`Customers | where Age< 30 | limit 2 | count;`
`Customers | where Age< 30 | limit 2 | count | project Count;`

- [top](https://learn.microsoft.com/en-us/azure/data-explorer/kusto/query/topoperator)
`Customers | top 3 by Age;`
`Customers | top 3 by Age desc;`
`Customers | top 3 by Age asc | order by FirstName;`
`Customers | top 3 by FirstName desc nulls first;`
`Customers | top 3 by FirstName desc nulls last;`
`Customers | top 3 by Age | top 2 by FirstName;`

- [top-hitters](https://learn.microsoft.com/en-us/azure/data-explorer/kusto/query/tophittersoperator)
`Customers | top-hitters a = 2 of Age by extra;`
`Customers | top-hitters 2 of Age;`
`Customers | top-hitters 2 of Age by extra | top-hitters 2 of Age | order by Age;`
`Customers | top-hitters 2 of Age by extra | where Age > 30;`
`Customers | top-hitters 2 of Age by extra | where approximate_sum_extra < 200;`
`Customers | top-hitters 2 of Age | where approximate_count_Age > 2;`


# October 9, 2022

Expand Down
100 changes: 61 additions & 39 deletions src/Parsers/Kusto/ParserKQLQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
#include <Parsers/ASTSubquery.h>
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/Kusto/ParserKQLTop.h>
#include <Parsers/Kusto/ParserKQLTopHitter.h>

namespace DB
{
Expand All @@ -34,6 +36,27 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
}

std::unordered_map<std::string, ParserKQLQuery::KQLOperatorDataFlowState> kql_parser =
{
{"filter", {"filter", false, false, false, 3}},
{"where", {"filter", false, false, false, 3}},
{"limit", {"limit", false, true, false, 3}},
{"take", {"limit", false, true, false, 3}},
{"project", {"project", false, false, false, 3}},
{"distinct", {"distinct", false, true, false, 3}},
{"extend", {"extend", true, true, false, 3}},
{"sort by", {"order by", false, false, false, 4}},
{"order by", {"order by", false, false, false, 4}},
{"table", {"table", false, false, false, 3}},
{"print", {"print", false, true, false, 3}},
{"summarize", {"summarize", true, true, false, 3}},
{"make-series", {"make-series", true, true, false, 5}},
{"mv-expand", {"mv-expand", true, true, false, 5}},
{"count", {"count", false, true, false, 3}},
{"top", {"top", false, true, true, 3}},
{"top-hitters", {"top-hitters", true, true, true, 5}},
};

bool ParserKQLBase::parseByString(const String expr, ASTPtr & node, const uint32_t max_depth)
{
Expected expected;
Expand Down Expand Up @@ -117,7 +140,7 @@ String ParserKQLBase::getExprFromPipe(Pos & pos)
++end;
}
--end;
return String(begin->begin, end->end);
return (begin <= end) ? String(begin->begin, end->end) : "";
}

String ParserKQLBase::getExprFromToken(Pos & pos)
Expand Down Expand Up @@ -175,7 +198,7 @@ String ParserKQLBase::getExprFromToken(Pos & pos)
return res;
}

std::unique_ptr<IParserBase> ParserKQLQuery::getOperator(String & op_name)
std::unique_ptr<ParserKQLBase> ParserKQLQuery::getOperator(String & op_name)
{
if (op_name == "filter" || op_name == "where")
return std::make_unique<ParserKQLFilter>();
Expand All @@ -201,45 +224,16 @@ std::unique_ptr<IParserBase> ParserKQLQuery::getOperator(String & op_name)
return std::make_unique<ParserKQLPrint>();
else if (op_name == "count")
return std::make_unique<ParserKQLCount>();
else if (op_name == "top")
return std::make_unique<ParserKQLTop>();
else if (op_name == "top-hitters")
return std::make_unique<ParserKQLTopHitters>();
else
return nullptr;
}

bool ParserKQLQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool ParserKQLQuery::getOperations(Pos & pos, Expected & expected, OperationsPos & operation_pos)
{
struct KQLOperatorDataFlowState
{
String operator_name;
bool need_input;
bool gen_output;
int8_t backspace_steps; // how many steps to last token of previous pipe
};

auto select_query = std::make_shared<ASTSelectQuery>();
node = select_query;
ASTPtr tables;

std::unordered_map<std::string, KQLOperatorDataFlowState> kql_parser =
{
{"filter", {"filter", false, false, 3}},
{"where", {"filter", false, false, 3}},
{"limit", {"limit", false, true, 3}},
{"take", {"limit", false, true, 3}},
{"project", {"project", false, false, 3}},
{"distinct", {"distinct", false, true, 3}},
{"extend", {"extend", true, true, 3}},
{"sort by", {"order by", false, false, 4}},
{"order by", {"order by", false, false, 4}},
{"table", {"table", false, false, 3}},
{"print", {"print", false, true, 3}},
{"summarize", {"summarize", true, true, 3}},
{"make-series", {"make-series", true, true, 5}},
{"mv-expand", {"mv-expand", true, true, 5}},
{"count", {"count", false, true, 3}},
};

std::vector<std::pair<String, Pos>> operation_pos;

String table_name(pos->begin, pos->end);

if (table_name == "print")
Expand Down Expand Up @@ -300,17 +294,45 @@ bool ParserKQLQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
else
++pos;
}
return true;
}

bool ParserKQLQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
auto select_query = std::make_shared<ASTSelectQuery>();
node = select_query;
ASTPtr tables;

OperationsPos operation_pos;

if (!getOperations(pos, expected, operation_pos))
return false;

auto kql_operator_str = operation_pos.back().first;
auto npos = operation_pos.back().second;
// if (!npos.isValid())
// return false;

auto kql_operator_p = getOperator(kql_operator_str);
if (!kql_operator_p)
return false;

String updated_query;
kql_operator_p->updatePipeLine(operation_pos, updated_query);

Tokens token_query(updated_query.c_str(), updated_query.c_str() + updated_query.size());
IParser::Pos pos_query(token_query, pos.max_depth);
if (!updated_query.empty())
{
operation_pos.clear();
if(!ParserKQLQuery::getOperations(pos_query, expected, operation_pos))
return false;
}

kql_operator_str = operation_pos.back().first;
kql_operator_p = getOperator(kql_operator_str);
if (!kql_operator_p)
return false;

auto npos = operation_pos.back().second;

if (operation_pos.size() == 1)
{
if (kql_operator_str == "print")
Expand Down
16 changes: 14 additions & 2 deletions src/Parsers/Kusto/ParserKQLQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

namespace DB
{
using OperationsPos = std::vector<std::pair<String, IParser::Pos>>;

class ParserKQLBase : public IParserBase
{
public:
Expand All @@ -14,13 +16,23 @@ class ParserKQLBase : public IParserBase
static bool setSubQuerySource(ASTPtr & select_query, ASTPtr & source, bool dest_is_subquery, bool src_is_subquery);
static bool parseSQLQueryByString(ParserPtr && parser, String & query, ASTPtr & select_node, int32_t max_depth);
bool parseByString(const String expr, ASTPtr & node, const uint32_t max_depth);
virtual bool updatePipeLine (OperationsPos & /*operations*/, String & /*query*/) {return false;}
};

class ParserKQLQuery : public IParserBase
{

public:
struct KQLOperatorDataFlowState
{
String operator_name;
bool need_input;
bool gen_output;
bool need_reinterpret;
int8_t backspace_steps; // how many steps to last token of previous pipe
};
static bool getOperations(Pos & pos, Expected & expected, OperationsPos & operation_pos);
protected:
static std::unique_ptr<IParserBase> getOperator(String &op_name);
static std::unique_ptr<ParserKQLBase> getOperator(String &op_name);
const char * getName() const override { return "KQL query"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
Expand Down
57 changes: 57 additions & 0 deletions src/Parsers/Kusto/ParserKQLTop.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/Kusto/ParserKQLQuery.h>
#include <Parsers/Kusto/ParserKQLTop.h>
#include <format>

namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}

bool ParserKQLTop::parseImpl(Pos & /*pos*/, ASTPtr & /*node*/, Expected & /*expected*/)
{
return true;
}

bool ParserKQLTop::updatePipeLine (OperationsPos & operations, String & query)
{
Pos pos = operations.back().second;

if (pos->isEnd() || pos->type == TokenType::PipeMark || pos->type == TokenType::Semicolon)
throw Exception("Syntax error near top operator", ErrorCodes::SYNTAX_ERROR);

Pos start_pos = operations.front().second;
Pos end_pos = pos;
--end_pos;
--end_pos;

String prev_query(start_pos->begin, end_pos->end);

String limit_expr, sort_expr;
start_pos = pos;
end_pos = pos;
while (!pos->isEnd() && pos->type != TokenType::PipeMark && pos->type != TokenType::Semicolon)
{
if (String(pos->begin, pos->end) == "by")
{
auto limt_end_pos = pos;
--limt_end_pos;
limit_expr = String(start_pos->begin, limt_end_pos->end);
start_pos = pos;
++start_pos;
}
end_pos = pos;
++pos;
}
sort_expr = (start_pos <= end_pos) ? String(start_pos->begin, end_pos->end) : "";
if (limit_expr.empty() || sort_expr.empty())
throw Exception("top operator need a by clause", ErrorCodes::SYNTAX_ERROR);

query = std::format("{} sort by {} | take {}", prev_query, sort_expr, limit_expr);

return true;
}

}
17 changes: 17 additions & 0 deletions src/Parsers/Kusto/ParserKQLTop.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include <Parsers/IParserBase.h>
#include <Parsers/Kusto/ParserKQLQuery.h>

namespace DB
{

class ParserKQLTop : public ParserKQLBase
{
protected:
const char * getName() const override { return "KQL top"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
bool updatePipeLine (OperationsPos & operations, String & query) override;
};

}
76 changes: 76 additions & 0 deletions src/Parsers/Kusto/ParserKQLTopHitter.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
#include <Parsers/ExpressionListParsers.h>
#include <Parsers/Kusto/ParserKQLQuery.h>
#include <Parsers/Kusto/ParserKQLTopHitter.h>
#include <format>

namespace DB
{
namespace ErrorCodes
{
extern const int SYNTAX_ERROR;
}

bool ParserKQLTopHitters::parseImpl(Pos & /*pos*/, ASTPtr & /*node*/, Expected & /*expected*/)
{
return true;
}

bool ParserKQLTopHitters::updatePipeLine (OperationsPos & operations, String & query)
{
Pos pos = operations.back().second;

if (pos->isEnd() || pos->type == TokenType::PipeMark || pos->type == TokenType::Semicolon)
throw Exception("Syntax error near top-hitters operator", ErrorCodes::SYNTAX_ERROR);

Pos start_pos = operations.front().second;
Pos end_pos = pos;
--end_pos;
--end_pos;
--end_pos;
--end_pos;

String prev_query(start_pos->begin, end_pos->end);

String number_of_values, value_expression, summing_expression;
start_pos = pos;
end_pos = pos;
while (!pos->isEnd() && pos->type != TokenType::PipeMark && pos->type != TokenType::Semicolon)
{
if (String(pos->begin, pos->end) == "of")
{
auto number_end_pos = pos;
--number_end_pos;
number_of_values = String(start_pos->begin, number_end_pos->end);
start_pos = pos;
++start_pos;
}

if (String(pos->begin, pos->end) == "by")
{
auto expr_end_pos = pos;
--expr_end_pos;
value_expression = String(start_pos->begin, expr_end_pos->end);
start_pos = pos;
++start_pos;
}
end_pos = pos;
++pos;
}

if (value_expression.empty())
value_expression = (start_pos <= end_pos) ? String(start_pos->begin, end_pos->end) : "";
else
summing_expression = (start_pos <= end_pos) ? String(start_pos->begin, end_pos->end) : "";

if (number_of_values.empty() || value_expression.empty())
throw Exception("top-hitter operator need a ValueExpression", ErrorCodes::SYNTAX_ERROR);

if (summing_expression.empty())
query = std::format("{0} summarize approximate_count_{1} = count() by {1} | sort by approximate_count_{1} desc | take {2} ", prev_query, value_expression, number_of_values);
else
query = std::format("{0} summarize approximate_sum_{1} = sum({1}) by {2} | sort by approximate_sum_{1} desc | take {3}", prev_query, summing_expression, value_expression, number_of_values);

return true;
}

}
17 changes: 17 additions & 0 deletions src/Parsers/Kusto/ParserKQLTopHitter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
#pragma once

#include <Parsers/IParserBase.h>
#include <Parsers/Kusto/ParserKQLQuery.h>

namespace DB
{

class ParserKQLTopHitters : public ParserKQLBase
{
protected:
const char * getName() const override { return "KQL top-hitters"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
bool updatePipeLine (OperationsPos & operations, String & query) override;
};

}
Loading

0 comments on commit 41db86e

Please sign in to comment.