Skip to content

Commit

Permalink
[Feature](Compaction)Support full compaction (apache#21177)
Browse files Browse the repository at this point in the history
  • Loading branch information
Yukang-Lian authored Jul 16, 2023
1 parent a7eb186 commit c409fa0
Show file tree
Hide file tree
Showing 16 changed files with 568 additions and 4 deletions.
3 changes: 3 additions & 0 deletions be/src/common/status.h
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,8 @@ E(CUMULATIVE_INVALID_NEED_MERGED_VERSIONS, -2004);
E(CUMULATIVE_ERROR_DELETE_ACTION, -2005);
E(CUMULATIVE_MISS_VERSION, -2006);
E(CUMULATIVE_CLONE_OCCURRED, -2007);
E(FULL_NO_SUITABLE_VERSION, -2008);
E(FULL_MISS_VERSION, -2009);
E(META_INVALID_ARGUMENT, -3000);
E(META_OPEN_DB_ERROR, -3001);
E(META_KEY_NOT_FOUND, -3002);
Expand Down Expand Up @@ -285,6 +287,7 @@ inline bool capture_stacktrace(int code) {
&& code != ErrorCode::PUSH_TRANSACTION_ALREADY_EXIST
&& code != ErrorCode::BE_NO_SUITABLE_VERSION
&& code != ErrorCode::CUMULATIVE_NO_SUITABLE_VERSION
&& code != ErrorCode::FULL_NO_SUITABLE_VERSION
&& code != ErrorCode::PUBLISH_VERSION_NOT_CONTINUOUS
&& code != ErrorCode::ROWSET_RENAME_FILE_FAILED
&& code != ErrorCode::SEGCOMPACTION_INIT_READER
Expand Down
17 changes: 16 additions & 1 deletion be/src/http/action/compaction_action.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
#include "olap/base_compaction.h"
#include "olap/cumulative_compaction.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/full_compaction.h"
#include "olap/olap_define.h"
#include "olap/storage_engine.h"
#include "olap/tablet_manager.h"
Expand Down Expand Up @@ -89,7 +90,8 @@ Status CompactionAction::_handle_run_compaction(HttpRequest* req, std::string* j
// check compaction_type equals 'base' or 'cumulative'
std::string compaction_type = req->param(PARAM_COMPACTION_TYPE);
if (compaction_type != PARAM_COMPACTION_BASE &&
compaction_type != PARAM_COMPACTION_CUMULATIVE) {
compaction_type != PARAM_COMPACTION_CUMULATIVE &&
compaction_type != PARAM_COMPACTION_FULL) {
return Status::NotSupported("The compaction type '{}' is not supported", compaction_type);
}

Expand Down Expand Up @@ -229,6 +231,19 @@ Status CompactionAction::_execute_compaction_callback(TabletSharedPtr tablet,
<< ", table=" << tablet->full_name();
}
}
} else if (compaction_type == PARAM_COMPACTION_FULL) {
FullCompaction full_compaction(tablet);
res = full_compaction.compact();
if (!res) {
if (res.is<FULL_NO_SUITABLE_VERSION>()) {
// Ignore this error code.
VLOG_NOTICE << "failed to init full compaction due to no suitable version,"
<< "tablet=" << tablet->full_name();
} else {
LOG(WARNING) << "failed to do full compaction. res=" << res
<< ", table=" << tablet->full_name();
}
}
}

timer.stop();
Expand Down
1 change: 1 addition & 0 deletions be/src/http/action/compaction_action.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ enum class CompactionActionType {
const std::string PARAM_COMPACTION_TYPE = "compact_type";
const std::string PARAM_COMPACTION_BASE = "base";
const std::string PARAM_COMPACTION_CUMULATIVE = "cumulative";
const std::string PARAM_COMPACTION_FULL = "full";

/// This action is used for viewing the compaction status.
/// See compaction-action.md for details.
Expand Down
3 changes: 2 additions & 1 deletion be/src/io/io_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ enum class ReaderType {
READER_CHECKSUM = 4,
READER_COLD_DATA_COMPACTION = 5,
READER_SEGMENT_COMPACTION = 6,
UNKNOWN = 7
READER_FULL_COMPACTION = 7,
UNKNOWN = 8
};

namespace io {
Expand Down
4 changes: 4 additions & 0 deletions be/src/olap/compaction.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -301,6 +301,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
_tablet->set_last_cumu_compaction_success_time(now);
} else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
_tablet->set_last_base_compaction_success_time(now);
} else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) {
_tablet->set_last_full_compaction_success_time(now);
}
auto cumu_policy = _tablet->cumulative_compaction_policy();
LOG(INFO) << "succeed to do ordered data " << compaction_name()
Expand Down Expand Up @@ -451,6 +453,8 @@ Status Compaction::do_compaction_impl(int64_t permits) {
_tablet->set_last_cumu_compaction_success_time(now);
} else if (compaction_type() == ReaderType::READER_BASE_COMPACTION) {
_tablet->set_last_base_compaction_success_time(now);
} else if (compaction_type() == ReaderType::READER_FULL_COMPACTION) {
_tablet->set_last_full_compaction_success_time(now);
}

