Skip to content

Commit

Permalink
[fix](delete) fix potential delete fail after adding columns (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
TangSiyang2001 authored Oct 24, 2023
1 parent b82136b commit bab7581
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 38 deletions.
47 changes: 35 additions & 12 deletions be/src/olap/delete_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,27 @@ Status DeleteHandler::check_condition_valid(const TabletSchema& schema, const TC
}
}

if (!cond.__isset.column_unique_id) {
LOG(WARNING) << "column=" << cond.column_name
<< " in predicate does not have uid, table id=" << schema.table_id();
// TODO(tsy): make it fail here after FE forbidding hard-link-schema-change
return Status::OK();
}
if (schema.field_index(cond.column_unique_id) == -1) {
const auto& err_msg =
fmt::format("column id does not exists in table={}, schema version={},",
schema.table_id(), schema.schema_version());
return Status::Error<DELETE_INVALID_CONDITION>(err_msg);
}
if (!iequal(schema.column_by_uid(cond.column_unique_id).name(), cond.column_name)) {
const auto& err_msg = fmt::format(
"colum name={} does not belongs to column uid={}, which column name={}, "
"delete_cond.column_name ={}",
cond.column_name, cond.column_unique_id,
schema.column_by_uid(cond.column_unique_id).name(), cond.column_name);
return Status::Error<DELETE_INVALID_CONDITION>(err_msg);
}

return Status::OK();
}

