Skip to content

Commit

Permalink
PERF-3635 Share a sample across different SamplingLoader threads (#798)
Browse files Browse the repository at this point in the history
  • Loading branch information
cswanson310 authored Dec 27, 2022
1 parent f449258 commit b4d742b
Show file tree
Hide file tree
Showing 7 changed files with 237 additions and 120 deletions.
60 changes: 58 additions & 2 deletions src/cast_core/include/cast_core/actors/SamplingLoader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,56 @@

#include <metrics/metrics.hpp>

#include <value_generators/DocumentGenerator.hpp>
#include <mongocxx/collection.hpp>
#include <mutex>
#include <value_generators/PipelineGenerator.hpp>

namespace genny::actor {

/**
* This class represents a sample of documents from a collection which is lazily loaded on the first
* request. It is designed to be shared across threads - it is thread safe.
*
* The lazy loading will allow this sample to be taken once the threads are actively running the
* workload - after previous stages have taken their effect on the collection.
*/
class DeferredSample {
public:
DeferredSample(std::string actorName,
mongocxx::pool::entry client,
mongocxx::collection collection,
uint sampleSize,
PipelineGenerator pipelineSuffixGenerator)
: _actorName{std::move(actorName)},
_client(std::move(client)),
_collection(std::move(collection)),
_sampleSize(sampleSize),
_pipelineSuffixGenerator(std::move(pipelineSuffixGenerator)) {}

/**
* If this is the first caller, will run an aggregation to gather the sample and return it.
* Subsequent callers will block until that is finished and then receive a copy of those
* results.
*/
std::vector<bsoncxx::document::value> getSample();

private:
std::vector<bsoncxx::document::value> gatherSample(const std::lock_guard<std::mutex>& lock);

std::mutex _mutex;

// This vector lazily loaded, but once populated it is owned here and other threads will receive
// a view of these documents via a bsoncxx::document::view.
std::vector<bsoncxx::document::value> _sampleDocs = {};

std::string _actorName;
mongocxx::pool::entry _client;
// '_collection' needs '_client' to be in scope.
mongocxx::collection _collection;
uint _sampleSize;
PipelineGenerator _pipelineSuffixGenerator;
};

/**
* Given a collection that's already populated, will pull a sample of documents from that
* collection and then re insert them in order to grow the collection. This is not guaranteed
Expand All @@ -36,7 +82,10 @@ namespace genny::actor {
class SamplingLoader : public Actor {

public:
explicit SamplingLoader(ActorContext& context);
SamplingLoader(ActorContext& context,
std::string dbName,
std::string collectionName,
std::shared_ptr<DeferredSample> deferredSamplingLoader);
~SamplingLoader() override = default;

static std::string_view defaultName() {
Expand All @@ -51,6 +100,13 @@ class SamplingLoader : public Actor {
metrics::Operation _totalBulkLoad;
metrics::Operation _individualBulkLoad;
mongocxx::pool::entry _client;
mongocxx::collection _collection;

// This is not using WorkloadContext::ShareableState for something that is conceptually similar
// because we do not want to share the sample across all phases of the workload, which would be
// a constraint of that system. We want one per phase. We still have to share across different
// actors for mutliple threads in the same phase, so we do it this way.
std::shared_ptr<DeferredSample> _deferredSample;
PhaseLoop<PhaseConfig> _loop;
};

Expand Down
168 changes: 114 additions & 54 deletions src/cast_core/src/SamplingLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -48,67 +48,84 @@ using bsoncxx::builder::basic::make_document;
/** @private */
struct SamplingLoader::PhaseConfig {
PhaseConfig(PhaseContext& context, mongocxx::pool::entry& client, ActorId id)
: database{(*client)[context["Database"].to<std::string>()]},
collection{database[context["Collection"].to<std::string>()]},
insertBatchSize{context["InsertBatchSize"].to<IntegerSpec>()},
numBatches{context["Batches"].to<IntegerSpec>()},
// A sample size is optional. If unspecified, we'll make a sample size such that each
// sample document is inserted one time.
sampleSize{
context["SampleSize"].maybe<IntegerSpec>().value_or(numBatches * insertBatchSize)},
pipelineSuffixGenerator{context["Pipeline"]
.maybe<PipelineGenerator>(context, id)
.value_or(PipelineGenerator{})} {}

mongocxx::database database;
mongocxx::collection collection;
: insertBatchSize{context["InsertBatchSize"].to<IntegerSpec>()},
numBatches{context["Batches"].to<IntegerSpec>()} {}

// See src/workloads/docs/SamplingLoader.yml for a description of the arguments and some
// examples.
int64_t insertBatchSize;
int64_t numBatches;
int64_t sampleSize;
PipelineGenerator pipelineSuffixGenerator;
};

std::vector<bsoncxx::document::value> DeferredSample::getSample() {
std::lock_guard<std::mutex> lock(_mutex);
if (_sampleDocs.empty()) {
_sampleDocs = gatherSample(lock);
}
return _sampleDocs;
}

std::vector<bsoncxx::document::value> DeferredSample::gatherSample(
const std::lock_guard<std::mutex>& lock) {
mongocxx::pipeline samplePipeline;
samplePipeline.sample(_sampleSize);
samplePipeline.project(make_document(kvp("_id", 0)));
const auto suffixPipe = pipeline_helpers::makePipeline(_pipelineSuffixGenerator);
samplePipeline.append_stages(suffixPipe.view_array());

std::vector<bsoncxx::document::value> sampleDocs;
const int maxRetries = 3;
for (int nRetries = 0; nRetries < maxRetries; ++nRetries) {
try {
auto cursor = _collection.aggregate(samplePipeline, mongocxx::options::aggregate{});
sampleDocs = std::vector<bsoncxx::document::value>(cursor.begin(), cursor.end());
break;
} catch (const mongocxx::operation_exception& ex) {
if (nRetries >= maxRetries) {
BOOST_LOG_TRIVIAL(warning)
<< "Exceeded maximum number of retries: " << maxRetries << ". Giving up";
BOOST_THROW_EXCEPTION(ex);
} else if (ex.code().value() == 28799) {
// $sample couldn't find a non-duplicate document.
// See SERVER-29446, this can happen sporadically and is safe to retry.
BOOST_LOG_TRIVIAL(info)
<< "Got a retryable error when gathering the sample. Retrying...";
} else {
BOOST_LOG_TRIVIAL(warning) << "Unexpected error when gathering sample: ";
BOOST_THROW_EXCEPTION(ex);
}
}
}

if (sampleDocs.empty()) {
BOOST_THROW_EXCEPTION(InvalidConfigurationException(
_actorName + ": Sample was unable to find any documents from collection '" +
to_string(_collection.name()) +
"'. Could the collection be empty or could the pipeline be filtering out "
"documents? Attempting to sample " +
boost::to_string(_sampleSize) +
" documents. Pipeline suffix = " + to_json(suffixPipe.view_array())));
}
if (sampleDocs.size() < _sampleSize) {
BOOST_THROW_EXCEPTION(InvalidConfigurationException(
_actorName + ": Could not get a sample of the expected size. Either the collection '" +
to_string(_collection.name()) + "' is smaller than the requested sample size of " +
boost::to_string(_sampleSize) +
" documents, or the specified pipeline suffix is filtering documents. Found "
"only " +
boost::to_string(sampleDocs.size()) +
" documents. Pipeline suffix = " + to_json(suffixPipe.view_array())));
}
return sampleDocs;
}

void genny::actor::SamplingLoader::run() {
for (auto&& config : _loop) {
for (auto&& _ : config) {
// Now that we are running, we know our designated phase has started. Let's collect the
// deferred sample now - it can now observe the results of previous phases.
auto sampleDocs = _deferredSample->getSample();
BOOST_LOG_TRIVIAL(debug) << "Beginning to run SamplingLoader";
// Read the sample size documents into a vector.
mongocxx::pipeline samplePipeline;
samplePipeline.sample(config->sampleSize);
samplePipeline.project(make_document(kvp("_id", 0)));
const auto suffixStages =
pipeline_helpers::makePipeline(config->pipelineSuffixGenerator).view_array();
samplePipeline.append_stages(suffixStages);
auto cursor =
config->collection.aggregate(samplePipeline, mongocxx::options::aggregate{});
const auto sampleDocs =
std::vector<bsoncxx::document::value>(cursor.begin(), cursor.end());

if (sampleDocs.empty()) {
BOOST_THROW_EXCEPTION(InvalidConfigurationException(
"Sample was unable to find any documents from collection '" +
to_string(config->collection.name()) +
"'. Could the collection be empty or could the pipeline be filtering out "
"documents? Attempting to sample " +
boost::to_string(config->sampleSize) +
" documents. Pipeline suffix = " + to_json(suffixStages)));
}
if (sampleDocs.size() < config->sampleSize) {
BOOST_THROW_EXCEPTION(InvalidConfigurationException(
"Could not get a sample of the expected size. Either the collection '" +
to_string(config->collection.name()) +
"' is smaller than the requested sample size of " +
boost::to_string(config->sampleSize) +
" documents, or the specified pipeline suffix is filtering documents. Found "
"only " +
boost::to_string(sampleDocs.size()) +
" documents. Pipeline suffix = " + to_json(suffixStages)));
}

// Maintain an index into the sample that's used for the insert batches so we
// insert roughly the same number of copies for each sample.
size_t sampleIdx = 0;

// We will re-use the vector of the same size for each batch.
Expand All @@ -124,7 +141,8 @@ void genny::actor::SamplingLoader::run() {
// Now do the insert.
auto individualOpCtx = _individualBulkLoad.start();
try {
config->collection.insert_many(batchOfDocs);
_collection.insert_many(batchOfDocs,
mongocxx::options::insert{}.ordered(false));
individualOpCtx.success();
totalOpCtx.success();
} catch (const mongocxx::operation_exception& x) {
Expand All @@ -138,15 +156,57 @@ void genny::actor::SamplingLoader::run() {
}
}

SamplingLoader::SamplingLoader(genny::ActorContext& context)
SamplingLoader::SamplingLoader(genny::ActorContext& context,
std::string dbName,
std::string collectionName,
std::shared_ptr<DeferredSample> deferredSample)
: Actor(context),
_totalBulkLoad{context.operation("TotalBulkInsert", SamplingLoader::id())},
_individualBulkLoad{context.operation("IndividualBulkInsert", SamplingLoader::id())},
_client{std::move(
context.client(context.get("ClientName").maybe<std::string>().value_or("Default")))},
_collection{(*_client)[dbName][collectionName]},
_deferredSample{std::move(deferredSample)},
_loop{context, _client, SamplingLoader::id()} {}

class SamplingLoaderProducer : public genny::ActorProducer {
public:
SamplingLoaderProducer(const std::string_view& name) : ActorProducer(name) {}
genny::ActorVector produce(genny::ActorContext& context) {
if (context["Type"].to<std::string>() != "SamplingLoader") {
return {};
}
genny::ActorVector out;
uint totalThreads = context["Threads"].to<int>();

uint sampleSize = context["SampleSize"].to<int>();
auto pipelineSuffixGenerator =
context["Pipeline"].maybe<PipelineGenerator>(context).value_or(PipelineGenerator{});
auto clientName = context.get("ClientName").maybe<std::string>().value_or("Default");
auto client = context.client(clientName);
auto database = client->database(context["Database"].to<std::string>());
auto collName = context["Collection"].to<std::string>();
auto collection = database[collName];
auto deferredSample =
std::make_shared<DeferredSample>(context["Name"].maybe<std::string>().value_or(
std::string{SamplingLoader::defaultName()}),
std::move(client),
std::move(collection),
sampleSize,
std::move(pipelineSuffixGenerator));

for (uint i = 0; i < totalThreads; ++i) {
out.emplace_back(std::make_unique<genny::actor::SamplingLoader>(
context, database.name().to_string(), collName, deferredSample));
}
return out;
}
};

namespace {
auto registration = genny::Cast::registerDefault<SamplingLoader>();
std::shared_ptr<genny::ActorProducer> loaderProducer =
std::make_shared<SamplingLoaderProducer>("SamplingLoader");
auto registration = genny::Cast::registerCustom<genny::ActorProducer>(loaderProducer);

} // namespace
} // namespace genny::actor
13 changes: 4 additions & 9 deletions src/cast_core/test/SamplingLoader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ TEST_CASE_METHOD(MongoTestFixture,
# Insert 40 new documents (batches=2 * batchSize=10 * threads=2), each with a 'y' field.
- Name: SamplingLoader
Type: SamplingLoader
Database: test
Collection: sampling_loader_test
SampleSize: 5
Pipeline: [{$set: {y: "SamplingLoader wuz here"}}]
Threads: 2
Phases:
- Repeat: 1
Database: test
Collection: sampling_loader_test
SampleSize: 5
InsertBatchSize: 10
Pipeline: [{$set: {y: "SamplingLoader wuz here"}}]
Batches: 2
Metrics:
Expand All @@ -81,11 +81,6 @@ TEST_CASE_METHOD(MongoTestFixture,
genny::ActorHelper ah(nodes.root(), 2 /* 2 threads for samplers */);
ah.run();

// We can't make many reliable assertions on the output data, since each thread is
// acting independently, and (as mentioned in src/workloads/docs/SamplingLoader.yml) one
// thread may read another's inserted documents in its sample. So, we'll just assert
// the following:

// There should still be only 5 distinct values of 'x'.
mongocxx::pipeline pipe;
pipe.group(from_json(R"({"_id": "$x"})"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,30 @@ namespace genny {
struct PipelineGenerator {
PipelineGenerator() = default;

PipelineGenerator(const Node& node, PhaseContext& context, ActorId id) {
if (!node.isSequence()) {
BOOST_THROW_EXCEPTION(InvalidConfigurationException("'Pipeline' must be an array"));
PipelineGenerator(const Node& node, ActorContext& context) {
assertIsArray(node);
for (auto&& [_, stageNode] : node) {
// The '1' here is a lie, but we don't necessarily have an actor ID yet in this
// scenario.
stageGenerators.push_back(stageNode.to<DocumentGenerator>(context, 1));
}
}

PipelineGenerator(const Node& node, PhaseContext& context, ActorId id) {
assertIsArray(node);
for (auto&& [_, stageNode] : node) {
stageGenerators.push_back(stageNode.to<DocumentGenerator>(context, id));
}
}

std::vector<DocumentGenerator> stageGenerators;

private:
void assertIsArray(const Node& node) {
if (!node.isSequence()) {
BOOST_THROW_EXCEPTION(InvalidConfigurationException("'Pipeline' must be an array"));
}
}
};

} // namespace genny
Expand Down
Loading

0 comments on commit b4d742b

Please sign in to comment.