int64_t current_max_version;
Expand Down
215 changes: 215 additions & 0 deletions be/src/olap/full_compaction.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "olap/full_compaction.h"

#include <glog/logging.h>
#include <time.h>

#include <memory>
#include <mutex>
#include <ostream>
#include <shared_mutex>

#include "common/config.h"
#include "common/status.h"
#include "olap/cumulative_compaction_policy.h"
#include "olap/olap_common.h"
#include "olap/olap_define.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/rowset.h"
#include "olap/schema_change.h"
#include "olap/tablet_meta.h"
#include "runtime/thread_context.h"
#include "util/thread.h"
#include "util/trace.h"

namespace doris {
using namespace ErrorCode;

FullCompaction::FullCompaction(const TabletSharedPtr& tablet)
: Compaction(tablet, "FullCompaction:" + std::to_string(tablet->tablet_id())) {}

FullCompaction::~FullCompaction() {}

Status FullCompaction::prepare_compact() {
if (!_tablet->init_succeeded()) {
return Status::Error<INVALID_ARGUMENT>("Full compaction init failed");
}

std::unique_lock full_lock(_tablet->get_full_compaction_lock());
std::unique_lock base_lock(_tablet->get_base_compaction_lock());
std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock());

// 1. pick rowsets to compact
RETURN_IF_ERROR(pick_rowsets_to_compact());
_tablet->set_clone_occurred(false);

return Status::OK();
}

Status FullCompaction::execute_compact_impl() {
std::unique_lock full_lock(_tablet->get_full_compaction_lock());
std::unique_lock base_lock(_tablet->get_base_compaction_lock());
std::unique_lock cumu_lock(_tablet->get_cumulative_compaction_lock());

// Clone task may happen after compaction task is submitted to thread pool, and rowsets picked
// for compaction may change. In this case, current compaction task should not be executed.
if (_tablet->get_clone_occurred()) {
_tablet->set_clone_occurred(false);
return Status::Error<BE_CLONE_OCCURRED>("get_clone_occurred failed");
}

SCOPED_ATTACH_TASK(_mem_tracker);

// 2. do full compaction, merge rowsets
int64_t permits = get_compaction_permits();
RETURN_IF_ERROR(do_compaction(permits));

// 3. set state to success
_state = CompactionState::SUCCESS;

// 4. set cumulative point
Version last_version = _input_rowsets.back()->version();
_tablet->cumulative_compaction_policy()->update_cumulative_point(_tablet.get(), _input_rowsets,
_output_rowset, last_version);
VLOG_CRITICAL << "after cumulative compaction, current cumulative point is "
<< _tablet->cumulative_layer_point() << ", tablet=" << _tablet->full_name();

return Status::OK();
}

Status FullCompaction::pick_rowsets_to_compact() {
_input_rowsets = _tablet->pick_candidate_rowsets_to_full_compaction();
RETURN_IF_ERROR(check_version_continuity(_input_rowsets));
RETURN_IF_ERROR(_check_all_version(_input_rowsets));
if (_input_rowsets.size() <= 1) {
return Status::Error<FULL_NO_SUITABLE_VERSION>("There is no suitable version");
}

if (_input_rowsets.size() == 2 && _input_rowsets[0]->end_version() == 1) {
// the tablet is with rowset: [0-1], [2-y]
// and [0-1] has no data. in this situation, no need to do full compaction.
return Status::Error<FULL_NO_SUITABLE_VERSION>("There is no suitable version");
}

return Status::OK();
}

