Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not merge this!!! #13

Draft
wants to merge 4 commits into
base: bigo-22.5.1.2079-liang
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
#include <Server/HTTP/HTTPServer.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <filesystem>
#include <Interpreters/StorageDistributedTasksBuilder.h>

#include "config_core.h"
#include "Common/config_version.h"
Expand Down
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ if (TARGET ch_contrib::hdfs)
endif()

add_headers_and_sources(dbms Storages/Cache)
add_headers_and_sources(dbms Storages/DistributedShuffle)
if (TARGET ch_contrib::hivemetastore)
add_headers_and_sources(dbms Storages/Hive)
endif()
Expand Down Expand Up @@ -241,6 +242,7 @@ add_object_library(clickhouse_databases_mysql Databases/MySQL)
add_object_library(clickhouse_disks Disks)
add_object_library(clickhouse_interpreters Interpreters)
add_object_library(clickhouse_interpreters_access Interpreters/Access)
add_object_library(clickhouse_interpreters_ast_rewriters Interpreters/ASTRewriters)
add_object_library(clickhouse_interpreters_mysql Interpreters/MySQL)
add_object_library(clickhouse_interpreters_clusterproxy Interpreters/ClusterProxy)
add_object_library(clickhouse_interpreters_jit Interpreters/JIT)
Expand Down
3 changes: 2 additions & 1 deletion src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,8 @@
\
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely") \
M(ClearTimeoutShuffleStorageSession, "Number of sessions cleared by timeout") \

namespace ProfileEvents
{
Expand Down
3 changes: 3 additions & 0 deletions src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,9 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, allow_experimental_object_type, false, "Allow Object and JSON data types", 0) \
M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \
M(Bool, count_distinct_optimization, false, "Rewrite count distinct to subquery of group by", 0) \
M(String, use_cluster_for_distributed_shuffle, "", "If you want to run the join and group by in distributed shuffle mode, set it as one of the available cluster.", 0) \
M(Bool, enable_distribute_shuffle, false, "Enable shuffle join", 0) \
M(UInt64, shuffle_storage_session_timeout, 1800, "How long a session can be alive before expired by timeout", 0) \
M(Bool, throw_on_unsupported_query_inside_transaction, true, "Throw exception if unsupported query is used inside transaction", 0) \
M(TransactionsWaitCSNMode, wait_changes_become_visible_after_commit_mode, TransactionsWaitCSNMode::WAIT_UNKNOWN, "Wait for committed changes to become actually visible in the latest snapshot", 0) \
M(Bool, throw_if_no_data_to_insert, true, "Enables or disables empty INSERTs, enabled by default", 0) \
Expand Down
234 changes: 234 additions & 0 deletions src/Interpreters/ASTRewriters/ASTAnalyzeUtil.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
#include <algorithm>
#include <memory>
#include <sstream>
#include <Core/NamesAndTypes.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Interpreters/ASTRewriters/ASTAnalyzeUtil.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/IAST_fwd.h>
#include <Poco/StringTokenizer.h>
namespace DB
{

String ColumnWithDetailNameAndType::toString() const
{
WriteBufferFromOwnString buf;
buf << "full_name: " << full_name << ", short_name: " << short_name
<< ", alias_name: " << alias_name;
buf << ", data_type: " << type->getName();
return buf.str();
}

NamesAndTypesList ColumnWithDetailNameAndType::toNamesAndTypesList(const std::vector<ColumnWithDetailNameAndType> & columns)
{
std::list<NameAndTypePair> names_and_types;
for (const auto & col : columns)
{
names_and_types.emplace_back(NameAndTypePair(col.short_name, col.type));
}
NamesAndTypesList res(names_and_types.begin(), names_and_types.end());
return res;
}

void ColumnWithDetailNameAndType::makeAliasByFullName(std::vector<ColumnWithDetailNameAndType> & columns)
{
for (auto & column : columns)
{
if (column.full_name != column.short_name && column.alias_name.empty())
{
column.alias_name = column.full_name;
std::replace(column.alias_name.begin(), column.alias_name.end(), '.', '_');
}
}
}

std::vector<String> ColumnWithDetailNameAndType::splitedFullName() const
{
Poco::StringTokenizer splitter(full_name, ".");
std::vector<String> res;
for (const auto & token : splitter)
{
res.push_back(token);
}
return res;
}

bool ASTAnalyzeUtil::hasGroupByRecursively(const ASTPtr & ast)
{
return hasGroupByRecursively(ast.get());
}
bool ASTAnalyzeUtil::hasGroupByRecursively(const IAST * ast)
{
if (!ast)
return false;
if (const auto * insert_ast = ast->as<ASTInsertQuery>())
{
return hasGroupByRecursively(insert_ast->select);
}
else if (const auto * select_with_union = ast->as<ASTSelectWithUnionQuery>())
{
for (auto & child : select_with_union->list_of_selects->children)
{
if (hasGroupByRecursively(child))
return true;
}
}
else if (const auto * select_ast = ast->as<ASTSelectQuery>())
{
if (select_ast->groupBy() != nullptr)
return true;
return hasGroupByRecursively(select_ast->groupBy().get());
}
else if (const auto * tables_ast = ast->as<ASTTablesInSelectQuery>())
{
for (const auto & child : tables_ast->children)
{
if (hasGroupByRecursively(child.get()))
return true;
}
}
else if (const auto * table_element = ast->as<ASTTablesInSelectQueryElement>())
{
const auto * table_expr = table_element->table_expression->as<ASTTableExpression>();
return hasGroupByRecursively(table_expr->subquery.get());
}
else if (const auto * subquery = ast->as<ASTSubquery>())
{
for (const auto & child : subquery->children)
{
if (hasGroupByRecursively(child.get()))
return true;
}
}
return false;
}


bool ASTAnalyzeUtil::hasGroupBy(const ASTPtr & ast)
{
return hasGroupBy(ast.get());
}

bool ASTAnalyzeUtil::hasGroupBy(const IAST * ast)
{
if (const auto * select_with_union_ast = ast->as<ASTSelectWithUnionQuery>())
{
if (select_with_union_ast->list_of_selects->children.size() > 1)
return false;
return hasGroupBy(select_with_union_ast->list_of_selects->children[0]);
}
else if (const auto * select_ast = ast->as<ASTSelectQuery>())
{
return select_ast->groupBy() != nullptr;
}
return false;
}

bool ASTAnalyzeUtil::hasAggregationColumn(const ASTPtr & ast)
{
return hasAggregationColumn(ast.get());
}
bool ASTAnalyzeUtil::hasAggregationColumn(const IAST * ast)
{
if (const auto * select_ast = ast->as<ASTSelectQuery>())
{
const auto * select_list = select_ast->select()->as<ASTExpressionList>();
for (const auto & child : select_list->children)
{
if (const auto * function = child->as<ASTFunction>())
{
if (function->name == "count" || function->name == "avg" || function->name == "sum")
{
return true;
}
}
}
}
return false;
}

bool ASTAnalyzeUtil::hasAggregationColumnRecursively(const ASTPtr & ast)
{
return hasAggregationColumnRecursively(ast.get());
}

bool ASTAnalyzeUtil::hasAggregationColumnRecursively(const IAST * ast)
{
if (!ast)
return false;
if (const auto * insert_ast = ast->as<ASTInsertQuery>())
{
return hasAggregationColumnRecursively(insert_ast->select.get());
}
else if (const auto * select_with_union_ast = ast->as<ASTSelectWithUnionQuery>())
{
for (const auto & child : select_with_union_ast->list_of_selects->children)
{
if (hasAggregationColumnRecursively(child.get()))
return true;
}
}
else if (const auto * select_ast = ast->as<ASTSelectQuery>())
{
if (hasAggregationColumn(select_ast))
return true;
return hasAggregationColumnRecursively(select_ast->tables().get());
}
else if (const auto * tables_ast = ast->as<ASTTablesInSelectQuery>())
{
for (const auto & child : tables_ast->children)
{
if (hasAggregationColumnRecursively(child.get()))
return true;
}
}
else if (const auto * table_element = ast->as<ASTTablesInSelectQueryElement>())
{
const auto * table_expr = table_element->table_expression->as<ASTTableExpression>();
return hasAggregationColumnRecursively(table_expr->subquery.get());
}
else if (const auto * subquery = ast->as<ASTSubquery>())
{
for (const auto & child : subquery->children)
{
if (hasAggregationColumnRecursively(child.get()))
return true;
}
}
return false;
}

String ASTAnalyzeUtil::tryGetTableExpressionAlias(const ASTTableExpression * table_expr)
{
String res;
if (table_expr->table_function)
{
res = table_expr->table_function->as<ASTFunction>()->tryGetAlias();
}
else if (table_expr->subquery)
{
res = table_expr->subquery->as<ASTSubquery>()->tryGetAlias();
}
else if (table_expr->database_and_table_name)
{
if (const auto * with_alias_ast = table_expr->database_and_table_name->as<ASTTableIdentifier>())
{
res = with_alias_ast->tryGetAlias();
if (res.empty())
{
res = with_alias_ast->shortName();
}
}
}
return res;
}

}
53 changes: 53 additions & 0 deletions src/Interpreters/ASTRewriters/ASTAnalyzeUtil.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <DataTypes/IDataType.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/IAST_fwd.h>

