Skip to content

Commit

Permalink
Merge pull request ceph#457 from ceph/wip-paxos
Browse files Browse the repository at this point in the history
paxos fixes

Reviewed-by: Greg Farnum <[email protected]>
Reviewed-by: Joao Eduardo Luis <[email protected]>
  • Loading branch information
Sage Weil committed Jul 23, 2013
2 parents 093182b + cfe1395 commit 9626f77
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 15 deletions.
1 change: 1 addition & 0 deletions src/common/config_opts.h
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ OPTION(paxos_trim_min, OPT_INT, 250) // number of extra proposals tolerated bef
OPTION(paxos_trim_max, OPT_INT, 500) // max number of extra proposals to trim at a time
OPTION(paxos_service_trim_min, OPT_INT, 250) // minimum amount of versions to trigger a trim (0 disables it)
OPTION(paxos_service_trim_max, OPT_INT, 500) // maximum amount of versions to trim during a single proposal (0 disables it)
OPTION(paxos_kill_at, OPT_INT, 0)
OPTION(clock_offset, OPT_DOUBLE, 0) // how much to offset the system clock in Clock.cc
OPTION(auth_cluster_required, OPT_STR, "cephx") // required of mon, mds, osd daemons
OPTION(auth_service_required, OPT_STR, "cephx") // required by daemons of clients
Expand Down
84 changes: 72 additions & 12 deletions src/mon/Paxos.cc
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,21 @@ void Paxos::collect(version_t oldpn)

// look for uncommitted value
if (get_store()->exists(get_name(), last_committed+1)) {
version_t v = get_store()->get(get_name(), "pending_v");
version_t pn = get_store()->get(get_name(), "pending_pn");
if (v && pn && v == last_committed + 1) {
uncommitted_pn = pn;
} else {
dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << accepted_pn
<< " and crossing our fingers" << dendl;
uncommitted_pn = accepted_pn;
}
uncommitted_v = last_committed+1;
uncommitted_pn = accepted_pn;

get_store()->get(get_name(), last_committed+1, uncommitted_value);
assert(uncommitted_value.length());
dout(10) << "learned uncommitted " << (last_committed+1)
<< " pn " << uncommitted_pn
<< " (" << uncommitted_value.length() << " bytes) from myself"
<< dendl;
}
Expand Down Expand Up @@ -164,6 +174,8 @@ void Paxos::handle_collect(MMonPaxos *collect)
last->last_committed = last_committed;
last->first_committed = first_committed;

version_t previous_pn = accepted_pn;

