Skip to content

Commit

Permalink
Redo #7620: Fix merge command when insert value does not have source …
Browse files Browse the repository at this point in the history
…distributed column (#7627)

Related to issue #7619, #7620
Merge command fails when source query is single sharded and source and
target are co-located and insert is not using distribution key of
source.

Example
```
CREATE TABLE source (id integer);
CREATE TABLE target (id integer );

-- let's distribute both table on id field
SELECT create_distributed_table('source', 'id');
SELECT create_distributed_table('target', 'id');

MERGE INTO target t
  USING ( SELECT 1 AS somekey
          FROM source
        WHERE source.id = 1) s
  ON t.id = s.somekey
  WHEN NOT MATCHED
  THEN INSERT (id)
    VALUES (s.somekey)

ERROR:  MERGE INSERT must use the source table distribution column value
HINT:  MERGE INSERT must use the source table distribution column value
```

Author's Opinion: If join is not between source and target distributed
column, we should not force user to use source distributed column while
inserting value of target distributed column.

Fix: If user is not using distributed key of source for insertion let's
not push down query to workers and don't force user to use source
distributed column if it is not part of join.

This reverts commit fa4fc0b.

Co-authored-by: paragjain <[email protected]>
  • Loading branch information
JelteF and LordParag authored Jun 17, 2024
1 parent fa4fc0b commit aaaf637
Show file tree
Hide file tree
Showing 4 changed files with 470 additions and 32 deletions.
29 changes: 21 additions & 8 deletions src/backend/distributed/planner/merge_planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -182,14 +182,6 @@ CreateRouterMergePlan(Oid targetRelationId, Query *originalQuery, Query *query,
return distributedPlan;
}

Var *insertVar =
FetchAndValidateInsertVarIfExists(targetRelationId, originalQuery);
if (insertVar &&
!IsDistributionColumnInMergeSource((Expr *) insertVar, originalQuery, true))
{
ereport(ERROR, (errmsg("MERGE INSERT must use the source table "
"distribution column value")));
}

Job *job = RouterJob(originalQuery, plannerRestrictionContext,
&distributedPlan->planningError);
Expand Down Expand Up @@ -1124,6 +1116,27 @@ DeferErrorIfRoutableMergeNotSupported(Query *query, List *rangeTableList,
"repartitioning")));
return deferredError;
}


/*
* If execution has reached this point, it indicates that the query can be delegated to the worker.
* However, before proceeding with this delegation, we need to confirm that the user is utilizing
* the distribution column of the source table in the Insert variable.
* If this is not the case, we should refrain from pushing down the query.
* This is just a deffered error which will be handle by caller.
*/

Var *insertVar =
FetchAndValidateInsertVarIfExists(targetRelationId, query);
if (insertVar &&
!IsDistributionColumnInMergeSource((Expr *) insertVar, query, true))
{
ereport(DEBUG1, (errmsg(
"MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied")));
return DeferredError(ERRCODE_FEATURE_NOT_SUPPORTED,
"MERGE INSERT must use the source table distribution column value for push down to workers. Otherwise, repartitioning will be applied",
NULL, NULL);
}
return NULL;
}

Expand Down
Loading

0 comments on commit aaaf637

Please sign in to comment.