Skip to content

Commit

Permalink
Optimize broadcast_motion_walker and explain
Browse files Browse the repository at this point in the history
  • Loading branch information
dnskvlnk committed Dec 24, 2023
1 parent cd341ba commit 8de7f0f
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 55 deletions.
66 changes: 27 additions & 39 deletions src/backend/cdb/cdbmutate.c
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,10 @@ typedef struct ApplyMotionState
plan_tree_base_prefix base; /* Required prefix for
* plan_tree_walker/mutator */
int nextMotionID;
int numsegments;
int sliceDepth;
bool containMotionNodes;
Node *from;
HTAB *planid_subplans; /* hash table for InitPlanItem */
} ApplyMotionState;

Expand All @@ -94,16 +96,6 @@ typedef struct
List *cursorPositions;
} pre_dispatch_function_evaluation_context;

typedef struct RowsMutatorState {
plan_tree_base_prefix base;
Node *sliceRoot;
int scaleFactor;
} RowsMutatorState;

static bool broadcast_motion_walker(Node *node, RowsMutatorState *state);

static bool rows_number_walker(Node *node, RowsMutatorState *state);

static Node *planner_make_plan_constant(struct PlannerInfo *root, Node *n, bool is_SRI);

static Node *pre_dispatch_function_evaluation_mutator(Node *node,
Expand Down Expand Up @@ -376,50 +368,50 @@ get_partitioned_policy_from_flow(Plan *plan)
GP_POLICY_DEFAULT_NUMSEGMENTS());
}

/*
Make a single traversal to fixup rows number in slices which are
below Broadcast Motion nodes.
*/

static bool
broadcast_motion_walker(Node *node, RowsMutatorState *state)
rows_number_walker(Node *node, ApplyMotionState *context)
{
if (node == NULL)
return false;

if (is_plan_node(node) &&
((Plan *) node)->flow != NULL &&
((Plan *) node)->flow->req_move == MOVEMENT_BROADCAST)
if (is_plan_node(node))
{
state->scaleFactor = ((Plan *) node)->flow->numsegments;
plan_tree_walker(state->sliceRoot, rows_number_walker, state);
Plan *plan = (Plan *) node;

if (plan->flow != NULL && plan->flow->req_move == MOVEMENT_BROADCAST)
return true;

plan->plan_rows *= context->numsegments;
}
else if (IsA(node, Motion) || IsA(node, SubPlan))
state->sliceRoot = node;

return plan_tree_walker(node, broadcast_motion_walker, state);
return plan_tree_walker(node, rows_number_walker, context);
}


static bool
rows_number_walker(Node *node, RowsMutatorState *state)
broadcast_motion_walker(Node *node, ApplyMotionState *context)
{
Plan *currentPlan;

if (node == NULL)
return false;

if (is_plan_node(node))
{
if (((Plan *) node)->flow != NULL &&
((Plan *) node)->flow->req_move == MOVEMENT_BROADCAST)
return true;
Plan *plan = (Plan *) node;

currentPlan = (Plan *) node;
currentPlan->plan_rows *= state->scaleFactor;
if (plan->flow != NULL && plan->flow->req_move == MOVEMENT_BROADCAST)
{
context->numsegments = plan->flow->numsegments;
(void )rows_number_walker(context->from, context);
}
}
else if (IsA(node, Motion) || IsA(node, SubPlan))
context->from = node;

return plan_tree_walker(node, rows_number_walker, state);
return plan_tree_walker(node, broadcast_motion_walker, context);
}


/* -------------------------------------------------------------------------
* Function apply_motion() and apply_motion_mutator() add motion nodes to a
* top-level Plan tree as directed by the Flow nodes in the plan.
Expand All @@ -444,26 +436,22 @@ apply_motion(PlannerInfo *root, Plan *plan, Query *query)
InitPlanItem *item;
GpPolicyType targetPolicyType = POLICYTYPE_ENTRY;
ApplyMotionState state;
RowsMutatorState broadcast_motion_walker_state;
HASHCTL ctl;
HASH_SEQ_STATUS status;
bool needToAssignDirectDispatchContentIds = false;
bool bringResultToDispatcher = false;
int numsegments = getgpsegmentCount();

/* Initialize broadcast motion walker context */

planner_init_plan_tree_base(&broadcast_motion_walker_state.base, root);
broadcast_motion_walker_state.sliceRoot = (Node*)(plan);

/* Initialize mutator context. */

planner_init_plan_tree_base(&state.base, root); /* error on attempt to
* descend into subplan
* plan */
state.nextMotionID = 1; /* Start at 1 so zero will mean "unassigned". */
state.numsegments = numsegments;
state.sliceDepth = 0;
state.containMotionNodes = false;
state.from = (Node *) plan;
memset(&ctl, 0, sizeof(ctl));
ctl.keysize = sizeof(int);
ctl.entrysize = sizeof(InitPlanItem);
Expand Down Expand Up @@ -742,7 +730,7 @@ apply_motion(PlannerInfo *root, Plan *plan, Query *query)
Insist(focusPlan(plan, false, false));
}

plan_tree_walker((Node*)plan, broadcast_motion_walker, &broadcast_motion_walker_state);
(void) plan_tree_walker((Node *) plan, broadcast_motion_walker, &state);

result = (Plan *) apply_motion_mutator((Node *) plan, &state);

Expand Down
28 changes: 12 additions & 16 deletions src/backend/commands/explain.c
Original file line number Diff line number Diff line change
Expand Up @@ -1213,29 +1213,19 @@ ExplainNode(PlanState *planstate, List *ancestors,
char *skip_outer_msg = NULL;
int motion_recv;
int motion_snd;

/*
* We will divide planner estimates by this factor to produce per-segment
* estimates. In those cases when a command is executed on a single
* segment, save_currentSlice->gangSize will equal 0, so we use
* scaleFactor == 1 instead. save_currentSlice can be null if we use
* utility mode.
*/
float scaleFactor = (save_currentSlice && save_currentSlice->gangSize) ?
(float) save_currentSlice->gangSize :
1.0;
float scaleFactor = 1.0; /* we will divide planner estimates by this factor to produce
per-segment estimates */
Slice *parentSlice = NULL;

/* Remember who called us. */
parentplanstate = es->parentPlanState;
es->parentPlanState = planstate;

if (nodeTag(plan) == T_ModifyTable && CdbPathLocus_IsReplicated(*(plan->flow)))
if (save_currentSlice != NULL && save_currentSlice->gangSize > 0)
{
Assert(es->pstmt->planGen == PLANGEN_PLANNER); /* T_ModifyTable can be
* produce by planner
* only */
scaleFactor = 1.0;
scaleFactor = (float) save_currentSlice->gangSize;
}

if (plan->flow != NULL && CdbPathLocus_IsSegmentGeneral(*(plan->flow)))
{
/* Replicated table has full data on every segment */
Expand All @@ -1249,6 +1239,12 @@ ExplainNode(PlanState *planstate, List *ancestors,
break;
case T_ModifyTable:
sname = "ModifyTable";

if (plan->flow != NULL && CdbPathLocus_IsReplicated(*(plan->flow)))
{
scaleFactor = 1.0;
}

switch (((ModifyTable *) plan)->operation)
{
case CMD_INSERT:
Expand Down

0 comments on commit 8de7f0f

Please sign in to comment.