diff --git a/src/server/server_family.cc b/src/server/server_family.cc index d09b5b165e77..9645e10baef0 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -3131,6 +3131,14 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply CHECK(replica_); SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica + // We are responsive after we set the flag and as long the GLOBAL STATE is not loading + // ThisFiber::SleepFor(std::chrono::seconds(10)) + + // This is Fine! We can keep the lock for as much as we like. If we are not in FULL SYNC + // stage (which means that the global state is LOADING) then dragonfly can execute incoming + // requests/commands without an issue after the call to SetMasterFlagsOnAllThreads() above. + // What is more, is as long as we have thread locals on each proactor, we can be highly + // responsive for flows like INFO REPLICATION last_master_data_ = replica_->Stop(); replica_.reset(); @@ -3138,6 +3146,8 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply } // May not switch to ACTIVE if the process is, for example, shutting down at the same time. + // Why we need this ?! If we are in LOADING stage because of the replica being in FULL SYNC + // it should clean up that state.... service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); return builder->SendOk(); @@ -3156,6 +3166,12 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply builder->SendError("Invalid state"); return; } + // We are *NOT* responsive after we set the flag here AND UNTIL we finish FULL SYNC + // However, we might get cancelled by the time we lock again the mutex below + // or because `new_replica->Start()` failed. + // See also Service::VerifyCommandState() + // -> allowed_by_state = dfly_cntx.journal_emulated || (cid->opt_mask() & CO::LOADING); + // ThisFiber::SleepFor(std::chrono::seconds(10)) // Create a new replica and assign it new_replica = make_shared(replicaof_args->host, replicaof_args->port, &service_, @@ -3173,6 +3189,8 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply GenericError ec{}; switch (on_err) { case ActionOnConnectionFail::kReturnOnError: + // Why?! We just Greet and set the ConnectionContext member variables via + // REPLCONF ec = new_replica->Start(); break; case ActionOnConnectionFail::kContinueReplication: @@ -3269,6 +3287,10 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx) return builder->SendError("Full sync not done"); } + // This takes a *lot* of time as it requires some work on the master side as well. + // We keep the lock here so commands like INFO REPLICATION are blocked. + // This should not be the case when we have thread locals for the Replica as it will + // allow other flows to make progress while we are TAKINGOVER and HOLDING the mutex. std::error_code ec = replica_->TakeOver(ArgS(args, 0), save_flag); if (ec) return builder->SendError("Couldn't execute takeover");