Skip to content

Commit

Permalink
fix(duplication): fix the dangling pointer after blob object is mov…
Browse files Browse the repository at this point in the history
…ed (apache#2088)

apache#2089

Refactor `blob` object to avoid the dangling pointer leading to heap-use-after-free
error which happened in apache#2089 after `blob` object is moved. Also refactor and add
some tests for `blob` and `mutation_batch`.
  • Loading branch information
empiredan authored Aug 15, 2024
1 parent 9efe8d6 commit e68c2b9
Show file tree
Hide file tree
Showing 16 changed files with 397 additions and 162 deletions.
2 changes: 1 addition & 1 deletion .clang-tidy
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
# https://releases.llvm.org/14.0.0/tools/clang/tools/extra/docs/clang-tidy/index.html

CheckOptions: []
Checks: 'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*'
Checks: 'abseil-*,boost-*,bugprone-*,cert-*,clang-analyzer-*,concurrency-*,cppcoreguidelines-*,darwin-*,fuchsia-*,google-*,hicpp-*,linuxkernel-*,llvm-*,misc-*,modernize-*,performance-*,portability-*,readability-*,-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-modernize-use-trailing-return-type,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-private-member-variables-in-classes,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-hicpp-no-array-decay,-hicpp-named-parameter,-readability-named-parameter,-cppcoreguidelines-pro-bounds-pointer-arithmetic,-readability-function-cognitive-complexity,-cert-err58-cpp,-cppcoreguidelines-avoid-c-arrays,-hicpp-avoid-c-arrays,-modernize-avoid-c-arrays,-cppcoreguidelines-owning-memory,-cppcoreguidelines-pro-type-const-cast,-readability-identifier-length,-fuchsia-default-arguments-calls,-google-readability-avoid-underscore-in-googletest-name,-cppcoreguidelines-avoid-magic-numbers,-readability-magic-numbers'
ExtraArgs:
ExtraArgsBefore: []
FormatStyle: none
Expand Down
2 changes: 1 addition & 1 deletion build_tools/clang_tidy.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def tidy_on_path(path):
"clang-tidy",
"-p0",
"-path", BUILD_PATH,
"-checks=-cppcoreguidelines-pro-type-union-access,-modernize-use-trailing-return-type",
"-checks=-cppcoreguidelines-pro-type-union-access,-llvm-include-order,-modernize-use-trailing-return-type,-cppcoreguidelines-avoid-non-const-global-variables,-fuchsia-statically-constructed-objects,-fuchsia-overloaded-operator,-bugprone-easily-swappable-parameters,-cppcoreguidelines-non-private-member-variables-in-classes,-misc-non-private-member-variables-in-classes,-cppcoreguidelines-pro-bounds-array-to-pointer-decay,-hicpp-no-array-decay,-hicpp-named-parameter,-readability-named-parameter,-cppcoreguidelines-pro-bounds-pointer-arithmetic,-readability-function-cognitive-complexity,-cert-err58-cpp,-cppcoreguidelines-avoid-c-arrays,-hicpp-avoid-c-arrays,-modernize-avoid-c-arrays,-cppcoreguidelines-owning-memory,-cppcoreguidelines-pro-type-const-cast,-readability-identifier-length,-fuchsia-default-arguments-calls,-google-readability-avoid-underscore-in-googletest-name,-cppcoreguidelines-avoid-magic-numbers,-readability-magic-numbers",
"-extra-arg=-language=c++",
"-extra-arg=-std=c++17",
"-extra-arg=-Ithirdparty/output/include"]
Expand Down
18 changes: 14 additions & 4 deletions src/replica/duplication/mutation_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,24 @@

#include <functional>
#include <string>
#include <string_view>
#include <tuple>
#include <utility>
#include <vector>

#include <string_view>
#include "common/replication.codes.h"
#include "consensus_types.h"
#include "metadata_types.h"
#include "mutation_batch.h"
#include "replica_duplicator.h"
#include "replica/replica.h"
#include "replica_duplicator.h"
#include "runtime/task/task_code.h"
#include "runtime/task/task_spec.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"
#include "utils/error_code.h"
#include "utils/fmt_logging.h"
#include "utils/ports.h"

METRIC_DEFINE_gauge_int64(replica,
dup_recent_lost_mutations,
Expand Down Expand Up @@ -196,10 +197,19 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree start_decree
}

