Skip to content

Commit

Permalink
Merge branch 'bugfixes/tokumx-1.3' into releases/tokumx-1.3
Browse files Browse the repository at this point in the history
  • Loading branch information
leifwalsh committed Dec 13, 2013
2 parents ef94aa1 + c84f41c commit 4a66267
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 9 deletions.
9 changes: 9 additions & 0 deletions distsrc/NEWS
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ New features and improvements
- Added an optimization to the execution of aggregation pipelines that
greatly speeds up all range queries done for aggregations. (#802)

- The `mongo2toku` migration tool supports authentication with a
MongoDB replica set using new options `--ruser` and `--rpass`.
(#836)

- The `mongo2toku` migration tool now prints OpTimes in human-readable
form. (#839)


Bug fixes
~~~~~~~~~
Expand Down Expand Up @@ -52,6 +59,8 @@ Bug fixes
of continued activity, now continuing to use a cursor will preserve
it indefinitely. (#826)

- Fixed some miscellaneous bugs in `mongo2toku`. (#834, #837, #838)


tokumx 1.3.2
------------
Expand Down
46 changes: 37 additions & 9 deletions src/mongo/tools/2toku.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ namespace po = boost::program_options;

static string fmtOpTime(const OpTime &t) {
stringstream ss;
ss << t.getSecs() << ":" << t.getInc();
ss << t.getSecs() << ":" << t.getInc() << " (" << t.toStringPretty() << ")";
return ss.str();
}

Expand Down Expand Up @@ -124,10 +124,20 @@ class VanillaOplogPlayer : boost::noncopyable {

// nop
if (op == "n") {
if (!_insertBuf.empty()) {
flushInserts();
}
_maxOpTimeSynced = _thisTime;
_thisTime = OpTime();
return true;
}
// "presence of a database"
if (op == "db") {
if (!_insertBuf.empty()) {
flushInserts();
}
_maxOpTimeSynced = _thisTime;
_thisTime = OpTime();
return true;
}
if (op != "c" && op != "i" && op != "u" && op != "d") {
Expand Down Expand Up @@ -279,7 +289,8 @@ class OplogTool : public Tool {
log() << "Exiting while processing operation with OpTime " << _player->thisTimeStr() << endl;
}
report();
string tsString = _player->maxOpTimeSyncedStr();
OpTime t = _player->maxOpTimeSynced();
string tsString = mongoutils::str::stream() << t.getSecs() << ":" << t.getInc();
log() << "Use --ts=" << tsString << " to resume." << endl;
try {
std::ofstream tsFile;
Expand All @@ -305,6 +316,8 @@ class OplogTool : public Tool {
add_options()
("ts" , po::value<string>() , "max OpTime already applied (secs:inc)" )
("from", po::value<string>() , "host to pull from" )
("ruser", po::value<string>(), "user on source host if auth required (must be on admin db)")
("rpass", po::value<string>(), "password on source host")
("oplogns", po::value<string>()->default_value( "local.oplog.rs" ) , "ns to pull from" )
("reportingPeriod", po::value<int>()->default_value(10) , "seconds between progress reports" )
;
Expand Down Expand Up @@ -357,12 +370,27 @@ class OplogTool : public Tool {

_oplogns = getParam("oplogns");

Client::initThread( "mongo2toku" );
if (currentClient.get() == 0) {
Client::initThread( "mongo2toku" );
}

LOG(1) << "going to connect" << endl;

_rconn.reset(ScopedDbConnection::getScopedDbConnection(getParam("from")));

if (hasParam("ruser")) {
if (!hasParam("rpass")) {
log() << "if using auth on source, must specify both --ruser and --rpass" << endl;
return -1;
}
string authErr;
bool authOk = _rconn->conn().auth("admin", getParam("ruser"), getParam("rpass"), authErr);
if (!authOk) {
error() << "error authenticating to admin db on source: " << authErr << endl;
return -1;
}
}

LOG(1) << "connected" << endl;

{
Expand Down Expand Up @@ -404,14 +432,11 @@ class OplogTool : public Tool {
while (running) {
const int tailingQueryOptions = QueryOption_SlaveOk | QueryOption_CursorTailable | QueryOption_OplogReplay | QueryOption_AwaitData;

BSONObjBuilder queryBuilder;
BSONObjBuilder gteBuilder(queryBuilder.subobjStart("ts"));
gteBuilder.appendTimestamp("$gte", _player->maxOpTimeSynced().asDate());
gteBuilder.doneFast();
BSONObj query = queryBuilder.done();

BSONObj res;
auto_ptr<DBClientCursor> cursor(_rconn->conn().query(_oplogns, query, 0, 0, &res, tailingQueryOptions));
auto_ptr<DBClientCursor> cursor(_rconn->conn().query(
_oplogns, QUERY("ts" << GTE << _player->maxOpTimeSynced()),
0, 0, &res, tailingQueryOptions));

if (!cursor->more()) {
log() << "oplog query returned no results, sleeping 10 seconds..." << endl;
Expand All @@ -426,6 +451,7 @@ class OplogTool : public Tool {
if (!tsElt.ok()) {
log() << "oplog format error: " << firstObj << " missing 'ts' field." << endl;
logPosition();
cursor.reset();
_rconn->done();
_rconn.reset();
return -1;
Expand All @@ -436,6 +462,7 @@ class OplogTool : public Tool {
<< ", but didn't find anything before " << fmtOpTime(firstTime) << "!" << endl;
warning() << "This may mean your oplog has been truncated past the point you are trying to resume from." << endl;
warning() << "Either retry with a different value of --ts, or restart your migration procedure." << endl;
cursor.reset();
_rconn->done();
_rconn.reset();
return -1;
Expand All @@ -452,6 +479,7 @@ class OplogTool : public Tool {
bool ok = _player->processObj(obj);
if (!ok) {
logPosition();
cursor.reset();
_rconn->done();
_rconn.reset();
return -1;
Expand Down

0 comments on commit 4a66267

Please sign in to comment.