Skip to content

Commit

Permalink
[Feature](insert-overwrite) Support create partition for auto partiti…
Browse files Browse the repository at this point in the history
…on table when insert overwrite (apache#38628)

introduced session variable `enable_auto_create_when_overwrite`

when it's true:
1. `insert overwrite table auto_partition_table [values xxx| select
xxx]` support overwrite old datas and create partition(s) for new datas
if need.
2. `insert overwrite table auto_partition_table partition(*) [values
xxx| select xxx]` support overwrite old datas for values-relative
partitions(as it was before) and create partition(s) for new datas if
need.

doc pr: apache/doris-website#936
  • Loading branch information
zclllyybb authored Oct 28, 2024
1 parent 45d0e01 commit f3c129d
Show file tree
Hide file tree
Showing 16 changed files with 355 additions and 104 deletions.
1 change: 1 addition & 0 deletions be/src/exec/tablet_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -788,6 +788,7 @@ Status VOlapTablePartitionParam::replace_partitions(

// add new partitions with new id.
_partitions.emplace_back(part);
VLOG_NOTICE << "params add new partition " << part->id;

// replace items in _partition_maps
if (_is_in_partition) {
Expand Down
2 changes: 1 addition & 1 deletion be/src/pipeline/exec/exchange_sink_operator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ Status ExchangeSinkLocalState::_send_new_partition_batch() {
vectorized::Block tmp_block =
_row_distribution._batching_block->to_block(); // Borrow out, for lval ref
auto& p = _parent->cast<ExchangeSinkOperatorX>();
// these order is only.
// these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
Expand Down
177 changes: 117 additions & 60 deletions be/src/vec/sink/vrow_distribution.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

#include <cstdint>
#include <memory>
#include <sstream>
#include <string>

#include "common/logging.h"
#include "common/status.h"
Expand Down Expand Up @@ -116,6 +116,10 @@ Status VRowDistribution::automatic_create_partition() {
if (result.status.status_code == TStatusCode::OK) {
// add new created partitions
RETURN_IF_ERROR(_vpartition->add_partitions(result.partitions));
for (const auto& part : result.partitions) {
_new_partition_ids.insert(part.id);
VLOG_TRACE << "record new id: " << part.id;
}
RETURN_IF_ERROR(_create_partition_callback(_caller, &result));
}

Expand All @@ -134,7 +138,7 @@ static TCreatePartitionResult cast_as_create_result(TReplacePartitionResult& arg

// use _partitions and replace them
Status VRowDistribution::_replace_overwriting_partition() {
SCOPED_TIMER(_add_partition_request_timer);
SCOPED_TIMER(_add_partition_request_timer); // also for replace_partition
TReplacePartitionRequest request;
TReplacePartitionResult result;
request.__set_overwrite_group_id(_vpartition->get_overwrite_group_id());
Expand All @@ -144,16 +148,20 @@ Status VRowDistribution::_replace_overwriting_partition() {
// only request for partitions not recorded for replacement
std::set<int64_t> id_deduper;
for (const auto* part : _partitions) {
if (part == nullptr) [[unlikely]] {
return Status::InternalError(
"Cannot found origin partitions in auto detect overwriting, stop processing");
}
if (_new_partition_ids.contains(part->id)) {
// this is a new partition. dont replace again.
} else {
// request for replacement
id_deduper.insert(part->id);
}
if (part != nullptr) {
if (_new_partition_ids.contains(part->id)) {
// this is a new partition. dont replace again.
VLOG_TRACE << "skip new partition: " << part->id;
} else {
// request for replacement
id_deduper.insert(part->id);
}
} else if (_missing_map.empty()) {
// no origin partition. and not allow to create.
return Status::InvalidArgument(
"Cannot found origin partitions in auto detect overwriting, stop "
"processing");
} // else: part is null and _missing_map is not empty. dealed outside using auto-partition way. nothing to do here.
}
if (id_deduper.empty()) {
return Status::OK(); // no need to request
Expand Down Expand Up @@ -182,6 +190,7 @@ Status VRowDistribution::_replace_overwriting_partition() {
// record new partitions
for (const auto& part : result.partitions) {
_new_partition_ids.insert(part.id);
VLOG_TRACE << "record new id: " << part.id;
}
// replace data in _partitions
RETURN_IF_ERROR(_vpartition->replace_partitions(request_part_ids, result.partitions));
Expand Down Expand Up @@ -304,6 +313,52 @@ Status VRowDistribution::_generate_rows_distribution_for_non_auto_partition(
return Status::OK();
}

Status VRowDistribution::_deal_missing_map(vectorized::Block* block,
const std::vector<uint16_t>& partition_cols_idx,
int64_t& rows_stat_val) {
// for missing partition keys, calc the missing partition and save in _partitions_need_create
auto [part_ctxs, part_exprs] = _get_partition_function();
auto part_col_num = part_exprs.size();
// the two vectors are in column-first-order
std::vector<std::vector<std::string>> col_strs;
std::vector<const NullMap*> col_null_maps;
col_strs.resize(part_col_num);
col_null_maps.reserve(part_col_num);

for (int i = 0; i < part_col_num; ++i) {
auto return_type = part_exprs[i]->data_type();
// expose the data column. the return type would be nullable
const auto& [range_left_col, col_const] =
unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
if (range_left_col->is_nullable()) {
col_null_maps.push_back(&(
assert_cast<const ColumnNullable*>(range_left_col.get())->get_null_map_data()));
} else {
col_null_maps.push_back(nullptr);
}
for (auto row : _missing_map) {
col_strs[i].push_back(
return_type->to_string(*range_left_col, index_check_const(row, col_const)));
}
}

// calc the end value and save them. in the end of sending, we will create partitions for them and deal them.
RETURN_IF_ERROR(
_save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps));

size_t new_bt_rows = _batching_block->rows();
size_t new_bt_bytes = _batching_block->bytes();
rows_stat_val -= new_bt_rows - _batching_rows;
_state->update_num_rows_load_total(_batching_rows - new_bt_rows);
_state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes);
DorisMetrics::instance()->load_rows->increment(_batching_rows - new_bt_rows);
DorisMetrics::instance()->load_bytes->increment(_batching_bytes - new_bt_bytes);
_batching_rows = new_bt_rows;
_batching_bytes = new_bt_bytes;

return Status::OK();
}

Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
Expand All @@ -329,63 +384,64 @@ Status VRowDistribution::_generate_rows_distribution_for_auto_partition(
RETURN_IF_ERROR(_filter_block(block, row_part_tablet_ids));

if (!_missing_map.empty()) {
// for missing partition keys, calc the missing partition and save in _partitions_need_create
auto [part_ctxs, part_exprs] = _get_partition_function();
auto part_col_num = part_exprs.size();
// the two vectors are in column-first-order
std::vector<std::vector<std::string>> col_strs;
std::vector<const NullMap*> col_null_maps;
col_strs.resize(part_col_num);
col_null_maps.reserve(part_col_num);

for (int i = 0; i < part_col_num; ++i) {
auto return_type = part_exprs[i]->data_type();
// expose the data column. the return type would be nullable
const auto& [range_left_col, col_const] =
unpack_if_const(block->get_by_position(partition_cols_idx[i]).column);
if (range_left_col->is_nullable()) {
col_null_maps.push_back(&(assert_cast<const ColumnNullable*>(range_left_col.get())
->get_null_map_data()));
} else {
col_null_maps.push_back(nullptr);
}
for (auto row : _missing_map) {
col_strs[i].push_back(
return_type->to_string(*range_left_col, index_check_const(row, col_const)));
}
}

// calc the end value and save them. in the end of sending, we will create partitions for them and deal them.
RETURN_IF_ERROR(
_save_missing_values(col_strs, part_col_num, block, _missing_map, col_null_maps));

size_t new_bt_rows = _batching_block->rows();
size_t new_bt_bytes = _batching_block->bytes();
rows_stat_val -= new_bt_rows - _batching_rows;
_state->update_num_rows_load_total(_batching_rows - new_bt_rows);
_state->update_num_bytes_load_total(_batching_bytes - new_bt_bytes);
DorisMetrics::instance()->load_rows->increment(_batching_rows - new_bt_rows);
DorisMetrics::instance()->load_bytes->increment(_batching_bytes - new_bt_bytes);
_batching_rows = new_bt_rows;
_batching_bytes = new_bt_bytes;
RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val));
}
return Status::OK();
}

Status VRowDistribution::_generate_rows_distribution_for_auto_overwrite(
vectorized::Block* block, bool has_filtered_rows,
std::vector<RowPartTabletIds>& row_part_tablet_ids) {
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val) {
auto num_rows = block->rows();

// for non-auto-partition situation, goes into two 'else' branch. just find the origin partitions, replace them by rpc,
// and find the new partitions to use.
// for auto-partition's, find and save origins in _partitions and replace them. at meanwhile save the missing values for auto
// partition. then we find partition again to get replaced partitions in _partitions. this time _missing_map is ignored cuz
// we already saved missing values.
bool stop_processing = false;
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
_tablet_indexes, stop_processing, _skip));
if (_vpartition->is_auto_partition() &&
_state->query_options().enable_auto_create_when_overwrite) {
// allow auto create partition for missing rows.
std::vector<uint16_t> partition_keys = _vpartition->get_partition_keys();
auto partition_col = block->get_by_position(partition_keys[0]);
_missing_map.clear();
_missing_map.reserve(partition_col.column->size());

RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
_tablet_indexes, stop_processing, _skip,
&_missing_map));

// allow and really need to create during auto-detect-overwriting.
if (!_missing_map.empty()) {
RETURN_IF_ERROR(_deal_missing_map(block, partition_cols_idx, rows_stat_val));
}
} else {
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
_tablet_indexes, stop_processing, _skip));
}
RETURN_IF_ERROR(_replace_overwriting_partition());

// regenerate locations for new partitions & tablets
_reset_find_tablets(num_rows);
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
_tablet_indexes, stop_processing, _skip));
if (_vpartition->is_auto_partition() &&
_state->query_options().enable_auto_create_when_overwrite) {
// here _missing_map is just a placeholder
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
_tablet_indexes, stop_processing, _skip,
&_missing_map));
if (VLOG_TRACE_IS_ON) {
std::string tmp;
for (auto v : _missing_map) {
tmp += std::to_string(v).append(", ");
}
VLOG_TRACE << "Trace missing map of " << this << ':' << tmp;
}
} else {
RETURN_IF_ERROR(_tablet_finder->find_tablets(_state, block, num_rows, _partitions,
_tablet_indexes, stop_processing, _skip));
}
if (has_filtered_rows) {
for (int i = 0; i < num_rows; i++) {
_skip[i] = _skip[i] || _block_convertor->filter_map()[i];
Expand Down Expand Up @@ -456,10 +512,11 @@ Status VRowDistribution::generate_rows_distribution(
}

Status st = Status::OK();
if (_vpartition->is_auto_detect_overwrite()) {
if (_vpartition->is_auto_detect_overwrite() && !_deal_batched) {
// when overwrite, no auto create partition allowed.
st = _generate_rows_distribution_for_auto_overwrite(block.get(), has_filtered_rows,
row_part_tablet_ids);
st = _generate_rows_distribution_for_auto_overwrite(block.get(), partition_cols_idx,
has_filtered_rows, row_part_tablet_ids,
rows_stat_val);
} else if (_vpartition->is_auto_partition() && !_deal_batched) {
st = _generate_rows_distribution_for_auto_partition(block.get(), partition_cols_idx,
has_filtered_rows, row_part_tablet_ids,
Expand Down
9 changes: 7 additions & 2 deletions be/src/vec/sink/vrow_distribution.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,14 +162,19 @@ class VRowDistribution {
vectorized::Block* block, const std::vector<uint16_t>& partition_col_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
// the whole process to deal missing rows. will call _save_missing_values
Status _deal_missing_map(vectorized::Block* block,
const std::vector<uint16_t>& partition_cols_idx,
int64_t& rows_stat_val);

Status _generate_rows_distribution_for_non_auto_partition(
vectorized::Block* block, bool has_filtered_rows,
std::vector<RowPartTabletIds>& row_part_tablet_ids);

Status _generate_rows_distribution_for_auto_overwrite(
vectorized::Block* block, bool has_filtered_rows,
std::vector<RowPartTabletIds>& row_part_tablet_ids);
vectorized::Block* block, const std::vector<uint16_t>& partition_cols_idx,
bool has_filtered_rows, std::vector<RowPartTabletIds>& row_part_tablet_ids,
int64_t& rows_stat_val);
Status _replace_overwriting_partition();

void _reset_row_part_tablet_ids(std::vector<RowPartTabletIds>& row_part_tablet_ids,
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1422,7 +1422,7 @@ Status VTabletWriter::_send_new_partition_batch() {

Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref

// these order is only.
// these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
Expand Down
2 changes: 1 addition & 1 deletion be/src/vec/sink/writer/vtablet_writer_v2.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ Status VTabletWriterV2::_send_new_partition_batch() {

Block tmp_block = _row_distribution._batching_block->to_block(); // Borrow out, for lval ref

// these order is only.
// these order is unique.
// 1. clear batching stats(and flag goes true) so that we won't make a new batching process in dealing batched block.
// 2. deal batched block
// 3. now reuse the column of lval block. cuz write doesn't real adjust it. it generate a new block from that.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,6 @@ public class NativeInsertStmt extends InsertStmt {

boolean hasEmptyTargetColumns = false;
private boolean allowAutoPartition = true;
private boolean withAutoDetectOverwrite = false;

enum InsertType {
NATIVE_INSERT("insert_"),
Expand Down Expand Up @@ -333,11 +332,6 @@ public boolean isTransactionBegin() {
return isTransactionBegin;
}

public NativeInsertStmt withAutoDetectOverwrite() {
this.withAutoDetectOverwrite = true;
return this;
}

protected void preCheckAnalyze(Analyzer analyzer) throws UserException {
super.analyze(analyzer);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -615,6 +615,10 @@ public LogicalPlan visitInsertTable(InsertTableContext ctx) {
LogicalPlan plan = visitQuery(ctx.query());
// partitionSpec may be NULL. means auto detect partition. only available when IOT
Pair<Boolean, List<String>> partitionSpec = visitPartitionSpec(ctx.partitionSpec());
// partitionSpec.second :
// null - auto detect
// zero - whole table
// others - specific partitions
boolean isAutoDetect = partitionSpec.second == null;
LogicalSink<?> sink = UnboundTableSinkCreator.createUnboundTableSinkMaybeOverwrite(
tableName.build(),
Expand Down
Loading

0 comments on commit f3c129d

Please sign in to comment.