Skip to content

Commit

Permalink
[fix](group commit) Fix some group commit problems (apache#27769)
Browse files Browse the repository at this point in the history
  • Loading branch information
mymeiyi authored Nov 29, 2023
1 parent d96e2df commit 1f9aa8a
Show file tree
Hide file tree
Showing 13 changed files with 370 additions and 253 deletions.
7 changes: 7 additions & 0 deletions be/src/vec/sink/group_commit_block_sink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,13 @@ Status GroupCommitBlockSink::_add_block(RuntimeState* state,
if (block->rows() == 0) {
return Status::OK();
}
for (int i = 0; i < block->columns(); ++i) {
if (block->get_by_position(i).type->is_nullable()) {
continue;
}
block->get_by_position(i).column = make_nullable(block->get_by_position(i).column);
block->get_by_position(i).type = make_nullable(block->get_by_position(i).type);
}
// add block to queue
auto _cur_mutable_block = vectorized::MutableBlock::create_unique(block->clone_empty());
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public void getTables(Analyzer analyzer, Map<Long, TableIf> tableMap, Set<String
OlapTable olapTable = (OlapTable) table;
tblName.setDb(olapTable.getDatabase().getFullName());
tblName.setTbl(olapTable.getName());
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS) {
if (olapTable.getKeysType() == KeysType.UNIQUE_KEYS || olapTable.getTableProperty().storeRowColumn()) {
List<Column> columns = Lists.newArrayList(olapTable.getBaseSchema(true));
targetColumnNames = columns.stream().map(c -> c.getName()).collect(Collectors.toList());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,12 +178,11 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception {
}

OlapTableSink sink = ((OlapTableSink) planner.getFragments().get(0).getSink());
if (ctx.getSessionVariable().isEnableInsertGroupCommit()) {
// group commit
if (analyzeGroupCommit(sink, physicalOlapTableSink)) {
handleGroupCommit(ctx, sink, physicalOlapTableSink);
return;
}
// group commit
if (analyzeGroupCommit(sink, physicalOlapTableSink)) {
/*handleGroupCommit(ctx, sink, physicalOlapTableSink);
return;*/
throw new AnalysisException("group commit is not supported in nereids now");
}
Preconditions.checkArgument(!isTxnBegin, "an insert command cannot create more than one txn");
Transaction txn = new Transaction(ctx,
Expand Down
90 changes: 90 additions & 0 deletions regression-test/data/insert_p0/insert_group_commit_into_unique.out
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
14 d 24 0
15 c 23 0
16 d 24 1
27 e 25 0

-- !sql --
2 b -1 0
Expand All @@ -23,6 +24,7 @@
13 c 23 0
14 d 24 0
15 c 23 0
27 e 25 0

-- !sql --
1 a 10 10 1
Expand All @@ -37,6 +39,7 @@
14 d 24 24 0
15 c 23 23 0
16 d 24 24 1
27 e 25 25 0

-- !sql --
2 b 30 30 0
Expand All @@ -48,6 +51,7 @@
13 c 23 23 0
14 d 24 24 0
15 c 23 23 0
27 e 25 25 0

-- !sql --
1 a 200 200 1
Expand All @@ -59,6 +63,9 @@
11 a 11 10 1
12 a 12 10 0
13 a 13 10 0
20 b 20 8 0
21 b 21 7 0
22 b 22 6 0

-- !sql --
2 b 30 200 0
Expand All @@ -68,4 +75,87 @@
10 a 10 11 0
12 a 12 10 0
13 a 13 10 0
20 b 20 8 0
21 b 21 7 0
22 b 22 6 0

-- !sql --
1 a 10 1
2 b -1 0
3 c -1 0
4 \N -1 0
5 q 50 0
6 \N -1 0
11 a 211 0
12 b 22 1
13 c 23 0
14 d 24 0
15 c 23 0
16 d 24 1
27 e 25 0

-- !sql --
2 b -1 0
3 c -1 0
4 \N -1 0
5 q 50 0
6 \N -1 0
11 a 211 0
13 c 23 0
14 d 24 0
15 c 23 0
27 e 25 0

-- !sql --
1 a 10 10 1
2 b 30 30 0
3 c 30 30 0
4 \N 70 70 0
5 q 50 50 0
6 \N 60 60 0
11 a 211 211 0
12 b 22 22 1
13 c 23 23 0
14 d 24 24 0
15 c 23 23 0
16 d 24 24 1
27 e 25 25 0

-- !sql --
2 b 30 30 0
3 c 30 30 0
4 \N 70 70 0
5 q 50 50 0
6 \N 60 60 0
11 a 211 211 0
13 c 23 23 0
14 d 24 24 0
15 c 23 23 0
27 e 25 25 0

-- !sql --
1 a 200 200 1
2 b 30 200 0
3 c 30 300 0
5 q 50 500 0
6 \N 60 600 0
10 a 10 11 0
11 a 11 10 1
12 a 12 10 0
13 a 13 10 0
20 b 20 8 0
21 b 21 7 0
22 b 22 6 0

-- !sql --
2 b 30 200 0
3 c 30 300 0
5 q 50 500 0
6 \N 60 600 0
10 a 10 11 0
12 a 12 10 0
13 a 13 10 0
20 b 20 8 0
21 b 21 7 0
22 b 22 6 0

3 changes: 1 addition & 2 deletions regression-test/data/insert_p0/insert_with_null.out
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,10 @@
4 null []
5 NULL ["k5, k6"]
6 \N ["k7", "k8"]
7 abc \N
7 abc []

-- !sql --
6 \N ["k7", "k8"]

-- !sql --
7 abc \N

3 changes: 2 additions & 1 deletion regression-test/data/insert_p0/test_group_commit_2.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
11,a,211,0
12,b,22,1
15,c,23,0
16,d,24,1
16,d,24,1
27,e,25,0
5 changes: 4 additions & 1 deletion regression-test/data/insert_p0/test_group_commit_4.csv
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
10,a,10,11,0
11,a,11,10,1
12,a,12,9,0
13,a,13,9,1
13,a,13,9,1
20,b,20,8,0
21,b,21,7,0
22,b,22,6,0
11 changes: 6 additions & 5 deletions regression-test/suites/insert_p0/insert_group_commit_into.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ suite("insert_group_commit_into") {
assertTrue(serverInfo.contains("'status':'VISIBLE'"))
assertTrue(!serverInfo.contains("'label':'group_commit_"))
}

for (item in ["legacy", "nereids"]) {
try {
// create table
Expand Down Expand Up @@ -106,7 +107,7 @@ suite("insert_group_commit_into") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
sql """ set enable_fallback_to_original_planner=false; """
//sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
Expand Down Expand Up @@ -194,16 +195,16 @@ suite("insert_group_commit_into") {
qt_sql """ select name, score from ${table} order by name asc; """


if (item == "nereids") {
/*if (item == "nereids") {
group_commit_insert """ insert into ${table}(id, name, score) values(10 + 1, 'h', 100); """, 1
group_commit_insert """ insert into ${table}(id, name, score) select 10 + 2, 'h', 100; """, 1
group_commit_insert """ insert into ${table} with label test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13, 'h', 100); """, 1
getRowCount(23)
} else {
} else {*/
none_group_commit_insert """ insert into ${table}(id, name, score) values(10 + 1, 'h', 100); """, 1
none_group_commit_insert """ insert into ${table}(id, name, score) select 10 + 2, 'h', 100; """, 1
none_group_commit_insert """ insert into ${table} with label test_gc_""" + System.currentTimeMillis() + """ (id, name, score) values(13, 'h', 100); """, 1
}
//}

def rowCount = sql "select count(*) from ${table}"
logger.info("row count: " + rowCount)
Expand Down Expand Up @@ -297,7 +298,7 @@ suite("insert_group_commit_into") {
if (item == "nereids") {
sql """ set enable_nereids_dml = true; """
sql """ set enable_nereids_planner=true; """
sql """ set enable_fallback_to_original_planner=false; """
//sql """ set enable_fallback_to_original_planner=false; """
} else {
sql """ set enable_nereids_dml = false; """
}
Expand Down
Loading

0 comments on commit 1f9aa8a

Please sign in to comment.