Skip to content

Commit

Permalink
Fixes missing/duplicate decree scenario
Browse files Browse the repository at this point in the history
Issues involved overwriting highest_proposed_decree at the incorrect times and
appending to requested_values at the incorrect times.

Fixes #12
  • Loading branch information
dgkimura committed Jul 26, 2018
1 parent 41a89f8 commit 0e857a5
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 148 deletions.
6 changes: 3 additions & 3 deletions include/paxos/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ struct ProposerContext : public Context
std::map<Decree, std::shared_ptr<ReplicaSet>, compare_map_decree> promise_map;
std::set<Decree, compare_decree> ntie_map;
std::map<Decree, std::tuple<std::shared_ptr<ReplicaSet>, bool>, compare_map_decree> nprepare_map;
std::set<Decree, compare_root_decree> resume_map;
paxos::lru_set<Decree, compare_root_decree> resume_map;
std::map<Decree, std::shared_ptr<ReplicaSet>, compare_map_decree> naccept_map;
std::deque<std::tuple<std::string, DecreeType>> requested_values;

Expand All @@ -65,7 +65,7 @@ struct ProposerContext : public Context
ntie_map(),
nprepare_map(),
naccept_map(),
resume_map(),
resume_map(256),
requested_values(),
mutex(),
highest_nacked_decree(Replica(""), -1, "first", DecreeType::UserDecree),
Expand Down Expand Up @@ -95,7 +95,7 @@ struct AcceptorContext : public Context
)
: promised_decree(promised_decree_),
accepted_decree(accepted_decree_),
accepted_set(128),
accepted_set(256),
accepted_time(std::chrono::high_resolution_clock::now()),
interval(interval_),
mutex()
Expand Down
121 changes: 73 additions & 48 deletions src/roles.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,44 @@ HandlePromise(

std::lock_guard<std::mutex> lock(context->mutex);

if (IsDecreeHigher(message.decree, context->highest_proposed_decree.Value()) &&
context->replicaset->Contains(message.from))
auto highest_proposed_decree = context->highest_proposed_decree.Value();

if (IsDecreeHigherOrEqual(message.decree, highest_proposed_decree) &&
IsRootDecreeHigher(message.decree, context->ledger->Tail()) &&
IsReplicaEqual(message.to, message.decree.author))
{
//
// If the messaged decree is higher than any previoiusly promised
// decree and the sender is a known sender in our replica set then
// remember the sent decree as the highest promised decree.
// If (1) the decree is higher or equal to most recent proposal and (2)
// the decree is not yet written to the ledger and (3) we are the
// author then consider forwarding accept or updating highest proposed
// decree.
//
context->highest_proposed_decree = message.decree;

if (!message.decree.content.empty() &&
IsRootDecreeEqual(message.decree, highest_proposed_decree) &&
highest_proposed_decree.content != message.decree.content &&
!highest_proposed_decree.content.empty())
{
//
// Promise with non-empty decree contents suggests that decree was once
// in accept state. Therefore we should send accept on this decree
// again and let propogate through to accepted.
//
sender->ReplyAll(Response(message, MessageType::AcceptMessage));
return;
}

if (IsDecreeHigher(message.decree, highest_proposed_decree) &&
highest_proposed_decree.content.empty())
{
//
// If the messaged decree is higher than any previoiusly promised
// decree and the sender is a known sender in our replica set then
// remember the sent decree as the highest promised decree.
//
context->highest_proposed_decree = message.decree;
highest_proposed_decree = context->highest_proposed_decree.Value();
}
}

if (context->promise_map.find(message.decree) == context->promise_map.end())
Expand All @@ -171,16 +200,8 @@ HandlePromise(
context->promise_map[message.decree] = std::make_shared<ReplicaSet>();
}

if (!message.decree.content.empty())
{
//
// Promise with non-empty decree contents suggests that decree was once
// in accept state. Therefore we should send accept on this decree
// again and let propogate through to accepted.
//
sender->ReplyAll(Response(message, MessageType::AcceptMessage));
}
else if (IsDecreeIdentical(message.decree, context->highest_proposed_decree.Value()))
if (IsDecreeIdentical(message.decree, highest_proposed_decree) &&
IsRootDecreeOrdered(context->ledger->Tail(), message.decree))
{
bool duplicate = context->promise_map[message.decree]
->Contains(message.from);
Expand All @@ -203,7 +224,7 @@ HandlePromise(
if (received_promises == minimum_quorum ||
(received_promises >= minimum_quorum && duplicate))
{
if (context->highest_proposed_decree.Value().content.empty() &&
if (highest_proposed_decree.content.empty() &&
!context->requested_values.empty())
{
//
Expand All @@ -216,14 +237,12 @@ HandlePromise(
// ... then we should update the highest proposed decree with
// the requested values.
//
Decree updated_highest_proposed_decree =
context->highest_proposed_decree.Value();
updated_highest_proposed_decree.content = std::get<0>(
highest_proposed_decree.content = std::get<0>(
context->requested_values[0]);
updated_highest_proposed_decree.type = std::get<1>(
highest_proposed_decree.type = std::get<1>(
context->requested_values[0]);

context->highest_proposed_decree = updated_highest_proposed_decree;
context->highest_proposed_decree = highest_proposed_decree;
context->requested_values.erase(
context->requested_values.begin());
}
Expand Down Expand Up @@ -276,6 +295,7 @@ HandleNackTie(
context->pause->Start([&nack_response, &sender, &context]()
{
auto next = nack_response.decree;
next.content = context->highest_proposed_decree.Value().content;

//
// Check again before sending because during the time that we
Expand Down Expand Up @@ -312,18 +332,6 @@ HandleNack(
if (std::get<1>(context->nprepare_map[message.decree]) == false)
{
std::get<0>(context->nprepare_map[message.decree])->Add(message.from);;

int minimum_quorum = context->replicaset->GetSize() / 2 + 1;
int received_nprepare = std::get<0>(context->nprepare_map[message.decree])
->Intersection(context->replicaset)
->GetSize();

if (received_nprepare >= minimum_quorum)
{
std::get<1>(context->nprepare_map[message.decree]) = true;
context->requested_values.push_front(
std::make_tuple(message.decree.content, message.decree.type));
}
}
}

Expand Down Expand Up @@ -352,21 +360,39 @@ HandleResume(
context->promise_map.erase(message.decree);
}

if (IsDecreeIdentical(context->ledger->Tail(), message.decree))
auto highest_proposed_decree = context->highest_proposed_decree.Value();
if (IsRootDecreeEqual(message.decree, highest_proposed_decree) &&
!context->resume_map.contains(message.decree) &&
highest_proposed_decree.content != message.decree.content &&
!highest_proposed_decree.content.empty() &&
std::get<1>(context->nprepare_map[message.decree]) == false)
{
auto decree = context->highest_proposed_decree.Value();
if (IsRootDecreeEqual(message.decree, decree) &&
context->resume_map.find(message.decree) == context->resume_map.end() &&
decree.content != message.decree.content &&
!decree.content.empty() &&
std::get<1>(context->nprepare_map[decree]) == false)
{
context->requested_values.push_front(
std::make_tuple(decree.content, decree.type));
context->resume_map.insert(message.decree);
std::get<1>(context->nprepare_map[decree]) = true;
}
//
// If the root decrees of messaged decree and highest_proposed_decree
// are equal but the contents are not equal, then we know that there
// was a conflict between decrees and our proposal lost.
//
context->requested_values.push_front(
std::make_tuple(highest_proposed_decree.content,
highest_proposed_decree.type));
context->resume_map.insert(message.decree);
std::get<1>(context->nprepare_map[message.decree]) = true;
}

if (IsRootDecreeHigherOrEqual(message.decree, highest_proposed_decree) &&
!highest_proposed_decree.content.empty())
{
//
// At this point we know that the highest_proposed_decree was either
// (1) written to the ledger or (2) re-inserted into requested_values
// list. Cleanup should erase any previously contents.
//
highest_proposed_decree.content = "";
context->highest_proposed_decree = highest_proposed_decree;
}

if (IsRootDecreeEqual(context->ledger->Tail(), message.decree))
{
//
// Setup next round for pending proposals.
//
Expand Down Expand Up @@ -452,7 +478,6 @@ HandleAccept(
std::lock_guard<std::mutex> lock(context->mutex);

if (IsRootDecreeHigher(message.decree, context->promised_decree.Value()) ||
IsDecreeIdentical(message.decree, context->accepted_decree.Value()) ||
IsRootDecreeHigher(message.decree, context->accepted_decree.Value()) ||
IsDecreeIdentical(message.decree, context->accepted_decree.Value()))
{
Expand Down
Loading

0 comments on commit 0e857a5

Please sign in to comment.