Skip to content

25-1-analytics [KQP] Splitted hashfunc into hashv1 and columnshardhashv1 #19725

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions ydb/core/kqp/executer_actor/kqp_tasks_graph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -550,11 +550,11 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
BuildUnionAllChannels(tasksGraph, stageInfo, inputIdx, inputStageInfo, outputIdx, enableSpilling, log);
break;
case NKqpProto::TKqpPhyConnection::kHashShuffle: {
ui32 hashKind = NHashKind::EUndefined;
std::optional<EHashShuffleFuncType> hashKind;
auto forceSpilling = input.GetHashShuffle().GetUseSpilling();
switch (input.GetHashShuffle().GetHashKindCase()) {
case NKqpProto::TKqpPhyCnHashShuffle::kHashV1: {
hashKind = NHashKind::EHashV1;
hashKind = EHashShuffleFuncType::HashV1;
break;
}
case NKqpProto::TKqpPhyCnHashShuffle::kColumnShardHashV1: {
Expand All @@ -579,13 +579,15 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
);

inputStageInfo.Meta.HashParamsByOutput[outputIdx] = columnShardHashV1Params;
hashKind = NHashKind::EColumnShardHashV1;
hashKind = EHashShuffleFuncType::ColumnShardHashV1;
break;
}
default: {
Y_ENSURE(false, "undefined type of hash for shuffle");
}
}

