Skip to content

Commit a6ba11e

Browse files
authored
DQ Fallback when udf with layers (#26687)
1 parent 8336376 commit a6ba11e

File tree

3 files changed

+59
-6
lines changed

3 files changed

+59
-6
lines changed

ydb/library/yql/providers/dq/provider/exec/yql_dq_exectransformer.cpp

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -818,7 +818,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
818818
bool fallbackFlag = BuildUploadList(uploadList, localRun, explorer, typeEnv, files);
819819

820820
if (fallbackFlag) {
821-
YQL_CLOG(DEBUG, ProviderDq) << "Fallback: " << NCommon::ExprToPrettyString(ctx, *input);
821+
YQL_CLOG(TRACE, ProviderDq) << "Fallback: " << NCommon::ExprToPrettyString(ctx, *input);
822822
return Fallback();
823823
} else {
824824
*lambda = SerializeRuntimeNode(root, typeEnv);
@@ -921,6 +921,33 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
921921
}
922922
}
923923

924+
bool hasErrors = false;
925+
auto fallback = [&ctx, &hasErrors](const TExprNode& n, const TString& msg) {
926+
auto issues = TIssues{TIssue(ctx.GetPosition(n.Pos()), msg).SetCode(TIssuesIds::DQ_GATEWAY_NEED_FALLBACK_ERROR, TSeverityIds::S_WARNING)};
927+
ctx.AssociativeIssues.emplace(&n, std::move(issues));
928+
hasErrors = true;
929+
};
930+
931+
VisitExpr(*input.Get(), [&fallback, &hasErrors](const TExprNode& n) {
932+
if (TCoScriptUdf::Match(&n) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(n.Head().Content()))) {
933+
fallback(n, TStringBuilder() << "Cannot execute system python udf " << n.Content() << " in DQ");
934+
return false;
935+
}
936+
if ((TCoScriptUdf::Match(&n) && n.ChildrenSize() > 4) || (TCoUdf::Match(&n) && n.ChildrenSize() == 8)) {
937+
for (const auto& setting: n.Child(TCoScriptUdf::Match(&n) ? 4 : 7)->Children()) {
938+
YQL_ENSURE(setting->Head().IsAtom());
939+
if (setting->Head().Content() == "layers") {
940+
fallback(n, TStringBuilder() << "Cannot execute udf " << n.Head().Content() << " with layers in DQ");
941+
return false;
942+
}
943+
}
944+
}
945+
return !hasErrors;
946+
});
947+
if (hasErrors) {
948+
return Fallback();
949+
}
950+
924951
THashMap<TString, TString> secureParams;
925952
NCommon::FillSecureParams(resInput, *State->TypeCtx, secureParams);
926953

@@ -1074,7 +1101,7 @@ class TDqExecTransformer: public TExecTransformerBase, TCounters
10741101
return IGraphTransformer::TStatus(IGraphTransformer::TStatus::Error);
10751102
}
10761103

1077-
YQL_CLOG(DEBUG, ProviderDq) << "Fallback from gateway: " << NCommon::ExprToPrettyString(ctx, *input);
1104+
YQL_CLOG(TRACE, ProviderDq) << "Fallback from gateway: " << NCommon::ExprToPrettyString(ctx, *input);
10781105
TIssue warning(ctx.GetPosition(input->Pos()), "DQ cannot execute the query");
10791106
warning.Severity = TSeverityIds::S_INFO;
10801107
ctx.IssueManager.RaiseIssue(warning);

ydb/library/yql/providers/dq/provider/yql_dq_recapture.cpp

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,8 +202,17 @@ class TDqsRecaptureTransformer : public TSyncTransformerBase {
202202
Scan(*node.Child(i), ctx, good, visited);
203203
}
204204
}
205-
}
206-
else if (TCoScriptUdf::Match(&node)) {
205+
} else if (TCoScriptUdf::Match(&node)) {
206+
if (node.ChildrenSize() > 4) {
207+
for (const auto& setting: node.Child(4)->Children()) {
208+
YQL_ENSURE(setting->Head().IsAtom());
209+
if (setting->Head().Content() == "layers") {
210+
AddInfo(ctx, TStringBuilder() << "Cannot execute udf " << node.Head().Content() << " with layers in DQ");
211+
good = false;
212+
}
213+
}
214+
}
215+
207216
if (good && TCoScriptUdf::Match(&node) && NKikimr::NMiniKQL::IsSystemPython(NKikimr::NMiniKQL::ScriptTypeFromStr(node.Head().Content()))) {
208217
AddInfo(ctx, TStringBuilder() << "system python udf");
209218
good = false;
@@ -213,8 +222,15 @@ class TDqsRecaptureTransformer : public TSyncTransformerBase {
213222
Scan(*node.Child(i), ctx, good, visited);
214223
}
215224
}
216-
}
217-
else {
225+
} else if (TCoUdf::Match(&node) && node.ChildrenSize() == 8) {
226+
for (const auto& setting: node.Child(7)->Children()) {
227+
YQL_ENSURE(setting->Head().IsAtom());
228+
if (setting->Head().Content() == "layers") {
229+
AddInfo(ctx, TStringBuilder() << "Cannot execute udf " << node.Head().Content() << " with layers in DQ");
230+
good = false;
231+
}
232+
}
233+
} else {
218234
for (size_t i = 0; i != node.ChildrenSize() && good; ++i) {
219235
Scan(*node.Child(i), ctx, good, visited);
220236
}

ydb/library/yql/providers/dq/provider/yql_dq_validate.cpp

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,16 @@ std::shared_ptr<TIssue> TDqExecutionValidator::ValidateDqStage(const TExprNode&
5858
stageInfo.Issue = MakeErrorPtr(ctx, *n, TStringBuilder() << "Cannot execute system python udf " << n->Content() << " in DQ");
5959
hasErrors = true;
6060
}
61+
if ((TCoScriptUdf::Match(n.Get()) && n->ChildrenSize() > 4) || (TCoUdf::Match(n.Get()) && n->ChildrenSize() == 8)) {
62+
for (const auto& setting: n->Child(TCoScriptUdf::Match(n.Get()) ? 4 : 7)->Children()) {
63+
YQL_ENSURE(setting->Head().IsAtom());
64+
if (setting->Head().Content() == "layers") {
65+
stageInfo.Issue = MakeErrorPtr(ctx, *n, TStringBuilder() << "Cannot execute udf " << n->Head().Content() << " with layers in DQ");
66+
hasErrors = true;
67+
}
68+
}
69+
}
70+
6171
if (!typeCtx->ForceDq && TDqReadWrapBase::Match(n.Get())) {
6272
auto readNode = n->Child(0);
6373
auto dataSourceName = readNode->Child(1)->Child(0)->Content();

0 commit comments

Comments
 (0)