Status FullCompaction::modify_rowsets(const Merger::Statistics* stats) {
if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
_tablet->enable_unique_key_merge_on_write()) {
RETURN_IF_ERROR(
_full_compaction_update_delete_bitmap(_output_rowset, _output_rs_writer.get()));
}
std::vector<RowsetSharedPtr> output_rowsets(1, _output_rowset);
RETURN_IF_ERROR(_tablet->modify_rowsets(output_rowsets, _input_rowsets, true));
_tablet->save_meta();
return Status::OK();
}

Status FullCompaction::_check_all_version(const std::vector<RowsetSharedPtr>& rowsets) {
if (rowsets.empty()) {
return Status::Error<FULL_MISS_VERSION>("There is no input rowset when do full compaction");
}
const RowsetSharedPtr& last_rowset = rowsets.back();
const RowsetSharedPtr& first_rowset = rowsets.front();
if (last_rowset->version() != _tablet->max_version() || first_rowset->version().first != 0) {
return Status::Error<FULL_MISS_VERSION>(
"Full compaction rowsets' versions not equal to all exist rowsets' versions. "
"full compaction rowsets max version={}-{}"
", current rowsets max version={}-{}"
"full compaction rowsets min version={}-{}, current rowsets min version=0-1",
last_rowset->start_version(), last_rowset->end_version(),
_tablet->max_version().first, _tablet->max_version().second,
first_rowset->start_version(), first_rowset->end_version());
}
return Status::OK();
}

Status FullCompaction::_full_compaction_update_delete_bitmap(const RowsetSharedPtr& rowset,
RowsetWriter* rowset_writer) {
std::vector<RowsetSharedPtr> tmp_rowsets {};

// tablet is under alter process. The delete bitmap will be calculated after conversion.
if (_tablet->tablet_state() == TABLET_NOTREADY &&
SchemaChangeHandler::tablet_in_converting(_tablet->tablet_id())) {
LOG(INFO) << "tablet is under alter process, update delete bitmap later, tablet_id="
<< _tablet->tablet_id();
return Status::OK();
}

int64_t max_version = _tablet->max_version().second;
DCHECK(max_version >= rowset->version().second);
if (max_version > rowset->version().second) {
_tablet->capture_consistent_rowsets({rowset->version().second + 1, max_version},
&tmp_rowsets);
}

for (const auto& it : tmp_rowsets) {
const int64_t& cur_version = it->rowset_meta()->start_version();
RETURN_IF_ERROR(
_full_compaction_calc_delete_bitmap(it, rowset, cur_version, rowset_writer));
}

std::lock_guard rowset_update_lock(_tablet->get_rowset_update_lock());
std::lock_guard header_lock(_tablet->get_header_lock());
for (const auto& it : _tablet->rowset_map()) {
const int64_t& cur_version = it.first.first;
const RowsetSharedPtr& published_rowset = it.second;
if (cur_version > max_version) {
RETURN_IF_ERROR(_full_compaction_calc_delete_bitmap(published_rowset, rowset,
cur_version, rowset_writer));
}
}

return Status::OK();
}