namespace DB
{
struct ColumnWithDetailNameAndType
{
String full_name;
String short_name;
String alias_name;
DataTypePtr type;
String toString() const;

static void makeAliasByFullName(std::vector<ColumnWithDetailNameAndType> & columns);
static NamesAndTypesList toNamesAndTypesList(const std::vector<ColumnWithDetailNameAndType> & columns);
std::vector<String> splitedFullName() const;
};
using ColumnWithDetailNameAndTypes = std::vector<ColumnWithDetailNameAndType>;
class ASTAnalyzeUtil
{
public:
static bool hasGroupByRecursively(const ASTPtr & ast);
static bool hasGroupBy(const ASTPtr & ast);
static bool hasGroupByRecursively(const IAST * ast);
static bool hasGroupBy(const IAST * ast);

//static bool hasAggregationColumnRecursively(ASTPtr ast);
static bool hasAggregationColumn(const ASTPtr & ast);
static bool hasAggregationColumn(const IAST * ast);
static bool hasAggregationColumnRecursively(const ASTPtr & ast);
static bool hasAggregationColumnRecursively(const IAST * ast);
static String tryGetTableExpressionAlias(const ASTTableExpression * table_expr);

};


class ShuffleTableIdGenerator
{
public:
ShuffleTableIdGenerator():id(0){}
inline UInt32 nextId() { return id++; }
private:
UInt32 id;
};
using ShuffleTableIdGeneratorPtr = std::shared_ptr<ShuffleTableIdGenerator>;
}
Loading