blob bb;
if (update.data.buffer() != nullptr) {
if (update.data.buffer()) {
// ATTENTION: instead of copy, move could optimize the performance. However, this
// would nullify the elements of mu->data.updates.
bb = std::move(update.data);
} else {
bb = blob::create_from_bytes(update.data.data(), update.data.length());
// TODO(wangdan): if update.data.buffer() is nullptr, the blob object must have
// been used as `string_view`.
//
// Once `string_view` function is removed from blob, consider dropping following
// statements.
if (dsn_likely(update.data.data() != nullptr && !update.data.empty())) {
bb = blob::create_from_bytes(update.data.data(), update.data.length());
}
}

_total_bytes += bb.length();
Expand Down
7 changes: 3 additions & 4 deletions src/replica/duplication/test/duplication_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,16 +75,15 @@ class duplication_test_base : public replica_test_base
return log_file_map;
}

mutation_ptr create_test_mutation(int64_t decree,
int64_t last_committed_decree,
const std::string &data) override
mutation_ptr
create_test_mutation(int64_t decree, int64_t last_committed_decree, const char *data) override
{
auto mut = replica_test_base::create_test_mutation(decree, last_committed_decree, data);
mut->data.updates[0].code = RPC_DUPLICATION_IDEMPOTENT_WRITE; // must be idempotent write
return mut;
}

