Skip to content

Commit

Permalink
Add support of CTE with modifying DML operations on replicated tables (
Browse files Browse the repository at this point in the history
…#1168)

Several issues could occur while running queries with writable CTEs, which
modify replicated tables. This patch proposes solution for main aspects of
handling the modifying CTE over replicated tables. Those issues appeared due to
inability of major interfaces to handle locus type CdbLocusType_Replicated and
flow type FLOW_REPLICATED, which is set for the result of modifying operation
over replicated table inside the adjust_modifytable_flow function.

1. Choosing Join locus type

The planner failed to form a plan when trying to choose
correct final locus type for join operation between modifying CTE over
replicated table and any other relation. The planner were failing either with
errors, because cdbpath_motion_for_join function could not choose a proper
join locus, or produced invalid suboptimal plan when handling Replicated locus.
Therefore, this patch reconsiders the join logic with locus Replicated.
The main principle to decide join locus of Replicated locus with others is: the
slice which does the modifying operation on replicated table must be executed on
all segments with data. And we also have to keep in mind that Redistributing or
Broadcasting Replicated locus will lead to data duplication. The joins are
performed in the following way:
SegmentGeneral + Replicated

When the join between CdbLocusType_Replicated and CdbLocusType_SegmentGeneral
occurred, the planner failed with the assert, which checks that the common
UPDATE/DELETE ... FROM/USING (modifying operation with join) of a replicated
table takes place (root->upd_del_replicated_table == 0 condition in the
cdbpath_motion_for_join function).This assert was added by the author without
proper test coverage, and, currently, there is no case when this check can be
true.

Without the assert the planner produced the valid plan, however it could cut the
number of segments in the final locus. In this context locus Replicated was not
correctly handled, therefore one need to add the proper logic which takes into
account the writable CTE case.

This patch allows joins between CdbLocusType_Replicated and
CdbLocusType_SegmentGeneral in the following way: for the cases, when
SegmentGeneral's number of segments is greater or equal than Replicated's, the
final join locus type becomes Replicated as well. Otherwise we gather both
Replicated and SegmentGeneral parts to SingleQE in order to perform join at
single segment.

Replicated + General
The logic related to General locus remained unchanged. The join locus type
becomes Replicated.

Replicated + SingleQE
If the join between CdbLocusType_Replicated and CdbLocusType_Entry or
CdbLocusType_SingleQE takes place, the Replicated locus will be brought to Entry
or to SingleQE respectively.

Replicated + Partitioned
If the join between CdbLocusType_Replicated and CdbLocusType_Hashed or
CdbLocusType_Strewn takes place, and the number of segments for Replicated locus
is equal to number of segments for other locus, and it's not an outer join, the
join will be performed with Hashed or Strewn locus respectively. Otherwise,
both parts are brought to SingleQE.

Replicated + Replicated
Join between 2 parts with Replicated locus leads to final join locus being also
Replicated.

2. UNION ALL which includes Replicated path

Here two issues occurred. When UNION ALL contained a path with locus Replicated,
and other path, which is propagated to less number of segments than Replicated
is, the UNION target locus is aligned to the smallest number of segments among
the operands inside the set_append_path_locus function, what leads to segments
reduction of locus Replicated. This behaviour is invalid, because Replicated
locus must be executed on all segments with data. Therefore this patch does
not allow to make alignment of final UNION locus in case when target locus
is Replicated.

When UNION ALL were executed with operands, one of which had locus
Replicated and the other had locus SegmentGeneral, the planner was failing
with assertion error from cdbpath_create_motion_path. Final locus of UNION
was decided to be Replicated, and therefore, all other arguments had to become
Replicated as well. The issue was that cdbpath_create_motion_path function
forbidden operations of moving SegmentGeneral path to Replicated locus.

This patch solves the issue by making SegmentGeneral locus stay unchanged
in case when SegmentGeneral's number of segment is greater or equal
than Replicated's. In this case no motion needed. Otherwise SegmentGeneral path
is broadcasted from single segment.

3. Volatile functions handling

When a query had quals, which were applied to the modifying CTE and
contained volatile functions, the final plan became invalid, because its
execution led to volatile tuple set on different segments (however, we expect
the set of Replicated locus be the same everywhere). That could produce
inconsistent results in case of joins, SubPlans. And the issue is not only with
quals, but it can occur for volatile targetlists or joinquals.

This patch solves this issue by total prohibition of volatile functions applied
to the plan or path with locus Replicated. The functions
turn_volatile_seggen_to_singleqe, create_agg_subplan, create_groupingsets_plan,
create_modifytable_plan, create_motion_path, make_subplan were extended by the
condition, which checks whether given locus type is Replicated and there are
volatile functions over it. If condition is satisfied the proper error is thrown

The changes cover volatile targetlist, returning list, plan quals or join quals,
having clauses.

4. Replicated locus with different number of segments inside SubPlans

Another issue solved by this patch occurred when the modifying CTE was
referenced inside any SubPlan. In this case
cdbllize_decorate_subplans_with_motions and fix_outer_query_motions_mutator
functions tried to broadcast already replicated plan, what could lead to data
duplication. Therefore, one need to prevent these functions from broadcasting
the result with Replicated locus.

This patch modifies both functions by adding the condition, depending on which
the planning goes on or the error is thrown. If plan's locus type is
CdbLocusType_Replicated and its numsegments is equal to number of segments of
target distribution, the broadcast doesn't occur. If the number of segments is
different, the error is thrown.

5. Executor with sorted Explicit Gather Motion

execMotionSortedReceiver function had an assert preventing Explicit Gather from
working for sorted data. Since there isn't anything preventing it from working
correctly, this patch removes the assert and that case now works correctly.

Since the planner in GPDB 7 was considerably reworked, there are a lot of
changes from the original commit:
1. Changes for ParallelizeCorrelatedSubPlanMutator amd ParallelizeSubplan are
moved to cdbllize_decorate_subplans_with_motions.
2. Additional related fix to fix_outer_query_motions_mutator, disabling
broadcasts for Replicated tables. Explicit Gather is still allowed, and motions
to the same number of segments are omitted.
3. Fix for apply_motion is not needed anymore (original case 1).
4. Original fix for cdbpath_create_motion_path is not needed, already fixed
(corresponds to original case 3).
5. In cdbpath_motion_for_join, whole case is missing, added the original
GPDB 6 version from the patch. Also removed the asserts.
6. Additional fix for create_motion_plan for fixing outer query behavior, now
MOTIONTYPE_OUTER_QUERY is not lost.
7. Fix for set_append_path_locus slightly reworked because the condition is
more complicated on GPDB 7.
8. Checks for volatile functions are moved to different places. Quals are now
checked in cdbpath_create_motion_path, returning lists in create_modifytable_plan,
target lists in make_subplan, HAVING clauses in create_agg_plan and
create_groupingsets_planю
9. Test output is updated for GPDB 7 (different plans). Also GPDB 7 managed to
correctly plan a query that was failing before using Explicit Gather Motion:
```
explain (costs off)
with cte as (
    insert into with_dml_dr_seg2
    select i, i * 100 from generate_series(1,6) i
    returning i, j
) select * from t1
where t1.i in (select i from cte)
order by 1;
```

Co-Authored-By: Alexey Gordeev <[email protected]>
(cherry picked from commit 76964c0)
  • Loading branch information
bimboterminator1 authored and silent-observer committed Dec 26, 2024
1 parent cc52dc5 commit 9a9bbb9
Show file tree
Hide file tree
Showing 9 changed files with 2,711 additions and 95 deletions.
40 changes: 35 additions & 5 deletions src/backend/cdb/cdbllize.c
Original file line number Diff line number Diff line change
Expand Up @@ -790,13 +790,21 @@ cdbllize_decorate_subplans_with_motions(PlannerInfo *root, Plan *plan)
if (!subplan->flow)
elog(ERROR, "subplan is missing Flow information");

/*
* Broadcasting Replicated locus leads to data duplicates.
*/
if (subplan->flow->locustype == CdbLocusType_Replicated &&
subplan->flow->numsegments != context.currentPlanFlow->numsegments)
elog(ERROR, "could not parallelize SubPlan");

/*
* If the subquery result is not available where the outer query needs it,
* we have to add a Motion node to redistribute it.
*/
if (subplan->flow->locustype != CdbLocusType_OuterQuery &&
subplan->flow->locustype != CdbLocusType_SegmentGeneral &&
subplan->flow->locustype != CdbLocusType_General)
subplan->flow->locustype != CdbLocusType_General &&
subplan->flow->locustype != CdbLocusType_Replicated)
{
subplan = fix_subplan_motion(root, subplan, context.currentPlanFlow);

Expand Down Expand Up @@ -878,6 +886,7 @@ fix_outer_query_motions_mutator(Node *node, decorate_subplans_with_motions_conte
if (IsA(plan, Motion))
{
Motion *motion = (Motion *) plan;
bool shouldOmit = false;

/* sanity check: Sub plan must have flow */
Assert(motion->plan.lefttree->flow);
Expand Down Expand Up @@ -942,14 +951,35 @@ fix_outer_query_motions_mutator(Node *node, decorate_subplans_with_motions_conte
elog(ERROR, "unexpected Flow type in parent of a SubPlan");
}

if (plan->lefttree->flow->flotype == FLOW_REPLICATED)
{
/*
* Broadcasting Replicated locus leads to data duplication.
* We can only Explicit Gather it to a single QE or we can
* omit this motion if number of segments is equal.
*/
if (context->currentPlanFlow->flotype == FLOW_SINGLETON)
{
motion->motionType = MOTIONTYPE_GATHER_SINGLE;
}
else if (plan->lefttree->flow->numsegments == context->currentPlanFlow->numsegments)
{
shouldOmit = true;
}
else
elog(ERROR, "could not parallelize SubPlan");
}

/*
* For non-top slice, if this motion is QE singleton and subplan's locus
* is CdbLocusType_SegmentGeneral, omit this motion.
*/
if (context->sliceDepth > 0 &&
context->currentPlanFlow->flotype == FLOW_SINGLETON &&
context->currentPlanFlow->segindex == 0 &&
motion->plan.lefttree->flow->locustype == CdbLocusType_SegmentGeneral)
shouldOmit |= context->sliceDepth > 0 &&
context->currentPlanFlow->flotype == FLOW_SINGLETON &&
context->currentPlanFlow->segindex == 0 &&
motion->plan.lefttree->flow->locustype == CdbLocusType_SegmentGeneral;

if (shouldOmit)
{
/*
* Omit this motion. If there were any InitPlans attached to it,
Expand Down
253 changes: 167 additions & 86 deletions src/backend/cdb/cdbpath.c
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,10 @@ cdbpath_create_motion_path(PlannerInfo *root,
if (!bms_is_empty(PATH_REQ_OUTER(subpath)))
return NULL;

if (CdbPathLocus_IsReplicated(subpath->locus) &&
contain_volatile_functions((Node *) subpath->pathtarget->exprs))
goto invalid_motion_request;

/*
* Data is only available on segments, to distingush it with
* CdbLocusType_General, adding a motion to indicated this
Expand Down Expand Up @@ -1347,21 +1351,6 @@ cdbpath_motion_for_join(PlannerInfo *root,
}
}

/*
* Locus type Replicated can only be generated by join operation.
* And in the function cdbpathlocus_join there is a rule:
* <any locus type> join <Replicated> => any locus type
* Proof by contradiction, it shows that when code arrives here,
* it is impossible that any of the two input paths' locus
* is Replicated. So we add two asserts here.
*/
Assert(!CdbPathLocus_IsReplicated(outer.locus));
Assert(!CdbPathLocus_IsReplicated(inner.locus));

if (CdbPathLocus_IsReplicated(outer.locus) ||
CdbPathLocus_IsReplicated(inner.locus))
goto fail;

outer.has_wts = cdbpath_contains_wts(outer.path);
inner.has_wts = cdbpath_contains_wts(inner.path);

Expand Down Expand Up @@ -1743,100 +1732,182 @@ cdbpath_motion_for_join(PlannerInfo *root,
other = &outer;
}

Assert(CdbPathLocus_IsBottleneck(other->locus) ||
CdbPathLocus_IsPartitioned(other->locus));

/*
* For UPDATE/DELETE, replicated table can't guarantee a logic row has
* same ctid or item pointer on each copy. If we broadcast matched tuples
* to all segments, the segments may update the wrong tuples or can't
* find a valid tuple according to ctid or item pointer.
*
* So For UPDATE/DELETE on replicated table, we broadcast other path so
* all target tuples can be selected on all copys and then be updated
* locally.
*/
if (root->upd_del_replicated_table > 0 &&
bms_is_member(root->upd_del_replicated_table,
segGeneral->path->parent->relids))
if (CdbPathLocus_IsReplicated(other->locus))
{
/*
* For UPDATE on a replicated table, we have to do it
* everywhere so that for each segment, we have to collect
* all the information of other that is we should broadcast it
* The case, when UPDATE/DELETE operation on a replicated table
* also has join operand with Replicated locus, is unknown.
*/

/*
* FIXME: do we need to test other's movable?
*/
CdbPathLocus_MakeReplicated(&other->move_to,
CdbPathLocus_NumSegments(segGeneral->locus));
}
else if (CdbPathLocus_IsBottleneck(other->locus))
{
Assert(root->upd_del_replicated_table == 0);

int numsegments = CdbPathLocus_CommonSegments(segGeneral->locus,
other->locus);

/*
* if the locus type is equal and segment count is unequal,
* we will dispatch the one on more segments to the other
* Replicated locus corresponds to the result of
* the CTE with modifying DML operation over a replicated
* table inside. In case when SegmentGeneral locus is
* propagated at more number of segments than Replicated locus
* is, it is appropriate to reduce SegmentGeneral's number to
* perform join on segments. Otherwise, perform join at
* SingleQE.
*/
numsegments = CdbPathLocus_CommonSegments(segGeneral->locus,
other->locus);
segGeneral->move_to = other->locus;
segGeneral->move_to.numsegments = numsegments;
if (segGeneral->locus.numsegments >= other->locus.numsegments)
{
segGeneral->locus.numsegments = numsegments;
return other->locus;
}
else
{
CdbPathLocus_MakeSingleQE(&segGeneral->move_to, numsegments);
CdbPathLocus_MakeSingleQE(&other->move_to, numsegments);
}
}
else
{
/*
* This branch handles for partitioned other locus
* hashed, hashoj, strewn
*/
Assert(CdbPathLocus_IsPartitioned(other->locus));

Assert(CdbPathLocus_IsBottleneck(other->locus) ||
CdbPathLocus_IsPartitioned(other->locus));

if (!segGeneral->ok_to_replicate)
/*
* For UPDATE/DELETE, replicated table can't guarantee a logic row has
* same ctid or item pointer on each copy. If we broadcast matched tuples
* to all segments, the segments may update the wrong tuples or can't
* find a valid tuple according to ctid or item pointer.
*
* So For UPDATE/DELETE on replicated table, we broadcast other path so
* all target tuples can be selected on all copys and then be updated
* locally.
*/
if (root->upd_del_replicated_table > 0 &&
bms_is_member(root->upd_del_replicated_table,
segGeneral->path->parent->relids))
{
if (!try_redistribute(root, segGeneral,
other, redistribution_clauses))
{
/*
* FIXME: do we need to test movable?
*/
CdbPathLocus_MakeSingleQE(&segGeneral->move_to,
CdbPathLocus_NumSegments(segGeneral->locus));
CdbPathLocus_MakeSingleQE(&other->move_to,
CdbPathLocus_NumSegments(other->locus));
}
/*
* For UPDATE on a replicated table, we have to do it
* everywhere so that for each segment, we have to collect
* all the information of other that is we should broadcast it
*/

/*
* FIXME: do we need to test other's movable?
*/
CdbPathLocus_MakeReplicated(&other->move_to,
CdbPathLocus_NumSegments(segGeneral->locus));
}
else if (CdbPathLocus_IsBottleneck(other->locus))
{
/*
* if the locus type is equal and segment count is unequal,
* we will dispatch the one on more segments to the other
*/
numsegments = CdbPathLocus_CommonSegments(segGeneral->locus,
other->locus);
segGeneral->move_to = other->locus;
segGeneral->move_to.numsegments = numsegments;
}
else
{
/*
* If all other's segments have segGeneral stored, then no motion
* is needed.
*
* A sql to reach here:
* select * from d2 a join r1 b using (c1);
* where d2 is a replicated table on 2 segment,
* r1 is a random table on 1 segments.
*/
if (CdbPathLocus_NumSegments(segGeneral->locus) >=
CdbPathLocus_NumSegments(other->locus))
return other->locus;
else
* This branch handles for partitioned other locus
* hashed, hashoj, strewn
*/
Assert(CdbPathLocus_IsPartitioned(other->locus));

if (!segGeneral->ok_to_replicate)
{
if (!try_redistribute(root, segGeneral,
other, redistribution_clauses))
other, redistribution_clauses))
{
numsegments = CdbPathLocus_CommonSegments(segGeneral->locus,
other->locus);
/*
* FIXME: do we need to test movable?
*/
CdbPathLocus_MakeSingleQE(&segGeneral->move_to, numsegments);
CdbPathLocus_MakeSingleQE(&other->move_to, numsegments);
* FIXME: do we need to test movable?
*/
CdbPathLocus_MakeSingleQE(&segGeneral->move_to,
CdbPathLocus_NumSegments(segGeneral->locus));
CdbPathLocus_MakeSingleQE(&other->move_to,
CdbPathLocus_NumSegments(other->locus));
}
}
else
{
/*
* If all other's segments have segGeneral stored, then no motion
* is needed.
*
* A sql to reach here:
* select * from d2 a join r1 b using (c1);
* where d2 is a replicated table on 2 segment,
* r1 is a random table on 1 segments.
*/
if (CdbPathLocus_NumSegments(segGeneral->locus) >=
CdbPathLocus_NumSegments(other->locus))
return other->locus;
else
{
if (!try_redistribute(root, segGeneral,
other, redistribution_clauses))
{
numsegments = CdbPathLocus_CommonSegments(segGeneral->locus,
other->locus);
/*
* FIXME: do we need to test movable?
*/
CdbPathLocus_MakeSingleQE(&segGeneral->move_to, numsegments);
CdbPathLocus_MakeSingleQE(&other->move_to, numsegments);
}
}
}
}
}
}
}
else if (CdbPathLocus_IsReplicated(outer.locus) ||
CdbPathLocus_IsReplicated(inner.locus))
{
/*
* Replicated paths shouldn't occur except ones including
* modifying CTEs with DML operations on replicated table.
*/
Assert(root->upd_del_replicated_table == 0);

CdbpathMfjRel *replicated;
CdbpathMfjRel *other;

if (CdbPathLocus_IsReplicated(outer.locus))
{
replicated = &outer;
other = &inner;
}
else
{
replicated = &inner;
other = &outer;
}

int numsegments = CdbPathLocus_CommonSegments(replicated->locus,
other->locus);

/*
* If Replicated locus is joined with Partitioned locus group
* it will be possible to perform join locally (if number of segments
* is equal). Otherwise, join must be performed at single segment.
*/
if (CdbPathLocus_IsBottleneck(other->locus))
CdbPathLocus_MakeSimple(&replicated->move_to,
other->locus.locustype, numsegments);
else if (CdbPathLocus_IsPartitioned(other->locus))
{
if (replicated->ok_to_replicate &&
CdbPathLocus_NumSegments(replicated->locus) ==
CdbPathLocus_NumSegments(other->locus))
return other->locus;
else
{
CdbPathLocus_MakeSingleQE(&replicated->move_to, numsegments);
CdbPathLocus_MakeSingleQE(&other->move_to, numsegments);
}
}
}
/*
* Is either source confined to a single process? NB: Motion to a single
* process (qDisp or qExec) is the only motion in which we may use Merge
Expand Down Expand Up @@ -2629,7 +2700,9 @@ create_split_update_path(PlannerInfo *root, Index rti, GpPolicy *policy, Path *s
Path *
turn_volatile_seggen_to_singleqe(PlannerInfo *root, Path *path, Node *node)
{
if ((CdbPathLocus_IsSegmentGeneral(path->locus) || CdbPathLocus_IsGeneral(path->locus)) &&
if ((CdbPathLocus_IsSegmentGeneral(path->locus) ||
CdbPathLocus_IsGeneral(path->locus) ||
CdbPathLocus_IsReplicated(path->locus)) &&
(contain_volatile_functions(node) || IsA(path, LimitPath)))
{
CdbPathLocus singleQE;
Expand All @@ -2647,6 +2720,14 @@ turn_volatile_seggen_to_singleqe(PlannerInfo *root, Path *path, Node *node)
getgpsegmentCount());
return path;
}
else if (CdbPathLocus_IsReplicated(path->locus))
{
/*
* Replicated locus is not supported yet in context of
* volatile functions handling.
*/
elog(ERROR, "could not devise a plan");
}

CdbPathLocus_MakeSingleQE(&singleQE,
CdbPathLocus_NumSegments(path->locus));
Expand Down
3 changes: 2 additions & 1 deletion src/backend/executor/nodeMotion.c
Original file line number Diff line number Diff line change
Expand Up @@ -433,7 +433,8 @@ execMotionSortedReceiver(MotionState *node)
Motion *motion = (Motion *) node->ps.plan;
EState *estate = node->ps.state;

AssertState(motion->motionType == MOTIONTYPE_GATHER &&
AssertState((motion->motionType == MOTIONTYPE_GATHER ||
motion->motionType == MOTIONTYPE_GATHER_SINGLE) &&
motion->sendSorted &&
hp != NULL);

Expand Down
Loading

0 comments on commit 9a9bbb9

Please sign in to comment.