Skip to content

Commit

Permalink
fix(cluster): Join on specified attempt id (#3305)
Browse files Browse the repository at this point in the history
**The Bug**

Before this fix, source nodes would send `FIN` entries to target nodes
(in all thread flows), and would then send a `DFLYMIGRATE ACK` command
to verify that all flows received the `FIN` in time.

If they didn't, the source node would retry this logic in a loop, until
successful.

The problem is that, in some rear cases, one or more of the flows would
indeed be in a `FIN` state, _but of a previous `FIN` that is already
outdated_. If that's indeed the case, all data between that `FIN` and
the next `FIN`(s) will be lost.

**The Fix**

We already have an attempt id that we send in the `DFLYMIGRATE ACK`
command, and return it in the response. This fix utilizes the same
attempt id to be sent to all flows, and then when joined, we make sure
we join on the correct (latest) attempt id.

Unfortunately, we can't use `FIN` opcode now, because the protocol does
not send any additional metadata for this opcode. I chose to use LSN
because it has exactly the fields that we need, and one could possibly
think of Log Sequence Number as an attempt id, but I could change that
if it's unclear or too hacky.

**Testing**

To reproduce this, one needs to lower
`--slot_migration_connection_timeout_ms` significantly, say to 500ms.
This would fail, on my laptop, every ~2 runs.

With this fix, it runs hundreds of times and never reproduces.
  • Loading branch information
chakaz authored Jul 14, 2024
1 parent 8355569 commit 4e3bd94
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 24 deletions.
2 changes: 1 addition & 1 deletion src/server/cluster/cluster_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
if (!migration)
return cntx->SendError(kIdNotFound);

if (!migration->Join()) {
if (!migration->Join(attempt)) {
return cntx->SendError("Join timeout happened");
}

Expand Down
48 changes: 35 additions & 13 deletions src/server/cluster/incoming_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,16 @@ class ClusterShardMigration {
break;
}

while (tx_data->opcode == journal::Op::FIN) {
VLOG(2) << "Attempt to finalize flow " << source_shard_id_;
while (tx_data->opcode == journal::Op::LSN) {
VLOG(2) << "Attempt to finalize flow " << source_shard_id_ << " attempt " << tx_data->lsn;
last_attempt_.store(tx_data->lsn);
bc->Dec(); // we can Join the flow now
// if we get new data, attempt is failed
if (tx_data = tx_reader.NextTxData(&reader, cntx); !tx_data) {
VLOG(1) << "Finalized flow " << source_shard_id_;
return;
}
VLOG(2) << "Attempt failed to finalize flow " << source_shard_id_;
bc->Add(); // the flow isn't finished so we lock it again
}
if (tx_data->opcode == journal::Op::PING) {
Expand All @@ -70,6 +72,7 @@ class ClusterShardMigration {
}
}

VLOG(2) << "Flow " << source_shard_id_ << " canceled";
bc->Dec(); // we should provide ability to join the flow
}

Expand All @@ -86,14 +89,19 @@ class ClusterShardMigration {
return {};
}

long GetLastAttempt() const {
return last_attempt_.load();
}

private:
void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) {
if (cntx->IsCancelled()) {
return;
}
CHECK(tx_data.shard_cnt <= 1); // we don't support sync for multishard execution
if (!tx_data.IsGlobalCmd()) {
VLOG(3) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
VLOG(3) << "Execute cmd without sync between shards. cmd: "
<< CmdArgList(tx_data.command.cmd_args);
executor_.Execute(tx_data.dbid, tx_data.command);
} else {
// TODO check which global commands should be supported
Expand All @@ -112,6 +120,7 @@ class ClusterShardMigration {
util::FiberSocketBase* socket_ ABSL_GUARDED_BY(mu_);
JournalExecutor executor_;
IncomingSlotMigration* in_migration_;
atomic_long last_attempt_{-1};
};

IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots,
Expand All @@ -130,16 +139,29 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot
IncomingSlotMigration::~IncomingSlotMigration() {
}

bool IncomingSlotMigration::Join() {
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (bc_->WaitFor(timeout)) {
state_.store(MigrationState::C_FINISHED);
keys_number_ = cluster::GetKeyCount(slots_);
return true;
bool IncomingSlotMigration::Join(long attempt) {
const absl::Time start = absl::Now();
const absl::Duration timeout =
absl::Milliseconds(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms));

while (true) {
const absl::Time now = absl::Now();
const absl::Duration passed = now - start;
VLOG(1) << "Checking whether to continue with join " << passed << " vs " << timeout;
if (passed >= timeout) {
LOG(WARNING) << "Can't join migration in time";
ReportError(GenericError("Can't join migration in time"));
return false;
}

if ((bc_->WaitFor(absl::ToInt64Milliseconds(timeout - passed) * 1ms)) &&
(std::all_of(shard_flows_.begin(), shard_flows_.end(),
[&](const auto& flow) { return flow->GetLastAttempt() == attempt; }))) {
state_.store(MigrationState::C_FINISHED);
keys_number_ = cluster::GetKeyCount(slots_);
return true;
}
}
LOG(WARNING) << "Can't join migration in time";
ReportError(GenericError("Can't join migration in time"));
return false;
}

void IncomingSlotMigration::Stop() {
Expand All @@ -159,7 +181,7 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou
state_.store(MigrationState::C_SYNC);

shard_flows_[shard]->Start(&cntx_, source, bc_);
VLOG(1) << "Incoming flow: " << shard << " finished for " << source_id_;
VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_;
}

size_t IncomingSlotMigration::GetKeyCount() const {
Expand Down
2 changes: 1 addition & 1 deletion src/server/cluster/incoming_slot_migration.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class IncomingSlotMigration {
// Waits until all flows got FIN opcode.
// returns true if we joined false if timeout is readed
// After Join we still can get data due to error situation
[[nodiscard]] bool Join();
[[nodiscard]] bool Join(long attempt);

// Stop migrations, can be called even after migration is finished
void Stop();
Expand Down
11 changes: 6 additions & 5 deletions src/server/cluster/outgoing_slot_migration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
streamer_.Cancel();
}

void Finalize() {
streamer_.SendFinalize();
void Finalize(long attempt) {
streamer_.SendFinalize(attempt);
}

const dfly::GenericError GetError() const {
Expand Down Expand Up @@ -270,9 +270,9 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
pause_fb_opt->JoinIfNeeded();
});

auto cb = [this](util::ProactorBase* pb) {
auto cb = [this, attempt](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
slot_migrations_[shard->shard_id()]->Finalize();
slot_migrations_[shard->shard_id()]->Finalize(attempt);
}
};

Expand Down Expand Up @@ -302,7 +302,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
}

const auto attempt_res = get<int64_t>(LastResponseArgs().front().u);
if (attempt_res == kInvalidAttempt) {
if (attempt_res != attempt) {
LOG(WARNING) << "Incorrect attempt payload, sent " << attempt << " received " << attempt_res;
return false;
}

Expand Down
6 changes: 3 additions & 3 deletions src/server/journal/streamer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,9 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
} while (cursor);
}

void RestoreStreamer::SendFinalize() {
VLOG(1) << "RestoreStreamer FIN opcode for : " << db_slice_->shard_id();
journal::Entry entry(journal::Op::FIN, 0 /*db_id*/, 0 /*slot_id*/);
void RestoreStreamer::SendFinalize(long attempt) {
VLOG(1) << "RestoreStreamer LSN opcode for : " << db_slice_->shard_id() << " attempt " << attempt;
journal::Entry entry(journal::Op::LSN, attempt);

io::StringSink sink;
JournalWriter writer{&sink};
Expand Down
2 changes: 1 addition & 1 deletion src/server/journal/streamer.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ class RestoreStreamer : public JournalStreamer {
// Cancel() must be called if Start() is called
void Cancel() override;

void SendFinalize();
void SendFinalize(long attempt);

bool IsSnapshotFinished() const {
return snapshot_finished_;
Expand Down

0 comments on commit 4e3bd94

Please sign in to comment.