Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into EVG-18409
Browse files Browse the repository at this point in the history
  • Loading branch information
johndaniels committed Jan 4, 2023
2 parents 870c614 + b4d742b commit 2780ef5
Show file tree
Hide file tree
Showing 91 changed files with 1,821 additions and 497 deletions.
73 changes: 50 additions & 23 deletions src/cast_core/include/cast_core/actors/OptionsConversion.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,14 @@
#ifndef HEADER_1AFC7FF3_F491_452B_9805_18CAEDE4663D_INCLUDED
#define HEADER_1AFC7FF3_F491_452B_9805_18CAEDE4663D_INCLUDED

#include <string_view>

#include <boost/throw_exception.hpp>
#include <bsoncxx/json.hpp>
#include <mongocxx/database.hpp>
#include <mongocxx/pool.hpp>
#include <string_view>

#include <gennylib/Actor.hpp>
#include <gennylib/PhaseLoop.hpp>
#include <gennylib/context.hpp>

#include <value_generators/DocumentGenerator.hpp>

namespace genny {
Expand Down Expand Up @@ -125,30 +124,58 @@ struct NodeConvert<mongocxx::options::find> {
static type convert(const Node& node) {
type rhs{};

if (node["Hint"]) {
auto h = node["Hint"].to<std::string>();
auto hint = mongocxx::hint(std::move(h));
rhs.hint(mongocxx::hint(hint));
if (const auto& allowDiskUse = node["AllowDiskUse"]) {
rhs.allow_disk_use(allowDiskUse.to<bool>());
}
if (node["Comment"]) {
auto c = node["Comment"].to<std::string>();
rhs.comment(std::move(c));
if (const auto& sort = node["Sort"]) {
rhs.sort(bsoncxx::from_json(sort.to<std::string>()));
}
if (node["Limit"]) {
auto limit = node["Limit"].to<int>();
rhs.limit(limit);
if (const auto& collation = node["Collation"]) {
rhs.collation(bsoncxx::from_json(collation.to<std::string>()));
}
if (node["BatchSize"]) {
auto batchSize = node["BatchSize"].to<int>();
rhs.batch_size(batchSize);
// Note that the conversion of hints (here and elsewhere in this file) could be extended
// to support the hint specified as a document. Right now it only supports hints specified
// as a string giving the index name.
if (const auto& hint = node["Hint"]) {
rhs.hint(mongocxx::hint(hint.to<std::string>()));
}
if (node["MaxTime"]) {
auto maxTime = node["MaxTime"].to<genny::TimeSpec>();
rhs.max_time(std::chrono::milliseconds{maxTime});
if (const auto& comment = node["Comment"]) {
rhs.comment(comment.to<std::string>());
}
if (node["ReadPreference"]) {
auto readPref = node["ReadPreference"].to<mongocxx::read_preference>();
rhs.read_preference(readPref);
if (const auto& limit = node["Limit"]) {
rhs.limit(limit.to<int64_t>());
}
if (const auto& skip = node["Skip"]) {
rhs.skip(skip.to<int64_t>());
}
if (const auto& batchSize = node["BatchSize"]) {
rhs.batch_size(batchSize.to<int32_t>());
}
if (const auto& maxTime = node["MaxTime"]) {
auto max = maxTime.to<genny::TimeSpec>();
rhs.max_time(std::chrono::milliseconds{max});
}
if (const auto& readPref = node["ReadPreference"]) {
rhs.read_preference(readPref.to<mongocxx::read_preference>());
}

auto getBoolValue = [&](const std::string& paramName) {
const auto& val = node[paramName];
return val && val.to<bool>();
};

// Figure out the cursor type.
const bool tailable = getBoolValue("Tailable");
const bool awaitData = getBoolValue("AwaitData");
if (tailable && awaitData) {
rhs.cursor_type(mongocxx::cursor::type::k_tailable_await);
} else if (tailable) {
rhs.cursor_type(mongocxx::cursor::type::k_tailable);
} else if (awaitData) {
BOOST_THROW_EXCEPTION(InvalidConfigurationException(
"Cannot set 'awaitData' to true without also setting 'tailable' to true"));
} else {
rhs.cursor_type(mongocxx::cursor::type::k_non_tailable);
}
return rhs;
}
Expand Down
60 changes: 58 additions & 2 deletions src/cast_core/include/cast_core/actors/SamplingLoader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,56 @@

#include <metrics/metrics.hpp>

#include <value_generators/DocumentGenerator.hpp>
#include <mongocxx/collection.hpp>
#include <mutex>
#include <value_generators/PipelineGenerator.hpp>

namespace genny::actor {

/**
* This class represents a sample of documents from a collection which is lazily loaded on the first
* request. It is designed to be shared across threads - it is thread safe.
*
* The lazy loading will allow this sample to be taken once the threads are actively running the
* workload - after previous stages have taken their effect on the collection.
*/
class DeferredSample {
public:
DeferredSample(std::string actorName,
mongocxx::pool::entry client,
mongocxx::collection collection,
uint sampleSize,
PipelineGenerator pipelineSuffixGenerator)
: _actorName{std::move(actorName)},
_client(std::move(client)),
_collection(std::move(collection)),
_sampleSize(sampleSize),
_pipelineSuffixGenerator(std::move(pipelineSuffixGenerator)) {}

/**
* If this is the first caller, will run an aggregation to gather the sample and return it.
* Subsequent callers will block until that is finished and then receive a copy of those
* results.
*/
std::vector<bsoncxx::document::value> getSample();

private:
std::vector<bsoncxx::document::value> gatherSample(const std::lock_guard<std::mutex>& lock);

std::mutex _mutex;

// This vector lazily loaded, but once populated it is owned here and other threads will receive
// a view of these documents via a bsoncxx::document::view.
std::vector<bsoncxx::document::value> _sampleDocs = {};

std::string _actorName;
mongocxx::pool::entry _client;
// '_collection' needs '_client' to be in scope.
mongocxx::collection _collection;
uint _sampleSize;
PipelineGenerator _pipelineSuffixGenerator;
};

/**
* Given a collection that's already populated, will pull a sample of documents from that
* collection and then re insert them in order to grow the collection. This is not guaranteed
Expand All @@ -36,7 +82,10 @@ namespace genny::actor {
class SamplingLoader : public Actor {

public:
explicit SamplingLoader(ActorContext& context);
SamplingLoader(ActorContext& context,
std::string dbName,
std::string collectionName,
std::shared_ptr<DeferredSample> deferredSamplingLoader);
~SamplingLoader() override = default;

static std::string_view defaultName() {
Expand All @@ -51,6 +100,13 @@ class SamplingLoader : public Actor {
metrics::Operation _totalBulkLoad;
metrics::Operation _individualBulkLoad;
mongocxx::pool::entry _client;
mongocxx::collection _collection;

// This is not using WorkloadContext::ShareableState for something that is conceptually similar
// because we do not want to share the sample across all phases of the workload, which would be
// a constraint of that system. We want one per phase. We still have to share across different
// actors for mutliple threads in the same phase, so we do it this way.
std::shared_ptr<DeferredSample> _deferredSample;
PhaseLoop<PhaseConfig> _loop;
};

Expand Down
36 changes: 31 additions & 5 deletions src/cast_core/src/CrudActor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -636,13 +636,19 @@ struct FindOperation : public BaseOperation {
_collection{std::move(collection)},
_operation{operation},
_filter{opNode["Filter"].to<DocumentGenerator>(context, id)} {
if (opNode["Options"]) {
_options = opNode["Options"].to<mongocxx::options::find>();
if (const auto& options = opNode["Options"]; options) {
_options = options.to<mongocxx::options::find>();
if (options["Projection"]) {
_projection.emplace(options["Projection"].to<DocumentGenerator>(context, id));
}
}
}

void run(mongocxx::client_session& session) override {
auto filter = _filter();
if (_projection) {
_options.projection(_projection.value()());
}
this->doBlock(_operation, [&](metrics::OperationContext& ctx) {
auto cursor = (_onSession) ? _collection.find(session, filter.view(), _options)
: _collection.find(filter.view(), _options);
Expand All @@ -660,6 +666,7 @@ struct FindOperation : public BaseOperation {
mongocxx::collection _collection;
mongocxx::options::find _options;
DocumentGenerator _filter;
std::optional<DocumentGenerator> _projection;
metrics::Operation _operation;
};

Expand All @@ -675,13 +682,27 @@ struct FindOneOperation : public BaseOperation {
_collection{std::move(collection)},
_operation{operation},
_filter{opNode["Filter"].to<DocumentGenerator>(context, id)} {
if (opNode["Options"]) {
_options = opNode["Options"].to<mongocxx::options::find>();
if (const auto& options = opNode["Options"]; options) {
_options = options.to<mongocxx::options::find>();
if (options["Projection"]) {
_projection.emplace(options["Projection"].to<DocumentGenerator>(context, id));
}
if (options["Limit"]) {
BOOST_THROW_EXCEPTION(
InvalidConfigurationException("Cannot specify 'limit' to 'findOne' operation"));
}
if (options["BatchSize"]) {
BOOST_THROW_EXCEPTION(InvalidConfigurationException(
"Cannot specify 'batchSize' to 'findOne' operation"));
}
}
}

void run(mongocxx::client_session& session) override {
auto filter = _filter();
if (_projection) {
_options.projection(_projection.value()());
}
this->doBlock(_operation, [&](metrics::OperationContext& ctx) {
auto result = (_onSession) ? _collection.find_one(session, filter.view(), _options)
: _collection.find_one(filter.view(), _options);
Expand All @@ -698,6 +719,7 @@ struct FindOneOperation : public BaseOperation {
mongocxx::collection _collection;
mongocxx::options::find _options;
DocumentGenerator _filter;
std::optional<DocumentGenerator> _projection;
metrics::Operation _operation;
};

Expand Down Expand Up @@ -1193,7 +1215,11 @@ class Delay {
public:
Delay(const Node& node, GeneratorArgs args)
: numberGenerator{makeDoubleGenerator(node["^TimeSpec"]["value"], args)} {
auto unitString = node["^TimeSpec"]["units"].maybe<std::string>().value_or("seconds");
if (!node["^TimeSpec"]["unit"]) {
BOOST_THROW_EXCEPTION(InvalidConfigurationException(
"Each TimeSpec needs a unit declaration."));
}
auto unitString = node["^TimeSpec"]["unit"].to<std::string>();

// Use string::find here so plurals get parsed correctly.
if (unitString.find("nanosecond") == 0) {
Expand Down
4 changes: 2 additions & 2 deletions src/cast_core/src/RunCommand.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void runThenAwaitStepdown(mongocxx::database& database, bsoncxx::document::view&
struct RunCommandOperationConfig {
explicit RunCommandOperationConfig(const genny::Node& node)
: metricsName{node["OperationMetricsName"].maybe<std::string>().value_or("")},
isQuiet{node["OperationIsQuiet"].maybe<bool>().value_or(false)},
isQuiet{node["OperationIsQuiet"].maybe<bool>().value_or(true)},
logResult{node["OperationLogsResult"].maybe<bool>().value_or(false)},
awaitStepdown{node["OperationAwaitStepdown"].maybe<bool>().value_or(false)} {
if (auto opName = node["OperationName"].maybe<std::string>();
Expand All @@ -102,7 +102,7 @@ struct RunCommandOperationConfig {
explicit RunCommandOperationConfig() {}

const std::string metricsName = "";
const bool isQuiet = false;
const bool isQuiet = true;
const bool logResult = false;
const bool awaitStepdown = false;
};
Expand Down
Loading

0 comments on commit 2780ef5

Please sign in to comment.