Skip to content

Commit

Permalink
Kusto-phase2, updated make-series operator
Browse files Browse the repository at this point in the history
  • Loading branch information
kashwy committed Aug 26, 2023
1 parent d261b29 commit 322a0aa
Show file tree
Hide file tree
Showing 2 changed files with 76 additions and 121 deletions.
194 changes: 75 additions & 119 deletions src/Parsers/Kusto/ParserKQLMakeSeries.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ namespace DB

bool ParserKQLMakeSeries :: parseAggregationColumns(AggregationColumns & aggregation_columns, Pos & pos)
{
std::unordered_set<String> allowed_aggregation
std::unordered_set<String> allowed_aggregation
({
"avg",
"avgif",
Expand Down Expand Up @@ -54,14 +54,14 @@ bool ParserKQLMakeSeries :: parseAggregationColumns(AggregationColumns & aggrega
{
alias = std::move(first_token);
aggregation_fun = String(pos->begin,pos->end);
++pos;
}
else
aggregation_fun = std::move(first_token);

if (allowed_aggregation.find(aggregation_fun) == allowed_aggregation.end())
return false;

++pos;
if (open_bracket.ignore(pos, expected))
column = String(pos->begin,pos->end);
else
Expand Down Expand Up @@ -148,9 +148,10 @@ bool ParserKQLMakeSeries :: parseFromToStepClause(FromToStepClause & from_to_ste
return true;
}


void ParserKQLMakeSeries :: makeNumericSeries(KQLMakeSeries & kql_make_series, const uint32_t & max_depth)
void ParserKQLMakeSeries :: makeSeries(KQLMakeSeries & kql_make_series, const uint32_t & max_depth)
{
const uint64_t era_diff = 62135596800; // this magic number is the differicen is second form 0001-01-01 (Azure start time ) and 1970-01-01 (CH start time)

String start_str, end_str;
String sub_query, main_query;

Expand All @@ -169,164 +170,123 @@ void ParserKQLMakeSeries :: makeNumericSeries(KQLMakeSeries & kql_make_series, c

String bin_str, start, end;

if (!start_str.empty()) // has from
{
bin_str = std::format(" toFloat64({0}) + (toInt64(((toFloat64({1}) - toFloat64({0})) / {2}) ) * {2}) AS {1}_ali ",
start_str, axis_column, step);
start = std::format("toUInt64({})", start_str);
}
else
{
bin_str = std::format(" toFloat64(toInt64((toFloat64({0}) ) / {1}) * {1}) AS {0}_ali ",
axis_column, step);
}

auto sub_sub_query = std::format(" (Select {0}, {1}, {2} FROM {3} GROUP BY {0}, {4}_ali ORDER BY {4}_ali) ", group_expression, subquery_columns, bin_str, table_name, axis_column);

if (!end_str.empty())
end = std::format("toUInt64({})", end_str);
uint64_t diff = 0;
String axis_column_format;
String axis_str;

String range, condition;
if (!start_str.empty() && !end_str.empty())
{
range = std::format("range({},{}, toUInt64({}))", start, end, step);
condition = std::format("{0}_ali >= {1} and {0}_ali <= {2}", axis_column, start, end);
}
else if (start_str.empty() && !end_str.empty())
auto get_group_expression_alias = [&]
{
range = std::format("range(low, {} , toUInt64({}))", end, step);
condition = std::format("{}_ali <= {}", axis_column, end);
}
else if (!start_str.empty() && end_str.empty())
{
range = std::format("range({}, high, toUInt64({}))", start, step);
condition = std::format("{}_ali >= {}", axis_column, start);
}
else
{
range = std::format("range(low, high, toUInt64({}))", step);
condition = "1"; //true
}
std::vector<String> group_expression_tokens;
Tokens tokens(group_expression.c_str(), group_expression.c_str() + group_expression.size());
IParser::Pos pos(tokens, max_depth);
while (!pos->isEnd())
{
if (String(pos->begin, pos->end) == "AS")
{
if (!group_expression_tokens.empty())
group_expression_tokens.pop_back();
++pos;
group_expression_tokens.push_back(String(pos->begin, pos->end));
}
else
group_expression_tokens.push_back(String(pos->begin, pos->end));
++pos;
}
String res;
for (auto token : group_expression_tokens)
res = res + token + " ";
return res;
};

auto range_len = std::format("length({})", range);
main_query = std::format("{} ", group_expression);
auto group_expression_alias = get_group_expression_alias();

auto axis_and_agg_alias_list = axis_column;
auto final_axis_agg_alias_list =std::format("tupleElement(zipped,1) AS {}",axis_column); //tupleElement(pp,2) as PriceAvg ,tupleElement(pp,1)
int idx = 2;
for (auto agg_column : aggregation_columns)
if (from_to_step.is_timespan)
{
String agg_group_column = std::format("arrayConcat(groupArrayIf ({}_ali,{}) as ga, arrayMap(x -> ({}),range(0,toUInt32 ({} - length(ga) < 0 ? 0 : {} - length(ga)),1) )) as {}",
agg_column.alias, condition, agg_column.default_value, range_len, range_len, agg_column.alias);
main_query +=", " + agg_group_column;

axis_and_agg_alias_list +=", " + agg_column.alias;
final_axis_agg_alias_list += std::format(", tupleElement(zipped,{}) AS {}", idx, agg_column.alias);
axis_column_format = std::format("toFloat64(toDateTime64({}, 9, 'UTC'))", axis_column);
}

auto axis_str = std::format("arrayDistinct(arrayConcat(groupArrayIf({0}_ali, {1}), arrayMap( x->(toFloat64(x)), {2})) ) as {0}",
axis_column, condition,range);

main_query += ", " + axis_str;
auto sub_group_by = std::format("{}", group_expression);

sub_query = std::format("( SELECT toUInt64(min({}_ali)) AS low, toUInt64(max({}_ali))+ {} AS high, arraySort(arrayZip({})) as zipped, {} FROM {} GROUP BY {} )",
axis_column, axis_column,step, axis_and_agg_alias_list,main_query,sub_sub_query, sub_group_by);

main_query = std::format("{},{}", group_expression, final_axis_agg_alias_list);

kql_make_series.sub_query = std::move(sub_query);
kql_make_series.main_query = std::move(main_query);
}

void ParserKQLMakeSeries :: makeTimeSeries(KQLMakeSeries & kql_make_series, const uint32_t & max_depth)
{
const uint64_t era_diff = 62135596800; // this magic number is the differicen is second form 0001-01-01 (Azure start time ) and 1970-01-01 (CH start time)

String start_str, end_str;
String sub_query, main_query;

auto & aggregation_columns = kql_make_series.aggregation_columns;
auto & from_to_step = kql_make_series.from_to_step;
auto & subquery_columns = kql_make_series.subquery_columns;
auto & axis_column = kql_make_series.axis_column;
auto & group_expression = kql_make_series.group_expression;
auto step = from_to_step.step;

if (!kql_make_series.from_to_step.from_str.empty())
start_str = getExprFromToken(kql_make_series.from_to_step.from_str, max_depth);

if (!kql_make_series.from_to_step.to_str.empty())
end_str = getExprFromToken(from_to_step.to_str, max_depth);

String bin_str, start, end;
else
axis_column_format = std::format("toFloat64({})", axis_column);

uint64_t diff = 0;
if (!start_str.empty()) // has from
{
bin_str = std::format(" toFloat64(toDateTime64({0}, 9, 'UTC')) + (toInt64(((toFloat64(toDateTime64({1}, 9, 'UTC')) - toFloat64(toDateTime64({0}, 9, 'UTC'))) / {2}) ) * {2}) AS {1}_ali ",
start_str, axis_column, step);
start = std::format("toUInt64(toDateTime64({},9,'UTC'))", start_str);
bin_str = std::format(" toFloat64({0}) + (toInt64((({1} - toFloat64({0})) / {2}) ) * {2}) AS {3}_ali ",
start_str, axis_column_format, step, axis_column);
start = std::format("toUInt64({})", start_str);
}
else
{
bin_str = std::format(" toInt64((toFloat64(toDateTime64({0}, 9, 'UTC')) + {1}) / {2}) * {2} AS {0}_ali ",
axis_column, era_diff, step);
diff = era_diff;
if (from_to_step.is_timespan)
diff = era_diff;
bin_str = std::format(" toFloat64(toInt64(({0} + {1}) / {2}) * {2}) AS {3}_ali ", axis_column_format, diff, step, axis_column);
}

auto sub_sub_query = std::format(" (Select {0}, {1}, {2} FROM {3} GROUP BY {0}, {4}_ali ORDER BY {4}_ali) ", group_expression, subquery_columns, bin_str, table_name, axis_column);

if (!end_str.empty())
end = std::format("toUInt64(toDateTime64({}, 9, 'UTC'))", end_str);
end = std::format("toUInt64({})", end_str);

String range, condition;

if (!start_str.empty() && !end_str.empty())
{
range = std::format("range({},{}, toUInt64({}))", start, end, step);
condition = std::format("{0}_ali >= {1} and {0}_ali <= {2}", axis_column, start, end);
range = std::format("range({}, {}, toUInt64({}))", start, end, step);
condition = std::format("where toInt64({0}) >= {1} and toInt64({0}) < {2}", axis_column_format, start, end);
}
else if (start_str.empty() && !end_str.empty())
{
range = std::format("range(low, {} + {}, toUInt64({}))", end, era_diff, step);
condition = std::format("{0}_ali - {1} < {2}", axis_column, era_diff, end);
range = std::format("range(low, {} + {}, toUInt64({}))", end, diff, step);
condition = std::format("where toInt64({0}) - {1} < {2}", axis_column_format, diff, end);
}
else if (!start_str.empty() && end_str.empty())
{
range = std::format("range({}, high, toUInt64({}))", start, step);
condition = std::format("{}_ali >= {}", axis_column, start);
condition = std::format("where toInt64({}) >= {}", axis_column_format, start);
}
else
{
range = std::format("range(low, high, toUInt64({}))", step);
condition = "1"; //true
condition = " ";
}

auto range_len = std::format("length({})", range);
main_query = std::format("{} ", group_expression);

String sub_sub_query;
if (group_expression.empty())
sub_sub_query = std::format(" (Select {0}, {1} FROM {2} {4} GROUP BY {3}_ali ORDER BY {3}_ali) ", subquery_columns, bin_str, table_name, axis_column, condition);
else
sub_sub_query = std::format(" (Select {0}, {1}, {2} FROM {3} {5} GROUP BY {0}, {4}_ali ORDER BY {4}_ali) ", group_expression, subquery_columns, bin_str, table_name, axis_column, condition);

if (!group_expression.empty())
main_query = std::format("{} ", group_expression_alias);

auto axis_and_agg_alias_list = axis_column;
auto final_axis_agg_alias_list =std::format("tupleElement(zipped,1) AS {}",axis_column); //tupleElement(pp,2) as PriceAvg ,tupleElement(pp,1)
int idx = 2;
for (auto agg_column : aggregation_columns)
{
String agg_group_column = std::format("arrayConcat(groupArrayIf ({}_ali,{}) as ga, arrayMap(x -> ({}),range(0,toUInt32 ({} - length(ga) < 0 ? 0 : {} - length(ga)),1) )) as {}",
agg_column.alias, condition, agg_column.default_value, range_len, range_len, agg_column.alias);
main_query +=", " + agg_group_column;
String agg_group_column = std::format("arrayConcat(groupArray ({}_ali) as ga, arrayMap(x -> ({}),range(0,toUInt32 ({} - length(ga) < 0 ? 0 : {} - length(ga)),1) )) as {}",
agg_column.alias, agg_column.default_value, range_len, range_len, agg_column.alias);
main_query = main_query.empty() ? agg_group_column : main_query + ", " + agg_group_column;

axis_and_agg_alias_list +=", " + agg_column.alias;
final_axis_agg_alias_list += std::format(", tupleElement(zipped,{}) AS {}", idx, agg_column.alias);
}
auto axis_str = std::format("arrayDistinct(arrayConcat(groupArrayIf(toDateTime64({0}_ali - {1},9,'UTC'), {2}), arrayMap( x->(toDateTime64(x - {1} ,9,'UTC')), {3}) )) as {0}",
axis_column, diff, condition,range);

if (from_to_step.is_timespan)
axis_str = std::format("arrayDistinct(arrayConcat(groupArray(toDateTime64({0}_ali - {1},9,'UTC')), arrayMap( x->(toDateTime64(x - {1} ,9,'UTC')), {2}) )) as {0}",
axis_column, diff, range);
else
axis_str = std::format("arrayDistinct(arrayConcat(groupArray({0}_ali), arrayMap( x->(toFloat64(x)), {1}) )) as {0}",
axis_column, range);

main_query += ", " + axis_str;
auto sub_group_by = std::format("{}", group_expression);
auto sub_group_by = group_expression.empty()? "" : std::format("GROUP BY {}", group_expression_alias);

sub_query = std::format("( SELECT toUInt64(min({}_ali)) AS low, toUInt64(max({}_ali))+ {} AS high, arraySort(arrayZip({})) as zipped, {} FROM {} GROUP BY {} )",
sub_query = std::format("( SELECT toUInt64(min({}_ali)) AS low, toUInt64(max({}_ali))+ {} AS high, arraySort(arrayZip({})) as zipped, {} FROM {} {} )",
axis_column, axis_column,step, axis_and_agg_alias_list, main_query, sub_sub_query, sub_group_by);

main_query = std::format("{},{}", group_expression, final_axis_agg_alias_list);
if (group_expression.empty())
main_query = std::format("{}", final_axis_agg_alias_list);
else
main_query = std::format("{},{}", group_expression_alias, final_axis_agg_alias_list);

kql_make_series.sub_query = std::move(sub_query);
kql_make_series.main_query = std::move(main_query);
Expand Down Expand Up @@ -387,10 +347,7 @@ bool ParserKQLMakeSeries :: parseImpl(Pos & pos, ASTPtr & node, Expected & expec
subquery_columns += ", "+ column_str;
}

if (from_to_step.is_timespan)
makeTimeSeries(kql_make_series, pos.max_depth);
else
makeNumericSeries(kql_make_series, pos.max_depth);
makeSeries(kql_make_series, pos.max_depth);

Tokens token_subquery(kql_make_series.sub_query.c_str(), kql_make_series.sub_query.c_str() + kql_make_series.sub_query.size());
IParser::Pos pos_subquery(token_subquery, pos.max_depth);
Expand All @@ -407,6 +364,5 @@ bool ParserKQLMakeSeries :: parseImpl(Pos & pos, ASTPtr & node, Expected & expec

pos = begin;
return true;

}
}
3 changes: 1 addition & 2 deletions src/Parsers/Kusto/ParserKQLMakeSeries.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,7 @@ class ParserKQLMakeSeries : public ParserKQLBase
String main_query;
};

void makeNumericSeries(KQLMakeSeries & kql_make_series, const uint32_t & max_depth);
void makeTimeSeries(KQLMakeSeries & kql_make_series, const uint32_t & max_depth);
void makeSeries(KQLMakeSeries & kql_make_series, const uint32_t & max_depth);
bool parseAggregationColumns(AggregationColumns & aggregation_columns, Pos & pos);
bool parseFromToStepClause(FromToStepClause & from_to_step, Pos & pos);
const char * getName() const override { return "KQL project"; }
Expand Down

0 comments on commit 322a0aa

Please sign in to comment.