Expand Down Expand Up @@ -316,8 +337,12 @@ Status DeleteHandler::_parse_column_pred(TabletSchemaSPtr complete_schema,
for (const auto& sub_predicate : sub_pred_list) {
TCondition condition;
RETURN_IF_ERROR(parse_condition(sub_predicate, &condition));
int32_t col_unique_id =
delete_pred_related_schema->column(condition.column_name).unique_id();
int32_t col_unique_id;
if constexpr (std::is_same_v<SubPredType, DeletePredicatePB>) {
col_unique_id = sub_predicate.col_unique_id;
} else {
col_unique_id = delete_pred_related_schema->column(condition.column_name).unique_id();
}
condition.__set_column_unique_id(col_unique_id);
const auto& column = complete_schema->column_by_uid(col_unique_id);
uint32_t index = complete_schema->field_index(col_unique_id);
Expand Down Expand Up @@ -345,7 +370,7 @@ Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
bool with_sub_pred_v2) {
DCHECK(!_is_inited) << "reinitialize delete handler.";
DCHECK(version >= 0) << "invalid parameters. version=" << version;
_predicate_arena.reset(new vectorized::Arena());
_predicate_arena = std::make_unique<vectorized::Arena>();

for (const auto& delete_pred : delete_preds) {
// Skip the delete condition with large version
Expand All @@ -354,7 +379,7 @@ Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
}
// Need the tablet schema at the delete condition to parse the accurate column
const auto& delete_pred_related_schema = delete_pred->tablet_schema();
auto& delete_condition = delete_pred->delete_predicate();
const auto& delete_condition = delete_pred->delete_predicate();
DeleteConditions temp;
temp.filter_version = delete_pred->version().first;
if (with_sub_pred_v2) {
Expand All @@ -368,8 +393,8 @@ Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
for (const auto& in_predicate : delete_condition.in_predicates()) {
TCondition condition;
condition.__set_column_name(in_predicate.column_name());
condition.__set_column_unique_id(
delete_pred_related_schema->column(condition.column_name).unique_id());
auto col_unique_id = in_predicate.column_unique_id();
condition.__set_column_unique_id(col_unique_id);

if (in_predicate.is_not_in()) {
condition.__set_condition_op("!*=");
Expand All @@ -379,8 +404,6 @@ Status DeleteHandler::init(TabletSchemaSPtr tablet_schema,
for (const auto& value : in_predicate.values()) {
condition.condition_values.push_back(value);
}
int32_t col_unique_id =
delete_pred_related_schema->column(condition.column_name).unique_id();
const auto& column = tablet_schema->column_by_uid(col_unique_id);
uint32_t index = tablet_schema->field_index(col_unique_id);
temp.column_predicate_vec.push_back(
Expand All @@ -401,7 +424,7 @@ void DeleteHandler::finalize() {
}

for (auto& cond : _del_conds) {
for (auto pred : cond.column_predicate_vec) {
for (const auto* pred : cond.column_predicate_vec) {
delete pred;
}
}
Expand All @@ -414,12 +437,12 @@ void DeleteHandler::get_delete_conditions_after_version(
int64_t version, AndBlockColumnPredicate* and_block_column_predicate_ptr,
std::unordered_map<int32_t, std::vector<const ColumnPredicate*>>*
del_predicates_for_zone_map) const {
for (auto& del_cond : _del_conds) {
for (const auto& del_cond : _del_conds) {
if (del_cond.filter_version > version) {
// now, only query support delete column predicate operator
if (!del_cond.column_predicate_vec.empty()) {
if (del_cond.column_predicate_vec.size() == 1) {
auto single_column_block_predicate =
auto* single_column_block_predicate =
new SingleColumnBlockPredicate(del_cond.column_predicate_vec[0]);
and_block_column_predicate_ptr->add_column_predicate(
single_column_block_predicate);
Expand All @@ -432,7 +455,7 @@ void DeleteHandler::get_delete_conditions_after_version(
(*del_predicates_for_zone_map)[del_cond.column_predicate_vec[0]->column_id()]
.push_back(del_cond.column_predicate_vec[0]);
} else {
auto or_column_predicate = new OrBlockColumnPredicate();
auto* or_column_predicate = new OrBlockColumnPredicate();

// build or_column_predicate
// when delete from where a = 1 and b = 2, we can not use del_predicates_for_zone_map to filter zone page,
Expand Down
2 changes: 1 addition & 1 deletion be/src/olap/delete_handler.h
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class DeleteHandler {
// * Status::Error<DELETE_INVALID_PARAMETERS>(): input parameters are not valid
// * Status::Error<MEM_ALLOC_FAILED>(): alloc memory failed
Status init(TabletSchemaSPtr tablet_schema,
const std::vector<RowsetMetaSharedPtr>& delete_conditions, int64_t version,
const std::vector<RowsetMetaSharedPtr>& delete_preds, int64_t version,
bool with_sub_pred_v2 = false);

[[nodiscard]] bool empty() const { return _del_conds.empty(); }
Expand Down
25 changes: 0 additions & 25 deletions be/src/olap/push_handler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -141,31 +141,6 @@ Status PushHandler::_do_streaming_ingestion(TabletSharedPtr tablet, const TPushR
DeletePredicatePB del_pred;
TabletSchema tablet_schema;
tablet_schema.copy_from(*tablet->tablet_schema());
for (const auto& delete_cond : request.delete_conditions) {
if (!delete_cond.__isset.column_unique_id) {
LOG(WARNING) << "column=" << delete_cond.column_name
<< " in predicate does not have uid, table id="
<< tablet_schema.table_id();
// TODO(tsy): make it fail here after FE forbidding hard-link-schema-change
continue;
}
if (tablet_schema.field_index(delete_cond.column_unique_id) == -1) {
const auto& err_msg =
fmt::format("column id={} does not exists, table id={}",
delete_cond.column_unique_id, tablet_schema.table_id());
return Status::Aborted(err_msg);
}
if (!iequal(tablet_schema.column_by_uid(delete_cond.column_unique_id).name(),
delete_cond.column_name)) {
const auto& err_msg = fmt::format(
"colum name={} does not belongs to column uid={}, which column name={}, "
"delete_cond.column_name ={}",
delete_cond.column_name, delete_cond.column_unique_id,
tablet_schema.column_by_uid(delete_cond.column_unique_id).name(),
delete_cond.column_name);
return Status::Aborted(err_msg);
}
}
if (!request.columns_desc.empty() && request.columns_desc[0].col_unique_id >= 0) {
tablet_schema.clear_columns();
for (const auto& column_desc : request.columns_desc) {
Expand Down
1 change: 1 addition & 0 deletions be/src/olap/tablet_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -761,6 +761,7 @@ void TabletSchema::copy_from(const TabletSchema& tablet_schema) {
TabletSchemaPB tablet_schema_pb;
tablet_schema.to_schema_pb(&tablet_schema_pb);
init_from_pb(tablet_schema_pb);
_table_id = tablet_schema.table_id();
}

std::string TabletSchema::to_key() const {
Expand Down

0 comments on commit bab7581

Please sign in to comment.