mutation_ptr create_test_mutation(int64_t decree, const std::string &data) override
mutation_ptr create_test_mutation(int64_t decree, const char *data) override
{
return duplication_test_base::create_test_mutation(decree, decree - 1, data);
}
Expand Down
13 changes: 5 additions & 8 deletions src/replica/duplication/test/load_from_private_log_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,8 +92,7 @@ class load_from_private_log_test : public duplication_test_base
// each round mlog will replay the former logs, and create new file
mutation_log_ptr mlog = create_private_log();
for (int i = 1; i <= 10; i++) {
std::string msg = "hello!";
mutation_ptr mu = create_test_mutation(10 * f + i, msg);
auto mu = create_test_mutation(10 * f + i, "hello!");
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
}
mlog->tracker()->wait_outstanding_tasks();
Expand Down Expand Up @@ -149,9 +148,8 @@ class load_from_private_log_test : public duplication_test_base
auto reserved_plog_force_flush = FLAGS_plog_force_flush;
FLAGS_plog_force_flush = true;
for (int i = decree_start; i <= num_entries + decree_start; i++) {
std::string msg = "hello!";
// decree - last_commit_decree = 1 by default
mutation_ptr mu = create_test_mutation(i, msg);
auto mu = create_test_mutation(i, "hello!");
// mock the last_commit_decree of first mu equal with `last_commit_decree_start`
if (i == decree_start) {
mu->data.header.last_committed_decree = last_commit_decree_start;
Expand All @@ -160,7 +158,7 @@ class load_from_private_log_test : public duplication_test_base
}

// commit the last entry
mutation_ptr mu = create_test_mutation(decree_start + num_entries + 1, "hello!");
auto mu = create_test_mutation(decree_start + num_entries + 1, "hello!");
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
FLAGS_plog_force_flush = reserved_plog_force_flush;

Expand Down Expand Up @@ -362,13 +360,12 @@ TEST_P(load_from_private_log_test, ignore_useless)

int num_entries = 100;
for (int i = 1; i <= num_entries; i++) {
std::string msg = "hello!";
mutation_ptr mu = create_test_mutation(i, msg);
auto mu = create_test_mutation(i, "hello!");
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
}

// commit the last entry
mutation_ptr mu = create_test_mutation(1 + num_entries, "hello!");
auto mu = create_test_mutation(1 + num_entries, "hello!");
mlog->append(mu, LPC_AIO_IMMEDIATE_CALLBACK, nullptr, nullptr, 0);
mlog->close();

Expand Down
160 changes: 100 additions & 60 deletions src/replica/duplication/test/mutation_batch_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,138 +31,178 @@
#include "gtest/gtest.h"
#include "replica/duplication/mutation_batch.h"
#include "replica/duplication/mutation_duplicator.h"
#include "replica/duplication/replica_duplicator.h"
#include "replica/mutation.h"
#include "replica/prepare_list.h"
#include "runtime/task/task_code.h"
#include "utils/autoref_ptr.h"
#include "utils/blob.h"

namespace dsn {
namespace replication {
namespace dsn::replication {

class mutation_batch_test : public duplication_test_base
{
public:
void reset_buffer(const decree last_commit,
const decree start,
const decree end,
mutation_batch &batcher)
mutation_batch_test() : _duplicator(create_test_duplicator(0)), _batcher(_duplicator.get()) {}

void reset_buffer(const decree last_commit, const decree start, const decree end)
{
batcher._mutation_buffer->reset(last_commit);
batcher._mutation_buffer->_start_decree = start;
batcher._mutation_buffer->_end_decree = end;
_batcher._mutation_buffer->reset(last_commit);
_batcher._mutation_buffer->_start_decree = start;
_batcher._mutation_buffer->_end_decree = end;
}

void commit_buffer(const decree current_decree, mutation_batch &batcher)
void commit_buffer(const decree current_decree)
{
batcher._mutation_buffer->commit(current_decree, COMMIT_TO_DECREE_HARD);
_batcher._mutation_buffer->commit(current_decree, COMMIT_TO_DECREE_HARD);
}

void check_mutation_contents(const std::set<std::string> &expected_mutations,
mutation_batch &batcher)
void check_mutation_contents(const std::vector<std::string> &expected_mutations)
{
const auto all_mutations = batcher.move_all_mutations();
const auto all_mutations = _batcher.move_all_mutations();

std::set<std::string> actual_mutations;
std::vector<std::string> actual_mutations;
std::transform(all_mutations.begin(),
all_mutations.end(),
std::inserter(actual_mutations, actual_mutations.end()),
std::back_inserter(actual_mutations),
[](const mutation_tuple &tuple) { return std::get<2>(tuple).to_string(); });

ASSERT_EQ(expected_mutations, actual_mutations);
}

std::unique_ptr<replica_duplicator> _duplicator;
mutation_batch _batcher;
};

INSTANTIATE_TEST_SUITE_P(, mutation_batch_test, ::testing::Values(false, true));

TEST_P(mutation_batch_test, prepare_mutation)
{
auto duplicator = create_test_duplicator(0);
mutation_batch batcher(duplicator.get());

auto mu1 = create_test_mutation(1, 0, "first mutation");
set_last_applied_decree(1);
ASSERT_TRUE(batcher.add(mu1));
ASSERT_EQ(1, batcher.last_decree());
ASSERT_TRUE(_batcher.add(mu1));
ASSERT_EQ(1, _batcher.last_decree());

auto mu2 = create_test_mutation(2, 1, "abcde");
set_last_applied_decree(2);
ASSERT_TRUE(batcher.add(mu2));
ASSERT_EQ(2, batcher.last_decree());
ASSERT_TRUE(_batcher.add(mu2));
ASSERT_EQ(2, _batcher.last_decree());

auto mu3 = create_test_mutation(3, 2, "hello world");
ASSERT_TRUE(batcher.add(mu3));
ASSERT_TRUE(_batcher.add(mu3));

// The last decree has not been updated.
ASSERT_EQ(2, batcher.last_decree());
ASSERT_EQ(2, _batcher.last_decree());

auto mu4 = create_test_mutation(4, 2, "foo bar");
ASSERT_TRUE(batcher.add(mu4));
ASSERT_EQ(2, batcher.last_decree());
ASSERT_TRUE(_batcher.add(mu4));
ASSERT_EQ(2, _batcher.last_decree());

// The committed mutation would be ignored.
auto mu2_another = create_test_mutation(2, 1, "another second mutation");
ASSERT_TRUE(batcher.add(mu2_another));
ASSERT_EQ(2, batcher.last_decree());
ASSERT_TRUE(_batcher.add(mu2_another));
ASSERT_EQ(2, _batcher.last_decree());

// The mutation with duplicate decree would be ignored.
auto mu3_another = create_test_mutation(3, 2, "123 xyz");
ASSERT_TRUE(batcher.add(mu3_another));
ASSERT_EQ(2, batcher.last_decree());
ASSERT_TRUE(_batcher.add(mu3_another));
ASSERT_EQ(2, _batcher.last_decree());

auto mu5 = create_test_mutation(5, 2, "5th mutation");
set_last_applied_decree(5);
ASSERT_TRUE(batcher.add(mu5));
ASSERT_EQ(5, batcher.last_decree());
ASSERT_TRUE(_batcher.add(mu5));
ASSERT_EQ(5, _batcher.last_decree());

check_mutation_contents({"first mutation", "abcde", "hello world", "foo bar", "5th mutation"});
}

TEST_P(mutation_batch_test, add_null_mutation)
{
auto mu = create_test_mutation(1, nullptr);
_batcher.add_mutation_if_valid(mu, 0);

check_mutation_contents({""});
}

TEST_P(mutation_batch_test, add_empty_mutation)
{
auto mu = create_test_mutation(1, "");
_batcher.add_mutation_if_valid(mu, 0);

check_mutation_contents({""});
}

// TODO(wangdan): once `string_view` function is removed from blob, drop this test.
TEST_P(mutation_batch_test, add_string_view_mutation)
{
auto mu = create_test_mutation(1, nullptr);
const std::string data("hello");
mu->data.updates.back().data = blob(data.data(), 0, data.size());
_batcher.add_mutation_if_valid(mu, 0);

check_mutation_contents({"first mutation", "abcde", "hello world", "foo bar", "5th mutation"},
batcher);
check_mutation_contents({"hello"});
}

TEST_P(mutation_batch_test, add_mutation_if_valid)
TEST_P(mutation_batch_test, add_a_valid_mutation)
{
auto duplicator = create_test_duplicator(0);
mutation_batch batcher(duplicator.get());
auto mu = create_test_mutation(1, "hello");
_batcher.add_mutation_if_valid(mu, 0);

check_mutation_contents({"hello"});
}

TEST_P(mutation_batch_test, add_multiple_valid_mutations)
{
// The mutation could not be reused, since in mutation_batch::add_mutation_if_valid
// the elements of mutation::data::updates would be moved and nullified.
auto mu1 = create_test_mutation(1, "hello");
batcher.add_mutation_if_valid(mu1, 0);
check_mutation_contents({"hello"}, batcher);
_batcher.add_mutation_if_valid(mu1, 0);

auto mu2 = create_test_mutation(2, "world");
batcher.add_mutation_if_valid(mu2, 0);
check_mutation_contents({"world"}, batcher);
_batcher.add_mutation_if_valid(mu2, 2);

auto mu3 = create_test_mutation(3, "hi");
_batcher.add_mutation_if_valid(mu3, 2);

check_mutation_contents({"hello", "world", "hi"});
}

TEST_P(mutation_batch_test, add_invalid_mutation)
{
auto mu2 = create_test_mutation(2, "world");
_batcher.add_mutation_if_valid(mu2, 2);

// mu1 would be ignored, since its decree is less than the start decree.
batcher.add_mutation_if_valid(mu1, 2);
batcher.add_mutation_if_valid(mu2, 2);
auto mu1 = create_test_mutation(1, "hello");
_batcher.add_mutation_if_valid(mu1, 2);

auto mu3 = create_test_mutation(3, "hi");
_batcher.add_mutation_if_valid(mu3, 2);

auto mu3 = create_test_mutation(1, "hi");
batcher.add_mutation_if_valid(mu3, 1);
check_mutation_contents({"hi", "world"}, batcher);
auto mu4 = create_test_mutation(1, "ok");
_batcher.add_mutation_if_valid(mu4, 1);

// "ok" would be the first, since its timestamp (i.e. decree in create_test_mutation)
// is the smallest.
check_mutation_contents({"ok", "world", "hi"});
}

TEST_P(mutation_batch_test, ignore_non_idempotent_write)
{
auto duplicator = create_test_duplicator(0);
mutation_batch batcher(duplicator.get());

auto mu = create_test_mutation(1, "hello");
mu->data.updates[0].code = RPC_DUPLICATION_NON_IDEMPOTENT_WRITE;
batcher.add_mutation_if_valid(mu, 0);
check_mutation_contents({}, batcher);
_batcher.add_mutation_if_valid(mu, 0);
check_mutation_contents({});
}

TEST_P(mutation_batch_test, mutation_buffer_commit)
{
auto duplicator = create_test_duplicator(0);
mutation_batch batcher(duplicator.get());

// Mock mutation_buffer[last=10, start=15, end=20], last + 1(next commit decree) is out of
// [start~end], then last would become min_decree() - 1, see mutation_buffer::commit() for
// details.
reset_buffer(10, 15, 20, batcher);
commit_buffer(15, batcher);
ASSERT_EQ(14, batcher.last_decree());
reset_buffer(10, 15, 20);
commit_buffer(15);
ASSERT_EQ(14, _batcher.last_decree());
}

} // namespace replication
} // namespace dsn
} // namespace dsn::replication
Loading

0 comments on commit e68c2b9

Please sign in to comment.