Skip to content

Commit

Permalink
Merge pull request ClickHouse#57430 from ClickHouse/non-ready-set-ttl
Browse files Browse the repository at this point in the history
Non ready set in TTL WHERE.
  • Loading branch information
KochetovNicolai authored Feb 9, 2024
2 parents effaace + 97d1eb1 commit 862c6cd
Show file tree
Hide file tree
Showing 27 changed files with 320 additions and 105 deletions.
1 change: 1 addition & 0 deletions src/Interpreters/Set.h
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ class Set
const DataTypes & getElementsTypes() const { return set_elements_types; }

bool hasExplicitSetElements() const { return fill_set_elements || (!set_elements.empty() && set_elements.front()->size() == data.getTotalRowCount()); }
bool hasSetElements() const { return !set_elements.empty(); }
Columns getSetElements() const { checkIsCreated(); return { set_elements.begin(), set_elements.end() }; }

void checkColumnsNumber(size_t num_key_columns) const;
Expand Down
28 changes: 28 additions & 0 deletions src/Processors/QueryPlan/CreatingSetsStep.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,34 @@ void addCreatingSetsStep(QueryPlan & query_plan, PreparedSets::Subqueries subque
query_plan.unitePlans(std::move(creating_sets), std::move(plans));
}

QueryPipelineBuilderPtr addCreatingSetsTransform(QueryPipelineBuilderPtr pipeline, PreparedSets::Subqueries subqueries, ContextPtr context)
{
DataStreams input_streams;
input_streams.emplace_back(DataStream{pipeline->getHeader()});

QueryPipelineBuilders pipelines;
pipelines.reserve(1 + subqueries.size());
pipelines.push_back(std::move(pipeline));

auto plan_settings = QueryPlanOptimizationSettings::fromContext(context);
auto pipeline_settings = BuildQueryPipelineSettings::fromContext(context);

for (auto & future_set : subqueries)
{
if (future_set->get())
continue;

auto plan = future_set->build(context);
if (!plan)
continue;

input_streams.emplace_back(plan->getCurrentDataStream());
pipelines.emplace_back(plan->buildQueryPipeline(plan_settings, pipeline_settings));
}

return CreatingSetsStep(input_streams).updatePipeline(std::move(pipelines), pipeline_settings);
}

std::vector<std::unique_ptr<QueryPlan>> DelayedCreatingSetsStep::makePlansForSets(DelayedCreatingSetsStep && step)
{
std::vector<std::unique_ptr<QueryPlan>> plans;
Expand Down
2 changes: 2 additions & 0 deletions src/Processors/QueryPlan/CreatingSetsStep.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,6 @@ void addCreatingSetsStep(QueryPlan & query_plan, PreparedSets::Subqueries subque

void addCreatingSetsStep(QueryPlan & query_plan, PreparedSetsPtr prepared_sets, ContextPtr context);

QueryPipelineBuilderPtr addCreatingSetsTransform(QueryPipelineBuilderPtr pipeline, PreparedSets::Subqueries subqueries, ContextPtr context);

}
5 changes: 3 additions & 2 deletions src/Processors/TTL/ITTLAlgorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ namespace ErrorCodes
}