Status FullCompaction::_full_compaction_calc_delete_bitmap(const RowsetSharedPtr& published_rowset,
const RowsetSharedPtr& rowset,
const int64_t& cur_version,
RowsetWriter* rowset_writer) {
std::vector<segment_v2::SegmentSharedPtr> segments;
auto beta_rowset = reinterpret_cast<BetaRowset*>(published_rowset.get());
RETURN_IF_ERROR(beta_rowset->load_segments(&segments));
DeleteBitmapPtr delete_bitmap =
std::make_shared<DeleteBitmap>(_tablet->tablet_meta()->tablet_id());
std::vector<RowsetSharedPtr> specified_rowsets(1, rowset);

OlapStopWatch watch;
RETURN_IF_ERROR(_tablet->calc_delete_bitmap(published_rowset, segments, specified_rowsets,
delete_bitmap, cur_version, rowset_writer));
size_t total_rows = std::accumulate(
segments.begin(), segments.end(), 0,
[](size_t sum, const segment_v2::SegmentSharedPtr& s) { return sum += s->num_rows(); });
VLOG_DEBUG << "[Full compaction] construct delete bitmap tablet: " << _tablet->tablet_id()
<< ", published rowset version: [" << published_rowset->version().first << "-"
<< published_rowset->version().second << "]"
<< ", full compaction rowset version: [" << rowset->version().first << "-"
<< rowset->version().second << "]"
<< ", cost: " << watch.get_elapse_time_us() << "(us), total rows: " << total_rows;

for (const auto& [k, v] : delete_bitmap->delete_bitmap) {
_tablet->tablet_meta()->delete_bitmap().merge({std::get<0>(k), std::get<1>(k), cur_version},
v);
}

return Status::OK();
}

} // namespace doris
62 changes: 62 additions & 0 deletions be/src/olap/full_compaction.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include <butil/macros.h>

#include <string>
#include <vector>

#include "common/status.h"
#include "io/io_common.h"
#include "olap/compaction.h"
#include "olap/rowset/rowset.h"
#include "olap/tablet.h"

namespace doris {

class FullCompaction : public Compaction {
public:
FullCompaction(const TabletSharedPtr& tablet);
~FullCompaction() override;

Status prepare_compact() override;
Status execute_compact_impl() override;
Status modify_rowsets(const Merger::Statistics* stats = nullptr) override;

std::vector<RowsetSharedPtr> get_input_rowsets() { return _input_rowsets; }

protected:
Status pick_rowsets_to_compact() override;
std::string compaction_name() const override { return "full compaction"; }

ReaderType compaction_type() const override { return ReaderType::READER_FULL_COMPACTION; }

private:
Status _check_all_version(const std::vector<RowsetSharedPtr>& rowsets);
Status _full_compaction_update_delete_bitmap(const RowsetSharedPtr& rowset,
RowsetWriter* rowset_writer);
Status _full_compaction_calc_delete_bitmap(const RowsetSharedPtr& published_rowset,
const RowsetSharedPtr& rowset,
const int64_t& cur_version,
RowsetWriter* rowset_writer);

DISALLOW_COPY_AND_ASSIGN(FullCompaction);
};

} // namespace doris
2 changes: 1 addition & 1 deletion be/src/olap/olap_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ using uint128_t = unsigned __int128;

using TabletUid = UniqueId;

enum CompactionType { BASE_COMPACTION = 1, CUMULATIVE_COMPACTION = 2 };
enum CompactionType { BASE_COMPACTION = 1, CUMULATIVE_COMPACTION = 2, FULL_COMPACTION = 3 };

struct DataDirInfo {
std::string path;
Expand Down
2 changes: 2 additions & 0 deletions be/src/olap/reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,7 @@ Status TabletReader::_init_return_columns(const ReaderParams& read_params) {
} else if ((read_params.reader_type == ReaderType::READER_CUMULATIVE_COMPACTION ||
read_params.reader_type == ReaderType::READER_SEGMENT_COMPACTION ||
read_params.reader_type == ReaderType::READER_BASE_COMPACTION ||
read_params.reader_type == ReaderType::READER_FULL_COMPACTION ||
read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION ||
read_params.reader_type == ReaderType::READER_ALTER_TABLE) &&
!read_params.return_columns.empty()) {
Expand Down Expand Up @@ -613,6 +614,7 @@ Status TabletReader::_init_delete_condition(const ReaderParams& read_params) {
// QUERY will filter the row in query layer to keep right result use where clause.
// CUMULATIVE_COMPACTION will lost the filter_delete info of base rowset
if (read_params.reader_type == ReaderType::READER_BASE_COMPACTION ||
read_params.reader_type == ReaderType::READER_FULL_COMPACTION ||
read_params.reader_type == ReaderType::READER_COLD_DATA_COMPACTION ||
read_params.reader_type == ReaderType::READER_CHECKSUM) {
_filter_delete = true;
Expand Down
Loading

0 comments on commit c409fa0

Please sign in to comment.