diff --git a/be/src/vec/exec/scan/scanner_context.h b/be/src/vec/exec/scan/scanner_context.h index d97fc731fe5067d..03e76d357ba6f7c 100644 --- a/be/src/vec/exec/scan/scanner_context.h +++ b/be/src/vec/exec/scan/scanner_context.h @@ -105,9 +105,8 @@ class ScannerContext : public std::enable_shared_from_this, const TupleDescriptor* output_tuple_desc, const RowDescriptor* output_row_descriptor, const std::list>& scanners, - int64_t limit_, int64_t max_bytes_in_blocks_queue, - std::shared_ptr dependency, - const int num_parallel_instances); + int64_t limit_, std::shared_ptr dependency, + bool ignore_data_distribution); ~ScannerContext() override { SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(_query_thread_context.query_mem_tracker); @@ -208,7 +207,7 @@ class ScannerContext : public std::enable_shared_from_this, int64_t limit; int32_t _max_thread_num = 0; - int64_t _max_bytes_in_queue; + int64_t _max_bytes_in_queue = 0; doris::vectorized::ScannerScheduler* _scanner_scheduler; SimplifiedScanScheduler* _simple_scan_scheduler = nullptr; SimplifiedScanScheduler* _remote_scan_task_scheduler = nullptr; @@ -218,7 +217,6 @@ class ScannerContext : public std::enable_shared_from_this, int32_t _num_running_scanners = 0; // weak pointer for _scanners, used in stop function std::vector> _all_scanners; - const int _num_parallel_instances; std::shared_ptr _scanner_profile; RuntimeProfile::Counter* _scanner_sched_counter = nullptr; RuntimeProfile::Counter* _newly_create_free_blocks_num = nullptr; @@ -228,6 +226,7 @@ class ScannerContext : public std::enable_shared_from_this, RuntimeProfile::Counter* _scale_up_scanners_counter = nullptr; QueryThreadContext _query_thread_context; std::shared_ptr _dependency = nullptr; + bool _ignore_data_distribution = false; // for scaling up the running scanners size_t _estimated_block_size = 0; diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index 4140835b6570046..8895e45f5635ab7 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -684,7 +684,10 @@ public class SessionVariable implements Serializable, Writable { @VariableMgr.VarAttr(name = EXEC_MEM_LIMIT) public long maxExecMemByte = 2147483648L; - @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT) + @VariableMgr.VarAttr(name = SCAN_QUEUE_MEM_LIMIT, + description = {"每个 Scan Instance 的 block queue 能够保存多少字节的 block", + "How many bytes of block can be saved in the block queue of each Scan Instance"}) + // 100MB public long maxScanQueueMemByte = 2147483648L / 20; @VariableMgr.VarAttr(name = NUM_SCANNER_THREADS, needForward = true, description = { diff --git a/regression-test/data/load_p0/ingestion_load/data1.parquet b/regression-test/data/load_p0/ingestion_load/data1.parquet new file mode 100644 index 000000000000000..623456ace7430de Binary files /dev/null and b/regression-test/data/load_p0/ingestion_load/data1.parquet differ diff --git a/regression-test/data/load_p0/ingestion_load/data2-0.parquet b/regression-test/data/load_p0/ingestion_load/data2-0.parquet new file mode 100644 index 000000000000000..ef0e63887c114c8 Binary files /dev/null and b/regression-test/data/load_p0/ingestion_load/data2-0.parquet differ diff --git a/regression-test/data/load_p0/ingestion_load/data2-1.parquet b/regression-test/data/load_p0/ingestion_load/data2-1.parquet new file mode 100644 index 000000000000000..a1c388a7bf65c4d Binary files /dev/null and b/regression-test/data/load_p0/ingestion_load/data2-1.parquet differ diff --git a/regression-test/data/load_p0/ingestion_load/data2-2.parquet b/regression-test/data/load_p0/ingestion_load/data2-2.parquet new file mode 100644 index 000000000000000..720ea77afd23d4f Binary files /dev/null and b/regression-test/data/load_p0/ingestion_load/data2-2.parquet differ diff --git a/regression-test/data/load_p0/ingestion_load/data2-3.parquet b/regression-test/data/load_p0/ingestion_load/data2-3.parquet new file mode 100644 index 000000000000000..cc2b37a7c03fea0 Binary files /dev/null and b/regression-test/data/load_p0/ingestion_load/data2-3.parquet differ diff --git a/regression-test/data/load_p0/ingestion_load/test_ingestion_load.out b/regression-test/data/load_p0/ingestion_load/test_ingestion_load.out index f39b6b66d291082..f8ce916c7de5fd2 100644 --- a/regression-test/data/load_p0/ingestion_load/test_ingestion_load.out +++ b/regression-test/data/load_p0/ingestion_load/test_ingestion_load.out @@ -11,3 +11,27 @@ 1145092610 xWJUDWAV8Nllo0F dnZ9RMVdoqxh4kGBvy55zQdChNTVYdlvRZP4aWIkXyErUbM1XmFGQ9vuCD113JKKCyx4crDoY false 115 -22832 -7242855305248390982 -4240353246453053617 -9074.909 -2.51212400295869E8 -502.410 618.820 2024-06-12 2024-04-18 2023-11-04T09:55:17 2023-11-13T16:30:23 1736373707 UU14wnLhPkBid41 pmuNqYfOc3JCscf9meT5dYB2i28Pt9iaeXK4QqjVZJdoKFOeZI5bG9RKm1zInTdDMW1N0PKI5Y true -105 -20276 360048259532857165 -4602633478165721463 -13230.296 -1.708246954394742E9 757.147 -533.800 2024-01-05 2023-09-08 2023-11-27T05:21:33 2024-02-11T21:35:03 +-- !select -- +-2067215761 41V4fgCKI3Hkioe DZGaq24Yqh7SwmbPT6IX23jfC5NKqG7gE9JT4GLwiaQtoO8l6EjhGWQP9X7NHmjdqMbIN5kNeDkffOrlS6roIwj2wXpJ true 123 -4524 1014223769452772206 -6589390612399245616 29103.387 -4.09385322835896E8 886.394 -699.960 2024-02-22 2024-03-04 2024-01-01T22:13:54 2023-10-02T19:35:10 +-1982022740 fcZ1o6ZXG8UOFh5 iw4Ziys42GRRTFNkVPeQEA9I5EQtBD04xfefDsPCWN0vr1 true -3 -25143 7122614823242399351 7391807634038366248 23160.604 -9.20283206353984E8 829.880 -387.403 2023-10-16 2024-05-13 2024-07-16T18:27:45 2023-11-03T05:30:21 +-1228926576 1TISaGtB01BiqVt kq4u false -123 15962 3590007830423934951 -1478759439092857810 -7813.757 -6.98785793100899E8 930.743 402.350 2024-07-23 2023-07-30 2023-11-27T17:48:50 2024-03-11T21:09:58 +-575060895 rRfatUJiOO5dq9Y ETjqrUNUnI5kSmkafjWfRTG8HIp98pLGXagNpXZHqOIZZDRkoGeahOwk9 false 16 -767 6623730208927396375 -3055706367894284822 12540.839 -1.047911096098831E9 -752.454 -241.620 2024-04-10 2024-05-16 2023-12-07T23:38:05 2023-12-11T05:48:36 +-76042627 PcVaKC43qmIzuxY U3aGxaZumFpqcUsLI true 44 31151 9085406701767055602 -5846138572199996843 -16845.29 2.44522690225531E8 -784.720 -467.133 2023-10-31 2023-08-29 2023-09-12T10:12:46 2023-10-19T17:02:51 +121048200 KPLWjhhbGXqflJi rzqOYQH9ySHPwCm5K4GdeuI28G8LLmnpqLmsLMLfyRIvcfrlubQI47wUa8QILhuS38MBkjL true 42 13182 -6601530758880565531 5619594098883737912 -2782.1506 3.86698722676211E8 478.420 -330.289 2024-06-17 2023-12-26 2024-04-28T03:29:04 2023-08-18T21:05:32 +262860291 m3XgmlbIHYNH1qS BTJRzVrpM78zJAsHMEGhkF5BiDoc3yJuoV0s209sFcqElZsheBgolBGlFl9X4EfauD64FcFF2Mi4V0dKZfpDgaLLRPfG1SALV7 false -42 5990 -7504815416577235660 1535659792778122944 1171.9619 1.28834143701229E8 626.721 682.828 2023-11-24 2023-11-18 2024-03-21T11:50:17 2024-03-31T12:59:27 +579428879 KsOC6WGrieGlo7B SzeA6tRbsiGWJTBDvBQdBjCqjSE6Y false -111 32758 4029182463831656671 -3546198025044093789 20338.55 -2.015222388533773E9 61.981 720.310 2023-11-13 2024-07-04 2024-07-19T12:42:28 2024-01-04T10:32:53 +1145092610 xWJUDWAV8Nllo0F dnZ9RMVdoqxh4kGBvy55zQdChNTVYdlvRZP4aWIkXyErUbM1XmFGQ9vuCD113JKKCyx4crDoY false 115 -22832 -7242855305248390982 -4240353246453053617 -9074.909 -2.51212400295869E8 -502.410 618.820 2024-06-12 2024-04-18 2023-11-04T09:55:17 2023-11-13T16:30:23 +1736373707 UU14wnLhPkBid41 pmuNqYfOc3JCscf9meT5dYB2i28Pt9iaeXK4QqjVZJdoKFOeZI5bG9RKm1zInTdDMW1N0PKI5Y true -105 -20276 360048259532857165 -4602633478165721463 -13230.296 -1.708246954394742E9 757.147 -533.800 2024-01-05 2023-09-08 2023-11-27T05:21:33 2024-02-11T21:35:03 + +-- !select -- +-9022291871392468311 2023-11-02 mOWPGHmiZ 10784 -128 2023-11-12T18:06:21 4218616419351308798 1993977685 -1857678846 +-6045452612961149194 2024-06-23 G1j 28468 -55 2024-06-09T00:12:11 -6456263257174124469 -727277974 144696403 +-1537906159489906139 2024-04-04 MRMRE18bVh49RD 32763 98 2024-01-20T00:54:03 -1289145371043997006 128181215 -1295829474 +-1510882223779118241 2024-07-24 PCwFn7r21MZr 22960 -79 2024-02-07T18:15:07 -8437284610883885859 472729036 -39626304 +-1185467471318572316 2023-11-08 ieed5Msw8X6be4HGS 16555 -79 2024-07-28T23:08:29 3263664376405334754 -809360772 -1229995615 +-234810200663664160 2024-06-07 s7GIrN805aU3cs2EM -7555 -124 2023-12-28T18:59:15 -3600712745035417587 2035647886 126756427 +4461660295430359180 2024-04-23 K 25428 6 2023-11-15T18:38:20 -4503242152141666001 -1093190312 1511443278 +6742880469957921530 2024-05-02 cJJrvRJfpCuGh 27232 64 2024-08-18T09:46:50 -2607385663861429432 -1390108377 1758263623 +7252685688720766402 2024-03-13 891C2 -9774 -1 2023-10-12T19:45:28 -3210623791036109982 -915986651 -1794344594 +8278077411585505009 2023-11-17 gBesLQnYpjK7iDUUcIi -26656 -50 2023-12-11T14:29:52 -8301529943262026214 -1555756888 -1318983102 + diff --git a/regression-test/data/load_p0/ingestion_load/test_ingestion_load_multi_table.out b/regression-test/data/load_p0/ingestion_load/test_ingestion_load_multi_table.out new file mode 100644 index 000000000000000..7a3ec8e86ddd051 --- /dev/null +++ b/regression-test/data/load_p0/ingestion_load/test_ingestion_load_multi_table.out @@ -0,0 +1,25 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !select -- +-2067215761 41V4fgCKI3Hkioe DZGaq24Yqh7SwmbPT6IX23jfC5NKqG7gE9JT4GLwiaQtoO8l6EjhGWQP9X7NHmjdqMbIN5kNeDkffOrlS6roIwj2wXpJ true 123 -4524 1014223769452772206 -6589390612399245616 29103.387 -4.09385322835896E8 886.394 -699.960 2024-02-22 2024-03-04 2024-01-01T22:13:54 2023-10-02T19:35:10 +-1982022740 fcZ1o6ZXG8UOFh5 iw4Ziys42GRRTFNkVPeQEA9I5EQtBD04xfefDsPCWN0vr1 true -3 -25143 7122614823242399351 7391807634038366248 23160.604 -9.20283206353984E8 829.880 -387.403 2023-10-16 2024-05-13 2024-07-16T18:27:45 2023-11-03T05:30:21 +-1228926576 1TISaGtB01BiqVt kq4u false -123 15962 3590007830423934951 -1478759439092857810 -7813.757 -6.98785793100899E8 930.743 402.350 2024-07-23 2023-07-30 2023-11-27T17:48:50 2024-03-11T21:09:58 +-575060895 rRfatUJiOO5dq9Y ETjqrUNUnI5kSmkafjWfRTG8HIp98pLGXagNpXZHqOIZZDRkoGeahOwk9 false 16 -767 6623730208927396375 -3055706367894284822 12540.839 -1.047911096098831E9 -752.454 -241.620 2024-04-10 2024-05-16 2023-12-07T23:38:05 2023-12-11T05:48:36 +-76042627 PcVaKC43qmIzuxY U3aGxaZumFpqcUsLI true 44 31151 9085406701767055602 -5846138572199996843 -16845.29 2.44522690225531E8 -784.720 -467.133 2023-10-31 2023-08-29 2023-09-12T10:12:46 2023-10-19T17:02:51 +121048200 KPLWjhhbGXqflJi rzqOYQH9ySHPwCm5K4GdeuI28G8LLmnpqLmsLMLfyRIvcfrlubQI47wUa8QILhuS38MBkjL true 42 13182 -6601530758880565531 5619594098883737912 -2782.1506 3.86698722676211E8 478.420 -330.289 2024-06-17 2023-12-26 2024-04-28T03:29:04 2023-08-18T21:05:32 +262860291 m3XgmlbIHYNH1qS BTJRzVrpM78zJAsHMEGhkF5BiDoc3yJuoV0s209sFcqElZsheBgolBGlFl9X4EfauD64FcFF2Mi4V0dKZfpDgaLLRPfG1SALV7 false -42 5990 -7504815416577235660 1535659792778122944 1171.9619 1.28834143701229E8 626.721 682.828 2023-11-24 2023-11-18 2024-03-21T11:50:17 2024-03-31T12:59:27 +579428879 KsOC6WGrieGlo7B SzeA6tRbsiGWJTBDvBQdBjCqjSE6Y false -111 32758 4029182463831656671 -3546198025044093789 20338.55 -2.015222388533773E9 61.981 720.310 2023-11-13 2024-07-04 2024-07-19T12:42:28 2024-01-04T10:32:53 +1145092610 xWJUDWAV8Nllo0F dnZ9RMVdoqxh4kGBvy55zQdChNTVYdlvRZP4aWIkXyErUbM1XmFGQ9vuCD113JKKCyx4crDoY false 115 -22832 -7242855305248390982 -4240353246453053617 -9074.909 -2.51212400295869E8 -502.410 618.820 2024-06-12 2024-04-18 2023-11-04T09:55:17 2023-11-13T16:30:23 +1736373707 UU14wnLhPkBid41 pmuNqYfOc3JCscf9meT5dYB2i28Pt9iaeXK4QqjVZJdoKFOeZI5bG9RKm1zInTdDMW1N0PKI5Y true -105 -20276 360048259532857165 -4602633478165721463 -13230.296 -1.708246954394742E9 757.147 -533.800 2024-01-05 2023-09-08 2023-11-27T05:21:33 2024-02-11T21:35:03 + +-- !select -- +-2067215761 41V4fgCKI3Hkioe DZGaq24Yqh7SwmbPT6IX23jfC5NKqG7gE9JT4GLwiaQtoO8l6EjhGWQP9X7NHmjdqMbIN5kNeDkffOrlS6roIwj2wXpJ true 123 -4524 1014223769452772206 -6589390612399245616 29103.387 -4.09385322835896E8 886.394 -699.960 2024-02-22 2024-03-04 2024-01-01T22:13:54 2023-10-02T19:35:10 +-1982022740 fcZ1o6ZXG8UOFh5 iw4Ziys42GRRTFNkVPeQEA9I5EQtBD04xfefDsPCWN0vr1 true -3 -25143 7122614823242399351 7391807634038366248 23160.604 -9.20283206353984E8 829.880 -387.403 2023-10-16 2024-05-13 2024-07-16T18:27:45 2023-11-03T05:30:21 +-1228926576 1TISaGtB01BiqVt kq4u false -123 15962 3590007830423934951 -1478759439092857810 -7813.757 -6.98785793100899E8 930.743 402.350 2024-07-23 2023-07-30 2023-11-27T17:48:50 2024-03-11T21:09:58 +-575060895 rRfatUJiOO5dq9Y ETjqrUNUnI5kSmkafjWfRTG8HIp98pLGXagNpXZHqOIZZDRkoGeahOwk9 false 16 -767 6623730208927396375 -3055706367894284822 12540.839 -1.047911096098831E9 -752.454 -241.620 2024-04-10 2024-05-16 2023-12-07T23:38:05 2023-12-11T05:48:36 +-76042627 PcVaKC43qmIzuxY U3aGxaZumFpqcUsLI true 44 31151 9085406701767055602 -5846138572199996843 -16845.29 2.44522690225531E8 -784.720 -467.133 2023-10-31 2023-08-29 2023-09-12T10:12:46 2023-10-19T17:02:51 +121048200 KPLWjhhbGXqflJi rzqOYQH9ySHPwCm5K4GdeuI28G8LLmnpqLmsLMLfyRIvcfrlubQI47wUa8QILhuS38MBkjL true 42 13182 -6601530758880565531 5619594098883737912 -2782.1506 3.86698722676211E8 478.420 -330.289 2024-06-17 2023-12-26 2024-04-28T03:29:04 2023-08-18T21:05:32 +262860291 m3XgmlbIHYNH1qS BTJRzVrpM78zJAsHMEGhkF5BiDoc3yJuoV0s209sFcqElZsheBgolBGlFl9X4EfauD64FcFF2Mi4V0dKZfpDgaLLRPfG1SALV7 false -42 5990 -7504815416577235660 1535659792778122944 1171.9619 1.28834143701229E8 626.721 682.828 2023-11-24 2023-11-18 2024-03-21T11:50:17 2024-03-31T12:59:27 +579428879 KsOC6WGrieGlo7B SzeA6tRbsiGWJTBDvBQdBjCqjSE6Y false -111 32758 4029182463831656671 -3546198025044093789 20338.55 -2.015222388533773E9 61.981 720.310 2023-11-13 2024-07-04 2024-07-19T12:42:28 2024-01-04T10:32:53 +1145092610 xWJUDWAV8Nllo0F dnZ9RMVdoqxh4kGBvy55zQdChNTVYdlvRZP4aWIkXyErUbM1XmFGQ9vuCD113JKKCyx4crDoY false 115 -22832 -7242855305248390982 -4240353246453053617 -9074.909 -2.51212400295869E8 -502.410 618.820 2024-06-12 2024-04-18 2023-11-04T09:55:17 2023-11-13T16:30:23 +1736373707 UU14wnLhPkBid41 pmuNqYfOc3JCscf9meT5dYB2i28Pt9iaeXK4QqjVZJdoKFOeZI5bG9RKm1zInTdDMW1N0PKI5Y true -105 -20276 360048259532857165 -4602633478165721463 -13230.296 -1.708246954394742E9 757.147 -533.800 2024-01-05 2023-09-08 2023-11-27T05:21:33 2024-02-11T21:35:03 + diff --git a/regression-test/suites/load_p0/ingestion_load/test_ingestion_load.groovy b/regression-test/suites/load_p0/ingestion_load/test_ingestion_load.groovy index ca41b608673023c..2ce4ed3202f7af1 100644 --- a/regression-test/suites/load_p0/ingestion_load/test_ingestion_load.groovy +++ b/regression-test/suites/load_p0/ingestion_load/test_ingestion_load.groovy @@ -45,7 +45,7 @@ suite('test_ingestion_load', 'p0') { httpTest { endpoint context.config.feHttpAddress - uri "/api/ingestion_load/${context.dbName}/_create" + uri "/api/ingestion_load/internal/${context.dbName}/_create" op "post" basicAuthorization context.config.feHttpUser, context.config.feHttpPassword body reqBody @@ -92,7 +92,7 @@ suite('test_ingestion_load', 'p0') { httpTest { endpoint context.config.feHttpAddress - uri "/api/ingestion_load/${context.dbName}/_update" + uri "/api/ingestion_load/internal/${context.dbName}/_update" op "post" basicAuthorization context.config.feHttpUser, context.config.feHttpPassword body updateStatusReqBody @@ -106,12 +106,12 @@ suite('test_ingestion_load', 'p0') { } } - max_try_milli_secs = 60000 + max_try_milli_secs = 120000 while (max_try_milli_secs) { result = sql "show load where label = '${loadLabel}'" if (result[0][2] == "FINISHED") { sql "sync" - qt_select "select * from ${testTable} order by c_int" + qt_select "select * from ${testTable} order by 1" break } else { sleep(5000) // wait 1 second every time @@ -147,6 +147,7 @@ suite('test_ingestion_load', 'p0') { c_datetime datetime NULL, c_datetimev2 datetime NULL ) + DUPLICATE KEY(c_int) DISTRIBUTED BY HASH(c_int) BUCKETS 1 PROPERTIES ( "replication_num" = "1" @@ -157,6 +158,65 @@ suite('test_ingestion_load', 'p0') { testIngestLoadJob.call(tableName, label, context.config.dataPath + '/load_p0/ingestion_load/data.parquet') + tableName = 'tbl_test_spark_load_unique_mor' + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + c_int int(11) NULL, + c_char char(15) NULL, + c_varchar varchar(100) NULL, + c_bool boolean NULL, + c_tinyint tinyint(4) NULL, + c_smallint smallint(6) NULL, + c_bigint bigint(20) NULL, + c_largeint largeint(40) NULL, + c_float float NULL, + c_double double NULL, + c_decimal decimal(6, 3) NULL, + c_decimalv3 decimal(6, 3) NULL, + c_date date NULL, + c_datev2 date NULL, + c_datetime datetime NULL, + c_datetimev2 datetime NULL + ) + UNIQUE KEY(c_int) + DISTRIBUTED BY HASH(c_int) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1", + "enable_unique_key_merge_on_write" = "false" + ) + """ + + label = "test_ingestion_load_unique_mor" + + testIngestLoadJob.call(tableName, label, context.config.dataPath + '/load_p0/ingestion_load/data.parquet') + + tableName = 'tbl_test_spark_load_agg' + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} + ( + `user_id` LARGEINT NOT NULL COMMENT "user id", + `date` DATE NOT NULL COMMENT "data import time", + `city` VARCHAR(20) COMMENT "city", + `age` SMALLINT COMMENT "age", + `sex` TINYINT COMMENT "gender", + `last_visit_date` DATETIME REPLACE DEFAULT "1970-01-01 00:00:00" COMMENT "last visit date time", + `cost` BIGINT SUM DEFAULT "0" COMMENT "user total cost", + `max_dwell_time` INT MAX DEFAULT "0" COMMENT "user max dwell time", + `min_dwell_time` INT MIN DEFAULT "99999" COMMENT "user min dwell time" + ) + AGGREGATE KEY(`user_id`, `date`, `city`, `age`, `sex`) + DISTRIBUTED BY HASH(`user_id`) BUCKETS 1 + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + + label = "test_ingestion_load_agg" + + testIngestLoadJob.call(tableName, label, context.config.dataPath + '/load_p0/ingestion_load/data1.parquet') + } } diff --git a/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_alter_column.groovy b/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_alter_column.groovy new file mode 100644 index 000000000000000..2faf918db1bcaa1 --- /dev/null +++ b/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_alter_column.groovy @@ -0,0 +1,208 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.nio.file.Files +import java.nio.file.Paths +import java.nio.file.StandardCopyOption + +suite('test_ingestion_load_alter_column', 'p0') { + + def testIngestLoadJob = { testTable, loadLabel, dataFile, alterAction -> + + sql "TRUNCATE TABLE ${testTable}" + + sql "CLEAN LABEL FROM ${context.dbName}" + + Integer loadId = -1 + Integer tableId = -1 + Integer partitionId = -1 + Integer indexId = -1 + Integer bucketId = 0 + Integer schemaHash = -1 + + String reqBody = + """{ + "label": "${loadLabel}", + "tableToPartition": { + "${testTable}": [] + }, + "properties": {} + }""" + + httpTest { + endpoint context.config.feHttpAddress + uri "/api/ingestion_load/internal/${context.dbName}/_create" + op "post" + basicAuthorization context.config.feHttpUser, context.config.feHttpPassword + body reqBody + check { code, resBody -> + assert code == 200 + def resBodyJson = parseJson(resBody) + assert resBodyJson instanceof Map + assert resBodyJson.code == 0 + def data = resBodyJson.data + loadId = data.loadId + def tableMeta = data.tableMeta + tableId = tableMeta["${testTable}"].id + def index = tableMeta["${testTable}"].indexes[0] + indexId = index.indexId + schemaHash = index.schemaHash + partitionId = tableMeta["${testTable}"].partitionInfo.partitions[0].partitionId + } + } + + String resultFileName = "V1.${loadLabel}.${tableId}.${partitionId}.${indexId}.${bucketId}.${schemaHash}.parquet" + logger.info("resultFileName: " + resultFileName) + + Files.copy(Paths.get(dataFile), + Paths.get(context.config.dataPath + "/load_p0/ingestion_load/${resultFileName}"), StandardCopyOption.REPLACE_EXISTING) + + String etlResultFilePath = uploadToHdfs "/load_p0/ingestion_load/${resultFileName}" + + String dppResult = '{\\"isSuccess\\":true,\\"failedReason\\":\\"\\",\\"scannedRows\\":10,\\"fileNumber\\":1,' + + '\\"fileSize\\":2441,\\"normalRows\\":10,\\"abnormalRows\\":0,\\"unselectRows\\":0,' + + '\\"partialAbnormalRows\\":\\"[]\\",\\"scannedBytes\\":0}' + + String updateStatusReqBody = + """{ + "loadId": ${loadId}, + "statusInfo": { + "status": "SUCCESS", + "msg": "", + "appId": "", + "dppResult": "${dppResult}", + "filePathToSize": "{\\"${etlResultFilePath}\\": 81758}", + "hadoopProperties": "{\\"fs.defaultFS\\":\\"${getHdfsFs()}\\",\\"hadoop.username\\":\\"${getHdfsUser()}\\",\\"hadoop.password\\":\\"${getHdfsPasswd()}\\"}" + } + }""" + + httpTest { + endpoint context.config.feHttpAddress + uri "/api/ingestion_load/internal/${context.dbName}/_update" + op "post" + basicAuthorization context.config.feHttpUser, context.config.feHttpPassword + body updateStatusReqBody + check { code, resBody -> + { + assert code == 200 + def resBodyJson = parseJson(resBody) + assert resBodyJson instanceof Map + assert resBodyJson.code == 0 + } + } + } + + alterAction.call() + + max_try_milli_secs = 120000 + while (max_try_milli_secs) { + result = sql "show load where label = '${loadLabel}'" + if (result[0][2] == "CANCELLED") { + msg = result[0][7] + logger.info("err msg: " + msg) + assertTrue((result[0][7] =~ /schema of index \[\d+\] has changed/).find()) + break + } else { + sleep(5000) // wait 1 second every time + max_try_milli_secs -= 5000 + if (max_try_milli_secs < 0) { + assertEquals(1, 2) + } + } + } + + } + + if (enableHdfs()) { + + tableName1 = 'tbl_test_spark_load_alter_column_1' + tableName2 = 'tbl_test_spark_load_alter_column_2' + + try { + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName1} ( + c_int int(11) NULL, + c_char char(15) NULL, + c_varchar varchar(100) NULL, + c_bool boolean NULL, + c_tinyint tinyint(4) NULL, + c_smallint smallint(6) NULL, + c_bigint bigint(20) NULL, + c_largeint largeint(40) NULL, + c_float float NULL, + c_double double NULL, + c_decimal decimal(6, 3) NULL, + c_decimalv3 decimal(6, 3) NULL, + c_date date NULL, + c_datev2 date NULL, + c_datetime datetime NULL, + c_datetimev2 datetime NULL + ) + DUPLICATE KEY(c_int) + DISTRIBUTED BY HASH(c_int) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + label = "test_ingestion_load_alter_column_1" + + testIngestLoadJob.call(tableName1, label, context.config.dataPath + '/load_p0/ingestion_load/data.parquet', { + sql "alter table ${tableName1} drop column c_datetimev2" + }) + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName2} ( + c_int int(11) NULL, + c_char char(15) NULL, + c_varchar varchar(100) NULL, + c_bool boolean NULL, + c_tinyint tinyint(4) NULL, + c_smallint smallint(6) NULL, + c_bigint bigint(20) NULL, + c_largeint largeint(40) NULL, + c_float float NULL, + c_double double NULL, + c_decimal decimal(6, 3) NULL, + c_decimalv3 decimal(6, 3) NULL, + c_date date NULL, + c_datev2 date NULL, + c_datetime datetime NULL, + c_datetimev2 datetime NULL + ) + DUPLICATE KEY(c_int) + DISTRIBUTED BY HASH(c_int) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + label = "test_ingestion_load_alter_column_2" + + testIngestLoadJob.call(tableName2, label, context.config.dataPath + '/load_p0/ingestion_load/data.parquet', { + sql "alter table ${tableName2} add column c_string string null" + }) + + } finally { + sql "DROP TABLE ${tableName1}" + sql "DROP TABLE ${tableName2}" + } + + } + +} \ No newline at end of file diff --git a/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_alter_partition.groovy b/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_alter_partition.groovy new file mode 100644 index 000000000000000..332e5d1b1577654 --- /dev/null +++ b/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_alter_partition.groovy @@ -0,0 +1,224 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.nio.file.Files +import java.nio.file.Paths +import java.nio.file.StandardCopyOption + +suite('test_ingestion_load_alter_partition', 'p0') { + + def testIngestLoadJob = { testTable, loadLabel, dataFiles, alterAction -> + + sql "TRUNCATE TABLE ${testTable}" + + sql "CLEAN LABEL ${loadLabel} FROM ${context.dbName}" + + Integer loadId = -1 + Integer tableId = -1 + Integer partitionId = -1 + Integer indexId = -1 + Integer bucketId = 0 + Integer schemaHash = -1 + + String reqBody = + """{ + "label": "${loadLabel}", + "tableToPartition": { + "${testTable}": [] + }, + "properties": {} + }""" + + resultFileNames = [] + + httpTest { + endpoint context.config.feHttpAddress + uri "/api/ingestion_load/internal/${context.dbName}/_create" + op "post" + basicAuthorization context.config.feHttpUser, context.config.feHttpPassword + body reqBody + check { code, resBody -> + assert code == 200 + def resBodyJson = parseJson(resBody) + assert resBodyJson instanceof Map + assert resBodyJson.code == 0 + def data = resBodyJson.data + loadId = data.loadId + def tableMeta = data.tableMeta + tableId = tableMeta["${testTable}"].id + def index = tableMeta["${testTable}"].indexes[0] + indexId = index.indexId + schemaHash = index.schemaHash + partitions = tableMeta["${testTable}"].partitionInfo.partitions + for(partition in partitions) { + logger.info("partitionId: " + partition.partitionId) + resultFileNames.add("V1.${loadLabel}.${tableId}.${partition.partitionId}.${indexId}.${bucketId}.${schemaHash}.parquet") + } + } + } + + etlResultFilePaths = [] + for(int i=0; i < dataFiles.size(); i++) { + Files.copy(Paths.get(dataFiles[i]), + Paths.get(context.config.dataPath + "/load_p0/ingestion_load/${resultFileNames[i]}"), StandardCopyOption.REPLACE_EXISTING) + String etlResultFilePath = uploadToHdfs "/load_p0/ingestion_load/${resultFileNames[i]}" + logger.info("etlResultFilePath: " + etlResultFilePath) + etlResultFilePaths.add(etlResultFilePath) + } + + String dppResult = '{\\"isSuccess\\":true,\\"failedReason\\":\\"\\",\\"scannedRows\\":10,\\"fileNumber\\":1,' + + '\\"fileSize\\":2441,\\"normalRows\\":10,\\"abnormalRows\\":0,\\"unselectRows\\":0,' + + '\\"partialAbnormalRows\\":\\"[]\\",\\"scannedBytes\\":0}' + + String updateStatusReqBody = + """{ + "loadId": ${loadId}, + "statusInfo": { + "status": "SUCCESS", + "msg": "", + "appId": "", + "dppResult": "${dppResult}", + "filePathToSize": "{\\"${etlResultFilePaths.get(0)}\\":851,\\"${etlResultFilePaths.get(1)}\\":781,\\"${etlResultFilePaths.get(2)}\\":781,\\"${etlResultFilePaths.get(3)}\\":839}", + "hadoopProperties": "{\\"fs.defaultFS\\":\\"${getHdfsFs()}\\",\\"hadoop.username\\":\\"${getHdfsUser()}\\",\\"hadoop.password\\":\\"${getHdfsPasswd()}\\"}" + } + }""" + + httpTest { + endpoint context.config.feHttpAddress + uri "/api/ingestion_load/internal/${context.dbName}/_update" + op "post" + basicAuthorization context.config.feHttpUser, context.config.feHttpPassword + body updateStatusReqBody + check { code, resBody -> + { + assert code == 200 + def resBodyJson = parseJson(resBody) + assert resBodyJson instanceof Map + assert resBodyJson.code == 0 + } + } + } + + alterAction.call() + + max_try_milli_secs = 120000 + while (max_try_milli_secs) { + result = sql "show load where label = '${loadLabel}'" + if (result[0][2] == "FINISHED") { + sql "sync" + qt_select "select c1, count(*) from ${testTable} group by c1 order by c1" + break + } else if (result[0][2] == "CANCELLED") { + msg = result[0][7] + logger.info("err msg: " + msg) + assertTrue((result[0][7] =~ /partition does not exist/).find()) + break + } else { + sleep(5000) // wait 1 second every time + max_try_milli_secs -= 5000 + if (max_try_milli_secs < 0) { + assertEquals(1, 2) + } + } + } + + } + + if (enableHdfs()) { + + tableName1 = 'tbl_test_spark_load_alter_partition_1' + tableName2 = 'tbl_test_spark_load_alter_partition_2' + tableName3 = 'tbl_test_spark_load_alter_partition_3' + + try { + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName1} ( + c0 int not null, + c1 date, + c2 varchar(64) + ) + DUPLICATE KEY(c0) + PARTITION BY RANGE(c1) ( + FROM ("2024-09-01") TO ("2024-09-05") INTERVAL 1 DAY + ) + DISTRIBUTED BY HASH(c0) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + label = "test_ingestion_load_alter_partition_1" + + testIngestLoadJob.call(tableName1, label, [context.config.dataPath + '/load_p0/ingestion_load/data2-0.parquet', context.config.dataPath + '/load_p0/ingestion_load/data2-1.parquet',context.config.dataPath + '/load_p0/ingestion_load/data2-2.parquet',context.config.dataPath + '/load_p0/ingestion_load/data2-3.parquet'], { + sql "alter table ${tableName1} drop partition p_20240901" + }) + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName2} ( + c0 int not null, + c1 date, + c2 varchar(64) + ) + DUPLICATE KEY(c0) + PARTITION BY RANGE(c1) ( + FROM ("2024-09-01") TO ("2024-09-05") INTERVAL 1 DAY + ) + DISTRIBUTED BY HASH(c0) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + label = "test_ingestion_load_alter_partition_2" + + testIngestLoadJob.call(tableName2, label, [context.config.dataPath + '/load_p0/ingestion_load/data2-0.parquet', context.config.dataPath + '/load_p0/ingestion_load/data2-1.parquet',context.config.dataPath + '/load_p0/ingestion_load/data2-2.parquet',context.config.dataPath + '/load_p0/ingestion_load/data2-3.parquet'], { + sql "alter table ${tableName2} add partition p_20240905 VALUES [('2024-09-05'), ('2024-09-06'))" + }) + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName3} ( + c0 int not null, + c1 date, + c2 varchar(64) + ) + DUPLICATE KEY(c0) + PARTITION BY RANGE(c1) ( + FROM ("2024-09-01") TO ("2024-09-05") INTERVAL 1 DAY + ) + DISTRIBUTED BY HASH(c0) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + label = "test_ingestion_load_alter_partition_3" + + testIngestLoadJob.call(tableName3, label, [context.config.dataPath + '/load_p0/ingestion_load/data2-0.parquet', context.config.dataPath + '/load_p0/ingestion_load/data2-1.parquet',context.config.dataPath + '/load_p0/ingestion_load/data2-2.parquet',context.config.dataPath + '/load_p0/ingestion_load/data2-3.parquet'], { + sql "alter table ${tableName3} add temporary partition tp_20240901 VALUES [('2024-09-01'), ('2024-09-02'))" + sql "alter table ${tableName3} replace partition(p_20240901) with temporary partition(tp_20240901)" + }) + + } finally { + sql "DROP TABLE ${tableName1}" + sql "DROP TABLE ${tableName2}" + sql "DROP TABLE ${tableName3}" + } + + } + +} \ No newline at end of file diff --git a/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_drop_table.groovy b/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_drop_table.groovy new file mode 100644 index 000000000000000..4f245c3d535b157 --- /dev/null +++ b/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_drop_table.groovy @@ -0,0 +1,196 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.nio.file.Files +import java.nio.file.Paths +import java.nio.file.StandardCopyOption + +suite('test_ingestion_load_drop_table', 'p0') { + + def testIngestLoadJob = { testTable, loadLabel, dataFile, alterAction -> + + sql "TRUNCATE TABLE ${testTable}" + + sql "CLEAN LABEL FROM ${context.dbName}" + + Integer loadId = -1 + Integer tableId = -1 + Integer partitionId = -1 + Integer indexId = -1 + Integer bucketId = 0 + Integer schemaHash = -1 + + String reqBody = + """{ + "label": "${loadLabel}", + "tableToPartition": { + "${testTable}": [] + }, + "properties": {} + }""" + + httpTest { + endpoint context.config.feHttpAddress + uri "/api/ingestion_load/internal/${context.dbName}/_create" + op "post" + basicAuthorization context.config.feHttpUser, context.config.feHttpPassword + body reqBody + check { code, resBody -> + assert code == 200 + def resBodyJson = parseJson(resBody) + assert resBodyJson instanceof Map + assert resBodyJson.code == 0 + def data = resBodyJson.data + loadId = data.loadId + def tableMeta = data.tableMeta + tableId = tableMeta["${testTable}"].id + def index = tableMeta["${testTable}"].indexes[0] + indexId = index.indexId + schemaHash = index.schemaHash + partitionId = tableMeta["${testTable}"].partitionInfo.partitions[0].partitionId + } + } + + String resultFileName = "V1.${loadLabel}.${tableId}.${partitionId}.${indexId}.${bucketId}.${schemaHash}.parquet" + logger.info("resultFileName: " + resultFileName) + + Files.copy(Paths.get(dataFile), + Paths.get(context.config.dataPath + "/load_p0/ingestion_load/${resultFileName}"), StandardCopyOption.REPLACE_EXISTING) + + String etlResultFilePath = uploadToHdfs "/load_p0/ingestion_load/${resultFileName}" + + String dppResult = '{\\"isSuccess\\":true,\\"failedReason\\":\\"\\",\\"scannedRows\\":10,\\"fileNumber\\":1,' + + '\\"fileSize\\":2441,\\"normalRows\\":10,\\"abnormalRows\\":0,\\"unselectRows\\":0,' + + '\\"partialAbnormalRows\\":\\"[]\\",\\"scannedBytes\\":0}' + + String updateStatusReqBody = + """{ + "loadId": ${loadId}, + "statusInfo": { + "status": "SUCCESS", + "msg": "", + "appId": "", + "dppResult": "${dppResult}", + "filePathToSize": "{\\"${etlResultFilePath}\\": 81758}", + "hadoopProperties": "{\\"fs.defaultFS\\":\\"${getHdfsFs()}\\",\\"hadoop.username\\":\\"${getHdfsUser()}\\",\\"hadoop.password\\":\\"${getHdfsPasswd()}\\"}" + } + }""" + + httpTest { + endpoint context.config.feHttpAddress + uri "/api/ingestion_load/internal/${context.dbName}/_update" + op "post" + basicAuthorization context.config.feHttpUser, context.config.feHttpPassword + body updateStatusReqBody + check { code, resBody -> + { + assert code == 200 + def resBodyJson = parseJson(resBody) + assert resBodyJson instanceof Map + assert resBodyJson.code == 0 + } + } + } + + alterAction.call() + + max_try_milli_secs = 120000 + while (max_try_milli_secs) { + result = sql "show load where label = '${loadLabel}'" + if (result.size() == 0) { + break + } else { + sleep(5000) // wait 1 second every time + max_try_milli_secs -= 5000 + if (max_try_milli_secs <= 0) { + assertEquals(1, 2) + } + } + } + + } + + if (enableHdfs()) { + + tableName = 'tbl_test_spark_load_drop_table' + + try { + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + c_int int(11) NULL, + c_char char(15) NULL, + c_varchar varchar(100) NULL, + c_bool boolean NULL, + c_tinyint tinyint(4) NULL, + c_smallint smallint(6) NULL, + c_bigint bigint(20) NULL, + c_largeint largeint(40) NULL, + c_float float NULL, + c_double double NULL, + c_decimal decimal(6, 3) NULL, + c_decimalv3 decimal(6, 3) NULL, + c_date date NULL, + c_datev2 date NULL, + c_datetime datetime NULL, + c_datetimev2 datetime NULL + ) + DUPLICATE KEY(c_int) + DISTRIBUTED BY HASH(c_int) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + label = "test_ingestion_load_drop_table" + + testIngestLoadJob.call(tableName, label, context.config.dataPath + '/load_p0/ingestion_load/data.parquet', { + sql "DROP TABLE ${tableName}" + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + c_int int(11) NULL, + c_char char(15) NULL, + c_varchar varchar(100) NULL, + c_bool boolean NULL, + c_tinyint tinyint(4) NULL, + c_smallint smallint(6) NULL, + c_bigint bigint(20) NULL, + c_largeint largeint(40) NULL, + c_float float NULL, + c_double double NULL, + c_decimal decimal(6, 3) NULL, + c_decimalv3 decimal(6, 3) NULL, + c_date date NULL, + c_datev2 date NULL, + c_datetime datetime NULL, + c_datetimev2 datetime NULL + ) + DUPLICATE KEY(c_int) + DISTRIBUTED BY HASH(c_int) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + }) + + } finally { + sql "DROP TABLE ${tableName}" + } + + } + +} \ No newline at end of file diff --git a/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_multi_table.groovy b/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_multi_table.groovy new file mode 100644 index 000000000000000..953c5d620a906a1 --- /dev/null +++ b/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_multi_table.groovy @@ -0,0 +1,208 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.nio.file.Files +import java.nio.file.Paths +import java.nio.file.StandardCopyOption + +suite('test_ingestion_load_multi_table', 'p0') { + + def testIngestLoadJob = { loadLabel, testTable1, testTable2, dataFile1, dataFile2 -> + + sql "TRUNCATE TABLE ${testTable1}" + sql "TRUNCATE TABLE ${testTable2}" + + sql "CLEAN LABEL FROM ${context.dbName}" + + Integer loadId = -1 + Integer tableId = -1 + Integer partitionId = -1 + Integer indexId = -1 + Integer bucketId = 0 + Integer schemaHash = -1 + + String resultFileName1 = "" + String resultFileName2 = "" + + String reqBody = + """{ + "label": "${loadLabel}", + "tableToPartition": { + "${testTable1}": [], + "${testTable2}": [] + }, + "properties": {} + }""" + + httpTest { + endpoint context.config.feHttpAddress + uri "/api/ingestion_load/internal/${context.dbName}/_create" + op "post" + basicAuthorization context.config.feHttpUser, context.config.feHttpPassword + body reqBody + check { code, resBody -> + assert code == 200 + def resBodyJson = parseJson(resBody) + assert resBodyJson instanceof Map + assert resBodyJson.code == 0 + def data = resBodyJson.data + loadId = data.loadId + def tableMeta = data.tableMeta + // table1 + tableId = tableMeta["${testTable1}"].id + def index1 = tableMeta["${testTable1}"].indexes[0] + indexId = index1.indexId + schemaHash = index1.schemaHash + partitionId = tableMeta["${testTable1}"].partitionInfo.partitions[0].partitionId + resultFileName1 = "V1.${loadLabel}.${tableId}.${partitionId}.${indexId}.${bucketId}.${schemaHash}.parquet" + // table2 + tableId = tableMeta["${testTable2}"].id + def index2 = tableMeta["${testTable2}"].indexes[0] + indexId = index2.indexId + schemaHash = index2.schemaHash + partitionId = tableMeta["${testTable2}"].partitionInfo.partitions[0].partitionId + resultFileName2 = "V1.${loadLabel}.${tableId}.${partitionId}.${indexId}.${bucketId}.${schemaHash}.parquet" + } + } + + logger.info("resultFileName1: " + resultFileName1) + logger.info("resultFileName2: " + resultFileName2) + + Files.copy(Paths.get(dataFile1), + Paths.get(context.config.dataPath + "/load_p0/ingestion_load/${resultFileName1}"), StandardCopyOption.REPLACE_EXISTING) + Files.copy(Paths.get(dataFile2), + Paths.get(context.config.dataPath + "/load_p0/ingestion_load/${resultFileName2}"), StandardCopyOption.REPLACE_EXISTING) + + String etlResultFilePath1 = uploadToHdfs "/load_p0/ingestion_load/${resultFileName1}" + String etlResultFilePath2 = uploadToHdfs "/load_p0/ingestion_load/${resultFileName2}" + + String dppResult = '{\\"isSuccess\\":true,\\"failedReason\\":\\"\\",\\"scannedRows\\":10,\\"fileNumber\\":2,' + + '\\"fileSize\\":163516,\\"normalRows\\":10,\\"abnormalRows\\":0,\\"unselectRows\\":0,' + + '\\"partialAbnormalRows\\":\\"[]\\",\\"scannedBytes\\":0}' + + + String updateStatusReqBody = + """{ + "loadId": ${loadId}, + "statusInfo": { + "status": "SUCCESS", + "msg": "", + "appId": "", + "dppResult": "${dppResult}", + "filePathToSize": "{\\"${etlResultFilePath1}\\": 81758, \\"${etlResultFilePath2}\\": 81758}", + "hadoopProperties": "{\\"fs.defaultFS\\":\\"${getHdfsFs()}\\",\\"hadoop.username\\":\\"${getHdfsUser()}\\",\\"hadoop.password\\":\\"${getHdfsPasswd()}\\"}" + } + }""" + + httpTest { + endpoint context.config.feHttpAddress + uri "/api/ingestion_load/internal/${context.dbName}/_update" + op "post" + basicAuthorization context.config.feHttpUser, context.config.feHttpPassword + body updateStatusReqBody + check { code, resBody -> + { + assert code == 200 + def resBodyJson = parseJson(resBody) + assert resBodyJson instanceof Map + assert resBodyJson.code == 0 + } + } + } + + max_try_milli_secs = 60000 + while (max_try_milli_secs) { + result = sql "show load where label = '${loadLabel}'" + if (result[0][2] == "FINISHED") { + sql "sync" + qt_select "select * from ${testTable1} order by c_int" + qt_select "select * from ${testTable2} order by c_int" + break + } else { + sleep(5000) // wait 1 second every time + max_try_milli_secs -= 5000 + if (max_try_milli_secs < 0) { + assertEquals(1, 2) + } + } + } + + } + + if (enableHdfs()) { + + tableName1 = 'tbl_test_spark_load_multi_1' + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName1} ( + c_int int(11) NULL, + c_char char(15) NULL, + c_varchar varchar(100) NULL, + c_bool boolean NULL, + c_tinyint tinyint(4) NULL, + c_smallint smallint(6) NULL, + c_bigint bigint(20) NULL, + c_largeint largeint(40) NULL, + c_float float NULL, + c_double double NULL, + c_decimal decimal(6, 3) NULL, + c_decimalv3 decimal(6, 3) NULL, + c_date date NULL, + c_datev2 date NULL, + c_datetime datetime NULL, + c_datetimev2 datetime NULL + ) + DISTRIBUTED BY HASH(c_int) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + tableName2 = 'tbl_test_spark_load_multi_2' + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName2} ( + c_int int(11) NULL, + c_char char(15) NULL, + c_varchar varchar(100) NULL, + c_bool boolean NULL, + c_tinyint tinyint(4) NULL, + c_smallint smallint(6) NULL, + c_bigint bigint(20) NULL, + c_largeint largeint(40) NULL, + c_float float NULL, + c_double double NULL, + c_decimal decimal(6, 3) NULL, + c_decimalv3 decimal(6, 3) NULL, + c_date date NULL, + c_datev2 date NULL, + c_datetime datetime NULL, + c_datetimev2 datetime NULL + ) + DISTRIBUTED BY HASH(c_int) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + def label = "test_ingestion_load_multi_table" + + testIngestLoadJob.call(label, tableName1, tableName2, context.config.dataPath + '/load_p0/ingestion_load/data.parquet', context.config.dataPath + '/load_p0/ingestion_load/data.parquet') + + } + +} \ No newline at end of file diff --git a/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_with_partition.groovy b/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_with_partition.groovy new file mode 100644 index 000000000000000..8ba00a678a97800 --- /dev/null +++ b/regression-test/suites/load_p0/ingestion_load/test_ingestion_load_with_partition.groovy @@ -0,0 +1,160 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +import java.nio.file.Files +import java.nio.file.Paths +import java.nio.file.StandardCopyOption + +suite('test_ingestion_load_with_partition', 'p0') { + + def testIngestLoadJob = { testTable, loadLabel, dataFiles -> + + sql "TRUNCATE TABLE ${testTable}" + + sql "CLEAN LABEL FROM ${context.dbName}" + + Integer loadId = -1 + Integer tableId = -1 + Integer partitionId = -1 + Integer indexId = -1 + Integer bucketId = 0 + Integer schemaHash = -1 + + String reqBody = + """{ + "label": "${loadLabel}", + "tableToPartition": { + "${testTable}": [] + }, + "properties": {} + }""" + + resultFileNames = [] + + httpTest { + endpoint context.config.feHttpAddress + uri "/api/ingestion_load/internal/${context.dbName}/_create" + op "post" + basicAuthorization context.config.feHttpUser, context.config.feHttpPassword + body reqBody + check { code, resBody -> + assert code == 200 + def resBodyJson = parseJson(resBody) + assert resBodyJson instanceof Map + assert resBodyJson.code == 0 + def data = resBodyJson.data + loadId = data.loadId + def tableMeta = data.tableMeta + tableId = tableMeta["${testTable}"].id + def index = tableMeta["${testTable}"].indexes[0] + indexId = index.indexId + schemaHash = index.schemaHash + partitions = tableMeta["${testTable}"].partitionInfo.partitions + for(partition in partitions) { + logger.info("partitionId: " + partition.partitionId) + resultFileNames.add("V1.${loadLabel}.${tableId}.${partition.partitionId}.${indexId}.${bucketId}.${schemaHash}.parquet") + } + } + } + + etlResultFilePaths = [] + for(int i=0; i < dataFiles.size(); i++) { + Files.copy(Paths.get(dataFiles[i]), + Paths.get(context.config.dataPath + "/load_p0/ingestion_load/${resultFileNames[i]}"), StandardCopyOption.REPLACE_EXISTING) + String etlResultFilePath = uploadToHdfs "/load_p0/ingestion_load/${resultFileNames[i]}" + logger.info("etlResultFilePath: " + etlResultFilePath) + etlResultFilePaths.add(etlResultFilePath) + } + + String dppResult = '{\\"isSuccess\\":true,\\"failedReason\\":\\"\\",\\"scannedRows\\":10,\\"fileNumber\\":1,' + + '\\"fileSize\\":2441,\\"normalRows\\":10,\\"abnormalRows\\":0,\\"unselectRows\\":0,' + + '\\"partialAbnormalRows\\":\\"[]\\",\\"scannedBytes\\":0}' + + String updateStatusReqBody = + """{ + "loadId": ${loadId}, + "statusInfo": { + "status": "SUCCESS", + "msg": "", + "appId": "", + "dppResult": "${dppResult}", + "filePathToSize": "{\\"${etlResultFilePaths.get(0)}\\":851,\\"${etlResultFilePaths.get(1)}\\":781,\\"${etlResultFilePaths.get(2)}\\":781,\\"${etlResultFilePaths.get(3)}\\":839}", + "hadoopProperties": "{\\"fs.defaultFS\\":\\"${getHdfsFs()}\\",\\"hadoop.username\\":\\"${getHdfsUser()}\\",\\"hadoop.password\\":\\"${getHdfsPasswd()}\\"}" + } + }""" + + httpTest { + endpoint context.config.feHttpAddress + uri "/api/ingestion_load/internal/${context.dbName}/_update" + op "post" + basicAuthorization context.config.feHttpUser, context.config.feHttpPassword + body updateStatusReqBody + check { code, resBody -> + { + assert code == 200 + def resBodyJson = parseJson(resBody) + assert resBodyJson instanceof Map + assert resBodyJson.code == 0 + } + } + } + + max_try_milli_secs = 120000 + while (max_try_milli_secs) { + result = sql "show load where label = '${loadLabel}'" + if (result[0][2] == "FINISHED") { + sql "sync" + qt_select "select c1, count(*) from ${testTable} group by c1 order by c1" + break + } else { + sleep(5000) // wait 1 second every time + max_try_milli_secs -= 5000 + if (max_try_milli_secs < 0) { + assertEquals(1, 2) + } + } + } + + } + + if (enableHdfs()) { + + tableName = 'tbl_test_spark_load_partition' + + sql """ + CREATE TABLE IF NOT EXISTS ${tableName} ( + c0 int not null, + c1 date, + c2 varchar(64) + ) + DUPLICATE KEY(c0) + PARTITION BY RANGE(c1) ( + FROM ("2024-09-01") TO ("2024-09-05") INTERVAL 1 DAY + ) + DISTRIBUTED BY HASH(c0) BUCKETS 1 + PROPERTIES ( + "replication_num" = "1" + ) + """ + + def label = "test_ingestion_load_partition" + + testIngestLoadJob.call(tableName, label, [context.config.dataPath + '/load_p0/ingestion_load/data2-0.parquet', context.config.dataPath + '/load_p0/ingestion_load/data2-1.parquet',context.config.dataPath + '/load_p0/ingestion_load/data2-2.parquet',context.config.dataPath + '/load_p0/ingestion_load/data2-3.parquet']) + + } + +} \ No newline at end of file diff --git a/regression-test/suites/nereids_rules_p0/grouping_sets/valid_grouping.groovy b/regression-test/suites/nereids_rules_p0/grouping_sets/valid_grouping.groovy new file mode 100644 index 000000000000000..624fc7e9f159ccb --- /dev/null +++ b/regression-test/suites/nereids_rules_p0/grouping_sets/valid_grouping.groovy @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("valid_grouping"){ + + // this suite test legacy planner + sql "set enable_nereids_planner=false" + + sql "drop table if exists valid_grouping" + sql """ + CREATE TABLE `valid_grouping` ( + `a` INT NULL, + `b` VARCHAR(10) NULL, + `c` INT NULL, + `d` INT NULL + ) ENGINE=OLAP + DUPLICATE KEY(`a`, `b`) + DISTRIBUTED BY RANDOM BUCKETS AUTO + PROPERTIES ( + "replication_allocation" = "tag.location.default: 1" + ); + """ + sql "insert into valid_grouping values(1,'d2',3,5);" + test { + sql """select + b, 'day' as DT_TYPE + from valid_grouping + group by grouping sets ( (grouping_id(b)),(b));""" + exception("GROUP BY expression must not contain grouping scalar functions: grouping_id(`b`)") + } + + test { + sql """select + b, 'day' as DT_TYPE + from valid_grouping + group by grouping sets ( (grouping(b)),(b));""" + exception("GROUP BY expression must not contain grouping scalar functions: grouping(`b`)") + } + +} \ No newline at end of file diff --git a/regression-test/suites/query_p0/aggregate/select_random_distributed_tbl.groovy b/regression-test/suites/query_p0/aggregate/select_random_distributed_tbl.groovy index c818454c261262b..6e896a1d098e39b 100644 --- a/regression-test/suites/query_p0/aggregate/select_random_distributed_tbl.groovy +++ b/regression-test/suites/query_p0/aggregate/select_random_distributed_tbl.groovy @@ -141,6 +141,11 @@ suite("select_random_distributed_tbl") { qt_sql_17 "select k1 from random_distributed_tbl_test_2;" qt_sql_18 "select distinct k1 from random_distributed_tbl_test_2;" + sql "set enable_nereids_planner = false;" + qt_sql_17 "select k1 from random_distributed_tbl_test_2 order by k1;" + qt_sql_18 "select distinct k1 from random_distributed_tbl_test_2 order by k1;" + qt_sql_19 "select k2 from random_distributed_tbl_test_2 order by k2;" + sql "set enable_nereids_planner = true;" qt_sql_19 "select k1 from random_distributed_tbl_test_2;" qt_sql_20 "select distinct k1 from random_distributed_tbl_test_2;"