// can we accept this pn?
if (collect->pn > accepted_pn) {
// ok, accept it
Expand Down Expand Up @@ -198,13 +210,25 @@ void Paxos::handle_collect(MMonPaxos *collect)
// do we have an accepted but uncommitted value?
// (it'll be at last_committed+1)
bufferlist bl;
if (get_store()->exists(get_name(), last_committed+1)) {
if (collect->last_committed == last_committed &&
get_store()->exists(get_name(), last_committed+1)) {
get_store()->get(get_name(), last_committed+1, bl);
assert(bl.length() > 0);
dout(10) << " sharing our accepted but uncommitted value for "
<< last_committed+1 << " (" << bl.length() << " bytes)" << dendl;
last->values[last_committed+1] = bl;
last->uncommitted_pn = accepted_pn;

version_t v = get_store()->get(get_name(), "pending_v");
version_t pn = get_store()->get(get_name(), "pending_pn");
if (v && pn && v == last_committed + 1) {
last->uncommitted_pn = pn;
} else {
// previously we didn't record which pn a value was accepted
// under! use the pn value we just had... :(
dout(10) << "WARNING: no pending_pn on disk, using previous accepted_pn " << previous_pn
<< " and crossing our fingers" << dendl;
last->uncommitted_pn = previous_pn;
}
}

// send reply
Expand Down Expand Up @@ -370,9 +394,13 @@ void Paxos::handle_last(MMonPaxos *last)
return;
}

assert(g_conf->paxos_kill_at != 1);

// store any committed values if any are specified in the message
store_state(last);

assert(g_conf->paxos_kill_at != 2);

// do they accept your pn?
if (last->pn > accepted_pn) {
// no, try again.
Expand All @@ -390,15 +418,23 @@ void Paxos::handle_last(MMonPaxos *last)
<< num_last << " peons" << dendl;

// did this person send back an accepted but uncommitted value?
if (last->uncommitted_pn &&
last->uncommitted_pn > uncommitted_pn) {
uncommitted_v = last->last_committed+1;
uncommitted_pn = last->uncommitted_pn;
uncommitted_value = last->values[uncommitted_v];
dout(10) << "we learned an uncommitted value for " << uncommitted_v
<< " pn " << uncommitted_pn
<< " " << uncommitted_value.length() << " bytes"
<< dendl;
if (last->uncommitted_pn) {
if (last->uncommitted_pn > uncommitted_pn &&
last->last_committed >= last_committed &&
last->last_committed + 1 >= uncommitted_v) {
uncommitted_v = last->last_committed+1;
uncommitted_pn = last->uncommitted_pn;
uncommitted_value = last->values[uncommitted_v];
dout(10) << "we learned an uncommitted value for " << uncommitted_v
<< " pn " << uncommitted_pn
<< " " << uncommitted_value.length() << " bytes"
<< dendl;
} else {
dout(10) << "ignoring uncommitted value for " << (last->last_committed+1)
<< " pn " << last->uncommitted_pn
<< " " << last->values[last->last_committed+1].length() << " bytes"
<< dendl;
}
}

// is that everyone?
Expand Down Expand Up @@ -502,6 +538,10 @@ void Paxos::begin(bufferlist& v)
MonitorDBStore::Transaction t;
t.put(get_name(), last_committed+1, new_value);

// note which pn this pending value is for.
t.put(get_name(), "pending_v", last_committed + 1);
t.put(get_name(), "pending_pn", accepted_pn);

dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
t.dump(&f);
Expand All @@ -516,6 +556,8 @@ void Paxos::begin(bufferlist& v)

get_store()->apply_transaction(t);

assert(g_conf->paxos_kill_at != 3);

if (mon->get_quorum().size() == 1) {
// we're alone, take it easy
commit();
Expand Down Expand Up @@ -566,6 +608,8 @@ void Paxos::handle_begin(MMonPaxos *begin)
assert(begin->pn == accepted_pn);
assert(begin->last_committed == last_committed);

assert(g_conf->paxos_kill_at != 4);

// set state.
state = STATE_UPDATING;
lease_expire = utime_t(); // cancel lease
Expand All @@ -578,6 +622,10 @@ void Paxos::handle_begin(MMonPaxos *begin)
MonitorDBStore::Transaction t;
t.put(get_name(), v, begin->values[v]);

// note which pn this pending value is for.
t.put(get_name(), "pending_v", v);
t.put(get_name(), "pending_pn", accepted_pn);

dout(30) << __func__ << " transaction dump:\n";
JSONFormatter f(true);
t.dump(&f);
Expand All @@ -586,6 +634,8 @@ void Paxos::handle_begin(MMonPaxos *begin)

get_store()->apply_transaction(t);

assert(g_conf->paxos_kill_at != 5);

// reply
MMonPaxos *accept = new MMonPaxos(mon->get_epoch(), MMonPaxos::OP_ACCEPT,
ceph_clock_now(g_ceph_context));
Expand Down Expand Up @@ -620,6 +670,8 @@ void Paxos::handle_accept(MMonPaxos *accept)
accepted.insert(from);
dout(10) << " now " << accepted << " have accepted" << dendl;

assert(g_conf->paxos_kill_at != 6);

// new majority?
if (accepted.size() == (unsigned)mon->monmap->size()/2+1) {
// yay, commit!
Expand All @@ -643,6 +695,8 @@ void Paxos::handle_accept(MMonPaxos *accept)
// yay!
extend_lease();

assert(g_conf->paxos_kill_at != 10);

finish_round();

// wake people up
Expand Down Expand Up @@ -673,6 +727,8 @@ void Paxos::commit()
// leader still got a majority and committed with out us.)
lease_expire = utime_t(); // cancel lease

assert(g_conf->paxos_kill_at != 7);

MonitorDBStore::Transaction t;

// commit locally
Expand All @@ -692,6 +748,8 @@ void Paxos::commit()

get_store()->apply_transaction(t);

assert(g_conf->paxos_kill_at != 8);

// refresh first_committed; this txn may have trimmed.
first_committed = get_store()->get(get_name(), "first_committed");

Expand All @@ -713,6 +771,8 @@ void Paxos::commit()
mon->messenger->send_message(commit, mon->monmap->get_inst(*p));
}

assert(g_conf->paxos_kill_at != 9);

// get ready for a new round.
new_value.clear();

Expand Down
7 changes: 4 additions & 3 deletions src/mon/Paxos.h
Original file line number Diff line number Diff line change
Expand Up @@ -290,8 +290,9 @@ class Paxos {
*/
version_t accepted_pn;
/**
* @todo This has something to do with the last_committed version. Not sure
* about what it entails, tbh.
* The last_committed epoch of the leader at the time we accepted the last pn.
*
* This has NO SEMANTIC MEANING, and is there only for the debug output.
*/
version_t accepted_pn_from;
/**
Expand Down Expand Up @@ -1114,7 +1115,7 @@ class Paxos {
* @param t The transaction to which we will append the operations
* @param bl A bufferlist containing an encoded transaction
*/
void decode_append_transaction(MonitorDBStore::Transaction& t,
static void decode_append_transaction(MonitorDBStore::Transaction& t,
bufferlist& bl) {
MonitorDBStore::Transaction vt;
bufferlist::iterator it = bl.begin();
Expand Down
14 changes: 14 additions & 0 deletions src/tools/ceph-monstore-tool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "global/global_init.h"
#include "os/LevelDBStore.h"
#include "mon/MonitorDBStore.h"
#include "mon/Paxos.h"
#include "common/Formatter.h"

namespace po = boost::program_options;
Expand Down Expand Up @@ -246,6 +247,19 @@ int main(int argc, char **argv) {
goto done;
}
bl.write_fd(fd);
} else if (cmd == "dump-paxos") {
for (version_t v = dstart; v <= dstop; ++v) {
bufferlist bl;
st.get("paxos", v, bl);
if (bl.length() == 0)
break;
cout << "\n--- " << v << " ---" << std::endl;
MonitorDBStore::Transaction tx;
Paxos::decode_append_transaction(tx, bl);
JSONFormatter f(true);
tx.dump(&f);
f.flush(cout);
}
} else if (cmd == "dump-trace") {
if (tfile.empty()) {
std::cerr << "Need trace_file" << std::endl;
Expand Down

0 comments on commit 9626f77

Please sign in to comment.