Skip to content

Commit

Permalink
Merge branch 'apache:master' into master
Browse files Browse the repository at this point in the history
  • Loading branch information
liujiwen-up authored Jun 20, 2024
2 parents 6347cc0 + 24b1d3b commit 111d697
Show file tree
Hide file tree
Showing 765 changed files with 28,439 additions and 13,684 deletions.
8 changes: 7 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,13 @@ lru_cache_test
/fe/fe-core/src/test/resources/real-help-resource.zip
/ui/dist

# docker
docker/thirdparties/docker-compose/*/data
docker/thirdparties/docker-compose/*/logs
docker/thirdparties/docker-compose/*/*.yaml
docker/runtime/be/resource/apache-doris/

# other
compile_commands.json
.github
docker/runtime/be/resource/apache-doris/

8 changes: 6 additions & 2 deletions be/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -382,8 +382,10 @@ endif()
# -O3: Enable all compiler optimizations
# -DNDEBUG: Turn off dchecks/asserts/debug only code.
set(CXX_FLAGS_RELEASE "${CXX_GCC_FLAGS} -O3 -DNDEBUG")
set(CXX_FLAGS_ASAN "${CXX_GCC_FLAGS} -O0 -fsanitize=address -DADDRESS_SANITIZER")
set(CXX_FLAGS_ASAN "${CXX_GCC_FLAGS} -O0 -fsanitize=address -fsanitize=undefined -fno-strict-aliasing -fno-sanitize=alignment,signed-integer-overflow,float-cast-overflow -DUNDEFINED_BEHAVIOR_SANITIZER -DADDRESS_SANITIZER")
set(CXX_FLAGS_LSAN "${CXX_GCC_FLAGS} -O0 -fsanitize=leak -DLEAK_SANITIZER")
## Use for BE-UT
set(CXX_FLAGS_ASAN_UT "${CXX_GCC_FLAGS} -O0 -fsanitize=address -DADDRESS_SANITIZER")

# Set the flags to the undefined behavior sanitizer, also known as "ubsan"
# Turn on sanitizer and debug symbols to get stack traces:
Expand All @@ -408,6 +410,8 @@ elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "UBSAN")
set(CMAKE_CXX_FLAGS "${CXX_FLAGS_UBSAN}")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "TSAN")
set(CMAKE_CXX_FLAGS "${CXX_FLAGS_TSAN}")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "ASAN_UT")
set(CMAKE_CXX_FLAGS "${CXX_FLAGS_ASAN_UT}")
else()
message(FATAL_ERROR "Unknown build type: ${CMAKE_BUILD_TYPE}")
endif()
Expand Down Expand Up @@ -629,7 +633,7 @@ endif ()
# Add sanitize static link flags
if ("${CMAKE_BUILD_TYPE}" STREQUAL "DEBUG" OR "${CMAKE_BUILD_TYPE}" STREQUAL "RELEASE")
set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS} ${MALLOCLIB})
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "ASAN")
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "ASAN" OR "${CMAKE_BUILD_TYPE}" STREQUAL "ASAN_UT")
set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS} ${ASAN_LIBS})
elseif ("${CMAKE_BUILD_TYPE}" STREQUAL "LSAN")
set(DORIS_LINK_LIBS ${DORIS_LINK_LIBS} ${LSAN_LIBS})
Expand Down
2 changes: 1 addition & 1 deletion be/src/apache-orc
15 changes: 12 additions & 3 deletions be/src/cloud/cloud_engine_calc_delete_bitmap_task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -154,10 +154,11 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
DeleteBitmapPtr delete_bitmap;
RowsetIdUnorderedSet rowset_ids;
std::shared_ptr<PartialUpdateInfo> partial_update_info;
std::shared_ptr<PublishStatus> publish_status;
int64_t txn_expiration;
Status status = _engine.txn_delete_bitmap_cache().get_tablet_txn_info(
_transaction_id, _tablet_id, &rowset, &delete_bitmap, &rowset_ids, &txn_expiration,
&partial_update_info);
&partial_update_info, &publish_status);
if (status != Status::OK()) {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << _tablet_id
<< ", txn_id=" << _transaction_id << ", status=" << status;
Expand All @@ -172,8 +173,16 @@ Status CloudTabletCalcDeleteBitmapTask::handle() const {
txn_info.delete_bitmap = delete_bitmap;
txn_info.rowset_ids = rowset_ids;
txn_info.partial_update_info = partial_update_info;
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, _transaction_id, txn_expiration);
auto update_delete_bitmap_time_us = MonotonicMicros() - t3;
txn_info.publish_status = publish_status;
auto update_delete_bitmap_time_us = 0;
if (txn_info.publish_status && (*(txn_info.publish_status) == PublishStatus::SUCCEED)) {
LOG(INFO) << "tablet=" << _tablet_id << ",txn=" << _transaction_id
<< ",publish_status=SUCCEED,not need to recalculate and update delete_bitmap.";
} else {
status = CloudTablet::update_delete_bitmap(tablet, &txn_info, _transaction_id,
txn_expiration);
update_delete_bitmap_time_us = MonotonicMicros() - t3;
}
if (status != Status::OK()) {
LOG(WARNING) << "failed to calculate delete bitmap. rowset_id=" << rowset->rowset_id()
<< ", tablet_id=" << _tablet_id << ", txn_id=" << _transaction_id
Expand Down
41 changes: 41 additions & 0 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
#include <type_traits>
#include <vector>

#include "cloud/cloud_storage_engine.h"
#include "cloud/cloud_tablet.h"
#include "cloud/config.h"
#include "cloud/pb_convert.h"
Expand All @@ -50,6 +51,7 @@
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
#include "olap/storage_engine.h"
#include "olap/tablet_meta.h"
#include "runtime/client_cache.h"
#include "runtime/exec_env.h"
Expand Down Expand Up @@ -546,6 +548,36 @@ Status CloudMetaMgr::sync_tablet_rowsets(CloudTablet* tablet, bool warmup_delta_
}
}

bool CloudMetaMgr::sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
DeleteBitmap* delete_bitmap) {
std::set<int64_t> txn_processed;
for (auto& rs_meta : rs_metas) {
auto txn_id = rs_meta.txn_id();
if (txn_processed.find(txn_id) != txn_processed.end()) {
continue;
}
txn_processed.insert(txn_id);
DeleteBitmapPtr tmp_delete_bitmap;
RowsetIdUnorderedSet tmp_rowset_ids;
std::shared_ptr<PublishStatus> publish_status =
std::make_shared<PublishStatus>(PublishStatus::INIT);
CloudStorageEngine& engine = ExecEnv::GetInstance()->storage_engine().to_cloud();
Status status = engine.txn_delete_bitmap_cache().get_delete_bitmap(
txn_id, tablet->tablet_id(), &tmp_delete_bitmap, &tmp_rowset_ids, &publish_status);
if (status.ok() && *(publish_status.get()) == PublishStatus::SUCCEED) {
delete_bitmap->merge(*tmp_delete_bitmap);
engine.txn_delete_bitmap_cache().remove_unused_tablet_txn_info(txn_id,
tablet->tablet_id());
} else {
LOG(WARNING) << "failed to get tablet txn info. tablet_id=" << tablet->tablet_id()
<< ", txn_id=" << txn_id << ", status=" << status;
return false;
}
}
return true;
}

Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
const TabletStatsPB& stats, const TabletIndexPB& idx,
Expand All @@ -554,6 +586,15 @@ Status CloudMetaMgr::sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_
return Status::OK();
}

if (sync_tablet_delete_bitmap_by_cache(tablet, old_max_version, rs_metas, delete_bitmap)) {
return Status::OK();
} else {
LOG(WARNING) << "failed to sync delete bitmap by txn info. tablet_id="
<< tablet->tablet_id();
DeleteBitmapPtr new_delete_bitmap = std::make_shared<DeleteBitmap>(tablet->tablet_id());
*delete_bitmap = *new_delete_bitmap;
}

std::shared_ptr<MetaService_Stub> stub;
RETURN_IF_ERROR(MetaServiceProxy::get_client(&stub));

Expand Down
4 changes: 4 additions & 0 deletions be/src/cloud/cloud_meta_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,10 @@ class CloudMetaMgr {
int64_t initiator);

private:
bool sync_tablet_delete_bitmap_by_cache(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas,
DeleteBitmap* delete_bitmap);

Status sync_tablet_delete_bitmap(CloudTablet* tablet, int64_t old_max_version,
std::ranges::range auto&& rs_metas, const TabletStatsPB& stats,
const TabletIndexPB& idx, DeleteBitmap* delete_bitmap);
Expand Down
71 changes: 56 additions & 15 deletions be/src/cloud/cloud_stream_load_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

#include "cloud/cloud_stream_load_executor.h"

#include "cloud/cloud_meta_mgr.h"
#include "cloud/cloud_storage_engine.h"
#include "cloud/config.h"
#include "common/logging.h"
#include "common/status.h"
Expand All @@ -29,23 +31,62 @@ CloudStreamLoadExecutor::CloudStreamLoadExecutor(ExecEnv* exec_env)

CloudStreamLoadExecutor::~CloudStreamLoadExecutor() = default;

Status CloudStreamLoadExecutor::pre_commit_txn(StreamLoadContext* ctx) {
auto st = _exec_env->storage_engine().to_cloud().meta_mgr().precommit_txn(*ctx);
if (!st.ok()) {
LOG(WARNING) << "Failed to precommit txn: " << st << ", " << ctx->brief();
return st;
}
ctx->need_rollback = false;
return st;
}

Status CloudStreamLoadExecutor::operate_txn_2pc(StreamLoadContext* ctx) {
VLOG_DEBUG << "operate_txn_2pc, op: " << ctx->txn_operation;
if (ctx->txn_operation.compare("commit") == 0) {
return _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, true);
} else {
// 2pc abort
return _exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx);
}
}

Status CloudStreamLoadExecutor::commit_txn(StreamLoadContext* ctx) {
// forward to fe to execute commit transaction for MoW table
Status st;
int retry_times = 0;
// mow table will retry when DELETE_BITMAP_LOCK_ERROR occurs
do {
st = StreamLoadExecutor::commit_txn(ctx);
if (st.ok() || !st.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
break;
if (ctx->load_type == TLoadType::ROUTINE_LOAD) {
return StreamLoadExecutor::commit_txn(ctx);
}

// forward to fe to excute commit transaction for MoW table
if (ctx->is_mow_table()) {
Status st;
int retry_times = 0;
while (retry_times < config::mow_stream_load_commit_retry_times) {
st = StreamLoadExecutor::commit_txn(ctx);
// DELETE_BITMAP_LOCK_ERROR will be retried
if (st.ok() || !st.is<ErrorCode::DELETE_BITMAP_LOCK_ERROR>()) {
break;
}
LOG_WARNING("Failed to commit txn")
.tag("txn_id", ctx->txn_id)
.tag("retry_times", retry_times)
.error(st);
retry_times++;
}
LOG_WARNING("Failed to commit txn")
.tag("txn_id", ctx->txn_id)
.tag("retry_times", retry_times)
.error(st);
retry_times++;
} while (retry_times < config::mow_stream_load_commit_retry_times);
return st;
}

auto st = _exec_env->storage_engine().to_cloud().meta_mgr().commit_txn(*ctx, false);
if (!st.ok()) {
LOG(WARNING) << "Failed to commit txn: " << st << ", " << ctx->brief();
return st;
}
ctx->need_rollback = false;
return st;
}

} // namespace doris
void CloudStreamLoadExecutor::rollback_txn(StreamLoadContext* ctx) {
WARN_IF_ERROR(_exec_env->storage_engine().to_cloud().meta_mgr().abort_txn(*ctx),
"Failed to rollback txn");
}

} // namespace doris
10 changes: 9 additions & 1 deletion be/src/cloud/cloud_stream_load_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,20 @@
#include "runtime/stream_load/stream_load_executor.h"

namespace doris {

class CloudStreamLoadExecutor final : public StreamLoadExecutor {
public:
CloudStreamLoadExecutor(ExecEnv* exec_env);

~CloudStreamLoadExecutor() override;

Status pre_commit_txn(StreamLoadContext* ctx) override;

Status operate_txn_2pc(StreamLoadContext* ctx) override;

Status commit_txn(StreamLoadContext* ctx) override;

void rollback_txn(StreamLoadContext* ctx) override;
};
} // namespace doris

} // namespace doris
12 changes: 9 additions & 3 deletions be/src/cloud/cloud_tablet.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -604,8 +604,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx
RowsetSharedPtr rowset = txn_info->rowset;
int64_t cur_version = rowset->start_version();
// update delete bitmap info, in order to avoid recalculation when trying again
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(txn_id, tablet_id(), delete_bitmap,
cur_rowset_ids);
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), delete_bitmap, cur_rowset_ids, PublishStatus::PREPARE);

if (txn_info->partial_update_info && txn_info->partial_update_info->is_partial_update &&
rowset_writer->num_rows() > 0) {
Expand All @@ -626,6 +626,8 @@ Status CloudTablet::save_delete_bitmap(const TabletTxnInfo* txn_info, int64_t tx

RETURN_IF_ERROR(_engine.meta_mgr().update_delete_bitmap(
*this, txn_id, COMPACTION_DELETE_BITMAP_LOCK_ID, new_delete_bitmap.get()));
_engine.txn_delete_bitmap_cache().update_tablet_txn_info(
txn_id, tablet_id(), new_delete_bitmap, cur_rowset_ids, PublishStatus::SUCCEED);

return Status::OK();
}
Expand Down Expand Up @@ -684,7 +686,11 @@ Status CloudTablet::calc_delete_bitmap_for_compaction(
"cumulative compaction: the merged rows({}) is not equal to missed "
"rows({}) in rowid conversion, tablet_id: {}, table_id:{}",
merged_rows, missed_rows_size, tablet_id(), table_id());
DCHECK(false) << err_msg;
if (config::enable_mow_compaction_correctness_check_core) {
CHECK(false) << err_msg;
} else {
DCHECK(false) << err_msg;
}
LOG(WARNING) << err_msg;
}
}
Expand Down
Loading

0 comments on commit 111d697

Please sign in to comment.