Skip to content

Commit

Permalink
[Opt](Variant) merge schema in sync_rowsets to prevents from CPU over…
Browse files Browse the repository at this point in the history
…head each time describe table

Should prevent from merge schema each time calling `merged_tablet_schema`. So this pr put the merge logic in `sync_rowsets` stage.
  • Loading branch information
eldenmoon committed Oct 29, 2024
1 parent 56fc08e commit d6ca48a
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 11 deletions.
45 changes: 35 additions & 10 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,36 @@ Status CloudTablet::capture_rs_readers(const Version& spec_version,
return capture_rs_readers_unlocked(version_path, rs_splits);
}

Status CloudTablet::merge_rowsets_schema() {
// Find the rowset with the max version
auto max_version_rowset =
std::max_element(
_rs_version_map.begin(), _rs_version_map.end(),
[](const auto& a, const auto& b) {
return !a.second->tablet_schema()
? true
: (!b.second->tablet_schema()
? false
: a.second->tablet_schema()->schema_version() <
b.second->tablet_schema()
->schema_version());
})
->second;
TabletSchemaSPtr max_version_schema = max_version_rowset->tablet_schema();
// If the schema has variant columns, perform a merge to create a wide tablet schema
if (max_version_schema->num_variant_columns() > 0) {
std::vector<TabletSchemaSPtr> schemas;
std::transform(_rs_version_map.begin(), _rs_version_map.end(), std::back_inserter(schemas),
[](const auto& rs_meta) { return rs_meta.second->tablet_schema(); });
// Merge the collected schemas to obtain the least common schema
RETURN_IF_ERROR(vectorized::schema_util::get_least_common_schema(schemas, nullptr,
max_version_schema));
VLOG_DEBUG << "dump schema: " << max_version_schema->dump_full_schema();
_merged_tablet_schema = max_version_schema;
}
return Status::OK();
}

// There are only two tablet_states RUNNING and NOT_READY in cloud mode
// This function will erase the tablet from `CloudTabletMgr` when it can't find this tablet in MS.
Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data) {
Expand All @@ -133,6 +163,10 @@ Status CloudTablet::sync_rowsets(int64_t query_version, bool warmup_delta_data)
if (st.is<ErrorCode::NOT_FOUND>()) {
clear_cache();
}

// Merge all rowset schemas within a CloudTablet
RETURN_IF_ERROR(merge_rowsets_schema());

return st;
}

Expand Down Expand Up @@ -188,16 +222,7 @@ Status CloudTablet::sync_if_not_running() {
}

TabletSchemaSPtr CloudTablet::merged_tablet_schema() const {
std::shared_lock rdlock(_meta_lock);
TabletSchemaSPtr target_schema;
std::vector<TabletSchemaSPtr> schemas;
for (const auto& [_, rowset] : _rs_version_map) {
schemas.push_back(rowset->tablet_schema());
}
// get the max version schema and merge all schema
static_cast<void>(
vectorized::schema_util::get_least_common_schema(schemas, nullptr, target_schema));
return target_schema;
return _merged_tablet_schema;
}

void CloudTablet::add_rowsets(std::vector<RowsetSharedPtr> to_add, bool version_overlap,
Expand Down
6 changes: 6 additions & 0 deletions be/src/cloud/cloud_tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,9 @@ class CloudTablet final : public BaseTablet {

Status sync_if_not_running();

// Merge all rowset schemas within a CloudTablet
Status merge_rowsets_schema();

CloudStorageEngine& _engine;

// this mutex MUST ONLY be used when sync meta
Expand Down Expand Up @@ -246,6 +249,9 @@ class CloudTablet final : public BaseTablet {
std::mutex _base_compaction_lock;
std::mutex _cumulative_compaction_lock;
mutable std::mutex _rowset_update_lock;

// Schema will be merged from all rowsets when sync_rowsets
TabletSchemaSPtr _merged_tablet_schema;
};

using CloudTabletSPtr = std::shared_ptr<CloudTablet>;
Expand Down
5 changes: 4 additions & 1 deletion be/src/service/internal_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1159,7 +1159,10 @@ void PInternalService::fetch_remote_tablet_schema(google::protobuf::RpcControlle
LOG(WARNING) << "tablet does not exist, tablet id is " << tablet_id;
continue;
}
tablet_schemas.push_back(res.value()->merged_tablet_schema());
auto schema = res.value()->merged_tablet_schema();
if (schema != nullptr) {
tablet_schemas.push_back(schema);
}
}
if (!tablet_schemas.empty()) {
// merge all
Expand Down

0 comments on commit d6ca48a

Please sign in to comment.