Skip to content

Commit

Permalink
[Fix](partial update) Fix wrongly update autoinc column in partial up…
Browse files Browse the repository at this point in the history
…date (apache#39996)

## Proposed changes

apache#38229 convert partial update to
upsert in some situations. But when the insert stmt has missing autoinc
value column, we can't do this transformation, otherwise the value in
autoinc column will be wrongly overwritten with newly generated values
instead of reading from old rows.
  • Loading branch information
bobhan1 authored and dataroaring committed Sep 12, 2024
1 parent db3016d commit 805a33f
Show file tree
Hide file tree
Showing 10 changed files with 165 additions and 79 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1364,7 +1364,7 @@ private void trySetPartialUpdate() throws UserException {
if (hasEmptyTargetColumns) {
return;
}
boolean hasMissingColExceptAutoInc = false;
boolean hasMissingColExceptAutoIncKey = false;
for (Column col : olapTable.getFullSchema()) {
boolean exists = false;
for (Column insertCol : targetColumns) {
Expand All @@ -1377,16 +1377,16 @@ private void trySetPartialUpdate() throws UserException {
break;
}
}
if (!exists && !col.isAutoInc()) {
if (col.isKey()) {
if (!exists) {
if (col.isKey() && !col.isAutoInc()) {
throw new UserException("Partial update should include all key columns, missing: " + col.getName());
}
if (col.isVisible()) {
hasMissingColExceptAutoInc = true;
if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
hasMissingColExceptAutoIncKey = true;
}
}
}
if (!hasMissingColExceptAutoInc) {
if (!hasMissingColExceptAutoIncKey) {
return;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,7 @@ public static Plan normalizePlan(Plan plan, TableIf table, Optional<InsertComman
if (unboundLogicalSink.getColNames().isEmpty()) {
((UnboundTableSink<? extends Plan>) unboundLogicalSink).setPartialUpdate(false);
} else {
boolean hasMissingColExceptAutoInc = false;
boolean hasMissingColExceptAutoIncKey = false;
for (Column col : olapTable.getFullSchema()) {
Optional<String> insertCol = unboundLogicalSink.getColNames().stream()
.filter(c -> c.equalsIgnoreCase(col.getName())).findFirst();
Expand All @@ -296,11 +296,11 @@ public static Plan normalizePlan(Plan plan, TableIf table, Optional<InsertComman
+ " all ordinary columns referenced"
+ " by generated columns, missing: " + col.getName());
}
if (!col.isAutoInc() && !insertCol.isPresent() && col.isVisible()) {
hasMissingColExceptAutoInc = true;
if (!(col.isAutoInc() && col.isKey()) && !insertCol.isPresent() && col.isVisible()) {
hasMissingColExceptAutoIncKey = true;
}
}
if (!hasMissingColExceptAutoInc) {
if (!hasMissingColExceptAutoIncKey) {
((UnboundTableSink<? extends Plan>) unboundLogicalSink).setPartialUpdate(false);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,13 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
if (isPartialUpdate && !destTable.getEnableUniqueKeyMergeOnWrite()) {
throw new UserException("Only unique key merge on write support partial update");
}

// try to convert to upsert if only has missing auto-increment key column
boolean hasMissingColExceptAutoIncKey = false;
if (taskInfo.getColumnExprDescs().descs.isEmpty()) {
isPartialUpdate = false;
}

HashSet<String> partialUpdateInputColumns = new HashSet<>();
if (isPartialUpdate) {
for (Column col : destTable.getFullSchema()) {
Expand All @@ -171,9 +178,16 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
break;
}
}
if (col.isKey() && !existInExpr) {
throw new UserException("Partial update should include all key columns, missing: " + col.getName());
if (!existInExpr) {
if (col.isKey() && !col.isAutoInc()) {
throw new UserException("Partial update should include all key columns, missing: "
+ col.getName());
}
if (!(col.isKey() && col.isAutoInc()) && col.isVisible()) {
hasMissingColExceptAutoIncKey = true;
}
}

if (!col.getGeneratedColumnsThatReferToThis().isEmpty()
&& col.getGeneratedColumnInfo() == null && !existInExpr) {
throw new UserException("Partial update should include"
Expand All @@ -185,6 +199,9 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
partialUpdateInputColumns.add(Column.DELETE_SIGN);
}
}
if (isPartialUpdate && !hasMissingColExceptAutoIncKey) {
isPartialUpdate = false;
}
// here we should be full schema to fill the descriptor table
for (Column col : destTable.getFullSchema()) {
if (isPartialUpdate && !partialUpdateInputColumns.contains(col.getName())) {
Expand Down Expand Up @@ -252,7 +269,7 @@ public TPipelineFragmentParams plan(TUniqueId loadId, int fragmentInstanceIdInde
// The load id will pass to csv reader to find the stream load context from new load stream manager
fileScanNode.setLoadInfo(loadId, taskInfo.getTxnId(), destTable, BrokerDesc.createForStreamLoad(),
fileGroup, fileStatus, taskInfo.isStrictMode(), taskInfo.getFileType(), taskInfo.getHiddenColumns(),
taskInfo.isPartialUpdate());
isPartialUpdate);
scanNode = fileScanNode;

scanNode.init(analyzer);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
doris3
doris4
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
102,doris8
103,doris9
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
104,"doris10"
105,"doris11"
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
2,888,888
3,888,888
Original file line number Diff line number Diff line change
Expand Up @@ -2,64 +2,72 @@
-- !select_1 --
doris1
doris2

-- !select_2 --
2

-- !select_3 --
doris1
doris2
doris3
doris4

-- !select_4 --
4

-- !select_1 --
doris1
doris2

-- !select_2 --
2
4

-- !select_3 --
"doris10"
"doris11"
doris1
doris2
doris3
doris4
doris5
doris7
doris8
doris9

-- !select_4 --
4
10

-- !select_1 --
doris1
doris2
-- !select_5 --
1 10 10 10
2 20 20 20
3 30 30 30
4 40 40 40

-- !select_2 --
2
-- !select_6 --
1 99 99 10
2 888 888 20
3 888 888 30
4 40 40 40

-- !select_3 --
-- !select_1 --
doris1
doris2
doris3
doris4

-- !select_4 --
4

-- !select_1 --
doris1
doris2

-- !select_2 --
2
4

-- !select_3 --
"doris10"
"doris11"
doris1
doris2
doris3
doris4
doris5
doris7
doris8
doris9

-- !select_4 --
4
10

-- !select_5 --
1 10 10 10
2 20 20 20
3 30 30 30
4 40 40 40

-- !select_6 --
1 99 99 10
2 888 888 20
3 888 888 30
4 40 40 40

Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ suite("test_delete_from_timeout","nonConcurrent") {
GetDebugPoint().disableDebugPointForAllBEs("PushHandler::_do_streaming_ingestion.try_lock_fail")
}

sql """delete from ${tableName} where col1 = "false" and col2 = "-9999782574499444.2" and col3 = "-25"; """
sql """delete from ${tableName} where col1 = "false" and col3 = "-25"; """
t1.join()
qt_sql "select * from ${tableName} order by col1, col2, col3;"

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
Expand All @@ -20,46 +19,100 @@ suite("test_partial_update_auto_inc") {
String db = context.config.getDbNameByFile(context.file)
sql "select 1;" // to create database

for (def use_mow : [false, true]) {
for (def use_nereids_planner : [false, true]) {
logger.info("current params: use_mow: ${use_mow}, use_nereids_planner: ${use_nereids_planner}")
connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) {
sql "use ${db};"
for (def use_nereids_planner : [false, true]) {
logger.info("current params: use_nereids_planner: ${use_nereids_planner}")
connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) {
sql "use ${db};"

if (use_nereids_planner) {
sql """ set enable_nereids_planner=true; """
sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_planner = false; """
}
if (use_nereids_planner) {
sql """ set enable_nereids_planner=true; """
sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_planner = false; """
}

// create table
sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc """
sql """ CREATE TABLE test_primary_key_partial_update_auto_inc (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`name` varchar(65533) NOT NULL COMMENT "用户姓名" )
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "${use_mow}"); """
sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc """
sql """ CREATE TABLE test_primary_key_partial_update_auto_inc (
`id` BIGINT NOT NULL AUTO_INCREMENT,
`name` varchar(65533) NOT NULL COMMENT "用户姓名" )
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """

sql """ set enable_unique_key_partial_update=true; """
sql """ insert into test_primary_key_partial_update_auto_inc(name) values("doris1"); """
sql """ set enable_unique_key_partial_update=false; """
sql """ insert into test_primary_key_partial_update_auto_inc(name) values("doris2"); """
sql "sync"
sql """ set enable_unique_key_partial_update=true; """
sql "sync"
// insert stmt only misses auto-inc key column
sql """ insert into test_primary_key_partial_update_auto_inc(name) values("doris1"); """
sql """ set enable_unique_key_partial_update=false; """
sql "sync"
sql """ insert into test_primary_key_partial_update_auto_inc(name) values("doris2"); """
// stream load only misses auto-inc key column
streamLoad {
table "test_primary_key_partial_update_auto_inc"
set 'partial_columns', 'true'
set 'column_separator', ','
set 'columns', 'name'
file 'partial_update_autoinc1.csv'
time 10000
}
qt_select_1 """ select name from test_primary_key_partial_update_auto_inc order by name; """
qt_select_2 """ select count(distinct id) from test_primary_key_partial_update_auto_inc; """

qt_select_1 """ select name from test_primary_key_partial_update_auto_inc order by name; """
qt_select_2 """ select count(distinct id) from test_primary_key_partial_update_auto_inc; """
sql """ set enable_unique_key_partial_update=true; """
sql "sync"
// insert stmt withou column list
sql """ insert into test_primary_key_partial_update_auto_inc values(100,"doris5"); """
// insert stmt, column list include all visible columns
sql """ insert into test_primary_key_partial_update_auto_inc(id,name) values(102,"doris6"); """
sql """ set enable_unique_key_partial_update=false; """
sql "sync"
sql """ insert into test_primary_key_partial_update_auto_inc values(101, "doris7"); """
// stream load withou column list
streamLoad {
table "test_primary_key_partial_update_auto_inc"
set 'partial_columns', 'true'
set 'column_separator', ','
file 'partial_update_autoinc2.csv'
time 10000
}
// stream load, column list include all visible columns
streamLoad {
table "test_primary_key_partial_update_auto_inc"
set 'partial_columns', 'true'
set 'column_separator', ','
set 'columns', 'id,name'
file 'partial_update_autoinc3.csv'
time 10000
}
qt_select_3 """ select name from test_primary_key_partial_update_auto_inc order by name; """
qt_select_4 """ select count(distinct id) from test_primary_key_partial_update_auto_inc; """
sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc """

sql """ set enable_unique_key_partial_update=true; """
sql """ insert into test_primary_key_partial_update_auto_inc values(100,"doris3"); """
sql """ set enable_unique_key_partial_update=false; """
sql """ insert into test_primary_key_partial_update_auto_inc values(101, "doris4"); """
sql "sync"
qt_select_3 """ select name from test_primary_key_partial_update_auto_inc order by name; """
qt_select_4 """ select count(distinct id) from test_primary_key_partial_update_auto_inc; """

sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc """
sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc2 """
sql """ CREATE TABLE test_primary_key_partial_update_auto_inc2 (
`id` BIGINT NOT NULL,
`c1` int,
`c2` int,
`cid` BIGINT NOT NULL AUTO_INCREMENT)
UNIQUE KEY(`id`) DISTRIBUTED BY HASH(`id`) BUCKETS 1
PROPERTIES("replication_num" = "1", "enable_unique_key_merge_on_write" = "true"); """
sql "insert into test_primary_key_partial_update_auto_inc2 values(1,10,10,10),(2,20,20,20),(3,30,30,30),(4,40,40,40);"
order_qt_select_5 "select * from test_primary_key_partial_update_auto_inc2"
sql """ set enable_unique_key_partial_update=true; """
sql "sync;"
// insert stmt only misses auto-inc value column, its value should not change when do partial update
sql "insert into test_primary_key_partial_update_auto_inc2(id,c1,c2) values(1,99,99),(2,99,99);"
// stream load only misses auto-inc value column, its value should not change when do partial update
streamLoad {
table "test_primary_key_partial_update_auto_inc2"
set 'partial_columns', 'true'
set 'column_separator', ','
set 'columns', 'id,c1,c2'
file 'partial_update_autoinc4.csv'
time 10000
}
order_qt_select_6 "select * from test_primary_key_partial_update_auto_inc2"
sql """ DROP TABLE IF EXISTS test_primary_key_partial_update_auto_inc2 """
}
}
}

0 comments on commit 805a33f

Please sign in to comment.