From 8de7f0fbc6f0c6619b486d537523b6720d78b06d Mon Sep 17 00:00:00 2001 From: Dennis Kovalenko Date: Sun, 24 Dec 2023 23:02:40 +0400 Subject: [PATCH] Optimize broadcast_motion_walker and explain --- src/backend/cdb/cdbmutate.c | 66 ++++++++++++++-------------------- src/backend/commands/explain.c | 28 +++++++-------- 2 files changed, 39 insertions(+), 55 deletions(-) diff --git a/src/backend/cdb/cdbmutate.c b/src/backend/cdb/cdbmutate.c index 1efd75d6f1ae..da53216adaf4 100644 --- a/src/backend/cdb/cdbmutate.c +++ b/src/backend/cdb/cdbmutate.c @@ -74,8 +74,10 @@ typedef struct ApplyMotionState plan_tree_base_prefix base; /* Required prefix for * plan_tree_walker/mutator */ int nextMotionID; + int numsegments; int sliceDepth; bool containMotionNodes; + Node *from; HTAB *planid_subplans; /* hash table for InitPlanItem */ } ApplyMotionState; @@ -94,16 +96,6 @@ typedef struct List *cursorPositions; } pre_dispatch_function_evaluation_context; -typedef struct RowsMutatorState { - plan_tree_base_prefix base; - Node *sliceRoot; - int scaleFactor; -} RowsMutatorState; - -static bool broadcast_motion_walker(Node *node, RowsMutatorState *state); - -static bool rows_number_walker(Node *node, RowsMutatorState *state); - static Node *planner_make_plan_constant(struct PlannerInfo *root, Node *n, bool is_SRI); static Node *pre_dispatch_function_evaluation_mutator(Node *node, @@ -376,50 +368,50 @@ get_partitioned_policy_from_flow(Plan *plan) GP_POLICY_DEFAULT_NUMSEGMENTS()); } -/* - Make a single traversal to fixup rows number in slices which are - below Broadcast Motion nodes. - */ + static bool -broadcast_motion_walker(Node *node, RowsMutatorState *state) +rows_number_walker(Node *node, ApplyMotionState *context) { if (node == NULL) return false; - if (is_plan_node(node) && - ((Plan *) node)->flow != NULL && - ((Plan *) node)->flow->req_move == MOVEMENT_BROADCAST) + if (is_plan_node(node)) { - state->scaleFactor = ((Plan *) node)->flow->numsegments; - plan_tree_walker(state->sliceRoot, rows_number_walker, state); + Plan *plan = (Plan *) node; + + if (plan->flow != NULL && plan->flow->req_move == MOVEMENT_BROADCAST) + return true; + + plan->plan_rows *= context->numsegments; } - else if (IsA(node, Motion) || IsA(node, SubPlan)) - state->sliceRoot = node; - return plan_tree_walker(node, broadcast_motion_walker, state); + return plan_tree_walker(node, rows_number_walker, context); } + static bool -rows_number_walker(Node *node, RowsMutatorState *state) +broadcast_motion_walker(Node *node, ApplyMotionState *context) { - Plan *currentPlan; - if (node == NULL) return false; if (is_plan_node(node)) { - if (((Plan *) node)->flow != NULL && - ((Plan *) node)->flow->req_move == MOVEMENT_BROADCAST) - return true; + Plan *plan = (Plan *) node; - currentPlan = (Plan *) node; - currentPlan->plan_rows *= state->scaleFactor; + if (plan->flow != NULL && plan->flow->req_move == MOVEMENT_BROADCAST) + { + context->numsegments = plan->flow->numsegments; + (void )rows_number_walker(context->from, context); + } } + else if (IsA(node, Motion) || IsA(node, SubPlan)) + context->from = node; - return plan_tree_walker(node, rows_number_walker, state); + return plan_tree_walker(node, broadcast_motion_walker, context); } + /* ------------------------------------------------------------------------- * Function apply_motion() and apply_motion_mutator() add motion nodes to a * top-level Plan tree as directed by the Flow nodes in the plan. @@ -444,26 +436,22 @@ apply_motion(PlannerInfo *root, Plan *plan, Query *query) InitPlanItem *item; GpPolicyType targetPolicyType = POLICYTYPE_ENTRY; ApplyMotionState state; - RowsMutatorState broadcast_motion_walker_state; HASHCTL ctl; HASH_SEQ_STATUS status; bool needToAssignDirectDispatchContentIds = false; bool bringResultToDispatcher = false; int numsegments = getgpsegmentCount(); - /* Initialize broadcast motion walker context */ - - planner_init_plan_tree_base(&broadcast_motion_walker_state.base, root); - broadcast_motion_walker_state.sliceRoot = (Node*)(plan); - /* Initialize mutator context. */ planner_init_plan_tree_base(&state.base, root); /* error on attempt to * descend into subplan * plan */ state.nextMotionID = 1; /* Start at 1 so zero will mean "unassigned". */ + state.numsegments = numsegments; state.sliceDepth = 0; state.containMotionNodes = false; + state.from = (Node *) plan; memset(&ctl, 0, sizeof(ctl)); ctl.keysize = sizeof(int); ctl.entrysize = sizeof(InitPlanItem); @@ -742,7 +730,7 @@ apply_motion(PlannerInfo *root, Plan *plan, Query *query) Insist(focusPlan(plan, false, false)); } - plan_tree_walker((Node*)plan, broadcast_motion_walker, &broadcast_motion_walker_state); + (void) plan_tree_walker((Node *) plan, broadcast_motion_walker, &state); result = (Plan *) apply_motion_mutator((Node *) plan, &state); diff --git a/src/backend/commands/explain.c b/src/backend/commands/explain.c index 66f1c0361f7f..8107446e7e0f 100644 --- a/src/backend/commands/explain.c +++ b/src/backend/commands/explain.c @@ -1213,29 +1213,19 @@ ExplainNode(PlanState *planstate, List *ancestors, char *skip_outer_msg = NULL; int motion_recv; int motion_snd; - - /* - * We will divide planner estimates by this factor to produce per-segment - * estimates. In those cases when a command is executed on a single - * segment, save_currentSlice->gangSize will equal 0, so we use - * scaleFactor == 1 instead. save_currentSlice can be null if we use - * utility mode. - */ - float scaleFactor = (save_currentSlice && save_currentSlice->gangSize) ? - (float) save_currentSlice->gangSize : - 1.0; + float scaleFactor = 1.0; /* we will divide planner estimates by this factor to produce + per-segment estimates */ + Slice *parentSlice = NULL; /* Remember who called us. */ parentplanstate = es->parentPlanState; es->parentPlanState = planstate; - if (nodeTag(plan) == T_ModifyTable && CdbPathLocus_IsReplicated(*(plan->flow))) + if (save_currentSlice != NULL && save_currentSlice->gangSize > 0) { - Assert(es->pstmt->planGen == PLANGEN_PLANNER); /* T_ModifyTable can be - * produce by planner - * only */ - scaleFactor = 1.0; + scaleFactor = (float) save_currentSlice->gangSize; } + if (plan->flow != NULL && CdbPathLocus_IsSegmentGeneral(*(plan->flow))) { /* Replicated table has full data on every segment */ @@ -1249,6 +1239,12 @@ ExplainNode(PlanState *planstate, List *ancestors, break; case T_ModifyTable: sname = "ModifyTable"; + + if (plan->flow != NULL && CdbPathLocus_IsReplicated(*(plan->flow))) + { + scaleFactor = 1.0; + } + switch (((ModifyTable *) plan)->operation) { case CMD_INSERT: