diff --git a/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/stress_test_insert_into.yaml b/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/stress_test_insert_into.yaml index a45fb01c316695..cdd5abcf8b9e81 100644 --- a/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/stress_test_insert_into.yaml +++ b/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/stress_test_insert_into.yaml @@ -1,21 +1,21 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. tables: - stress_source: - create_date: - range: {min: "2023-07-01", max: "2024-01-10"} \ No newline at end of file + stress_source: + create_date: + range: {min: "2023-07-01", max: "2024-01-10"} diff --git a/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/two_stream_load_conflict.yaml b/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/two_stream_load_conflict.yaml index cd9782a3b33192..77928036e21787 100644 --- a/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/two_stream_load_conflict.yaml +++ b/regression-test/suites/partition_p1/auto_partition/doris_dbgen_conf/two_stream_load_conflict.yaml @@ -1,19 +1,19 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. tables: small_data_high_concurrent_load_range: diff --git a/regression-test/suites/partition_p2/auto_partition/ddl/create_list_part_data_table.sql b/regression-test/suites/partition_p2/auto_partition/ddl/create_list_part_data_table.sql new file mode 100644 index 00000000000000..221129306030d1 --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/ddl/create_list_part_data_table.sql @@ -0,0 +1,46 @@ +CREATE TABLE `test2`( + `col1` bigint not null, + `col2` boolean, + `col3` tinyint, + `col4` date, + `col5` float, + `col6` double, + `col7` string, + `col8` varchar(128), + `col9` decimal(9, 3), + `col10` char(128), + `col11` bigint, + `col12` boolean, + `col13` tinyint, + `col14` date, + `col15` float, + `col16` double, + `col17` string, + `col18` varchar(128), + `col19` decimal(9, 3), + `col20` char(128), + `col21` bigint, + `col22` boolean, + `col23` tinyint, + `col24` date, + `col25` float, + `col26` double, + `col27` string, + `col28` varchar(128), + `col29` decimal(9, 3), + `col30` char(128), + `col31` bigint, + `col32` boolean, + `col33` tinyint, + `col34` date, + `col35` float, + `col36` double, + `col37` string, + `col38` varchar(128), + `col39` decimal(9, 3), + `col40` char(128) +) UNIQUE KEY(`col1`) +DISTRIBUTED BY HASH(`col1`) BUCKETS 10 +PROPERTIES ( + "replication_num" = "1" +); \ No newline at end of file diff --git a/regression-test/suites/partition_p2/auto_partition/ddl/create_range_part_data_table.sql b/regression-test/suites/partition_p2/auto_partition/ddl/create_range_part_data_table.sql new file mode 100644 index 00000000000000..556e5e3fe87643 --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/ddl/create_range_part_data_table.sql @@ -0,0 +1,46 @@ +CREATE TABLE `test1`( + `col1` date not null, + `col2` boolean, + `col3` tinyint, + `col4` date, + `col5` float, + `col6` double, + `col7` string, + `col8` varchar(128), + `col9` decimal(9, 3), + `col10` char(128), + `col11` bigint, + `col12` boolean, + `col13` tinyint, + `col14` date, + `col15` float, + `col16` double, + `col17` string, + `col18` varchar(128), + `col19` decimal(9, 3), + `col20` char(128), + `col21` bigint, + `col22` boolean, + `col23` tinyint, + `col24` date, + `col25` float, + `col26` double, + `col27` string, + `col28` varchar(128), + `col29` decimal(9, 3), + `col30` char(128), + `col31` bigint, + `col32` boolean, + `col33` tinyint, + `col34` date, + `col35` float, + `col36` double, + `col37` string, + `col38` varchar(128), + `col39` decimal(9, 3), + `col40` char(128) +) UNIQUE KEY(`col1`) +DISTRIBUTED BY HASH(`col1`) BUCKETS 10 +PROPERTIES ( + "replication_num" = "1" +); \ No newline at end of file diff --git a/regression-test/suites/partition_p2/auto_partition/ddl/small_data_high_concurrrent_load.sql b/regression-test/suites/partition_p2/auto_partition/ddl/small_data_high_concurrrent_load.sql new file mode 100644 index 00000000000000..a85d7ce6c68f7b --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/ddl/small_data_high_concurrrent_load.sql @@ -0,0 +1,13 @@ +CREATE TABLE `small_data_high_concurrent_load_range`( + `col1` datetimev2 not null, + `col2` varchar(128), + `col3` decimal(9, 3), + `col4` date +) duplicate KEY(`col1`) +AUTO PARTITION BY range date_trunc(`col1`, 'day') +( +) +DISTRIBUTED BY HASH(`col1`) BUCKETS 10 +PROPERTIES ( + "replication_num" = "1" +); \ No newline at end of file diff --git a/regression-test/suites/partition_p2/auto_partition/ddl/stream_load_list_test_table.sql b/regression-test/suites/partition_p2/auto_partition/ddl/stream_load_list_test_table.sql new file mode 100644 index 00000000000000..3866d2a81f108d --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/ddl/stream_load_list_test_table.sql @@ -0,0 +1,49 @@ +CREATE TABLE `stream_load_list_test_table`( + `col1` bigint not null, + `col2` boolean, + `col3` tinyint, + `col4` date, + `col5` float, + `col6` double, + `col7` string, + `col8` varchar(128), + `col9` decimal(9, 3), + `col10` char(128), + `col11` bigint, + `col12` boolean, + `col13` tinyint, + `col14` date, + `col15` float, + `col16` double, + `col17` string, + `col18` varchar(128), + `col19` decimal(9, 3), + `col20` char(128), + `col21` bigint, + `col22` boolean, + `col23` tinyint, + `col24` date, + `col25` float, + `col26` double, + `col27` string, + `col28` varchar(128), + `col29` decimal(9, 3), + `col30` char(128), + `col31` bigint, + `col32` boolean, + `col33` tinyint, + `col34` date, + `col35` float, + `col36` double, + `col37` string, + `col38` varchar(128), + `col39` decimal(9, 3), + `col40` char(128) +) UNIQUE KEY(`col1`) +AUTO PARTITION BY list(`col1`) +( +) +DISTRIBUTED BY HASH(`col1`) BUCKETS 10 +PROPERTIES ( + "replication_num" = "1" +); \ No newline at end of file diff --git a/regression-test/suites/partition_p2/auto_partition/ddl/stream_load_range_test_table.sql b/regression-test/suites/partition_p2/auto_partition/ddl/stream_load_range_test_table.sql new file mode 100644 index 00000000000000..f64b1a65a50324 --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/ddl/stream_load_range_test_table.sql @@ -0,0 +1,49 @@ +CREATE TABLE `stream_load_range_test_table`( + `col1` datetimev2 not null, + `col2` boolean, + `col3` tinyint, + `col4` date, + `col5` float, + `col6` double, + `col7` string, + `col8` varchar(128), + `col9` decimal(9, 3), + `col10` char(128), + `col11` bigint, + `col12` boolean, + `col13` tinyint, + `col14` date, + `col15` float, + `col16` double, + `col17` string, + `col18` varchar(128), + `col19` decimal(9, 3), + `col20` char(128), + `col21` bigint, + `col22` boolean, + `col23` tinyint, + `col24` date, + `col25` float, + `col26` double, + `col27` string, + `col28` varchar(128), + `col29` decimal(9, 3), + `col30` char(128), + `col31` bigint, + `col32` boolean, + `col33` tinyint, + `col34` date, + `col35` float, + `col36` double, + `col37` string, + `col38` varchar(128), + `col39` decimal(9, 3), + `col40` char(128) +) UNIQUE KEY(`col1`) +AUTO PARTITION BY range date_trunc(`col1`, 'day') +( +) +DISTRIBUTED BY HASH(`col1`) BUCKETS 10 +PROPERTIES ( + "replication_num" = "1" +); \ No newline at end of file diff --git a/regression-test/suites/partition_p2/auto_partition/ddl/two_streamload_table1.sql b/regression-test/suites/partition_p2/auto_partition/ddl/two_streamload_table1.sql new file mode 100644 index 00000000000000..486c6df5117b14 --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/ddl/two_streamload_table1.sql @@ -0,0 +1,14 @@ +CREATE TABLE `two_streamload_list1`( + `col1` bigint not null, + `col5` bigint, + `col2` boolean, + `col3` tinyint, + `col4` date +) DUPLICATE KEY(`col1`) +AUTO PARTITION BY list(`col1`) +( +) +DISTRIBUTED BY HASH(`col1`) BUCKETS 10 +PROPERTIES ( +"replication_num" = "1" +); \ No newline at end of file diff --git a/regression-test/suites/partition_p2/auto_partition/ddl/two_streamload_table2.sql b/regression-test/suites/partition_p2/auto_partition/ddl/two_streamload_table2.sql new file mode 100644 index 00000000000000..ed36d3cf58b508 --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/ddl/two_streamload_table2.sql @@ -0,0 +1,14 @@ +CREATE TABLE `two_streamload_list2`( + `col1` bigint not null, + `col5` bigint, + `col2` boolean, + `col3` tinyint, + `col4` date +) DUPLICATE KEY(`col1`) +AUTO PARTITION BY list(`col1`) +( +) +DISTRIBUTED BY HASH(`col1`) BUCKETS 10 +PROPERTIES ( +"replication_num" = "1" +); \ No newline at end of file diff --git a/regression-test/suites/partition_p2/auto_partition/diff_data/stress_test_diff_date_list.groovy b/regression-test/suites/partition_p2/auto_partition/diff_data/stress_test_diff_date_list.groovy new file mode 100644 index 00000000000000..fa31ed66747813 --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/diff_data/stress_test_diff_date_list.groovy @@ -0,0 +1,226 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.io.FileType +import java.nio.file.Files +import java.nio.file.Paths +import java.net.URL +import java.io.File + +suite("stress_test_diff_date_list") { + + sql """ADMIN SET FRONTEND CONFIG ('max_auto_partition_num' = '5000')""" + + // get doris-db from s3 + def dirPath = context.file.parent + def fileName = "doris-dbgen" + def fileUrl = "http://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/doris-dbgen-23-10-18/doris-dbgen-23-10-20/doris-dbgen" + def filePath = Paths.get(dirPath, fileName) + if (!Files.exists(filePath)) { + new URL(fileUrl).withInputStream { inputStream -> + Files.copy(inputStream, filePath) + } + def file = new File(dirPath + "/" + fileName) + file.setExecutable(true) + } + + def data_count = 2 + def cur_rows = 1000000 + + // 用doris-dbgen生成数据文件 + def doris_dbgen_create_data = { db_name, tb_name, part_type, i -> + def rows = cur_rows // total rows to load + def bulkSize = rows + def tableName = tb_name + + def jdbcUrl = context.config.jdbcUrl + def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port + if (urlWithoutSchema.indexOf("/") >= 0) { + // e.g: jdbc:mysql://locahost:8080/?a=b + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) + } else { + // e.g: jdbc:mysql://locahost:8080 + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) + } + String feHttpAddress = context.config.feHttpAddress + def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1) + + String realDb = db_name + String user = context.config.jdbcUser + String password = context.config.jdbcPassword + + + def cm + if (password) { + cm = """${context.file.parent}/doris-dbgen gen --host ${sql_ip} --sql-port ${sql_port} --user ${user} --pass ${password} --database ${realDb} --table ${tableName} --rows ${rows} --bulk-size ${bulkSize} --http-port ${http_port} --config ${context.file.parent}/../doris_dbgen_conf/two_stream_load_conflict.yaml --save-to-dir ${context.file.parent}/${part_type}_${i}/""" + } else { + cm = """${context.file.parent}/doris-dbgen gen --host ${sql_ip} --sql-port ${sql_port} --user ${user} --database ${realDb} --table ${tableName} --rows ${rows} --bulk-size ${bulkSize} --http-port ${http_port} --config ${context.file.parent}/../doris_dbgen_conf/two_stream_load_conflict.yaml --save-to-dir ${context.file.parent}/${part_type}_${i}/""" + } + logger.info("command is: " + cm) + def proc = cm.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + + } + + def write_to_file = { cur_path, content -> + File file = new File(cur_path) + file.write(content) + } + + def cm1 + def cm2 + def sql_port_res = sql """show backends;""" + println(sql_port_res) + if (sql_port_res.size < 2) { + assert(false) + } + def be_http_1 = sql_port_res[0][1] + def be_http_2 = sql_port_res[1][1] + def be_port_1 = sql_port_res[0][4] + def be_port_2 = sql_port_res[1][4] + + def doris_dbgen_load_data = { db_name, tb_name, part_type, i -> + def tableName = tb_name + + def jdbcUrl = context.config.jdbcUrl + def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + + String realDb = db_name + String user = context.config.jdbcUser + String password = context.config.jdbcPassword + + def list = [] + def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i) + dir.eachFileRecurse (FileType.FILES) { file -> + list << file + } + + if (password) { + if (i == 1) { + cm1 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load""" + } else if (i == 2) { + cm2 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load""" + } + } else { + if (i == 1) { + cm1 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load""" + } else if (i == 2) { + cm2 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load""" + } + } + logger.info("command is: " + cm1) + logger.info("command is: " + cm2) + + def load_path_1 = """${context.file.parent}/thread_load_1.sh""" + if (i == 1) { + write_to_file(load_path_1, cm1) + cm1 = "bash " + load_path_1 + } + + def load_path_2 = """${context.file.parent}/thread_load_2.sh""" + if (i == 2) { + write_to_file(load_path_2, cm2) + cm2 = "bash " + load_path_2 + } + + } + + def data_delete = { part_type -> + + for (int i = 1; i <= data_count; i++) { + def list = [] + def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i) + if (dir.exists()) { + dir.eachFileRecurse (FileType.FILES) { file -> + list << file + } + logger.info("rm -rf " + dir) + ("rm -rf " + dir).execute().text + } + } + } + + def append_to_file = { content -> + + def list = [] + def dir = new File("""${context.file.parent}""" + "/list_1") + dir.eachFileRecurse (FileType.FILES) { file -> + list << file + } + println(list[0]) + + File file = list[0] + file.append(content) + } + + String db = context.config.getDbNameByFile(context.file) + def database_name = db + def tb_name1 = "two_streamload_list1" + def tb_name2 = "two_streamload_list2" + + sql """create database if not exists ${database_name};""" + sql """use ${database_name};""" + sql """drop table if exists ${tb_name1};""" + sql """drop table if exists ${tb_name2};""" + sql new File("""${context.file.parent}/../ddl/two_streamload_table1.sql""").text + sql new File("""${context.file.parent}/../ddl/two_streamload_table2.sql""").text + + data_delete("list") + doris_dbgen_create_data(database_name, tb_name1, "list", 1) + def add_one_row = "2|1|TRUE|109|2022-12-10" + append_to_file(add_one_row) + doris_dbgen_create_data(database_name, tb_name2, "list", 2) + + doris_dbgen_load_data(database_name, tb_name1, "list", 1) + doris_dbgen_load_data(database_name, tb_name1, "list", 2) + + def thread3 = Thread.start { + logger.info("load1 start") + def proc = cm1.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + } + def thread4 = Thread.start { + sleep(1 * 1000) + logger.info("load2 start") + def proc = cm2.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + } + thread3.join() + thread4.join() + + // TEST-BODY + def origin_count = sql """select count(*) from ${database_name}.${tb_name1}""" + def origin_part = sql """show partitions from ${database_name}.${tb_name1}""" + def count_rows = origin_count[0][0] + // check data count + assertTrue(count_rows == cur_rows * data_count + 1) + assertTrue(origin_part.size() == 2) + + sql """ADMIN SET FRONTEND CONFIG ('max_auto_partition_num' = '2000')""" +} diff --git a/regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_1.sh b/regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_1.sh new file mode 100644 index 00000000000000..6edd950cc7c2ca --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_1.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_2.sh b/regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_2.sh new file mode 100644 index 00000000000000..6edd950cc7c2ca --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/diff_data/thread_load_2.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/regression-test/suites/partition_p2/auto_partition/doris_dbgen_conf/10w_part_doris_dbgen.yaml b/regression-test/suites/partition_p2/auto_partition/doris_dbgen_conf/10w_part_doris_dbgen.yaml new file mode 100644 index 00000000000000..c7e99b29c4275f --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/doris_dbgen_conf/10w_part_doris_dbgen.yaml @@ -0,0 +1,38 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +tables: + test1: + col1: + range: {min: "1990-01-01", max: "2020-12-31"} + force_not_null: true + test2: + col1: + range: { min: 1, max: 100000000 } + force_not_null: true + stream_load_list_test_table: + col1: + range: { min: 1, max: 100000000 } + force_not_null: true + stream_load_list_test_table_normal: + d4: # column name + range: { min: 1, max: 100000000 } + force_not_null: true + stream_load_range_test_table: + col1: + range: {min: "1990-01-01", max: "2020-12-31"} + force_not_null: true diff --git a/regression-test/suites/partition_p2/auto_partition/doris_dbgen_conf/two_stream_load_conflict.yaml b/regression-test/suites/partition_p2/auto_partition/doris_dbgen_conf/two_stream_load_conflict.yaml new file mode 100644 index 00000000000000..b18db8db37065d --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/doris_dbgen_conf/two_stream_load_conflict.yaml @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +tables: + small_data_high_concurrent_load_range: + col1: + range: {min: "2020-01-01 00:00:00", max: "2023-12-31 23:59:59"} + force_not_null: true + two_streamload_list1: + col1: + enumvals: [ 1 ] + two_streamload_list2: + col1: + enumvals: [ 2 ] + stream_load_range_test_table: + col1: + range: { min: "2020-01-01", max: "2023-12-31" } + force_not_null: true + test1: + col1: + range: { min: "2020-01-01", max: "2023-12-31" } + force_not_null: true diff --git a/regression-test/suites/partition_p2/auto_partition/high_concur_load/stress_test_high_concurrency_load.groovy b/regression-test/suites/partition_p2/auto_partition/high_concur_load/stress_test_high_concurrency_load.groovy new file mode 100644 index 00000000000000..46c568b50c5902 --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/high_concur_load/stress_test_high_concurrency_load.groovy @@ -0,0 +1,193 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.io.FileType +import java.nio.file.Files +import java.nio.file.Paths +import java.net.URL +import java.io.File + +suite("stress_test_high_concurrency_load") { + + sql """ADMIN SET FRONTEND CONFIG ('max_auto_partition_num' = '5000')""" + + // get doris-db from s3 + def dirPath = context.file.parent + def fatherPath = context.file.parentFile.parentFile.getPath() + def fileName = "doris-dbgen" + def fileUrl = "${getS3Url()}/regression/doris-dbgen-23-10-18/doris-dbgen-23-10-20/doris-dbgen" + def filePath = Paths.get(dirPath, fileName) + if (!Files.exists(filePath)) { + new URL(fileUrl).withInputStream { inputStream -> + Files.copy(inputStream, filePath) + } + def file = new File(dirPath + "/" + fileName) + file.setExecutable(true) + } + + def data_count = 10 + def cur_rows = 100000 + + // 用doris-dbgen生成数据文件 + def doris_dbgen_create_data = { db_name, tb_name, part_type -> + def rows = cur_rows // total rows to load + def bulkSize = rows + def tableName = tb_name + + def jdbcUrl = context.config.jdbcUrl + def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port + if (urlWithoutSchema.indexOf("/") >= 0) { + // e.g: jdbc:mysql://locahost:8080/?a=b + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) + } else { + // e.g: jdbc:mysql://locahost:8080 + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) + } + String feHttpAddress = context.config.feHttpAddress + def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1) + + String realDb = db_name + String user = context.config.jdbcUser + String password = context.config.jdbcPassword + + for (int i = 0; i < data_count; i++) { + def cm + if (password) { + cm = """${context.file.parent}/doris-dbgen gen --host ${sql_ip} --sql-port ${sql_port} --user ${user} --pass ${password} --database ${realDb} --table ${tableName} --rows ${rows} --bulk-size ${bulkSize} --http-port ${http_port} --config ${context.file.parent}/../doris_dbgen_conf/two_stream_load_conflict.yaml --save-to-dir ${context.file.parent}/${part_type}/${part_type}_${i}/""" + } else { + cm = """${context.file.parent}/doris-dbgen gen --host ${sql_ip} --sql-port ${sql_port} --user ${user} --database ${realDb} --table ${tableName} --rows ${rows} --bulk-size ${bulkSize} --http-port ${http_port} --config ${context.file.parent}/../doris_dbgen_conf/two_stream_load_conflict.yaml --save-to-dir ${context.file.parent}/${part_type}/${part_type}_${i}/""" + } + logger.info("command is: " + cm) + def proc = cm.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + } + } + + def write_to_file = { cur_path, content -> + File file = new File(cur_path) + file.write(content) + } + + def cm_list = [] + def doris_dbgen_load_data = { db_name, tb_name, part_type -> + def tableName = tb_name + + def jdbcUrl = context.config.jdbcUrl + def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port + if (urlWithoutSchema.indexOf("/") >= 0) { + // e.g: jdbc:mysql://locahost:8080/?a=b + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) + } else { + // e.g: jdbc:mysql://locahost:8080 + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) + } + String feHttpAddress = context.config.feHttpAddress + def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1) + + String realDb = db_name + String user = context.config.jdbcUser + String password = context.config.jdbcPassword + + + for (int i = 0; i < data_count; i++) { + def cm = "" + def list = [] + def dir = new File("""${context.file.parent}""" + "/" + part_type + "/" + part_type + "_" + i) + dir.eachFileRecurse (FileType.FILES) { file -> + list << file + } + + if (password) { + cm = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${sql_ip}:${http_port}/api/${realDb}/${tableName}/_stream_load""" + } else { + cm = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${sql_ip}:${http_port}/api/${realDb}/${tableName}/_stream_load""" + } + logger.info("command is: " + cm) + + def load_path = """${context.file.parent}/range/thread_load_${i}.sh""" + write_to_file(load_path, cm) + cm = """bash ${context.file.parent}/range/thread_load_${i}.sh""" + def cm_copy = cm + cm_list = cm_list.toList() + [cm_copy] + } + } + + def data_delete = { part_type -> + + def sql_cm = """rm -rf ${context.file.parent}/${part_type}""" + sql_cm.execute() + } + + String db = context.config.getDbNameByFile(context.file) + def database_name = db + def tb_name2 = "small_data_high_concurrent_load_range" + + sql """create database if not exists ${database_name};""" + sql """use ${database_name};""" + sql """drop table if exists ${tb_name2};""" + sql new File("""${context.file.parent}/../ddl/small_data_high_concurrrent_load.sql""").text + + data_delete("range") + doris_dbgen_create_data(database_name, tb_name2, "range") + doris_dbgen_load_data(database_name, tb_name2, "range") + + def thread_thread_1000 = [] + def concurrent_load = { str -> + logger.info("load1 start:" + str) + logger.info("cm: " + str) + def proc = str.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + } + + thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[0])}) + thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[1])}) + thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[2])}) + thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[3])}) + thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[4])}) + thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[5])}) + thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[6])}) + thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[7])}) + thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[8])}) + thread_thread_1000.add(Thread.startDaemon {concurrent_load(cm_list[9])}) + + for (Thread th in thread_thread_1000) { + th.join() + } + + def row_count_range = sql """select count(*) from ${tb_name2};""" + assertTrue(cur_rows * data_count == row_count_range[0][0]) + def partition_res_range = sql """show partitions from ${tb_name2} order by PartitionName;""" + for (int i = 0; i < partition_res_range.size(); i++) { + for (int j = i+1; j < partition_res_range.size(); j++) { + if (partition_res_range[i][6] == partition_res_range[j][6]) { + assertTrue(false) + } + } + } + + sql """ADMIN SET FRONTEND CONFIG ('max_auto_partition_num' = '2000')""" +} diff --git a/regression-test/suites/partition_p2/auto_partition/same_data/stress_test_same_date_range.groovy b/regression-test/suites/partition_p2/auto_partition/same_data/stress_test_same_date_range.groovy new file mode 100644 index 00000000000000..f25052a5e17209 --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/same_data/stress_test_same_date_range.groovy @@ -0,0 +1,201 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.io.FileType +import java.nio.file.Files +import java.nio.file.Paths +import java.net.URL +import java.io.File + +suite("stress_test_same_date_range") { + + sql """ADMIN SET FRONTEND CONFIG ('max_auto_partition_num' = '5000')""" + + // get doris-db from s3 + def dirPath = context.file.parent + def fileName = "doris-dbgen" + def fileUrl = "http://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/doris-dbgen-23-10-18/doris-dbgen-23-10-20/doris-dbgen" + def filePath = Paths.get(dirPath, fileName) + if (!Files.exists(filePath)) { + new URL(fileUrl).withInputStream { inputStream -> + Files.copy(inputStream, filePath) + } + def file = new File(dirPath + "/" + fileName) + file.setExecutable(true) + } + + def data_count = 1 + def cur_rows = 100000 + + // 用doris-dbgen生成数据文件 + def doris_dbgen_create_data = { db_name, tb_name, part_type -> + def rows = cur_rows // total rows to load + def bulkSize = rows + def tableName = tb_name + + def jdbcUrl = context.config.jdbcUrl + def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port + if (urlWithoutSchema.indexOf("/") >= 0) { + // e.g: jdbc:mysql://locahost:8080/?a=b + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) + } else { + // e.g: jdbc:mysql://locahost:8080 + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) + } + String feHttpAddress = context.config.feHttpAddress + def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1) + + String realDb = db_name + String user = context.config.jdbcUser + String password = context.config.jdbcPassword + + for (int i = 1; i <= data_count; i++) { + def cm + if (password) { + cm = """${context.file.parent}/doris-dbgen gen --host ${sql_ip} --sql-port ${sql_port} --user ${user} --pass ${password} --database ${realDb} --table ${tableName} --rows ${rows} --bulk-size ${bulkSize} --http-port ${http_port} --config ${context.file.parent}/../doris_dbgen_conf/two_stream_load_conflict.yaml --save-to-dir ${context.file.parent}/${part_type}_${i}/""" + } else { + cm = """${context.file.parent}/doris-dbgen gen --host ${sql_ip} --sql-port ${sql_port} --user ${user} --database ${realDb} --table ${tableName} --rows ${rows} --bulk-size ${bulkSize} --http-port ${http_port} --config ${context.file.parent}/../doris_dbgen_conf/two_stream_load_conflict.yaml --save-to-dir ${context.file.parent}/${part_type}_${i}/""" + } + logger.info("command is: " + cm) + def proc = cm.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + } + } + + def write_to_file = { cur_path, content -> + File file = new File(cur_path) + file.write(content) + } + + def cm1 + def cm2 + def doris_dbgen_load_data = { db_name, tb_name, part_type -> + def tableName = tb_name + + def jdbcUrl = context.config.jdbcUrl + def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port_res = sql """show backends;""" + println(sql_port_res) + if (sql_port_res.size < 2) { + assert(false) + } + def be_http_1 = sql_port_res[0][1] + def be_http_2 = sql_port_res[1][1] + def be_port_1 = sql_port_res[0][4] + def be_port_2 = sql_port_res[1][4] + + String realDb = db_name + String user = context.config.jdbcUser + String password = context.config.jdbcPassword + + for (int i = 1; i <= data_count; i++) { + def list = [] + def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i) + dir.eachFileRecurse (FileType.FILES) { file -> + list << file + } + + if (password) { + cm1 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load""" + cm2 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load""" + } else { + cm1 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load""" + cm2 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load""" + } + logger.info("command is: " + cm1) + logger.info("command is: " + cm2) + + def load_path_1 = """${context.file.parent}/thread_load_1.sh""" + write_to_file(load_path_1, cm1) + cm1 = "bash " + load_path_1 + + def load_path_2 = """${context.file.parent}/thread_load_2.sh""" + write_to_file(load_path_2, cm2) + cm2 = "bash " + load_path_2 + + } + } + + def data_delete = { part_type -> + + for (int i = 1; i <= data_count; i++) { + def list = [] + def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i) + if (dir.exists()) { + dir.eachFileRecurse(FileType.FILES) { file -> + list << file + } + logger.info("rm -rf " + dir) + ("rm -rf " + dir).execute().text + } + } + } + + String db = context.config.getDbNameByFile(context.file) + def database_name = db + def tb_name1 = "test1" + def tb_name2 = "stream_load_range_test_table" + + sql """create database if not exists ${database_name};""" + sql """use ${database_name};""" + sql """drop table if exists ${tb_name1};""" + sql """drop table if exists ${tb_name2};""" + sql new File("""${context.file.parent}/../ddl/create_range_part_data_table.sql""").text + sql new File("""${context.file.parent}/../ddl/stream_load_range_test_table.sql""").text + + data_delete("range") + doris_dbgen_create_data(database_name, tb_name1, "range") + doris_dbgen_load_data(database_name, tb_name2, "range") + + def thread1 = Thread.start { + logger.info("load1 start") + def proc = cm1.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + } + def thread2 = Thread.start { + logger.info("load2 start") + def proc = cm2.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + } + thread1.join() + thread2.join() + + def row_count_range = sql """select count(*) from ${tb_name2};""" + def partition_res_range = sql """show partitions from ${tb_name2};""" + assertTrue(row_count_range[0][0] == partition_res_range.size) + def part_context = [] + + for (int i = 0; i < partition_res_range.size; i++) { + part_context.add(partition_res_range[i][6]) + } + def part_context_unique = part_context.clone().unique() + assertTrue(part_context.size == part_context_unique.size) + + sql """ADMIN SET FRONTEND CONFIG ('max_auto_partition_num' = '2000')""" +} diff --git a/regression-test/suites/partition_p2/auto_partition/same_data/thread_load_1.sh b/regression-test/suites/partition_p2/auto_partition/same_data/thread_load_1.sh new file mode 100644 index 00000000000000..6edd950cc7c2ca --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/same_data/thread_load_1.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/regression-test/suites/partition_p2/auto_partition/same_data/thread_load_2.sh b/regression-test/suites/partition_p2/auto_partition/same_data/thread_load_2.sh new file mode 100644 index 00000000000000..6edd950cc7c2ca --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/same_data/thread_load_2.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/regression-test/suites/partition_p2/auto_partition/two_stream_load/stress_test_two_stream_load.groovy b/regression-test/suites/partition_p2/auto_partition/two_stream_load/stress_test_two_stream_load.groovy new file mode 100644 index 00000000000000..ae795e3e85422d --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/two_stream_load/stress_test_two_stream_load.groovy @@ -0,0 +1,231 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import groovy.io.FileType +import java.nio.file.Files +import java.nio.file.Paths +import java.net.URL +import java.io.File + +suite("stress_test_two_stream_load") { + + sql """ADMIN SET FRONTEND CONFIG ('max_auto_partition_num' = '100000')""" + + // get doris-db from s3 + def dirPath = context.file.parent + def fileName = "doris-dbgen" + def fileUrl = "http://doris-build-1308700295.cos.ap-beijing.myqcloud.com/regression/doris-dbgen-23-10-18/doris-dbgen-23-10-20/doris-dbgen" + def filePath = Paths.get(dirPath, fileName) + if (!Files.exists(filePath)) { + new URL(fileUrl).withInputStream { inputStream -> + Files.copy(inputStream, filePath) + } + def file = new File(dirPath + "/" + fileName) + file.setExecutable(true) + } + + def data_count = 1 + def cur_rows = 10000 + + // 用doris-dbgen生成数据文件 + def doris_dbgen_create_data = { db_name, tb_name, part_type -> + def rows = cur_rows // total rows to load + def bulkSize = rows + def tableName = tb_name + + def jdbcUrl = context.config.jdbcUrl + def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port + if (urlWithoutSchema.indexOf("/") >= 0) { + // e.g: jdbc:mysql://locahost:8080/?a=b + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1, urlWithoutSchema.indexOf("/")) + } else { + // e.g: jdbc:mysql://locahost:8080 + sql_port = urlWithoutSchema.substring(urlWithoutSchema.indexOf(":") + 1) + } + String feHttpAddress = context.config.feHttpAddress + def http_port = feHttpAddress.substring(feHttpAddress.indexOf(":") + 1) + + String realDb = db_name + String user = context.config.jdbcUser + String password = context.config.jdbcPassword + + for (int i = 1; i <= data_count; i++) { + def cm + if (password) { + cm = """${context.file.parent}/doris-dbgen gen --host ${sql_ip} --sql-port ${sql_port} --user ${user} --pass ${password} --database ${realDb} --table ${tableName} --rows ${rows} --bulk-size ${bulkSize} --http-port ${http_port} --config ${context.file.parent}/../doris_dbgen_conf/10w_part_doris_dbgen.yaml --save-to-dir ${context.file.parent}/${part_type}_${i}/""" + } else { + cm = """${context.file.parent}/doris-dbgen gen --host ${sql_ip} --sql-port ${sql_port} --user ${user} --database ${realDb} --table ${tableName} --rows ${rows} --bulk-size ${bulkSize} --http-port ${http_port} --config ${context.file.parent}/../doris_dbgen_conf/10w_part_doris_dbgen.yaml --save-to-dir ${context.file.parent}/${part_type}_${i}/""" + } + logger.info("command is: " + cm) + def proc = cm.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + } + } + + def write_to_file = { cur_path, content -> + File file = new File(cur_path) + file.write(content) + } + + def cm1 + def cm2 + def doris_dbgen_load_data = { db_name, tb_name, part_type -> + def tableName = tb_name + + def jdbcUrl = context.config.jdbcUrl + def urlWithoutSchema = jdbcUrl.substring(jdbcUrl.indexOf("://") + 3) + def sql_ip = urlWithoutSchema.substring(0, urlWithoutSchema.indexOf(":")) + def sql_port_res = sql """show backends;""" + println(sql_port_res) + if (sql_port_res.size < 2) { + assert(false) + } + def be_http_1 = sql_port_res[0][1] + def be_http_2 = sql_port_res[1][1] + def be_port_1 = sql_port_res[0][4] + def be_port_2 = sql_port_res[1][4] + + String realDb = db_name + String user = context.config.jdbcUser + String password = context.config.jdbcPassword + + for (int i = 1; i <= data_count; i++) { + def list = [] + def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i) + dir.eachFileRecurse (FileType.FILES) { file -> + list << file + } + + if (password) { + cm1 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load""" + cm2 = """curl --location-trusted -u ${user}:${password} -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load""" + } else { + cm1 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_1}:${be_port_1}/api/${realDb}/${tableName}/_stream_load""" + cm2 = """curl --location-trusted -u root: -H "column_separator:|" -T ${list[0]} http://${be_http_2}:${be_port_2}/api/${realDb}/${tableName}/_stream_load""" + } + logger.info("command is: " + cm1) + logger.info("command is: " + cm2) + + def load_path_1 = """${context.file.parent}/thread_load_1.sh""" + write_to_file(load_path_1, cm1) + cm1 = "bash " + load_path_1 + + def load_path_2 = """${context.file.parent}/thread_load_2.sh""" + write_to_file(load_path_2, cm2) + cm2 = "bash " + load_path_2 + + } + } + + def data_delete = { part_type -> + + for (int i = 1; i <= data_count; i++) { + def list = [] + def dir = new File("""${context.file.parent}""" + "/" + part_type + "_" + i) + if (dir.exists()) { + dir.eachFileRecurse (FileType.FILES) { file -> + list << file + } + logger.info("rm -rf " + dir) + ("rm -rf " + dir).execute().text + } + } + } + + String db = context.config.getDbNameByFile(context.file) + def database_name = db + def tb_name1 = "test1" + def tb_name4 = "test2" + def tb_name2 = "stream_load_range_test_table" + def tb_name3 = "stream_load_list_test_table" + + sql """create database if not exists ${database_name};""" + sql """use ${database_name};""" + sql """drop table if exists ${tb_name1};""" + sql """drop table if exists ${tb_name2};""" + sql """drop table if exists ${tb_name3};""" + sql """drop table if exists ${tb_name4};""" + sql new File("""${context.file.parent}/../ddl/create_range_part_data_table.sql""").text + sql new File("""${context.file.parent}/../ddl/create_list_part_data_table.sql""").text + sql new File("""${context.file.parent}/../ddl/stream_load_range_test_table.sql""").text + sql new File("""${context.file.parent}/../ddl/stream_load_list_test_table.sql""").text + + data_delete("range") + data_delete("list") + + doris_dbgen_create_data(database_name, tb_name1, "range") + doris_dbgen_load_data(database_name, tb_name2, "range") + + def thread1 = Thread.start { + logger.info("load1 start") + def proc = cm1.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + } + def thread2 = Thread.start { + logger.info("load2 start") + def proc = cm2.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + } + thread1.join() + thread2.join() + + def row_count_range = sql """select count(*) from ${tb_name2};""" + def partition_res_range = sql """show partitions from ${tb_name2};""" + assertTrue(row_count_range[0][0] == partition_res_range.size) + + doris_dbgen_create_data(database_name, tb_name4, "list") + doris_dbgen_load_data(database_name, tb_name3, "list") + data_delete("range") + + def thread3 = Thread.start { + logger.info("load1 start") + def proc = cm1.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + } + def thread4 = Thread.start { + logger.info("load2 start") + def proc = cm2.execute() + def sout = new StringBuilder(), serr = new StringBuilder() + proc.consumeProcessOutput(sout, serr) + proc.waitForOrKill(7200000) + logger.info("std out: " + sout + "std err: " + serr) + } + thread3.join() + thread4.join() + + def row_count_list = sql """select count(*) from ${tb_name3};""" + def partition_res_list = sql """show partitions from ${tb_name3};""" + assertTrue(row_count_list[0][0] == partition_res_list.size) + + data_delete("list") + + sql """ADMIN SET FRONTEND CONFIG ('max_auto_partition_num' = '2000')""" +} diff --git a/regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_1.sh b/regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_1.sh new file mode 100644 index 00000000000000..6edd950cc7c2ca --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_1.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. diff --git a/regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_2.sh b/regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_2.sh new file mode 100644 index 00000000000000..6edd950cc7c2ca --- /dev/null +++ b/regression-test/suites/partition_p2/auto_partition/two_stream_load/thread_load_2.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License.