Skip to content

Commit

Permalink
refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
empiredan committed Jan 26, 2025
1 parent 9b3312f commit 378084c
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 34 deletions.
43 changes: 22 additions & 21 deletions src/server/pegasus_write_service_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
dsn::apps::incr_response &err_resp,
dsn::apps::update_request &update)
{
// Get old value from the RocksDB instance according to the provided key.
// Get current raw value for the provided key from the RocksDB instance.
db_get_context get_ctx;
const int err = _rocksdb_wrapper->get(req.key.to_string_view(), &get_ctx);
if (dsn_unlikely(err != rocksdb::Status::kOk)) {
Expand All @@ -192,34 +192,35 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
req.key, req.increment, calc_expire_on_non_existent(req), update);
}

dsn::blob old_value;
pegasus_extract_user_data(_pegasus_data_version, std::move(get_ctx.raw_value), old_value);
// Extract user data from raw value as base for increment.
dsn::blob base_value;
pegasus_extract_user_data(_pegasus_data_version, std::move(get_ctx.raw_value), base_value);

int64_t new_int = 0;
if (old_value.empty()) {
if (base_value.empty()) {
// Old value is also considered as 0 before incr as above once it's empty, thus
// set req.increment as the value for single put.
new_int = req.increment;
} else {
int64_t old_int = 0;
if (dsn_unlikely(!dsn::buf2int64(old_value.to_string_view(), old_int))) {
int64_t base_int = 0;
if (dsn_unlikely(!dsn::buf2int64(base_value.to_string_view(), base_int))) {
// Old value is not valid int64.
LOG_ERROR_PREFIX("incr failed: error = old value \"{}\" "
LOG_ERROR_PREFIX("incr failed: error = base value \"{}\" "
"is not an integer or out of range",
utils::c_escape_sensitive_string(old_value));
utils::c_escape_sensitive_string(base_value));
return make_error_response(rocksdb::Status::kInvalidArgument, err_resp);
}

new_int = old_int + req.increment;
if (dsn_unlikely((req.increment > 0 && new_int < old_int) ||
(req.increment < 0 && new_int > old_int))) {
// New value overflows, just respond with the old value.
new_int = base_int + req.increment;
if (dsn_unlikely((req.increment > 0 && new_int < base_int) ||
(req.increment < 0 && new_int > base_int))) {
// New value overflows, just respond with the base value.
LOG_ERROR_PREFIX("incr failed: error = new value is out of range, "
"old_value = {}, increment = {}, new_value = {}",
old_int,
"base_value = {}, increment = {}, new_value = {}",
base_int,
req.increment,
new_int);
return make_error_response(rocksdb::Status::kInvalidArgument, old_int, err_resp);
return make_error_response(rocksdb::Status::kInvalidArgument, base_int, err_resp);
}
}

Expand Down Expand Up @@ -662,22 +663,22 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return raw_key;
}

// Calculate expire timestamp in seconds according to `req`, for the keys not present
// in the storage.
// Calculate expire timestamp in seconds for the keys not contained in the storage
// according to `req`.
template <typename TRequest>
static inline int32_t calc_expire_on_non_existent(const TRequest &req)
{
return req.expire_ts_seconds > 0 ? req.expire_ts_seconds : 0;
}

// Calculate expire timestamp in seconds according to `req` and the old value in `get_ctx`,
// for the keys existing in the storage.
// Calculate new expire timestamp in seconds for the keys contained in the storage
// according to `req` and their current expire timestamp in `get_ctx`.
template <typename TRequest>
static inline int32_t calc_expire_on_existing(const TRequest &req,
const db_get_context &get_ctx)
{
if (req.expire_ts_seconds == 0) {
// Use the old value for the existing key in `get_ctx` as the expire timestamp.
// Still use current expire timestamp of the existing key as the new value.
return static_cast<int32_t>(get_ctx.expire_ts);
}

Expand All @@ -689,7 +690,7 @@ class pegasus_write_service::impl : public dsn::replication::replica_base
return req.expire_ts_seconds;
}

// Build a single-put request by provided int64-typed value.
// Build a single-put request by provided int64 value.
static inline void make_idempotent_request(const dsn::blob &key,
int64_t value,
int32_t expire_ts_seconds,
Expand Down
31 changes: 18 additions & 13 deletions src/server/test/pegasus_write_service_impl_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ class IncrTest : public PegasusWriteServiceImplTest
ASSERT_FALSE(get_ctx.found);
ASSERT_FALSE(get_ctx.expired);

// The base value should be 0 as the record is absent.
test_incr_and_check_db_record(0, increment);
}

Expand All @@ -210,6 +211,7 @@ class IncrTest : public PegasusWriteServiceImplTest
class NonIdempotentIncrTest : public IncrTest
{
public:
// Test `incr` with both returned error and error in response as expected.
void test_non_idempotent_incr(int64_t increment, int expected_ret_err, int expected_resp_err)
{
req.increment = increment;
Expand Down Expand Up @@ -267,9 +269,9 @@ TEST_P(NonIdempotentIncrTest, FailOnGet)
PUT_BASE_VALUE_INT64(100);

dsn::fail::setup();
// When db_get failed, incr should return an error.
dsn::fail::cfg("db_get", "100%1*return()");

// `incr` should return an error once failed to get current value from DB.
dsn::fail::cfg("db_get", "100%1*return()");
test_non_idempotent_incr(10, FAIL_DB_GET, FAIL_DB_GET);

dsn::fail::teardown();
Expand All @@ -280,20 +282,21 @@ TEST_P(NonIdempotentIncrTest, FailOnPut)
PUT_BASE_VALUE_INT64(100);

dsn::fail::setup();
// When rocksdb put failed, incr should return an error.
dsn::fail::cfg("db_write_batch_put", "100%1*return()");

// `incr` should return an error once failed to write into batch.
dsn::fail::cfg("db_write_batch_put", "100%1*return()");
test_non_idempotent_incr(10, FAIL_DB_WRITE_BATCH_PUT, FAIL_DB_WRITE_BATCH_PUT);

dsn::fail::teardown();
}

TEST_P(NonIdempotentIncrTest, IncrOnExpireRecord)
{
// Make the key expired.
// Make the record expired.
req.expire_ts_seconds = 1;
test_non_idempotent_incr(10, rocksdb::Status::kOk, rocksdb::Status::kOk);

// Now the record should be expired.
check_db_record_expired();

// Incr the expired key.
Expand All @@ -308,7 +311,7 @@ INSTANTIATE_TEST_SUITE_P(PegasusWriteServiceImplTest,
class IdempotentIncrTest : public IncrTest
{
public:
// Test make_idempotent for incr.
// Test make_idempotent for the incr request.
void test_make_idempotent(int64_t increment, int expected_err)
{
req.increment = increment;
Expand All @@ -321,8 +324,9 @@ class IdempotentIncrTest : public IncrTest
ASSERT_EQ(expected_err, err_resp.error);
}

// Test if make_idempotent for incr is successful; then, write the idempotent put
// request into db.
// Test idempotent write for the incr request:
// - make_idempotent for incr should be successful;
// - then, apply the idempotent put request into DB.
void test_idempotent_incr(int64_t increment, int expected_err)
{
test_make_idempotent(increment, rocksdb::Status::kOk);
Expand Down Expand Up @@ -383,9 +387,9 @@ TEST_P(IdempotentIncrTest, FailOnGet)
PUT_BASE_VALUE_INT64(100);

dsn::fail::setup();
// When db_get failed, make_idempotent should return an error.
dsn::fail::cfg("db_get", "100%1*return()");

// `make_idempotent` should return an error once failed to get current value from DB.
dsn::fail::cfg("db_get", "100%1*return()");
test_make_idempotent(10, FAIL_DB_GET);

dsn::fail::teardown();
Expand All @@ -396,20 +400,21 @@ TEST_P(IdempotentIncrTest, FailOnPut)
PUT_BASE_VALUE_INT64(100);

dsn::fail::setup();
// When rocksdb put failed, it should return an error while writing put request.
dsn::fail::cfg("db_write_batch_put", "100%1*return()");

// `put` should return an error once failed to write into batch.
dsn::fail::cfg("db_write_batch_put", "100%1*return()");
test_idempotent_incr(10, FAIL_DB_WRITE_BATCH_PUT);

dsn::fail::teardown();
}

TEST_P(IdempotentIncrTest, IncrOnExpireRecord)
{
// Make the key expired.
// Make the record expired.
req.expire_ts_seconds = 1;
test_idempotent_incr(10, rocksdb::Status::kOk);

// Now the record should be expired.
check_db_record_expired();

// Incr the expired key.
Expand Down

0 comments on commit 378084c

Please sign in to comment.