diff --git a/be/src/vec/sink/group_commit_block_sink.cpp b/be/src/vec/sink/group_commit_block_sink.cpp index 266fdf72f03930..665e31ddb3169c 100644 --- a/be/src/vec/sink/group_commit_block_sink.cpp +++ b/be/src/vec/sink/group_commit_block_sink.cpp @@ -140,6 +140,13 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state, if (block->rows() == 0) { return Status::OK(); } + for (int i = 0; i < block->columns(); ++i) { + if (block->get_by_position(i).type->is_nullable()) { + continue; + } + block->get_by_position(i).column = make_nullable(block->get_by_position(i).column); + block->get_by_position(i).type = make_nullable(block->get_by_position(i).type); + } // add block to queue auto _cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty()); { diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index ef7acb484a1e86..37402bda95a7eb 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -253,7 +253,7 @@ public void getTables(Analyzer analyzer, Map tableMap, Set columns = Lists.newArrayList(olapTable.getBaseSchema(true)); targetColumnNames = columns.stream().map(c -> c.getName()).collect(Collectors.toList()); } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java index 5f29dcfc6d82f8..838d91e319e103 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/InsertIntoTableCommand.java @@ -178,12 +178,11 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { } OlapTableSink sink = ((OlapTableSink) planner.getFragments().get(0).getSink()); - if (ctx.getSessionVariable().isEnableInsertGroupCommit()) { - // group commit - if (analyzeGroupCommit(sink, physicalOlapTableSink)) { - handleGroupCommit(ctx, sink, physicalOlapTableSink); - return; - } + // group commit + if (analyzeGroupCommit(sink, physicalOlapTableSink)) { + /*handleGroupCommit(ctx, sink, physicalOlapTableSink); + return;*/ + throw new AnalysisException("group commit is not supported in nereids now"); } Preconditions.checkArgument(!isTxnBegin, "an insert command cannot create more than one txn"); Transaction txn = new Transaction(ctx, diff --git a/regression-test/data/insert_p0/insert_group_commit_into_unique.out b/regression-test/data/insert_p0/insert_group_commit_into_unique.out index ad774645069c18..2946a07897f91b 100644 --- a/regression-test/data/insert_p0/insert_group_commit_into_unique.out +++ b/regression-test/data/insert_p0/insert_group_commit_into_unique.out @@ -12,6 +12,7 @@ 14 d 24 0 15 c 23 0 16 d 24 1 +27 e 25 0 -- !sql -- 2 b -1 0 @@ -23,6 +24,7 @@ 13 c 23 0 14 d 24 0 15 c 23 0 +27 e 25 0 -- !sql -- 1 a 10 10 1 @@ -37,6 +39,7 @@ 14 d 24 24 0 15 c 23 23 0 16 d 24 24 1 +27 e 25 25 0 -- !sql -- 2 b 30 30 0 @@ -48,6 +51,7 @@ 13 c 23 23 0 14 d 24 24 0 15 c 23 23 0 +27 e 25 25 0 -- !sql -- 1 a 200 200 1 @@ -59,6 +63,9 @@ 11 a 11 10 1 12 a 12 10 0 13 a 13 10 0 +20 b 20 8 0 +21 b 21 7 0 +22 b 22 6 0 -- !sql -- 2 b 30 200 0 @@ -68,4 +75,87 @@ 10 a 10 11 0 12 a 12 10 0 13 a 13 10 0 +20 b 20 8 0 +21 b 21 7 0 +22 b 22 6 0 + +-- !sql -- +1 a 10 1 +2 b -1 0 +3 c -1 0 +4 \N -1 0 +5 q 50 0 +6 \N -1 0 +11 a 211 0 +12 b 22 1 +13 c 23 0 +14 d 24 0 +15 c 23 0 +16 d 24 1 +27 e 25 0 + +-- !sql -- +2 b -1 0 +3 c -1 0 +4 \N -1 0 +5 q 50 0 +6 \N -1 0 +11 a 211 0 +13 c 23 0 +14 d 24 0 +15 c 23 0 +27 e 25 0 + +-- !sql -- +1 a 10 10 1 +2 b 30 30 0 +3 c 30 30 0 +4 \N 70 70 0 +5 q 50 50 0 +6 \N 60 60 0 +11 a 211 211 0 +12 b 22 22 1 +13 c 23 23 0 +14 d 24 24 0 +15 c 23 23 0 +16 d 24 24 1 +27 e 25 25 0 + +-- !sql -- +2 b 30 30 0 +3 c 30 30 0 +4 \N 70 70 0 +5 q 50 50 0 +6 \N 60 60 0 +11 a 211 211 0 +13 c 23 23 0 +14 d 24 24 0 +15 c 23 23 0 +27 e 25 25 0 + +-- !sql -- +1 a 200 200 1 +2 b 30 200 0 +3 c 30 300 0 +5 q 50 500 0 +6 \N 60 600 0 +10 a 10 11 0 +11 a 11 10 1 +12 a 12 10 0 +13 a 13 10 0 +20 b 20 8 0 +21 b 21 7 0 +22 b 22 6 0 + +-- !sql -- +2 b 30 200 0 +3 c 30 300 0 +5 q 50 500 0 +6 \N 60 600 0 +10 a 10 11 0 +12 a 12 10 0 +13 a 13 10 0 +20 b 20 8 0 +21 b 21 7 0 +22 b 22 6 0 diff --git a/regression-test/data/insert_p0/insert_with_null.out b/regression-test/data/insert_p0/insert_with_null.out index 58a7e423197739..fa56f23bc3680b 100644 --- a/regression-test/data/insert_p0/insert_with_null.out +++ b/regression-test/data/insert_p0/insert_with_null.out @@ -43,11 +43,10 @@ 4 null [] 5 NULL ["k5, k6"] 6 \N ["k7", "k8"] -7 abc \N +7 abc [] -- !sql -- 6 \N ["k7", "k8"] -- !sql -- -7 abc \N diff --git a/regression-test/data/insert_p0/test_group_commit_2.csv b/regression-test/data/insert_p0/test_group_commit_2.csv index afd03ffd2d6c92..0f81665b1e3efe 100644 --- a/regression-test/data/insert_p0/test_group_commit_2.csv +++ b/regression-test/data/insert_p0/test_group_commit_2.csv @@ -1,4 +1,5 @@ 11,a,211,0 12,b,22,1 15,c,23,0 -16,d,24,1 \ No newline at end of file +16,d,24,1 +27,e,25,0 \ No newline at end of file diff --git a/regression-test/data/insert_p0/test_group_commit_4.csv b/regression-test/data/insert_p0/test_group_commit_4.csv index 0724cfec78d51a..0b2678c40f34de 100644 --- a/regression-test/data/insert_p0/test_group_commit_4.csv +++ b/regression-test/data/insert_p0/test_group_commit_4.csv @@ -1,4 +1,7 @@ 10,a,10,11,0 11,a,11,10,1 12,a,12,9,0 -13,a,13,9,1 \ No newline at end of file +13,a,13,9,1 +20,b,20,8,0 +21,b,21,7,0 +22,b,22,6,0 \ No newline at end of file diff --git a/regression-test/suites/insert_p0/insert_group_commit_into.groovy b/regression-test/suites/insert_p0/insert_group_commit_into.groovy index 736e774fbb1d4c..8f44a5b2f66f90 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into.groovy @@ -79,6 +79,7 @@ suite("insert_group_commit_into") { assertTrue(serverInfo.contains("'status':'VISIBLE'")) assertTrue(!serverInfo.contains("'label':'group_commit_")) } + for (item in ["legacy", "nereids"]) { try { // create table @@ -106,7 +107,7 @@ suite("insert_group_commit_into") { if (item == "nereids") { sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ - sql """ set enable_fallback_to_original_planner=false; """ + //sql """ set enable_fallback_to_original_planner=false; """ } else { sql """ set enable_nereids_dml = false; """ } @@ -194,16 +195,16 @@ suite("insert_group_commit_into") { qt_sql """ select name, score from ${table} order by name asc; """ - if (item == "nereids") { + /*if (item == "nereids") { group_commit_insert """ insert into ${table}(id, name, score) values(10 + 1, 'h', 100); """, 1 group_commit_insert """ insert into ${table}(id, name, score) select 10 + 2, 'h', 100; """, 1 group_commit_insert """ insert into ${table} with label test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13, 'h', 100); """, 1 getRowCount(23) - } else { + } else {*/ none_group_commit_insert """ insert into ${table}(id, name, score) values(10 + 1, 'h', 100); """, 1 none_group_commit_insert """ insert into ${table}(id, name, score) select 10 + 2, 'h', 100; """, 1 none_group_commit_insert """ insert into ${table} with label test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13, 'h', 100); """, 1 - } + //} def rowCount = sql "select count(*) from ${table}" logger.info("row count: " + rowCount) @@ -297,7 +298,7 @@ suite("insert_group_commit_into") { if (item == "nereids") { sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ - sql """ set enable_fallback_to_original_planner=false; """ + //sql """ set enable_fallback_to_original_planner=false; """ } else { sql """ set enable_nereids_dml = false; """ } diff --git a/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy b/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy index 1caafe16b7add3..9fae43ede1bbc1 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_into_unique.groovy @@ -69,250 +69,267 @@ suite("insert_group_commit_into_unique") { } } - // 1. table without sequence column - try { - tableName = "insert_group_commit_into_unique" + "1" - dbTableName = dbName + "." + tableName - // create table - sql """ drop table if exists ${dbTableName}; """ - - sql """ - CREATE TABLE ${dbTableName} ( - `id` int(11) NOT NULL, - `name` varchar(50) NULL, - `score` int(11) NULL default "-1" - ) ENGINE=OLAP - UNIQUE KEY(`id`, `name`) - DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES ( - "replication_num" = "1" - ); - """ - - // 1. insert into - connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql """ set enable_insert_group_commit = true; """ - // TODO - sql """ set enable_nereids_dml = false; """ - - group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10),(5, 'q', 50); """, 2 - group_commit_insert """ insert into ${dbTableName}(id) select 6; """, 1 - group_commit_insert """ insert into ${dbTableName}(id) values(4); """, 1 - group_commit_insert """ insert into ${dbTableName}(name, id) values('c', 3); """, 1 - group_commit_insert """ insert into ${dbTableName}(id, name) values(2, 'b'); """, 1 - group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__) values(1, 'a', 10, 1) """, 1 - - /*getRowCount(5) - qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ - } - - // 2. stream load - streamLoad { - table "${tableName}" - - set 'column_separator', ',' - set 'group_commit', 'true' - set 'columns', 'id, name, score' - file "test_group_commit_1.csv" - unset 'label' - - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - checkStreamLoadResult(exception, result, 4, 4, 0, 0) + for (item in ["legacy", "nereids"]) { + // 1. table without sequence column + try { + tableName = "insert_group_commit_into_unique" + "1_" + item + dbTableName = dbName + "." + tableName + // create table + sql """ drop table if exists ${dbTableName}; """ + + sql """ + CREATE TABLE ${dbTableName} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ); + """ + + // 1. insert into + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set enable_insert_group_commit = true; """ + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + // sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10),(5, 'q', 50); """, 2 + group_commit_insert """ insert into ${dbTableName}(id) select 6; """, 1 + group_commit_insert """ insert into ${dbTableName}(id) values(4); """, 1 + group_commit_insert """ insert into ${dbTableName}(name, id) values('c', 3); """, 1 + group_commit_insert """ insert into ${dbTableName}(id, name) values(2, 'b'); """, 1 + group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__) values(1, 'a', 10, 1) """, 1 + + /*getRowCount(5) + qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ } - } - /*getRowCount(9) - qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ - streamLoad { - table "${tableName}" + // 2. stream load + streamLoad { + table "${tableName}" - set 'column_separator', ',' - set 'group_commit', 'true' - set 'columns', 'id, name, score, __DORIS_DELETE_SIGN__' - file "test_group_commit_2.csv" - unset 'label' + set 'column_separator', ',' + set 'group_commit', 'true' + set 'columns', 'id, name, score' + file "test_group_commit_1.csv" + unset 'label' - time 10000 // limit inflight 10s + time 10000 // limit inflight 10s - check { result, exception, startTime, endTime -> - checkStreamLoadResult(exception, result, 4, 4, 0, 0) + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } } - } - getRowCount(9) - sql """ set show_hidden_columns = true """ - qt_sql """ select id, name, score, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ - sql """ set show_hidden_columns = false """ - qt_sql """ select id, name, score, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ - } finally { - // try_sql("DROP TABLE ${dbTableName}") - } - - // 2. table with "function_column.sequence_col" - try { - tableName = "insert_group_commit_into_unique" + "2" - dbTableName = dbName + "." + tableName - // create table - sql """ drop table if exists ${dbTableName}; """ - - sql """ - CREATE TABLE ${dbTableName} ( - `id` int(11) NOT NULL, - `name` varchar(50) NULL, - `score` int(11) NULL default "-1" - ) ENGINE=OLAP - UNIQUE KEY(`id`, `name`) - DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES ( - "replication_num" = "1", - "function_column.sequence_col" = "score" - ); - """ - - // 1. insert into - connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql """ set enable_insert_group_commit = true; """ - // TODO - sql """ set enable_nereids_dml = false; """ - - group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10),(5, 'q', 50); """, 2 - group_commit_insert """ insert into ${dbTableName}(id, score) select 6, 60; """, 1 - group_commit_insert """ insert into ${dbTableName}(id, score) values(4, 70); """, 1 - group_commit_insert """ insert into ${dbTableName}(name, id, score) values('c', 3, 30); """, 1 - group_commit_insert """ insert into ${dbTableName}(score, id, name) values(30, 2, 'b'); """, 1 - group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__) values(1, 'a', 10, 1) """, 1 - - /*getRowCount(5) + /*getRowCount(9) qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ - }; - // 2. stream load - streamLoad { - table "${tableName}" + streamLoad { + table "${tableName}" - set 'column_separator', ',' - set 'group_commit', 'true' - set 'columns', 'id, name, score' - file "test_group_commit_1.csv" - unset 'label' + set 'column_separator', ',' + set 'group_commit', 'true' + set 'columns', 'id, name, score, __DORIS_DELETE_SIGN__' + file "test_group_commit_2.csv" + unset 'label' - time 10000 // limit inflight 10s + time 10000 // limit inflight 10s - check { result, exception, startTime, endTime -> - checkStreamLoadResult(exception, result, 4, 4, 0, 0) + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 5, 5, 0, 0) + } } + getRowCount(10) + sql """ set show_hidden_columns = true """ + qt_sql """ select id, name, score, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + sql """ set show_hidden_columns = false """ + qt_sql """ select id, name, score, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + } finally { + // try_sql("DROP TABLE ${dbTableName}") } - /*getRowCount(9) - qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ - streamLoad { - table "${tableName}" - - set 'column_separator', ',' - set 'group_commit', 'true' - set 'columns', 'id, name, score, __DORIS_DELETE_SIGN__' - file "test_group_commit_2.csv" - unset 'label' - - time 10000 // limit inflight 10s - - check { result, exception, startTime, endTime -> - checkStreamLoadResult(exception, result, 4, 4, 0, 0) + // 2. table with "function_column.sequence_col" + try { + tableName = "insert_group_commit_into_unique" + "2_" + item + dbTableName = dbName + "." + tableName + // create table + sql """ drop table if exists ${dbTableName}; """ + + sql """ + CREATE TABLE ${dbTableName} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "function_column.sequence_col" = "score" + ); + """ + + // 1. insert into + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set enable_insert_group_commit = true; """ + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + // sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + group_commit_insert """ insert into ${dbTableName} values (1, 'a', 10),(5, 'q', 50); """, 2 + group_commit_insert """ insert into ${dbTableName}(id, score) select 6, 60; """, 1 + group_commit_insert """ insert into ${dbTableName}(id, score) values(4, 70); """, 1 + group_commit_insert """ insert into ${dbTableName}(name, id, score) values('c', 3, 30); """, 1 + group_commit_insert """ insert into ${dbTableName}(score, id, name) values(30, 2, 'b'); """, 1 + group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__) values(1, 'a', 10, 1) """, 1 + + /*getRowCount(5) + qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ + }; + + // 2. stream load + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'group_commit', 'true' + set 'columns', 'id, name, score' + file "test_group_commit_1.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } } - } - getRowCount(9) - sql """ set show_hidden_columns = true """ - qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ - sql """ set show_hidden_columns = false """ - qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ - } finally { - // try_sql("DROP TABLE ${dbTableName}") - sql """ set show_hidden_columns = false """ - } - - // 3. table with "function_column.sequence_type" - try { - tableName = "insert_group_commit_into_unique" + "3" - dbTableName = dbName + "." + tableName - // create table - sql """ drop table if exists ${dbTableName}; """ - - sql """ - CREATE TABLE ${dbTableName} ( - `id` int(11) NOT NULL, - `name` varchar(50) NULL, - `score` int(11) NULL default "-1" - ) ENGINE=OLAP - UNIQUE KEY(`id`, `name`) - DISTRIBUTED BY HASH(`id`) BUCKETS 1 - PROPERTIES ( - "replication_num" = "1", - "function_column.sequence_type" = "int" - ); - """ - - // 1. insert into - connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { - sql """ set enable_insert_group_commit = true; """ - // TODO - sql """ set enable_nereids_dml = false; """ - - group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_SEQUENCE_COL__) values (1, 'a', 10, 100),(5, 'q', 50, 500); """, 2 - group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) select 6, 60, 600; """, 1 - group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) values(6, 50, 500); """, 1 - group_commit_insert """ insert into ${dbTableName}(name, id, score, __DORIS_SEQUENCE_COL__) values('c', 3, 30, 300); """, 1 - group_commit_insert """ insert into ${dbTableName}(score, id, name, __DORIS_SEQUENCE_COL__) values(30, 2, 'b', 200); """, 1 - group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__, __DORIS_SEQUENCE_COL__) values(1, 'a', 200, 1, 200) """, 1 - group_commit_insert """ insert into ${dbTableName}(score, id, name, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__) values(30, 2, 'b', 100, 1); """, 1 - - /*getRowCount(4) + /*getRowCount(9) qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ - }; - // 2. stream load - streamLoad { - table "${tableName}" + streamLoad { + table "${tableName}" - set 'column_separator', ',' - set 'group_commit', 'true' - set 'columns', 'id, name, score, __DORIS_SEQUENCE_COL__' - set 'function_column.sequence_col', '__DORIS_SEQUENCE_COL__' - file "test_group_commit_3.csv" - unset 'label' + set 'column_separator', ',' + set 'group_commit', 'true' + set 'columns', 'id, name, score, __DORIS_DELETE_SIGN__' + file "test_group_commit_2.csv" + unset 'label' - time 10000 // limit inflight 10s + time 10000 // limit inflight 10s - check { result, exception, startTime, endTime -> - checkStreamLoadResult(exception, result, 4, 4, 0, 0) + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 5, 5, 0, 0) + } } + getRowCount(10) + sql """ set show_hidden_columns = true """ + qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + sql """ set show_hidden_columns = false """ + qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + } finally { + // try_sql("DROP TABLE ${dbTableName}") + sql """ set show_hidden_columns = false """ } - /*getRowCount(9) - qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ - streamLoad { - table "${tableName}" + // 3. table with "function_column.sequence_type" + try { + tableName = "insert_group_commit_into_unique" + "3_" + item + dbTableName = dbName + "." + tableName + // create table + sql """ drop table if exists ${dbTableName}; """ + + sql """ + CREATE TABLE ${dbTableName} ( + `id` int(11) NOT NULL, + `name` varchar(50) NULL, + `score` int(11) NULL default "-1" + ) ENGINE=OLAP + UNIQUE KEY(`id`, `name`) + DISTRIBUTED BY HASH(`id`) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "function_column.sequence_type" = "int" + ); + """ + + // 1. insert into + connect(user = context.config.jdbcUser, password = context.config.jdbcPassword, url = context.config.jdbcUrl) { + sql """ set enable_insert_group_commit = true; """ + if (item == "nereids") { + sql """ set enable_nereids_dml = true; """ + sql """ set enable_nereids_planner=true; """ + // sql """ set enable_fallback_to_original_planner=false; """ + } else { + sql """ set enable_nereids_dml = false; """ + } + + group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_SEQUENCE_COL__) values (1, 'a', 10, 100),(5, 'q', 50, 500); """, 2 + group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) select 6, 60, 600; """, 1 + group_commit_insert """ insert into ${dbTableName}(id, score, __DORIS_SEQUENCE_COL__) values(6, 50, 500); """, 1 + group_commit_insert """ insert into ${dbTableName}(name, id, score, __DORIS_SEQUENCE_COL__) values('c', 3, 30, 300); """, 1 + group_commit_insert """ insert into ${dbTableName}(score, id, name, __DORIS_SEQUENCE_COL__) values(30, 2, 'b', 200); """, 1 + group_commit_insert """ insert into ${dbTableName}(id, name, score, __DORIS_DELETE_SIGN__, __DORIS_SEQUENCE_COL__) values(1, 'a', 200, 1, 200) """, 1 + group_commit_insert """ insert into ${dbTableName}(score, id, name, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__) values(30, 2, 'b', 100, 1); """, 1 + + /*getRowCount(4) + qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ + }; + + // 2. stream load + streamLoad { + table "${tableName}" + + set 'column_separator', ',' + set 'group_commit', 'true' + set 'columns', 'id, name, score, __DORIS_SEQUENCE_COL__' + set 'function_column.sequence_col', '__DORIS_SEQUENCE_COL__' + file "test_group_commit_3.csv" + unset 'label' + + time 10000 // limit inflight 10s + + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 4, 4, 0, 0) + } + } + /*getRowCount(9) + qt_sql """ select * from ${dbTableName} order by id, name, score asc; """*/ + + streamLoad { + table "${tableName}" - set 'column_separator', ',' - set 'group_commit', 'true' - set 'columns', 'id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__' - set 'function_column.sequence_col', '__DORIS_SEQUENCE_COL__' - file "test_group_commit_4.csv" - unset 'label' + set 'column_separator', ',' + set 'group_commit', 'true' + set 'columns', 'id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__' + set 'function_column.sequence_col', '__DORIS_SEQUENCE_COL__' + file "test_group_commit_4.csv" + unset 'label' - time 10000 // limit inflight 10s + time 10000 // limit inflight 10s - check { result, exception, startTime, endTime -> - checkStreamLoadResult(exception, result, 4, 4, 0, 0) + check { result, exception, startTime, endTime -> + checkStreamLoadResult(exception, result, 7, 7, 0, 0) + } } + getRowCount(10) + sql """ set show_hidden_columns = true """ + qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + sql """ set show_hidden_columns = false """ + qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ + } finally { + // try_sql("DROP TABLE ${dbTableName}") + sql """ set show_hidden_columns = false """ } - getRowCount(7) - sql """ set show_hidden_columns = true """ - qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ - sql """ set show_hidden_columns = false """ - qt_sql """ select id, name, score, __DORIS_SEQUENCE_COL__, __DORIS_DELETE_SIGN__ from ${dbTableName} order by id, name, score asc; """ - } finally { - // try_sql("DROP TABLE ${dbTableName}") - sql """ set show_hidden_columns = false """ } } diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy index b2c2edb204d96d..96910c0e1b8fa2 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_exception.groovy @@ -74,7 +74,7 @@ suite("insert_group_commit_with_exception") { if (item == "nereids") { sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ - sql """ set enable_fallback_to_original_planner=false; """ + //sql """ set enable_fallback_to_original_planner=false; """ } else { sql """ set enable_nereids_dml = false; """ } @@ -84,22 +84,22 @@ suite("insert_group_commit_with_exception") { def result = sql """ insert into ${table} values(1, 'a', 10, 100) """ assertTrue(false) } catch (Exception e) { - if (item == "nereids") { + /*if (item == "nereids") { assertTrue(e.getMessage().contains("insert into cols should be corresponding to the query output")) - } else { + } else {*/ assertTrue(e.getMessage().contains("Column count doesn't match value count")) - } + //} } try { def result = sql """ insert into ${table} values(2, 'b') """ assertTrue(false) } catch (Exception e) { - if (item == "nereids") { + /*if (item == "nereids") { assertTrue(e.getMessage().contains("insert into cols should be corresponding to the query output")) - } else { + } else {*/ assertTrue(e.getMessage().contains("Column count doesn't match value count")) - } + //} } result = sql """ insert into ${table} values(3, 'c', 30) """ @@ -115,33 +115,33 @@ suite("insert_group_commit_with_exception") { result = sql """ insert into ${table}(id, name) values(5, 'd', 50) """ assertTrue(false) } catch (Exception e) { - if (item == "nereids") { + /*if (item == "nereids") { assertTrue(e.getMessage().contains("insert into cols should be corresponding to the query output")) - } else { + } else {*/ assertTrue(e.getMessage().contains("Column count doesn't match value count")) - } + //} } try { result = sql """ insert into ${table}(id, name) values(6) """ assertTrue(false) } catch (Exception e) { - if (item == "nereids") { + /*if (item == "nereids") { assertTrue(e.getMessage().contains("insert into cols should be corresponding to the query output")) - } else { + } else {*/ assertTrue(e.getMessage().contains("Column count doesn't match value count")) - } + //} } try { result = sql """ insert into ${table}(id, names) values(7, 'd') """ assertTrue(false) } catch (Exception e) { - if (item == "nereids") { + /*if (item == "nereids") { assertTrue(e.getMessage().contains("column names is not found in table")) - } else { + } else {*/ assertTrue(e.getMessage().contains("Unknown column 'names'")) - } + //} } @@ -161,7 +161,7 @@ suite("insert_group_commit_with_exception") { if (item == "nereids") { statement.execute("set enable_nereids_dml = true;"); statement.execute("set enable_nereids_planner=true;"); - statement.execute("set enable_fallback_to_original_planner=false;"); + //statement.execute("set enable_fallback_to_original_planner=false;"); } else { statement.execute("set enable_nereids_dml = false;"); } diff --git a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy index c28b18d3797a62..888b294d19fdc0 100644 --- a/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy +++ b/regression-test/suites/insert_p0/insert_group_commit_with_large_data.groovy @@ -71,7 +71,7 @@ suite("insert_group_commit_with_large_data") { if (item == "nereids") { sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ - sql """ set enable_fallback_to_original_planner=false; """ + //sql """ set enable_fallback_to_original_planner=false; """ } else { sql """ set enable_nereids_dml = false; """ } diff --git a/regression-test/suites/insert_p0/insert_with_null.groovy b/regression-test/suites/insert_p0/insert_with_null.groovy index 1ac8f46b8c6ee6..66096ed22ad6a2 100644 --- a/regression-test/suites/insert_p0/insert_with_null.groovy +++ b/regression-test/suites/insert_p0/insert_with_null.groovy @@ -73,7 +73,7 @@ suite("insert_with_null") { sql """ set enable_insert_group_commit = true; """ sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ - sql """ set enable_fallback_to_original_planner=false; """ + //sql """ set enable_fallback_to_original_planner=false; """ } sql """ insert into ${table} values(1, '"b"', ["k1=v1, k2=v2"]); """ diff --git a/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy b/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy index fd5dafcdc7dd95..fc8bababad08cd 100644 --- a/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy +++ b/regression-test/suites/insert_p0/test_group_commit_interval_ms_property.groovy @@ -65,7 +65,7 @@ suite("test_group_commit_interval_ms_property") { if (item == "nereids") { sql """ set enable_nereids_dml = true; """ sql """ set enable_nereids_planner=true; """ - sql """ set enable_fallback_to_original_planner=false; """ + //sql """ set enable_fallback_to_original_planner=false; """ } else { sql """ set enable_nereids_dml = false; """ }