diff --git a/src/cast_core/include/cast_core/actors/SamplingLoader.hpp b/src/cast_core/include/cast_core/actors/SamplingLoader.hpp index 0b6daa583c..423e60ba80 100644 --- a/src/cast_core/include/cast_core/actors/SamplingLoader.hpp +++ b/src/cast_core/include/cast_core/actors/SamplingLoader.hpp @@ -22,10 +22,56 @@ #include -#include +#include +#include +#include namespace genny::actor { +/** + * This class represents a sample of documents from a collection which is lazily loaded on the first + * request. It is designed to be shared across threads - it is thread safe. + * + * The lazy loading will allow this sample to be taken once the threads are actively running the + * workload - after previous stages have taken their effect on the collection. + */ +class DeferredSample { +public: + DeferredSample(std::string actorName, + mongocxx::pool::entry client, + mongocxx::collection collection, + uint sampleSize, + PipelineGenerator pipelineSuffixGenerator) + : _actorName{std::move(actorName)}, + _client(std::move(client)), + _collection(std::move(collection)), + _sampleSize(sampleSize), + _pipelineSuffixGenerator(std::move(pipelineSuffixGenerator)) {} + + /** + * If this is the first caller, will run an aggregation to gather the sample and return it. + * Subsequent callers will block until that is finished and then receive a copy of those + * results. + */ + std::vector getSample(); + +private: + std::vector gatherSample(const std::lock_guard& lock); + + std::mutex _mutex; + + // This vector lazily loaded, but once populated it is owned here and other threads will receive + // a view of these documents via a bsoncxx::document::view. + std::vector _sampleDocs = {}; + + std::string _actorName; + mongocxx::pool::entry _client; + // '_collection' needs '_client' to be in scope. + mongocxx::collection _collection; + uint _sampleSize; + PipelineGenerator _pipelineSuffixGenerator; +}; + /** * Given a collection that's already populated, will pull a sample of documents from that * collection and then re insert them in order to grow the collection. This is not guaranteed @@ -36,7 +82,10 @@ namespace genny::actor { class SamplingLoader : public Actor { public: - explicit SamplingLoader(ActorContext& context); + SamplingLoader(ActorContext& context, + std::string dbName, + std::string collectionName, + std::shared_ptr deferredSamplingLoader); ~SamplingLoader() override = default; static std::string_view defaultName() { @@ -51,6 +100,13 @@ class SamplingLoader : public Actor { metrics::Operation _totalBulkLoad; metrics::Operation _individualBulkLoad; mongocxx::pool::entry _client; + mongocxx::collection _collection; + + // This is not using WorkloadContext::ShareableState for something that is conceptually similar + // because we do not want to share the sample across all phases of the workload, which would be + // a constraint of that system. We want one per phase. We still have to share across different + // actors for mutliple threads in the same phase, so we do it this way. + std::shared_ptr _deferredSample; PhaseLoop _loop; }; diff --git a/src/cast_core/src/SamplingLoader.cpp b/src/cast_core/src/SamplingLoader.cpp index fc0b1e103c..309e1ae010 100644 --- a/src/cast_core/src/SamplingLoader.cpp +++ b/src/cast_core/src/SamplingLoader.cpp @@ -48,67 +48,84 @@ using bsoncxx::builder::basic::make_document; /** @private */ struct SamplingLoader::PhaseConfig { PhaseConfig(PhaseContext& context, mongocxx::pool::entry& client, ActorId id) - : database{(*client)[context["Database"].to()]}, - collection{database[context["Collection"].to()]}, - insertBatchSize{context["InsertBatchSize"].to()}, - numBatches{context["Batches"].to()}, - // A sample size is optional. If unspecified, we'll make a sample size such that each - // sample document is inserted one time. - sampleSize{ - context["SampleSize"].maybe().value_or(numBatches * insertBatchSize)}, - pipelineSuffixGenerator{context["Pipeline"] - .maybe(context, id) - .value_or(PipelineGenerator{})} {} - - mongocxx::database database; - mongocxx::collection collection; + : insertBatchSize{context["InsertBatchSize"].to()}, + numBatches{context["Batches"].to()} {} + // See src/workloads/docs/SamplingLoader.yml for a description of the arguments and some // examples. int64_t insertBatchSize; int64_t numBatches; - int64_t sampleSize; - PipelineGenerator pipelineSuffixGenerator; }; +std::vector DeferredSample::getSample() { + std::lock_guard lock(_mutex); + if (_sampleDocs.empty()) { + _sampleDocs = gatherSample(lock); + } + return _sampleDocs; +} + +std::vector DeferredSample::gatherSample( + const std::lock_guard& lock) { + mongocxx::pipeline samplePipeline; + samplePipeline.sample(_sampleSize); + samplePipeline.project(make_document(kvp("_id", 0))); + const auto suffixPipe = pipeline_helpers::makePipeline(_pipelineSuffixGenerator); + samplePipeline.append_stages(suffixPipe.view_array()); + + std::vector sampleDocs; + const int maxRetries = 3; + for (int nRetries = 0; nRetries < maxRetries; ++nRetries) { + try { + auto cursor = _collection.aggregate(samplePipeline, mongocxx::options::aggregate{}); + sampleDocs = std::vector(cursor.begin(), cursor.end()); + break; + } catch (const mongocxx::operation_exception& ex) { + if (nRetries >= maxRetries) { + BOOST_LOG_TRIVIAL(warning) + << "Exceeded maximum number of retries: " << maxRetries << ". Giving up"; + BOOST_THROW_EXCEPTION(ex); + } else if (ex.code().value() == 28799) { + // $sample couldn't find a non-duplicate document. + // See SERVER-29446, this can happen sporadically and is safe to retry. + BOOST_LOG_TRIVIAL(info) + << "Got a retryable error when gathering the sample. Retrying..."; + } else { + BOOST_LOG_TRIVIAL(warning) << "Unexpected error when gathering sample: "; + BOOST_THROW_EXCEPTION(ex); + } + } + } + + if (sampleDocs.empty()) { + BOOST_THROW_EXCEPTION(InvalidConfigurationException( + _actorName + ": Sample was unable to find any documents from collection '" + + to_string(_collection.name()) + + "'. Could the collection be empty or could the pipeline be filtering out " + "documents? Attempting to sample " + + boost::to_string(_sampleSize) + + " documents. Pipeline suffix = " + to_json(suffixPipe.view_array()))); + } + if (sampleDocs.size() < _sampleSize) { + BOOST_THROW_EXCEPTION(InvalidConfigurationException( + _actorName + ": Could not get a sample of the expected size. Either the collection '" + + to_string(_collection.name()) + "' is smaller than the requested sample size of " + + boost::to_string(_sampleSize) + + " documents, or the specified pipeline suffix is filtering documents. Found " + "only " + + boost::to_string(sampleDocs.size()) + + " documents. Pipeline suffix = " + to_json(suffixPipe.view_array()))); + } + return sampleDocs; +} + void genny::actor::SamplingLoader::run() { for (auto&& config : _loop) { for (auto&& _ : config) { + // Now that we are running, we know our designated phase has started. Let's collect the + // deferred sample now - it can now observe the results of previous phases. + auto sampleDocs = _deferredSample->getSample(); BOOST_LOG_TRIVIAL(debug) << "Beginning to run SamplingLoader"; - // Read the sample size documents into a vector. - mongocxx::pipeline samplePipeline; - samplePipeline.sample(config->sampleSize); - samplePipeline.project(make_document(kvp("_id", 0))); - const auto suffixStages = - pipeline_helpers::makePipeline(config->pipelineSuffixGenerator).view_array(); - samplePipeline.append_stages(suffixStages); - auto cursor = - config->collection.aggregate(samplePipeline, mongocxx::options::aggregate{}); - const auto sampleDocs = - std::vector(cursor.begin(), cursor.end()); - - if (sampleDocs.empty()) { - BOOST_THROW_EXCEPTION(InvalidConfigurationException( - "Sample was unable to find any documents from collection '" + - to_string(config->collection.name()) + - "'. Could the collection be empty or could the pipeline be filtering out " - "documents? Attempting to sample " + - boost::to_string(config->sampleSize) + - " documents. Pipeline suffix = " + to_json(suffixStages))); - } - if (sampleDocs.size() < config->sampleSize) { - BOOST_THROW_EXCEPTION(InvalidConfigurationException( - "Could not get a sample of the expected size. Either the collection '" + - to_string(config->collection.name()) + - "' is smaller than the requested sample size of " + - boost::to_string(config->sampleSize) + - " documents, or the specified pipeline suffix is filtering documents. Found " - "only " + - boost::to_string(sampleDocs.size()) + - " documents. Pipeline suffix = " + to_json(suffixStages))); - } - - // Maintain an index into the sample that's used for the insert batches so we - // insert roughly the same number of copies for each sample. size_t sampleIdx = 0; // We will re-use the vector of the same size for each batch. @@ -124,7 +141,8 @@ void genny::actor::SamplingLoader::run() { // Now do the insert. auto individualOpCtx = _individualBulkLoad.start(); try { - config->collection.insert_many(batchOfDocs); + _collection.insert_many(batchOfDocs, + mongocxx::options::insert{}.ordered(false)); individualOpCtx.success(); totalOpCtx.success(); } catch (const mongocxx::operation_exception& x) { @@ -138,15 +156,57 @@ void genny::actor::SamplingLoader::run() { } } -SamplingLoader::SamplingLoader(genny::ActorContext& context) +SamplingLoader::SamplingLoader(genny::ActorContext& context, + std::string dbName, + std::string collectionName, + std::shared_ptr deferredSample) : Actor(context), _totalBulkLoad{context.operation("TotalBulkInsert", SamplingLoader::id())}, _individualBulkLoad{context.operation("IndividualBulkInsert", SamplingLoader::id())}, _client{std::move( context.client(context.get("ClientName").maybe().value_or("Default")))}, + _collection{(*_client)[dbName][collectionName]}, + _deferredSample{std::move(deferredSample)}, _loop{context, _client, SamplingLoader::id()} {} +class SamplingLoaderProducer : public genny::ActorProducer { +public: + SamplingLoaderProducer(const std::string_view& name) : ActorProducer(name) {} + genny::ActorVector produce(genny::ActorContext& context) { + if (context["Type"].to() != "SamplingLoader") { + return {}; + } + genny::ActorVector out; + uint totalThreads = context["Threads"].to(); + + uint sampleSize = context["SampleSize"].to(); + auto pipelineSuffixGenerator = + context["Pipeline"].maybe(context).value_or(PipelineGenerator{}); + auto clientName = context.get("ClientName").maybe().value_or("Default"); + auto client = context.client(clientName); + auto database = client->database(context["Database"].to()); + auto collName = context["Collection"].to(); + auto collection = database[collName]; + auto deferredSample = + std::make_shared(context["Name"].maybe().value_or( + std::string{SamplingLoader::defaultName()}), + std::move(client), + std::move(collection), + sampleSize, + std::move(pipelineSuffixGenerator)); + + for (uint i = 0; i < totalThreads; ++i) { + out.emplace_back(std::make_unique( + context, database.name().to_string(), collName, deferredSample)); + } + return out; + } +}; + namespace { -auto registration = genny::Cast::registerDefault(); +std::shared_ptr loaderProducer = + std::make_shared("SamplingLoader"); +auto registration = genny::Cast::registerCustom(loaderProducer); + } // namespace } // namespace genny::actor diff --git a/src/cast_core/test/SamplingLoader_test.cpp b/src/cast_core/test/SamplingLoader_test.cpp index e9b22216cb..a699054e54 100644 --- a/src/cast_core/test/SamplingLoader_test.cpp +++ b/src/cast_core/test/SamplingLoader_test.cpp @@ -56,14 +56,14 @@ TEST_CASE_METHOD(MongoTestFixture, # Insert 40 new documents (batches=2 * batchSize=10 * threads=2), each with a 'y' field. - Name: SamplingLoader Type: SamplingLoader + Database: test + Collection: sampling_loader_test + SampleSize: 5 + Pipeline: [{$set: {y: "SamplingLoader wuz here"}}] Threads: 2 Phases: - Repeat: 1 - Database: test - Collection: sampling_loader_test - SampleSize: 5 InsertBatchSize: 10 - Pipeline: [{$set: {y: "SamplingLoader wuz here"}}] Batches: 2 Metrics: @@ -81,11 +81,6 @@ TEST_CASE_METHOD(MongoTestFixture, genny::ActorHelper ah(nodes.root(), 2 /* 2 threads for samplers */); ah.run(); - // We can't make many reliable assertions on the output data, since each thread is - // acting independently, and (as mentioned in src/workloads/docs/SamplingLoader.yml) one - // thread may read another's inserted documents in its sample. So, we'll just assert - // the following: - // There should still be only 5 distinct values of 'x'. mongocxx::pipeline pipe; pipe.group(from_json(R"({"_id": "$x"})")); diff --git a/src/value_generators/include/value_generators/PipelineGenerator.hpp b/src/value_generators/include/value_generators/PipelineGenerator.hpp index d59a8a0f6e..58f4bff1d1 100644 --- a/src/value_generators/include/value_generators/PipelineGenerator.hpp +++ b/src/value_generators/include/value_generators/PipelineGenerator.hpp @@ -29,16 +29,30 @@ namespace genny { struct PipelineGenerator { PipelineGenerator() = default; - PipelineGenerator(const Node& node, PhaseContext& context, ActorId id) { - if (!node.isSequence()) { - BOOST_THROW_EXCEPTION(InvalidConfigurationException("'Pipeline' must be an array")); + PipelineGenerator(const Node& node, ActorContext& context) { + assertIsArray(node); + for (auto&& [_, stageNode] : node) { + // The '1' here is a lie, but we don't necessarily have an actor ID yet in this + // scenario. + stageGenerators.push_back(stageNode.to(context, 1)); } + } + + PipelineGenerator(const Node& node, PhaseContext& context, ActorId id) { + assertIsArray(node); for (auto&& [_, stageNode] : node) { stageGenerators.push_back(stageNode.to(context, id)); } } std::vector stageGenerators; + +private: + void assertIsArray(const Node& node) { + if (!node.isSequence()) { + BOOST_THROW_EXCEPTION(InvalidConfigurationException("'Pipeline' must be an array")); + } + } }; } // namespace genny diff --git a/src/workloads/docs/SamplingLoader.yml b/src/workloads/docs/SamplingLoader.yml index 88ed3df33f..ad5e8bc9b4 100644 --- a/src/workloads/docs/SamplingLoader.yml +++ b/src/workloads/docs/SamplingLoader.yml @@ -29,42 +29,38 @@ Actors: # Our first demonstration of the sampling loader. - Name: BasicSamplingDemo Type: SamplingLoader + Database: *db + Collection: *Collection Threads: 1 + # Number of documents in the sample - required. The loader will cycle through the sampled docs, + # using them as templates for the ones to be incerted, until it inserts the requested number of + # batches. For example, if the sample size is set to 1, exactly one document is sampled from the + # original dataset and reused as a template for _all_ of the inserted docs. And if the sample size + # is equal to the insert batch size, each sampled document will be "re-inserted" as many times as + # the number of batches. + # + # Using a small sample size may be useful if you want to measure performance of inserting many + # documents that don't need a variety of input data, as the small sample size allows for the + # random cursor optimization during sampling. + # + # Note that the sample size does not need to be correlated or a multiple of the insert batch size + # or number of batches. + SampleSize: 7 Phases: - {Nop: true} + # "Repeat" is pretty redundant with "InsertBatchSize", but "InsertBatchSize" is preferred. + # Repetitions will still share the same sample. - Repeat: 1 - Database: *db - Collection: *Collection # Number of documents in a single insert operation. InsertBatchSize: 5 # Number of insert operations. The total number of documents to be inserted is # Batches * InsertBatchSize. Batches: 2 - # Number of documents in the sample. The loader will cycle through the sampled docs, using them - # as templates for the ones to be incerted, until it inserts the requested number of batches. - # For example, if the sample size is set to 1, exactly one document is sampled from the original - # dataset and reused as a template for _all_ of the inserted docs. And if the sample size is - # equal to the insert batch size, each sampled document will be "re-inserted" as many times as - # the number of batches. - # - # Using a small sample size may be useful if you want to measure performance of inserting many - # documents that don't need a variety of input data, as the small sample size allows for the - # random cursor optimization during sampling. - # - # Note that the sample size does not need to be correlated or a multiple of the insert batch size - # or number of batches. - # - # If the sample size is omitted, the loader will set it to the number of the requested documents - # (Batches * InsertBatchSize) so that each sampled document is re-inserted exactly once. - SampleSize: 7 - {Nop: true} - {Nop: true} # In this case we use multiple threads. -# With multiple threads, each thread will operate independently, including in gathering the sample. -# A word of caution: because there is no synchronization between threads, this means it is possible -# for one thread to observe a document that was inserted by another SamplingLoader in the same -# phase. Plan accordingly. +# With multiple threads, each thread will share a sample which is gathered before the load starts. # # The total number of documents inserted then will be Threads * Batches * InsertBatchSize, here # 4 * 5 * 20 = 400. @@ -73,14 +69,14 @@ Actors: # accept an overall document count target which gets divided amongst the threads. - Name: MultiThreadedExample Type: SamplingLoader + Database: *db + Collection: *Collection + SampleSize: 5 Threads: 4 Phases: - {Nop: true} - {Nop: true} - Repeat: 1 - Database: *db - Collection: *Collection - SampleSize: 5 Batches: 5 InsertBatchSize: 20 - {Nop: true} @@ -92,23 +88,21 @@ Actors: - Name: PipelineOptionExample Type: SamplingLoader Threads: 4 + Database: *db + Collection: *Collection + SampleSize: 5 + # Note again: each thread shares a sample. This means: + # + # - the pipeline is run only once to gather the sample, and so any document generators you use in + # the pipeline specification will be instantiated once and only once. + # + # - this pipeline is run before the individual actors are generated (one per thread), so the actor + # ID will not be available. You should not use the {^ActorId: {}} generator in this pipeline. + Pipeline: [{$set: {y: "SamplingLoader wuz here"}}] Phases: - {Nop: true} - {Nop: true} - {Nop: true} - Repeat: 1 - Database: *db - Collection: *Collection - SampleSize: 5 Batches: 5 InsertBatchSize: 20 - # Note again: each thread is independent. As mentioned earlier, it is possible that in a race - # condition one thread will observe another thread's inserted documents in the sample. Here that - # would mean we might see a document that already has the 'y' and 'actorId' fields present. In - # this case they will be overridden with new values though, so it would be harmless. The end - # result should be that we see Batches * InsertBatchSize = 100 new documents per thread, and - # each actorId repeated 100 times. - # - # Note that the pipeline is run only once to gather the sample, and so any document generators - # you use in the pipeline specification will be instantiated once and only once. - Pipeline: [{$set: {y: "SamplingLoader wuz here", actorId: {^ActorId: {}}}}] diff --git a/src/workloads/query/CsiFragmentedInsertsFlat.yml b/src/workloads/query/CsiFragmentedInsertsFlat.yml index 18c618fb11..952e4e006c 100644 --- a/src/workloads/query/CsiFragmentedInsertsFlat.yml +++ b/src/workloads/query/CsiFragmentedInsertsFlat.yml @@ -84,28 +84,26 @@ ActorTemplates: Config: Name: {^Parameter: {Name: "Name", Default: "Insert"}} Type: SamplingLoader + Database: *db + Collection: *coll + + # No more than sampleSize*objectWidth paths can be "touched" by a sample. + SampleSize: *sampleSize + Threads: {^Parameter: {Name: "Threads", Default: 1}} Phases: OnlyActiveInPhases: Active: [{^Parameter: {Name: "OnlyActiveInPhase", Default: 1024}}] NopInPhasesUpTo: *maxPhases PhaseConfig: - Database: *db - Collection: *coll - - # No more than sampleSize*objectWidth paths can be "touched" by a sample. - SampleSize: *sampleSize - # When the batch size is the same as sample size, each sampled document will be used in # the batch exactly once, and this will be repeated for the number of batches. We expect, # that the first batch might be affected by "cold" caches but the subsequent batches # would be fully warmed up. InsertBatchSize: *sampleSize - # The 'SamplingLoader' actor re-samples on repeat, meaning that it would get a new set of - # documents likely with different paths and values. This makes each repeat hit different - # parts of the indexes, causing a long warm up tail. To avoid this we test with a single - # repeat but multiple batches. + # "Repeat" is pretty redundant with "InsertBatchSize", but "InsertBatchSize" is preferred. + # Repetitions will still share the same sample. Repeat: {^Parameter: {Name: "Repeats", Default: 1}} # The instances of the template must specify the number of batches to make it clear what diff --git a/src/workloads/query/CsiFragmentedInsertsNested.yml b/src/workloads/query/CsiFragmentedInsertsNested.yml index e75a8e4009..4f5d1efa53 100644 --- a/src/workloads/query/CsiFragmentedInsertsNested.yml +++ b/src/workloads/query/CsiFragmentedInsertsNested.yml @@ -92,15 +92,15 @@ ActorTemplates: Config: Name: {^Parameter: {Name: "Name", Default: "Insert"}} Type: SamplingLoader + Database: *db + Collection: *coll + SampleSize: *sampleSize Threads: {^Parameter: {Name: "Threads", Default: 1}} Phases: OnlyActiveInPhases: Active: [{^Parameter: {Name: "OnlyActiveInPhase", Default: 1024}}] NopInPhasesUpTo: *maxPhases PhaseConfig: - Database: *db - Collection: *coll - SampleSize: *sampleSize InsertBatchSize: *sampleSize Repeat: {^Parameter: {Name: "Repeats", Default: 1}} Batches: {^Parameter: {Name: "Batches", Default: 500}}