From e60dc0a942bcd13cf23d8abe386894e95e1d156f Mon Sep 17 00:00:00 2001 From: ninsmiracle Date: Sun, 18 Feb 2024 19:27:00 +0800 Subject: [PATCH] support_no_idempotent_dup --- src/replica/duplication/mutation_batch.cpp | 2 +- src/replica/mutation.h | 3 ++ src/replica/replica_2pc.cpp | 11 +++++- src/server/pegasus_mutation_duplicator.cpp | 16 +++++++++ src/server/pegasus_write_service.cpp | 41 ++++++++++++++++++++++ src/server/pegasus_write_service.h | 1 + 6 files changed, 72 insertions(+), 2 deletions(-) diff --git a/src/replica/duplication/mutation_batch.cpp b/src/replica/duplication/mutation_batch.cpp index 8b7a815fca..e306a936de 100644 --- a/src/replica/duplication/mutation_batch.cpp +++ b/src/replica/duplication/mutation_batch.cpp @@ -173,7 +173,7 @@ void mutation_batch::add_mutation_if_valid(mutation_ptr &mu, decree start_decree // ERR_OPERATION_DISABLED, but there could still be a mutation written // before the duplication was added. // To ignore means this write will be lost, which is acceptable under this rare case. - if (!task_spec::get(update.code)->rpc_request_is_write_idempotent) { + if (!task_spec::get(update.code)->rpc_request_is_write_idempotent && !FLAGS_force_send_no_idempotent_when_duplication) { continue; } blob bb; diff --git a/src/replica/mutation.h b/src/replica/mutation.h index d5b7f238ad..3e20d04531 100644 --- a/src/replica/mutation.h +++ b/src/replica/mutation.h @@ -42,6 +42,7 @@ #include "utils/autoref_ptr.h" #include "utils/fmt_logging.h" #include "utils/link.h" +#include "utils/flags.h" namespace dsn { class binary_reader; @@ -54,6 +55,8 @@ class latency_tracer; namespace replication { +DSN_DECLARE_bool(force_send_no_idempotent_when_duplication); + class mutation; typedef dsn::ref_ptr mutation_ptr; diff --git a/src/replica/replica_2pc.cpp b/src/replica/replica_2pc.cpp index 0e335d49b1..a43c556707 100644 --- a/src/replica/replica_2pc.cpp +++ b/src/replica/replica_2pc.cpp @@ -86,6 +86,14 @@ DSN_DEFINE_bool(replication, "reject client write requests if disk status is space insufficient"); DSN_TAG_VARIABLE(reject_write_when_disk_insufficient, FT_MUTABLE); +DSN_DEFINE_bool("replication", + force_send_no_idempotent_when_duplication, + false, + "receive client idempotent write requests and send them to backup cluster when " + "doing duplication"); +DSN_TAG_VARIABLE(force_send_no_idempotent_when_duplication, FT_MUTABLE); + + DSN_DEFINE_int32(replication, prepare_timeout_ms_for_secondaries, 1000, @@ -154,7 +162,8 @@ void replica::on_client_write(dsn::message_ex *request, bool ignore_throttling) return; } - if (is_duplication_master() && !spec->rpc_request_is_write_idempotent) { + if (is_duplication_master() && !spec->rpc_request_is_write_idempotent && + !FLAGS_force_send_no_idempotent_when_duplication) { // Ignore non-idempotent write, because duplication provides no guarantee of atomicity to // make this write produce the same result on multiple clusters. METRIC_VAR_INCREMENT(dup_rejected_non_idempotent_write_requests); diff --git a/src/server/pegasus_mutation_duplicator.cpp b/src/server/pegasus_mutation_duplicator.cpp index 3553bcfc79..43be6be612 100644 --- a/src/server/pegasus_mutation_duplicator.cpp +++ b/src/server/pegasus_mutation_duplicator.cpp @@ -101,6 +101,22 @@ using namespace dsn::literals::chrono_literals; dsn::from_blob_to_thrift(data, thrift_request); return pegasus_hash_key_hash(thrift_request.hash_key); } + if (tc == dsn::apps::RPC_RRDB_RRDB_INCR) { + dsn::apps::incr_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.key); + } + if (tc == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { + dsn::apps::check_and_set_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.hash_key); + } + if (tc == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + dsn::apps::check_and_mutate_request thrift_request; + dsn::from_blob_to_thrift(data, thrift_request); + return pegasus_hash_key_hash(thrift_request.hash_key); + } + LOG_FATAL("unexpected task code: {}", tc); __builtin_unreachable(); } diff --git a/src/server/pegasus_write_service.cpp b/src/server/pegasus_write_service.cpp index 09f05ae577..98bd0320d1 100644 --- a/src/server/pegasus_write_service.cpp +++ b/src/server/pegasus_write_service.cpp @@ -126,6 +126,11 @@ METRIC_DEFINE_counter(replica, dsn::metric_unit::kRequests, "The number of DUPLICATE requests"); +METRIC_DEFINE_counter(replica, + no_idempotent_duplicate, + dsn::metric_unit::kRequests, + "The number of forced idempotent requests when doing duplication"); + METRIC_DEFINE_percentile_int64(replica, dup_time_lag_ms, dsn::metric_unit::kMilliSeconds, @@ -169,6 +174,7 @@ pegasus_write_service::pegasus_write_service(pegasus_server_impl *server) METRIC_VAR_INIT_replica(check_and_set_latency_ns), METRIC_VAR_INIT_replica(check_and_mutate_latency_ns), METRIC_VAR_INIT_replica(dup_requests), + METRIC_VAR_INIT_replica(no_idempotent_duplicate), METRIC_VAR_INIT_replica(dup_time_lag_ms), METRIC_VAR_INIT_replica(dup_lagging_writes), _put_batch_size(0), @@ -416,6 +422,41 @@ int pegasus_write_service::duplicate(int64_t decree, } continue; } + + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR || + request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET || + request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + // receive no idempotent request from master cluster via duplication + METRIC_VAR_INCREMENT(no_idempotent_duplicate); + + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_INCR) { + incr_rpc rpc(write); + resp.__set_error(_impl->incr(ctx.decree, rpc.request(), rpc.response())); + if (resp.error != rocksdb::Status::kOk) { + return resp.error; + } + continue; + } + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_SET) { + check_and_set_rpc rpc(write); + resp.__set_error(_impl->check_and_set(ctx.decree, rpc.request(), rpc.response())); + if (resp.error != rocksdb::Status::kOk) { + return resp.error; + } + continue; + } + if (request.task_code == dsn::apps::RPC_RRDB_RRDB_CHECK_AND_MUTATE) { + check_and_mutate_rpc rpc(write); + resp.__set_error( + _impl->check_and_mutate(ctx.decree, rpc.request(), rpc.response())); + if (resp.error != rocksdb::Status::kOk) { + return resp.error; + } + continue; + } + } + + resp.__set_error(rocksdb::Status::kInvalidArgument); resp.__set_error_hint(fmt::format("unrecognized task code {}", request.task_code)); return empty_put(ctx.decree); diff --git a/src/server/pegasus_write_service.h b/src/server/pegasus_write_service.h index 18db638176..967a548cce 100644 --- a/src/server/pegasus_write_service.h +++ b/src/server/pegasus_write_service.h @@ -232,6 +232,7 @@ class pegasus_write_service : dsn::replication::replica_base METRIC_VAR_DECLARE_percentile_int64(check_and_mutate_latency_ns); METRIC_VAR_DECLARE_counter(dup_requests); + METRIC_VAR_DECLARE_counter(no_idempotent_duplicate); METRIC_VAR_DECLARE_percentile_int64(dup_time_lag_ms); METRIC_VAR_DECLARE_counter(dup_lagging_writes);