ITTLAlgorithm::ITTLAlgorithm(
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
: description(description_)
const TTLExpressions & ttl_expressions_, const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
: ttl_expressions(ttl_expressions_)
, description(description_)
, old_ttl_info(old_ttl_info_)
, current_time(current_time_)
, force(force_)
Expand Down
9 changes: 8 additions & 1 deletion src/Processors/TTL/ITTLAlgorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
namespace DB
{

struct TTLExpressions
{
ExpressionActionsPtr expression;
ExpressionActionsPtr where_expression;
};

/**
* Represents the actions, which are required to do
* with data, when TTL is expired: delete, aggregate, etc.
Expand All @@ -18,7 +24,7 @@ class ITTLAlgorithm
using TTLInfo = IMergeTreeDataPart::TTLInfo;
using MutableDataPartPtr = MergeTreeMutableDataPartPtr;

ITTLAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
ITTLAlgorithm(const TTLExpressions & ttl_expressions_, const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
virtual ~ITTLAlgorithm() = default;

virtual void execute(Block & block) = 0;
Expand All @@ -39,6 +45,7 @@ class ITTLAlgorithm
bool isTTLExpired(time_t ttl) const;
UInt32 getTimestampByIndex(const IColumn * column, size_t index) const;

const TTLExpressions ttl_expressions;
const TTLDescription description;
const TTLInfo old_ttl_info;
const time_t current_time;
Expand Down
11 changes: 6 additions & 5 deletions src/Processors/TTL/TTLAggregationAlgorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@ namespace DB
{

TTLAggregationAlgorithm::TTLAggregationAlgorithm(
const TTLExpressions & ttl_expressions_,
const TTLDescription & description_,
const TTLInfo & old_ttl_info_,
time_t current_time_,
bool force_,
const Block & header_,
const MergeTreeData & storage_)
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
: ITTLAlgorithm(ttl_expressions_, description_, old_ttl_info_, current_time_, force_)
, header(header_)
{
current_key_value.resize(description.group_by_keys.size());
Expand Down Expand Up @@ -75,8 +76,8 @@ void TTLAggregationAlgorithm::execute(Block & block)
const auto & column_names = header.getNames();
MutableColumns aggregate_columns = header.cloneEmptyColumns();

auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
auto ttl_column = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
auto where_column = executeExpressionAndGetColumn(ttl_expressions.where_expression, block, description.where_result_column);

size_t rows_aggregated = 0;
size_t current_key_start = 0;
Expand Down Expand Up @@ -157,8 +158,8 @@ void TTLAggregationAlgorithm::execute(Block & block)
/// If some rows were aggregated we have to recalculate ttl info's
if (some_rows_were_aggregated)
{
auto ttl_column_after_aggregation = executeExpressionAndGetColumn(description.expression, block, description.result_column);
auto where_column_after_aggregation = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
auto ttl_column_after_aggregation = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
auto where_column_after_aggregation = executeExpressionAndGetColumn(ttl_expressions.where_expression, block, description.where_result_column);
for (size_t i = 0; i < block.rows(); ++i)
{
bool where_filter_passed = !where_column_after_aggregation || where_column_after_aggregation->getBool(i);
Expand Down
1 change: 1 addition & 0 deletions src/Processors/TTL/TTLAggregationAlgorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ class TTLAggregationAlgorithm final : public ITTLAlgorithm
{
public:
TTLAggregationAlgorithm(
const TTLExpressions & ttl_expressions_,
const TTLDescription & description_,
const TTLInfo & old_ttl_info_,
time_t current_time_,
Expand Down
5 changes: 3 additions & 2 deletions src/Processors/TTL/TTLColumnAlgorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace DB
{

TTLColumnAlgorithm::TTLColumnAlgorithm(
const TTLExpressions & ttl_expressions_,
const TTLDescription & description_,
const TTLInfo & old_ttl_info_,
time_t current_time_,
Expand All @@ -12,7 +13,7 @@ TTLColumnAlgorithm::TTLColumnAlgorithm(
const ExpressionActionsPtr & default_expression_,
const String & default_column_name_,
bool is_compact_part_)
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
: ITTLAlgorithm(ttl_expressions_, description_, old_ttl_info_, current_time_, force_)
, column_name(column_name_)
, default_expression(default_expression_)
, default_column_name(default_column_name_)
Expand Down Expand Up @@ -49,7 +50,7 @@ void TTLColumnAlgorithm::execute(Block & block)
if (default_column)
default_column = default_column->convertToFullColumnIfConst();

auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
auto ttl_column = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);

auto & column_with_type = block.getByName(column_name);
const IColumn * values_column = column_with_type.column.get();
Expand Down
1 change: 1 addition & 0 deletions src/Processors/TTL/TTLColumnAlgorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class TTLColumnAlgorithm final : public ITTLAlgorithm
{
public:
TTLColumnAlgorithm(
const TTLExpressions & ttl_expressions_,
const TTLDescription & description_,
const TTLInfo & old_ttl_info_,
time_t current_time_,
Expand Down
10 changes: 5 additions & 5 deletions src/Processors/TTL/TTLDeleteAlgorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ namespace DB
{

TTLDeleteAlgorithm::TTLDeleteAlgorithm(
const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
const TTLExpressions & ttl_expressions_, const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_)
: ITTLAlgorithm(ttl_expressions_, description_, old_ttl_info_, current_time_, force_)
{
if (!isMinTTLExpired())
new_ttl_info = old_ttl_info;
Expand All @@ -19,8 +19,8 @@ void TTLDeleteAlgorithm::execute(Block & block)
if (!block || !isMinTTLExpired())
return;

auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
auto where_column = executeExpressionAndGetColumn(description.where_expression, block, description.where_result_column);
auto ttl_column = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
auto where_column = executeExpressionAndGetColumn(ttl_expressions.where_expression, block, description.where_result_column);

MutableColumns result_columns;
const auto & column_names = block.getNames();
Expand Down Expand Up @@ -54,7 +54,7 @@ void TTLDeleteAlgorithm::execute(Block & block)

void TTLDeleteAlgorithm::finalize(const MutableDataPartPtr & data_part) const
{
if (description.where_expression)
if (ttl_expressions.where_expression)
data_part->ttl_infos.rows_where_ttl[description.result_column] = new_ttl_info;
else
data_part->ttl_infos.table_ttl = new_ttl_info;
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/TTL/TTLDeleteAlgorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ namespace DB
class TTLDeleteAlgorithm final : public ITTLAlgorithm
{
public:
TTLDeleteAlgorithm(const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);
TTLDeleteAlgorithm(const TTLExpressions & ttl_expressions_, const TTLDescription & description_, const TTLInfo & old_ttl_info_, time_t current_time_, bool force_);

void execute(Block & block) override;
void finalize(const MutableDataPartPtr & data_part) const override;
Expand Down
5 changes: 3 additions & 2 deletions src/Processors/TTL/TTLUpdateInfoAlgorithm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ namespace DB
{

TTLUpdateInfoAlgorithm::TTLUpdateInfoAlgorithm(
const TTLExpressions & ttl_expressions_,
const TTLDescription & description_,
const TTLUpdateField ttl_update_field_,
const String ttl_update_key_,
const TTLInfo & old_ttl_info_,
time_t current_time_,
bool force_)
: ITTLAlgorithm(description_, old_ttl_info_, current_time_, force_)
: ITTLAlgorithm(ttl_expressions_, description_, old_ttl_info_, current_time_, force_)
, ttl_update_field(ttl_update_field_)
, ttl_update_key(ttl_update_key_)
{
Expand All @@ -21,7 +22,7 @@ void TTLUpdateInfoAlgorithm::execute(Block & block)
if (!block)
return;

auto ttl_column = executeExpressionAndGetColumn(description.expression, block, description.result_column);
auto ttl_column = executeExpressionAndGetColumn(ttl_expressions.expression, block, description.result_column);
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = ITTLAlgorithm::getTimestampByIndex(ttl_column.get(), i);
Expand Down
1 change: 1 addition & 0 deletions src/Processors/TTL/TTLUpdateInfoAlgorithm.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ class TTLUpdateInfoAlgorithm : public ITTLAlgorithm
{
public:
TTLUpdateInfoAlgorithm(
const TTLExpressions & ttl_expressions_,
const TTLDescription & description_,
const TTLUpdateField ttl_update_field_,
const String ttl_update_key_,
Expand Down
35 changes: 29 additions & 6 deletions src/Processors/Transforms/TTLCalcTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,24 @@
namespace DB
{

static TTLExpressions getExpressions(const TTLDescription & ttl_descr, PreparedSets::Subqueries & subqueries_for_sets, const ContextPtr & context)
{
auto expr = ttl_descr.buildExpression(context);
auto expr_queries = expr.sets->getSubqueries();
subqueries_for_sets.insert(subqueries_for_sets.end(), expr_queries.begin(), expr_queries.end());

auto where_expr = ttl_descr.buildWhereExpression(context);
if (where_expr.sets)
{
auto where_expr_queries = where_expr.sets->getSubqueries();
subqueries_for_sets.insert(subqueries_for_sets.end(), where_expr_queries.begin(), where_expr_queries.end());
}

return {expr.expression, where_expr.expression};
}

TTLCalcTransform::TTLCalcTransform(
const ContextPtr & context,
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
Expand All @@ -21,33 +38,39 @@ TTLCalcTransform::TTLCalcTransform(
{
const auto & rows_ttl = metadata_snapshot_->getRowsTTL();
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
rows_ttl, TTLUpdateField::TABLE_TTL, rows_ttl.result_column, old_ttl_infos.table_ttl, current_time_, force_));
getExpressions(rows_ttl, subqueries_for_sets, context), rows_ttl,
TTLUpdateField::TABLE_TTL, rows_ttl.result_column, old_ttl_infos.table_ttl, current_time_, force_));
}

for (const auto & where_ttl : metadata_snapshot_->getRowsWhereTTLs())
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
where_ttl, TTLUpdateField::ROWS_WHERE_TTL, where_ttl.result_column, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_));
getExpressions(where_ttl, subqueries_for_sets, context), where_ttl,
TTLUpdateField::ROWS_WHERE_TTL, where_ttl.result_column, old_ttl_infos.rows_where_ttl[where_ttl.result_column], current_time_, force_));

for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
group_by_ttl, TTLUpdateField::GROUP_BY_TTL, group_by_ttl.result_column, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_));
getExpressions(group_by_ttl, subqueries_for_sets, context), group_by_ttl,
TTLUpdateField::GROUP_BY_TTL, group_by_ttl.result_column, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_));

if (metadata_snapshot_->hasAnyColumnTTL())
{
for (const auto & [name, description] : metadata_snapshot_->getColumnTTLs())
{
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
description, TTLUpdateField::COLUMNS_TTL, name, old_ttl_infos.columns_ttl[name], current_time_, force_));
getExpressions(description, subqueries_for_sets, context), description,
TTLUpdateField::COLUMNS_TTL, name, old_ttl_infos.columns_ttl[name], current_time_, force_));
}
}

for (const auto & move_ttl : metadata_snapshot_->getMoveTTLs())
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
move_ttl, TTLUpdateField::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));
getExpressions(move_ttl, subqueries_for_sets, context), move_ttl,
TTLUpdateField::MOVES_TTL, move_ttl.result_column, old_ttl_infos.moves_ttl[move_ttl.result_column], current_time_, force_));

for (const auto & recompression_ttl : metadata_snapshot_->getRecompressionTTLs())
algorithms.emplace_back(std::make_unique<TTLUpdateInfoAlgorithm>(
recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
getExpressions(recompression_ttl, subqueries_for_sets, context), recompression_ttl,
TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
}

void TTLCalcTransform::consume(Chunk chunk)
Expand Down
4 changes: 4 additions & 0 deletions src/Processors/Transforms/TTLCalcTransform.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ class TTLCalcTransform : public IAccumulatingTransform
{
public:
TTLCalcTransform(
const ContextPtr & context,
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
Expand All @@ -23,6 +24,8 @@ class TTLCalcTransform : public IAccumulatingTransform
bool force_
);

PreparedSets::Subqueries getSubqueries() { return std::move(subqueries_for_sets); }

String getName() const override { return "TTL_CALC"; }
Status prepare() override;

Expand All @@ -35,6 +38,7 @@ class TTLCalcTransform : public IAccumulatingTransform

private:
std::vector<TTLAlgorithmPtr> algorithms;
PreparedSets::Subqueries subqueries_for_sets;

/// ttl_infos and empty_columns are updating while reading
const MergeTreeData::MutableDataPartPtr & data_part;
Expand Down
Loading

0 comments on commit 862c6cd

Please sign in to comment.