Y_ENSURE(hashKind.has_value(), "HashKind wasn't set!");
BuildHashShuffleChannels(
tasksGraph,
stageInfo,
Expand All @@ -595,7 +597,7 @@ void BuildKqpStageChannels(TKqpTasksGraph& tasksGraph, TStageInfo& stageInfo,
input.GetHashShuffle().GetKeyColumns(),
enableSpilling,
log,
hashKind,
hashKind.value(),
forceSpilling
);
break;
Expand Down Expand Up @@ -1128,12 +1130,15 @@ void FillOutputDesc(
}
hashPartitionDesc.SetPartitionsCount(output.PartitionsCount);

switch (output.HashKind) {
case NHashKind::EHashV1: {
Y_ENSURE(output.HashKind.has_value(), "HashKind wasn't set before the FillOutputDesc!");

switch (output.HashKind.value()) {
using enum EHashShuffleFuncType;
case HashV1: {
hashPartitionDesc.MutableHashV1();
break;
}
case NHashKind::EColumnShardHashV1: {
case ColumnShardHashV1: {
auto& columnShardHashV1Params = stageInfo.Meta.GetColumnShardHashV1Params(outputIdx);
LOG_DEBUG_S(
*TlsActivationContext,
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/kqp/host/kqp_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
#include <ydb/core/kqp/opt/kqp_statistics_transformer.h>
#include <ydb/core/kqp/opt/kqp_column_statistics_requester.h>
#include <ydb/core/kqp/opt/kqp_constant_folding_transformer.h>
#include <ydb/core/kqp/opt/kqp_opt_hash_func_propagate_transformer.h>
#include <ydb/core/kqp/opt/logical/kqp_opt_cbo.h>


Expand Down Expand Up @@ -336,6 +337,15 @@ class TKqpRunner : public IKqpRunner {
.AddTypeAnnotationTransformer(CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config))
.AddPostTypeAnnotation()
.Add(CreateKqpBuildPhysicalQueryTransformer(OptimizeCtx, BuildQueryCtx), "BuildPhysicalQuery")
.Add(CreateKqpTxsHashFuncPropagateTransformer(
CreateTypeAnnotationTransformer(
CreateKqpTypeAnnotationTransformer(Cluster, sessionCtx->TablesPtr(), *typesCtx, Config), *typesCtx
),
*typesCtx,
Config
),
"HashFuncPropagate"
)
.Add(CreateKqpStatisticsTransformer(OptimizeCtx, *typesCtx, Config, Pctx), "Statistics")
.Build(false);

Expand Down
239 changes: 239 additions & 0 deletions ydb/core/kqp/opt/kqp_opt_hash_func_propagate_transformer.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
#include "kqp_opt_hash_func_propagate_transformer.h"

#include <ydb/library/yql/dq/opt/dq_opt_stat.h>
#include <yql/essentials/utils/log/log.h>
#include <yql/essentials/core/yql_expr_type_annotation.h>
#include <yql/essentials/core/services/yql_transform_pipeline.h>

#include <yql/essentials/core/yql_expr_optimize.h>

#include <ydb/library/yql/dq/common/dq_common.h>

using namespace NYql;
using namespace NYql::NNodes;
using namespace NKikimr::NKqp;
using namespace NYql::NDq;

TVector<TDqPhyStage> TopSortStages(const TDqPhyStageList& stages) {
TVector<TDqPhyStage> topSortedStages;
topSortedStages.reserve(stages.Size());
std::function<void(const TDqPhyStage&)> topSort;
THashSet<ui64 /*uniqueId*/> visitedStages;

// Assume there is no cycles.
topSort = [&](const TDqPhyStage& stage) {
if (visitedStages.contains(stage.Ref().UniqueId())) {
return;
}

for (const auto& input : stage.Inputs()) {
if (auto connection = input.Maybe<TDqConnection>()) {
// NOTE: somehow `Output()` is actually an input.
if (auto phyStage = connection.Cast().Output().Stage().Maybe<TDqPhyStage>()) {
topSort(phyStage.Cast());
}
}
}

visitedStages.insert(stage.Ref().UniqueId());
topSortedStages.push_back(stage);
};

for (const auto& stage : stages) {
topSort(stage);
}

return topSortedStages;
}

TMaybeNode<TKqpPhysicalTx> PropogateHashFuncToHashShuffles(
const TKqpPhysicalTx& tx,
TExprContext& ctx,
const TKikimrConfiguration::TPtr& config
) {
const auto topSortedStages = TopSortStages(tx.Stages());

TVector<TDqPhyStage> newStages;
newStages.reserve(topSortedStages.size());

THashMap<ui64, NDq::EHashShuffleFuncType> hashTypeByStageID;
TNodeOnNodeOwnedMap stagesMap;
for (const auto& stage : topSortedStages) {
bool isRead = false;
VisitExpr(
stage.Program().Body().Ptr(),
[&isRead](const TExprNode::TPtr& node) {
if (TKqpTable::Match(node.Get())) {
isRead = true;
}
return !isRead;
}
);

bool enableShuffleElimination = config->OptShuffleElimination.Get().GetOrElse(config->DefaultEnableShuffleElimination);
auto stageHashType = config->HashShuffleFuncType.Get().GetOrElse(config->DefaultHashShuffleFuncType);
if (isRead && enableShuffleElimination) {
stageHashType = config->ColumnShardHashShuffleFuncType.Get().GetOrElse(config->DefaultColumnShardHashShuffleFuncType);
} else {
for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
auto input = stage.Inputs().Item(i);
if (auto maybeMap = input.Maybe<TDqCnMap>()) {
ui64 inputStageID = maybeMap.Cast().Output().Stage().Ptr()->UniqueId();
if (hashTypeByStageID.contains(inputStageID)) {
stageHashType = hashTypeByStageID[inputStageID];
}
}
}
}

ui64 stageID = stage.Ptr()->UniqueId();
hashTypeByStageID[stageID] = stageHashType;

TNodeOnNodeOwnedMap stagesInputMap;
for (size_t i = 0; i < stage.Inputs().Size(); ++i) {
auto input = stage.Inputs().Item(i);

if (auto maybeHashShuffle = input.Maybe<TDqCnHashShuffle>()) {
auto hashShuffle = maybeHashShuffle.Cast();
auto withHashFunc =
Build<TDqCnHashShuffle>(ctx, hashShuffle.Pos())
.InitFrom(maybeHashShuffle.Cast());

if (!hashShuffle.UseSpilling().IsValid()) {
// this is YQL bug when we can't init theу next field by index, if the previous wasn't initialized
withHashFunc
.UseSpilling().Build(false);
}

withHashFunc
.HashFunc()
.Build(ToString(hashTypeByStageID[stageID]));

stagesInputMap.emplace(input.Raw(), withHashFunc.Done().Ptr());
}
}

auto newStage = Build<TDqPhyStage>(ctx, stage.Pos())
.InitFrom(stage)
.Inputs(ctx.ReplaceNodes(ctx.ReplaceNodes(stage.Inputs().Ptr(), stagesInputMap), stagesMap))
.Done();
stagesMap.emplace(stage.Raw(), newStage.Ptr());
newStages.emplace_back(std::move(newStage));
}

return
Build<TKqpPhysicalTx>(ctx, tx.Pos())
.InitFrom(tx)
.Stages()
.Add(newStages)
.Build()
.Results(ctx.ReplaceNodes(tx.Results().Ptr(), stagesMap))
.Done();
}

class TKqpTxHashFuncPropagateTransformer : public TSyncTransformerBase {
public:
TKqpTxHashFuncPropagateTransformer(const TKikimrConfiguration::TPtr& config)
: Config(config)
{}

IGraphTransformer::TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
YQL_ENSURE(TExprBase(input).Maybe<TKqpPhysicalTx>());

auto tx = TExprBase(input).Cast<TKqpPhysicalTx>();
auto optimizedTx = PropogateHashFuncToHashShuffles(tx, ctx, Config);
if (!optimizedTx) {
return TStatus::Error;
}

output = optimizedTx.Cast().Ptr();
return IGraphTransformer::TStatus::Ok;
}

void Rewind() override {}

private:
TKikimrConfiguration::TPtr Config;
};

TAutoPtr<IGraphTransformer> CreateKqpTxHashFuncPropagateTransformer(const TKikimrConfiguration::TPtr& config) {
return THolder<IGraphTransformer>(new TKqpTxHashFuncPropagateTransformer(config));
}

class TKqpTxsHashFuncPropagateTransformer : public TSyncTransformerBase {
public:
TKqpTxsHashFuncPropagateTransformer(
TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
TTypeAnnotationContext& typesCtx,
const TKikimrConfiguration::TPtr& config
)
: TypeAnnTransformer(std::move(typeAnnTransformer))
{
TxTransformer =
TTransformationPipeline(&typesCtx)
.AddServiceTransformers()
.Add(*TypeAnnTransformer, "TypeAnnotation")
.AddPostTypeAnnotation(/* forSubgraph */ true)
.Add(CreateKqpTxHashFuncPropagateTransformer(config), "Peephole")
.Build(false);
}

TStatus DoTransform(TExprNode::TPtr input, TExprNode::TPtr& output, TExprContext& ctx) final {
if (!TKqpPhysicalQuery::Match(input.Get())) {
return TStatus::Error;
}

TKqpPhysicalQuery query(input);

TVector<TKqpPhysicalTx> txs;
txs.reserve(query.Transactions().Size());
for (const auto& tx : query.Transactions()) {
auto expr = TransformTx(tx, ctx);
txs.push_back(expr.Cast());
}

auto phyQuery = Build<TKqpPhysicalQuery>(ctx, query.Pos())
.Transactions()
.Add(txs)
.Build()
.Results(query.Results())
.Settings(query.Settings())
.Done();

output = phyQuery.Ptr();
return TStatus::Ok;
}

void Rewind() final {
TxTransformer->Rewind();
}

private:
TMaybeNode<TKqpPhysicalTx> TransformTx(const TKqpPhysicalTx& tx, TExprContext& ctx) {
TxTransformer->Rewind();

auto expr = tx.Ptr();

while (true) {
auto status = InstantTransform(*TxTransformer, expr, ctx);
if (status == TStatus::Error) {
return {};
}
if (status == TStatus::Ok) {
break;
}
}
return TKqpPhysicalTx(expr);
}

TAutoPtr<IGraphTransformer> TxTransformer;
TAutoPtr<NYql::IGraphTransformer> TypeAnnTransformer;
};

TAutoPtr<IGraphTransformer> NKikimr::NKqp::CreateKqpTxsHashFuncPropagateTransformer(
TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
TTypeAnnotationContext& typesCtx,
const TKikimrConfiguration::TPtr& config
) {
return THolder<IGraphTransformer>(new TKqpTxsHashFuncPropagateTransformer(typeAnnTransformer, typesCtx, config));
}
27 changes: 27 additions & 0 deletions ydb/core/kqp/opt/kqp_opt_hash_func_propagate_transformer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include "kqp_opt.h"

#include <ydb/core/kqp/common/kqp_yql.h>
#include <yql/essentials/ast/yql_expr.h>
#include <yql/essentials/core/yql_graph_transformer.h>
#include <yql/essentials/core/yql_expr_optimize.h>
#include <yql/essentials/core/yql_expr_type_annotation.h>
#include <yql/essentials/core/yql_opt_utils.h>


namespace NKikimr {
namespace NKqp {

using namespace NYql;
using namespace NYql::NNodes;
using namespace NOpt;

TAutoPtr<IGraphTransformer> CreateKqpTxsHashFuncPropagateTransformer(
TAutoPtr<NYql::IGraphTransformer> typeAnnTransformer,
TTypeAnnotationContext& typesCtx,
const TKikimrConfiguration::TPtr& config
);

}
}
15 changes: 14 additions & 1 deletion ydb/core/kqp/opt/kqp_query_plan.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,13 @@ class TxPlanSerializer {
keyColumns.AppendValue(TString(column.Value()));
}
}

auto& hashFunc = planNode.NodeInfo["HashFunc"];
if (hashShuffle.HashFunc().IsValid()) {
hashFunc = hashShuffle.HashFunc().Cast().StringValue();
} else {
hashFunc = "HashV1";
}
} else if (auto merge = connection.Maybe<TDqCnMerge>()) {
planNode.TypeName = "Merge";
auto& sortColumns = planNode.NodeInfo["SortColumns"];
Expand Down Expand Up @@ -2237,7 +2244,13 @@ struct TQueryPlanReconstructor {
result["Node Type"] = plan.GetMapSafe().at("Node Type").GetStringSafe();

if (plan.GetMapSafe().at("Node Type") == "HashShuffle") {
result["Node Type"] = TStringBuilder{} << "HashShuffle (KeyColumns: " << plan.GetMapSafe().at("KeyColumns") << ")";
TStringBuilder stringBuilder;
stringBuilder << "HashShuffle (" <<
"KeyColumns: " << plan.GetMapSafe().at("KeyColumns") << ", " <<
"HashFunc: " << plan.GetMapSafe().at("HashFunc")
<< ")";

result["Node Type"] = stringBuilder;
}

if (plan.GetMapSafe().contains("CTE Name")) {
Expand Down
1 change: 1 addition & 0 deletions ydb/core/kqp/opt/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ SRCS(
kqp_statistics_transformer.cpp
kqp_column_statistics_requester.cpp
kqp_constant_folding_transformer.cpp
kqp_opt_hash_func_propagate_transformer.cpp
)

PEERDIR(
Expand Down
Loading
Loading