Skip to content

Commit

Permalink
support_no_idempotent_dup
Browse files Browse the repository at this point in the history
  • Loading branch information
ninsmiracle committed Feb 18, 2024
1 parent 546b947 commit e60dc0a
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 2 deletions.
2 changes: 1 addition & 1 deletion src/replica/duplication/mutation_batch.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree start_decree
// ERR_OPERATION_DISABLED, but there could still be a mutation written
// before the duplication was added.
// To ignore means this write will be lost, which is acceptable under this rare case.
if (!task_spec::get(update.code)->rpc_request_is_write_idempotent) {
if (!task_spec::get(update.code)->rpc_request_is_write_idempotent && !FLAGS_force_send_no_idempotent_when_duplication) {
continue;
}
blob bb;
Expand Down
3 changes: 3 additions & 0 deletions src/replica/mutation.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
#include "utils/autoref_ptr.h"
#include "utils/fmt_logging.h"
#include "utils/link.h"
#include "utils/flags.h"

namespace dsn {
class binary_reader;
Expand All @@ -54,6 +55,8 @@ class latency_tracer;

namespace replication {

DSN_DECLARE_bool(force_send_no_idempotent_when_duplication);

class mutation;

typedef dsn::ref_ptr<mutation> mutation_ptr;
Expand Down
11 changes: 10 additions & 1 deletion src/replica/replica_2pc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,14 @@ DSN_DEFINE_bool(replication,
"reject client write requests if disk status is space insufficient");
DSN_TAG_VARIABLE(reject_write_when_disk_insufficient, FT_MUTABLE);

DSN_DEFINE_bool("replication",
force_send_no_idempotent_when_duplication,
false,
"receive client idempotent write requests and send them to backup cluster when "
"doing duplication");
DSN_TAG_VARIABLE(force_send_no_idempotent_when_duplication, FT_MUTABLE);


DSN_DEFINE_int32(replication,
prepare_timeout_ms_for_secondaries,
1000,
Expand Down Expand Up @@ -154,7 +162,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling)
return;
}

if (is_duplication_master() && !spec->rpc_request_is_write_idempotent) {
if (is_duplication_master() && !spec->rpc_request_is_write_idempotent &&
!FLAGS_force_send_no_idempotent_when_duplication) {
// Ignore non-idempotent write, because duplication provides no guarantee of atomicity to
// make this write produce the same result on multiple clusters.
METRIC_VAR_INCREMENT(dup_rejected_non_idempotent_write_requests);
Expand Down
16 changes: 16 additions & 0 deletions src/server/pegasus_mutation_duplicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,22 @@ using namespace dsn::literals::chrono_literals;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.hash_key);
}
if (tc == dsn::apps::RPC_RRDB_RRDB_INCR) {
dsn::apps::incr_request thrift_request;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.key);
}
if (tc == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
dsn::apps::check_and_set_request thrift_request;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.hash_key);
}
if (tc == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
dsn::apps::check_and_mutate_request thrift_request;
dsn::from_blob_to_thrift(data, thrift_request);
return pegasus_hash_key_hash(thrift_request.hash_key);
}

LOG_FATAL("unexpected task code: {}", tc);
__builtin_unreachable();
}
Expand Down
41 changes: 41 additions & 0 deletions src/server/pegasus_write_service.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,11 @@ METRIC_DEFINE_counter(replica,
dsn::metric_unit::kRequests,
"The number of DUPLICATE requests");

METRIC_DEFINE_counter(replica,
no_idempotent_duplicate,
dsn::metric_unit::kRequests,
"The number of forced idempotent requests when doing duplication");

METRIC_DEFINE_percentile_int64(replica,
dup_time_lag_ms,
dsn::metric_unit::kMilliSeconds,
Expand Down Expand Up @@ -169,6 +174,7 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server)
METRIC_VAR_INIT_replica(check_and_set_latency_ns),
METRIC_VAR_INIT_replica(check_and_mutate_latency_ns),
METRIC_VAR_INIT_replica(dup_requests),
METRIC_VAR_INIT_replica(no_idempotent_duplicate),
METRIC_VAR_INIT_replica(dup_time_lag_ms),
METRIC_VAR_INIT_replica(dup_lagging_writes),
_put_batch_size(0),
Expand Down Expand Up @@ -416,6 +422,41 @@ int pegasus_write_service::duplicate(int64_t decree,
}
continue;
}

if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR ||
request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET ||
request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
// receive no idempotent request from master cluster via duplication
METRIC_VAR_INCREMENT(no_idempotent_duplicate);

if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) {
incr_rpc rpc(write);
resp.__set_error(_impl->incr(ctx.decree, rpc.request(), rpc.response()));
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
continue;
}
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) {
check_and_set_rpc rpc(write);
resp.__set_error(_impl->check_and_set(ctx.decree, rpc.request(), rpc.response()));
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
continue;
}
if (request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) {
check_and_mutate_rpc rpc(write);
resp.__set_error(
_impl->check_and_mutate(ctx.decree, rpc.request(), rpc.response()));
if (resp.error != rocksdb::Status::kOk) {
return resp.error;
}
continue;
}
}


resp.__set_error(rocksdb::Status::kInvalidArgument);
resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code));
return empty_put(ctx.decree);
Expand Down
1 change: 1 addition & 0 deletions src/server/pegasus_write_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ class pegasus_write_service : dsn::replication::replica_base
METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns);

METRIC_VAR_DECLARE_counter(dup_requests);
METRIC_VAR_DECLARE_counter(no_idempotent_duplicate);
METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms);
METRIC_VAR_DECLARE_counter(dup_lagging_writes);

Expand Down

0 comments on commit e60dc0a

Please sign in to comment.