From f3c129d7d40311b428952528577df1d56e8a8120 Mon Sep 17 00:00:00 2001 From: zclllhhjj Date: Mon, 28 Oct 2024 18:51:24 +0800 Subject: [PATCH] [Feature](insert-overwrite) Support create partition for auto partition table when insert overwrite (#38628) introduced session variable `enable_auto_create_when_overwrite` when it's true: 1. `insert overwrite table auto_partition_table [values xxx| select xxx]` support overwrite old datas and create partition(s) for new datas if need. 2. `insert overwrite table auto_partition_table partition(*) [values xxx| select xxx]` support overwrite old datas for values-relative partitions(as it was before) and create partition(s) for new datas if need. doc pr: https://github.com/apache/doris-website/pull/936 --- be/src/exec/tablet_info.cpp | 1 + .../pipeline/exec/exchange_sink_operator.cpp | 2 +- be/src/vec/sink/vrow_distribution.cpp | 177 ++++++++++++------ be/src/vec/sink/vrow_distribution.h | 9 +- be/src/vec/sink/writer/vtablet_writer.cpp | 2 +- be/src/vec/sink/writer/vtablet_writer_v2.cpp | 2 +- .../doris/analysis/NativeInsertStmt.java | 6 - .../nereids/parser/LogicalPlanBuilder.java | 4 + .../insert/InsertOverwriteTableCommand.java | 33 ++-- .../org/apache/doris/qe/SessionVariable.java | 20 +- .../org/apache/doris/qe/StmtExecutor.java | 28 +-- gensrc/thrift/PaloInternalService.thrift | 1 + .../test_iot_overwrite_and_create.out | 24 +++ .../test_iot_overwrite_and_create_many.out | 15 ++ .../test_iot_overwrite_and_create.groovy | 71 +++++++ .../test_iot_overwrite_and_create_many.groovy | 64 +++++++ 16 files changed, 355 insertions(+), 104 deletions(-) create mode 100644 regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create.out create mode 100644 regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create_many.out create mode 100644 regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create.groovy create mode 100644 regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create_many.groovy diff --git a/be/src/exec/tablet_info.cpp b/be/src/exec/tablet_info.cpp index 0816a1ac698657..f1c0ad60e06455 100644 --- a/be/src/exec/tablet_info.cpp +++ b/be/src/exec/tablet_info.cpp @@ -788,6 +788,7 @@ Status VOlapTablePartitionParam::replace_partitions( // add new partitions with new id. _partitions.emplace_back(part); + VLOG_NOTICE << "params add new partition " << part->id; // replace items in _partition_maps if (_is_in_partition) { diff --git a/be/src/pipeline/exec/exchange_sink_operator.cpp b/be/src/pipeline/exec/exchange_sink_operator.cpp index a3b6f8da7e9447..55b0e43c936d5a 100644 --- a/be/src/pipeline/exec/exchange_sink_operator.cpp +++ b/be/src/pipeline/exec/exchange_sink_operator.cpp @@ -288,7 +288,7 @@ Status ExchangeSinkLocalState::_send_new_partition_batch() { vectorized::Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref auto& p = _parent->cast(); - // these order is only. + // these order is unique. // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. // 2. deal batched block // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. diff --git a/be/src/vec/sink/vrow_distribution.cpp b/be/src/vec/sink/vrow_distribution.cpp index 3a4c7e911f4c14..74a2830a191231 100644 --- a/be/src/vec/sink/vrow_distribution.cpp +++ b/be/src/vec/sink/vrow_distribution.cpp @@ -23,7 +23,7 @@ #include #include -#include +#include #include "common/logging.h" #include "common/status.h" @@ -116,6 +116,10 @@ Status VRowDistribution::automatic_create_partition() { if (result.status.status_code == TStatusCode::OK) { // add new created partitions RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions)); + for (const auto& part : result.partitions) { + _new_partition_ids.insert(part.id); + VLOG_TRACE << "record new id: " << part.id; + } RETURN_IF_ERROR(_create_partition_callback(_caller, &result)); } @@ -134,7 +138,7 @@ static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg // use _partitions and replace them Status VRowDistribution::_replace_overwriting_partition() { - SCOPED_TIMER(_add_partition_request_timer); + SCOPED_TIMER(_add_partition_request_timer); // also for replace_partition TReplacePartitionRequest request; TReplacePartitionResult result; request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id()); @@ -144,16 +148,20 @@ Status VRowDistribution::_replace_overwriting_partition() { // only request for partitions not recorded for replacement std::set id_deduper; for (const auto* part : _partitions) { - if (part == nullptr) [[unlikely]] { - return Status::InternalError( - "Cannot found origin partitions in auto detect overwriting, stop processing"); - } - if (_new_partition_ids.contains(part->id)) { - // this is a new partition. dont replace again. - } else { - // request for replacement - id_deduper.insert(part->id); - } + if (part != nullptr) { + if (_new_partition_ids.contains(part->id)) { + // this is a new partition. dont replace again. + VLOG_TRACE << "skip new partition: " << part->id; + } else { + // request for replacement + id_deduper.insert(part->id); + } + } else if (_missing_map.empty()) { + // no origin partition. and not allow to create. + return Status::InvalidArgument( + "Cannot found origin partitions in auto detect overwriting, stop " + "processing"); + } // else: part is null and _missing_map is not empty. dealed outside using auto-partition way. nothing to do here. } if (id_deduper.empty()) { return Status::OK(); // no need to request @@ -182,6 +190,7 @@ Status VRowDistribution::_replace_overwriting_partition() { // record new partitions for (const auto& part : result.partitions) { _new_partition_ids.insert(part.id); + VLOG_TRACE << "record new id: " << part.id; } // replace data in _partitions RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions)); @@ -304,6 +313,52 @@ Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition( return Status::OK(); } +Status VRowDistribution::_deal_missing_map(vectorized::Block* block, + const std::vector& partition_cols_idx, + int64_t& rows_stat_val) { + // for missing partition keys, calc the missing partition and save in _partitions_need_create + auto [part_ctxs, part_exprs] = _get_partition_function(); + auto part_col_num = part_exprs.size(); + // the two vectors are in column-first-order + std::vector> col_strs; + std::vector col_null_maps; + col_strs.resize(part_col_num); + col_null_maps.reserve(part_col_num); + + for (int i = 0; i < part_col_num; ++i) { + auto return_type = part_exprs[i]->data_type(); + // expose the data column. the return type would be nullable + const auto& [range_left_col, col_const] = + unpack_if_const(block->get_by_position(partition_cols_idx[i]).column); + if (range_left_col->is_nullable()) { + col_null_maps.push_back(&( + assert_cast(range_left_col.get())->get_null_map_data())); + } else { + col_null_maps.push_back(nullptr); + } + for (auto row : _missing_map) { + col_strs[i].push_back( + return_type->to_string(*range_left_col, index_check_const(row, col_const))); + } + } + + // calc the end value and save them. in the end of sending, we will create partitions for them and deal them. + RETURN_IF_ERROR( + _save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps)); + + size_t new_bt_rows = _batching_block->rows(); + size_t new_bt_bytes = _batching_block->bytes(); + rows_stat_val -= new_bt_rows - _batching_rows; + _state->update_num_rows_load_total(_batching_rows - new_bt_rows); + _state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes); + DorisMetrics::instance()->load_rows->increment(_batching_rows - new_bt_rows); + DorisMetrics::instance()->load_bytes->increment(_batching_bytes - new_bt_bytes); + _batching_rows = new_bt_rows; + _batching_bytes = new_bt_bytes; + + return Status::OK(); +} + Status VRowDistribution::_generate_rows_distribution_for_auto_partition( vectorized::Block* block, const std::vector& partition_cols_idx, bool has_filtered_rows, std::vector& row_part_tablet_ids, @@ -329,63 +384,64 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition( RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids)); if (!_missing_map.empty()) { - // for missing partition keys, calc the missing partition and save in _partitions_need_create - auto [part_ctxs, part_exprs] = _get_partition_function(); - auto part_col_num = part_exprs.size(); - // the two vectors are in column-first-order - std::vector> col_strs; - std::vector col_null_maps; - col_strs.resize(part_col_num); - col_null_maps.reserve(part_col_num); - - for (int i = 0; i < part_col_num; ++i) { - auto return_type = part_exprs[i]->data_type(); - // expose the data column. the return type would be nullable - const auto& [range_left_col, col_const] = - unpack_if_const(block->get_by_position(partition_cols_idx[i]).column); - if (range_left_col->is_nullable()) { - col_null_maps.push_back(&(assert_cast(range_left_col.get()) - ->get_null_map_data())); - } else { - col_null_maps.push_back(nullptr); - } - for (auto row : _missing_map) { - col_strs[i].push_back( - return_type->to_string(*range_left_col, index_check_const(row, col_const))); - } - } - - // calc the end value and save them. in the end of sending, we will create partitions for them and deal them. - RETURN_IF_ERROR( - _save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps)); - - size_t new_bt_rows = _batching_block->rows(); - size_t new_bt_bytes = _batching_block->bytes(); - rows_stat_val -= new_bt_rows - _batching_rows; - _state->update_num_rows_load_total(_batching_rows - new_bt_rows); - _state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes); - DorisMetrics::instance()->load_rows->increment(_batching_rows - new_bt_rows); - DorisMetrics::instance()->load_bytes->increment(_batching_bytes - new_bt_bytes); - _batching_rows = new_bt_rows; - _batching_bytes = new_bt_bytes; + RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val)); } return Status::OK(); } Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite( - vectorized::Block* block, bool has_filtered_rows, - std::vector& row_part_tablet_ids) { + vectorized::Block* block, const std::vector& partition_cols_idx, + bool has_filtered_rows, std::vector& row_part_tablet_ids, + int64_t& rows_stat_val) { auto num_rows = block->rows(); + // for non-auto-partition situation, goes into two 'else' branch. just find the origin partitions, replace them by rpc, + // and find the new partitions to use. + // for auto-partition's, find and save origins in _partitions and replace them. at meanwhile save the missing values for auto + // partition. then we find partition again to get replaced partitions in _partitions. this time _missing_map is ignored cuz + // we already saved missing values. bool stop_processing = false; - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, - _tablet_indexes, stop_processing, _skip)); + if (_vpartition->is_auto_partition() && + _state->query_options().enable_auto_create_when_overwrite) { + // allow auto create partition for missing rows. + std::vector partition_keys = _vpartition->get_partition_keys(); + auto partition_col = block->get_by_position(partition_keys[0]); + _missing_map.clear(); + _missing_map.reserve(partition_col.column->size()); + + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip, + &_missing_map)); + + // allow and really need to create during auto-detect-overwriting. + if (!_missing_map.empty()) { + RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val)); + } + } else { + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip)); + } RETURN_IF_ERROR(_replace_overwriting_partition()); // regenerate locations for new partitions & tablets _reset_find_tablets(num_rows); - RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, - _tablet_indexes, stop_processing, _skip)); + if (_vpartition->is_auto_partition() && + _state->query_options().enable_auto_create_when_overwrite) { + // here _missing_map is just a placeholder + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip, + &_missing_map)); + if (VLOG_TRACE_IS_ON) { + std::string tmp; + for (auto v : _missing_map) { + tmp += std::to_string(v).append(", "); + } + VLOG_TRACE << "Trace missing map of " << this << ':' << tmp; + } + } else { + RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions, + _tablet_indexes, stop_processing, _skip)); + } if (has_filtered_rows) { for (int i = 0; i < num_rows; i++) { _skip[i] = _skip[i] || _block_convertor->filter_map()[i]; @@ -456,10 +512,11 @@ Status VRowDistribution::generate_rows_distribution( } Status st = Status::OK(); - if (_vpartition->is_auto_detect_overwrite()) { + if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) { // when overwrite, no auto create partition allowed. - st = _generate_rows_distribution_for_auto_overwrite(block.get(), has_filtered_rows, - row_part_tablet_ids); + st = _generate_rows_distribution_for_auto_overwrite(block.get(), partition_cols_idx, + has_filtered_rows, row_part_tablet_ids, + rows_stat_val); } else if (_vpartition->is_auto_partition() && !_deal_batched) { st = _generate_rows_distribution_for_auto_partition(block.get(), partition_cols_idx, has_filtered_rows, row_part_tablet_ids, diff --git a/be/src/vec/sink/vrow_distribution.h b/be/src/vec/sink/vrow_distribution.h index fffe0e3f7f1887..248982c02026dc 100644 --- a/be/src/vec/sink/vrow_distribution.h +++ b/be/src/vec/sink/vrow_distribution.h @@ -162,14 +162,19 @@ class VRowDistribution { vectorized::Block* block, const std::vector& partition_col_idx, bool has_filtered_rows, std::vector& row_part_tablet_ids, int64_t& rows_stat_val); + // the whole process to deal missing rows. will call _save_missing_values + Status _deal_missing_map(vectorized::Block* block, + const std::vector& partition_cols_idx, + int64_t& rows_stat_val); Status _generate_rows_distribution_for_non_auto_partition( vectorized::Block* block, bool has_filtered_rows, std::vector& row_part_tablet_ids); Status _generate_rows_distribution_for_auto_overwrite( - vectorized::Block* block, bool has_filtered_rows, - std::vector& row_part_tablet_ids); + vectorized::Block* block, const std::vector& partition_cols_idx, + bool has_filtered_rows, std::vector& row_part_tablet_ids, + int64_t& rows_stat_val); Status _replace_overwriting_partition(); void _reset_row_part_tablet_ids(std::vector& row_part_tablet_ids, diff --git a/be/src/vec/sink/writer/vtablet_writer.cpp b/be/src/vec/sink/writer/vtablet_writer.cpp index ebd6f67e2af5d2..504ffb9cb749bf 100644 --- a/be/src/vec/sink/writer/vtablet_writer.cpp +++ b/be/src/vec/sink/writer/vtablet_writer.cpp @@ -1422,7 +1422,7 @@ Status VTabletWriter::_send_new_partition_batch() { Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref - // these order is only. + // these order is unique. // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. // 2. deal batched block // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. diff --git a/be/src/vec/sink/writer/vtablet_writer_v2.cpp b/be/src/vec/sink/writer/vtablet_writer_v2.cpp index 6b1423b125767a..96dfd85d297208 100644 --- a/be/src/vec/sink/writer/vtablet_writer_v2.cpp +++ b/be/src/vec/sink/writer/vtablet_writer_v2.cpp @@ -531,7 +531,7 @@ Status VTabletWriterV2::_send_new_partition_batch() { Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref - // these order is only. + // these order is unique. // 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block. // 2. deal batched block // 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that. diff --git a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java index e70fbd71117cde..14680f54b1dac6 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java +++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/NativeInsertStmt.java @@ -167,7 +167,6 @@ public class NativeInsertStmt extends InsertStmt { boolean hasEmptyTargetColumns = false; private boolean allowAutoPartition = true; - private boolean withAutoDetectOverwrite = false; enum InsertType { NATIVE_INSERT("insert_"), @@ -333,11 +332,6 @@ public boolean isTransactionBegin() { return isTransactionBegin; } - public NativeInsertStmt withAutoDetectOverwrite() { - this.withAutoDetectOverwrite = true; - return this; - } - protected void preCheckAnalyze(Analyzer analyzer) throws UserException { super.analyze(analyzer); diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java index 86f6999475ca44..068cf124aa0eff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java @@ -615,6 +615,10 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) { LogicalPlan plan = visitQuery(ctx.query()); // partitionSpec may be NULL. means auto detect partition. only available when IOT Pair> partitionSpec = visitPartitionSpec(ctx.partitionSpec()); + // partitionSpec.second : + // null - auto detect + // zero - whole table + // others - specific partitions boolean isAutoDetect = partitionSpec.second == null; LogicalSink sink = UnboundTableSinkCreator.createUnboundTableSinkMaybeOverwrite( tableName.build(), diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java index 064fccaf521029..c89a4fc7be96ee 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertOverwriteTableCommand.java @@ -142,6 +142,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { PhysicalTableSink physicalTableSink = ((PhysicalTableSink) plan.get()); TableIf targetTable = physicalTableSink.getTargetTable(); List partitionNames; + boolean wholeTable = false; if (physicalTableSink instanceof PhysicalOlapTableSink) { InternalDatabaseUtil .checkDatabase(((OlapTable) targetTable).getQualifiedDbName(), ConnectContext.get()); @@ -156,7 +157,10 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { } ConnectContext.get().setSkipAuth(true); partitionNames = ((UnboundTableSink) logicalQuery).getPartitions(); + // If not specific partition to overwrite, means it's a command to overwrite the table. + // not we execute as overwrite every partitions. if (CollectionUtils.isEmpty(partitionNames)) { + wholeTable = true; partitionNames = Lists.newArrayList(targetTable.getPartitionNames()); } } else { @@ -174,9 +178,10 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { // When inserting, BE will call to replace partition by FrontendService. FE will register new temp // partitions and return. for transactional, the replacement will really occur when insert successed, // i.e. `insertInto` finished. then we call taskGroupSuccess to make replacement. - insertInto(ctx, executor, taskId); + insertIntoAutoDetect(ctx, executor, taskId); insertOverwriteManager.taskGroupSuccess(taskId, (OlapTable) targetTable); } else { + // it's overwrite table(as all partitions) or specific partition(s) List tempPartitionNames = InsertOverwriteUtil.generateTempPartitionNames(partitionNames); if (isCancelled.get()) { LOG.info("insert overwrite is cancelled before registerTask, queryId: {}", @@ -198,7 +203,7 @@ public void run(ConnectContext ctx, StmtExecutor executor) throws Exception { insertOverwriteManager.taskFail(taskId); return; } - insertInto(ctx, executor, tempPartitionNames); + insertIntoPartitions(ctx, executor, tempPartitionNames, wholeTable); if (isCancelled.get()) { LOG.info("insert overwrite is cancelled before replacePartition, queryId: {}", ctx.getQueryIdentifier()); @@ -269,13 +274,15 @@ private void runInsertCommand(LogicalPlan logicalQuery, InsertCommandContext ins } /** - * insert into select. for sepecified temp partitions + * insert into select. for sepecified temp partitions or all partitions(table). * - * @param ctx ctx - * @param executor executor + * @param ctx ctx + * @param executor executor * @param tempPartitionNames tempPartitionNames + * @param wholeTable overwrite target is the whole table. not one by one by partitions(...) */ - private void insertInto(ConnectContext ctx, StmtExecutor executor, List tempPartitionNames) + private void insertIntoPartitions(ConnectContext ctx, StmtExecutor executor, List tempPartitionNames, + boolean wholeTable) throws Exception { // copy sink tot replace by tempPartitions UnboundLogicalSink copySink; @@ -291,9 +298,10 @@ private void insertInto(ConnectContext ctx, StmtExecutor executor, List sink.isPartialUpdate(), sink.getDMLCommandType(), (LogicalPlan) (sink.child(0))); - // 1. for overwrite situation, we disable auto create partition. + // 1. when overwrite table, allow auto partition or not is controlled by session variable. // 2. we save and pass overwrite auto detect by insertCtx - insertCtx = new OlapInsertCommandContext(false, true); + boolean allowAutoPartition = wholeTable && ctx.getSessionVariable().isEnableAutoCreateWhenOverwrite(); + insertCtx = new OlapInsertCommandContext(allowAutoPartition, true); } else if (logicalQuery instanceof UnboundHiveTableSink) { UnboundHiveTableSink sink = (UnboundHiveTableSink) logicalQuery; copySink = (UnboundLogicalSink) UnboundTableSinkCreator.createUnboundTableSink( @@ -332,12 +340,13 @@ private void insertInto(ConnectContext ctx, StmtExecutor executor, List * @param ctx ctx * @param executor executor */ - private void insertInto(ConnectContext ctx, StmtExecutor executor, long groupId) throws Exception { - // 1. for overwrite situation, we disable auto create partition. - // 2. we save and pass overwrite auto-detected by insertCtx + private void insertIntoAutoDetect(ConnectContext ctx, StmtExecutor executor, long groupId) throws Exception { InsertCommandContext insertCtx; if (logicalQuery instanceof UnboundTableSink) { - insertCtx = new OlapInsertCommandContext(false, + // 1. when overwrite auto-detect, allow auto partition or not is controlled by session variable. + // 2. we save and pass overwrite auto detect by insertCtx + boolean allowAutoPartition = ctx.getSessionVariable().isEnableAutoCreateWhenOverwrite(); + insertCtx = new OlapInsertCommandContext(allowAutoPartition, ((UnboundTableSink) logicalQuery).isAutoDetectPartition(), groupId, true); } else if (logicalQuery instanceof UnboundHiveTableSink) { insertCtx = new HiveInsertCommandContext(); diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index cc1f29b76c2b49..0c755b9aae901b 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -675,6 +675,12 @@ public class SessionVariable implements Serializable, Writable { "enable_cooldown_replica_affinity"; public static final String SKIP_CHECKING_ACID_VERSION_FILE = "skip_checking_acid_version_file"; + /** + * Inserting overwrite for auto partition table allows creating partition for + * datas which cannot find partition to overwrite. + */ + public static final String ENABLE_AUTO_CREATE_WHEN_OVERWRITE = "enable_auto_create_when_overwrite"; + /** * If set false, user couldn't submit analyze SQL and FE won't allocate any related resources. */ @@ -2170,7 +2176,6 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) { }) public boolean enableFallbackOnMissingInvertedIndex = true; - @VariableMgr.VarAttr(name = IN_LIST_VALUE_COUNT_THRESHOLD, description = { "in条件value数量大于这个threshold后将不会走fast_execute", "When the number of values in the IN condition exceeds this threshold," @@ -2210,6 +2215,14 @@ public void setIgnoreShapePlanNodes(String ignoreShapePlanNodes) { @VariableMgr.VarAttr(name = ENABLE_COOLDOWN_REPLICA_AFFINITY, needForward = true) public boolean enableCooldownReplicaAffinity = true; + @VariableMgr.VarAttr(name = ENABLE_AUTO_CREATE_WHEN_OVERWRITE, description = { + "开启后对自动分区表的 insert overwrite 操作会对没有找到分区的插入数据按自动分区规则创建分区,默认关闭", + "The insert overwrite operation on an auto-partitioned table will create partitions for inserted data" + + " for which no partition is found according to the auto-partitioning rules, which is turned off" + + " by default." + }) + public boolean enableAutoCreateWhenOverwrite = false; + @VariableMgr.VarAttr(name = SKIP_CHECKING_ACID_VERSION_FILE, needForward = true, description = { "跳过检查 transactional hive 版本文件 '_orc_acid_version.'", "Skip checking transactional hive version file '_orc_acid_version.'" @@ -3826,6 +3839,7 @@ public TQueryOptions toThrift() { tResult.setAdaptivePipelineTaskSerialReadOnLimit(adaptivePipelineTaskSerialReadOnLimit); tResult.setInListValueCountThreshold(inListValueCountThreshold); tResult.setEnablePhraseQuerySequentialOpt(enablePhraseQuerySequentialOpt); + tResult.setEnableAutoCreateWhenOverwrite(enableAutoCreateWhenOverwrite); return tResult; } @@ -4390,6 +4404,10 @@ public int getMaxMsgSizeOfResultReceiver() { return this.maxMsgSizeOfResultReceiver; } + public boolean isEnableAutoCreateWhenOverwrite() { + return this.enableAutoCreateWhenOverwrite; + } + public TSerdeDialect getSerdeDialect() { switch (serdeDialect) { case "doris": diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java index 08ad231168b9f2..f4ae8f588737c1 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/StmtExecutor.java @@ -3120,7 +3120,7 @@ private void handleCtasRollback(TableName table) { } } - private void handleIotStmt() { + private void handleIotStmt() throws AnalysisException { ConnectContext.get().setSkipAuth(true); try { InsertOverwriteTableStmt iotStmt = (InsertOverwriteTableStmt) this.parsedStmt; @@ -3162,9 +3162,11 @@ private void handleOverwriteTable(InsertOverwriteTableStmt iotStmt) { return; } // after success create table insert data + // when overwrite table, allow auto partition or not is controlled by session variable. + boolean allowAutoPartition = context.getSessionVariable().isEnableAutoCreateWhenOverwrite(); try { parsedStmt = new NativeInsertStmt(tmpTableName, null, new LabelName(iotStmt.getDb(), iotStmt.getLabel()), - iotStmt.getQueryStmt(), iotStmt.getHints(), iotStmt.getCols(), true); + iotStmt.getQueryStmt(), iotStmt.getHints(), iotStmt.getCols(), allowAutoPartition); parsedStmt.setUserInfo(context.getCurrentUserIdentity()); execute(); if (MysqlStateType.ERR.equals(context.getState().getStateType())) { @@ -3234,6 +3236,7 @@ private void handleOverwritePartition(InsertOverwriteTableStmt iotStmt) { return; } // after success add tmp partitions + // when overwrite partition, auto creating is always disallowed. try { parsedStmt = new NativeInsertStmt(targetTableName, new PartitionNames(true, tempPartitionName), new LabelName(iotStmt.getDb(), iotStmt.getLabel()), iotStmt.getQueryStmt(), @@ -3276,24 +3279,9 @@ private void handleOverwritePartition(InsertOverwriteTableStmt iotStmt) { } } - /* - * TODO: support insert overwrite auto detect partition in legacy planner - */ - private void handleAutoOverwritePartition(InsertOverwriteTableStmt iotStmt) { - // TODO: - TableName targetTableName = new TableName(null, iotStmt.getDb(), iotStmt.getTbl()); - try { - parsedStmt = new NativeInsertStmt(targetTableName, null, new LabelName(iotStmt.getDb(), iotStmt.getLabel()), - iotStmt.getQueryStmt(), iotStmt.getHints(), iotStmt.getCols(), true).withAutoDetectOverwrite(); - parsedStmt.setUserInfo(context.getCurrentUserIdentity()); - execute(); - } catch (Exception e) { - LOG.warn("IOT insert data error, stmt={}", parsedStmt.toSql(), e); - context.getState().setError(ErrorCode.ERR_UNKNOWN_ERROR, "Unexpected exception: " + e.getMessage()); - handleIotRollback(targetTableName); - return; - } - + private void handleAutoOverwritePartition(InsertOverwriteTableStmt iotStmt) throws AnalysisException { + throw new AnalysisException( + "insert overwrite auto detect is not support in legacy planner. use nereids instead"); } private void handleIotRollback(TableName table) { diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 62a45260f80c9c..f531db3028224a 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -351,6 +351,7 @@ struct TQueryOptions { 136: optional bool enable_phrase_query_sequential_opt = true; + 137: optional bool enable_auto_create_when_overwrite = false; // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. diff --git a/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create.out b/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create.out new file mode 100644 index 00000000000000..594c0cfabde723 --- /dev/null +++ b/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create.out @@ -0,0 +1,24 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !origin -- +1234567 +Beijing +Shanghai +list +xxx + +-- !0 -- +SHANGHAI +zzz + +-- !1 -- +zzz2 + +-- !2 -- +1234567 +BEIJING +Shanghai +abcd +list +xxx +zzz2 + diff --git a/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create_many.out b/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create_many.out new file mode 100644 index 00000000000000..b52a4ecbc1ae9e --- /dev/null +++ b/regression-test/data/insert_overwrite_p0/test_iot_overwrite_and_create_many.out @@ -0,0 +1,15 @@ +-- This file is automatically generated. You should know what you did if you want to edit this +-- !sql1 -- +1234567 1 +Beijing 20000 +Shanghai 20000 +list 1 +xxx 1 +zzz 20000 + +-- !sql2 -- +Beijing 20000 +Shanghai 20000 +yyy 20000 +zzz 20000 + diff --git a/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create.groovy b/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create.groovy new file mode 100644 index 00000000000000..4d0b667dd44b84 --- /dev/null +++ b/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create.groovy @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iot_overwrite_and_create") { + sql "set enable_auto_create_when_overwrite = true;" + + sql " drop table if exists auto_list; " + sql """ + create table auto_list( + k0 varchar null + ) + auto partition by list (k0) + ( + PARTITION p1 values in (("Beijing"), ("BEIJING")), + PARTITION p2 values in (("Shanghai"), ("SHANGHAI")), + PARTITION p3 values in (("xxx"), ("XXX")), + PARTITION p4 values in (("list"), ("LIST")), + PARTITION p5 values in (("1234567"), ("7654321")) + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 1 + properties("replication_num" = "1"); + """ + sql """ insert into auto_list values ("Beijing"),("Shanghai"),("xxx"),("list"),("1234567"); """ + qt_origin "select * from auto_list order by k0;" + + sql """insert overwrite table auto_list values ("SHANGHAI"), ("zzz");""" + qt_0 "select * from auto_list order by k0;" + sql """insert overwrite table auto_list values ("zzz2");""" + qt_1 "select * from auto_list order by k0;" + + test{ + sql """insert overwrite table auto_list partition(p1, p2) values ("zzz");""" + exception "Insert has filtered data in strict mode." + } + test{ + sql """insert overwrite table auto_list partition(p3) values ("zzz3");""" + exception "Insert has filtered data in strict mode." + } + + sql """ insert into auto_list values ("Beijing"),("Shanghai"),("xxx"),("list"),("1234567"); """ + sql """insert overwrite table auto_list partition(*) values ("abcd"), ("BEIJING");""" + qt_2 "select * from auto_list order by k0;" + + sql "set enable_auto_create_when_overwrite = false;" + test{ + sql """insert overwrite table auto_list values ("zzz3");""" + exception "Insert has filtered data in strict mode." + } + test{ + sql """insert overwrite table auto_list partition(p1, p2) values ("zzz");""" + exception "Insert has filtered data in strict mode." + } + test{ + sql """insert overwrite table auto_list partition(*) values ("zzz3");""" + exception "Cannot found origin partitions in auto detect overwriting" + } +} diff --git a/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create_many.groovy b/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create_many.groovy new file mode 100644 index 00000000000000..dcade3ce211453 --- /dev/null +++ b/regression-test/suites/insert_overwrite_p0/test_iot_overwrite_and_create_many.groovy @@ -0,0 +1,64 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +suite("test_iot_overwrite_and_create_many") { + sql "set enable_auto_create_when_overwrite = true;" + + sql " drop table if exists target; " + sql """ + create table target( + k0 varchar null + ) + auto partition by list (k0) + ( + PARTITION p1 values in (("Beijing"), ("BEIJING")), + PARTITION p2 values in (("Shanghai"), ("SHANGHAI")), + PARTITION p3 values in (("xxx"), ("XXX")), + PARTITION p4 values in (("list"), ("LIST")), + PARTITION p5 values in (("1234567"), ("7654321")) + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 2 + properties("replication_num" = "1"); + """ + sql """ insert into target values ("Beijing"),("Shanghai"),("xxx"),("list"),("1234567"); """ + + sql " drop table if exists source; " + sql """ + create table source( + k0 varchar null + ) + DISTRIBUTED BY HASH(`k0`) BUCKETS 10 + properties("replication_num" = "1"); + """ + + sql """ insert into source select "Beijing" from numbers("number" = "20000"); """ + sql """ insert into source select "Shanghai" from numbers("number" = "20000"); """ + sql """ insert into source select "zzz" from numbers("number"= "20000"); """ + def result + result = sql " show partitions from target; " + logger.info("origin: ${result}") + + sql " insert overwrite table target partition(*) select * from source; " + result = sql " show partitions from target; " + logger.info("changed: ${result}") + + qt_sql1 " select k0, count(k0) from target group by k0 order by k0; " + + sql """ insert into source select "yyy" from numbers("number" = "20000"); """ + sql " insert overwrite table target select * from source; " + qt_sql2 " select k0, count(k0) from target group by k0 order by k0; " +}