Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

$currentOp load shed POC using ExternalScriptRunner #1074

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include <string_view>
#include <string>

#include <mongocxx/pool.hpp>
#include <mongocxx/uri.hpp>

#include <gennylib/Actor.hpp>
#include <gennylib/PhaseLoop.hpp>
Expand Down Expand Up @@ -50,6 +50,7 @@ class ExternalScriptRunner : public Actor {

/** @private */
struct PhaseConfig;
mongocxx::pool::entry _client;
PhaseLoop<PhaseConfig> _loop;
std::string _command;
};
Expand Down
15 changes: 10 additions & 5 deletions src/cast_core/src/ExternalScriptRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include <boost/assert.hpp>
#include <boost/filesystem.hpp>
#include <mongocxx/client.hpp>
#include <cast_core/actors/ExternalScriptRunner.hpp>

namespace genny::actor {
Expand Down Expand Up @@ -117,7 +118,7 @@ class ScriptRunner {

class GeneralRunner: public ScriptRunner {
public:
GeneralRunner(PhaseContext& phaseContext, ActorId id, const std::string& workloadPath)
GeneralRunner(PhaseContext& phaseContext, ActorId id, const std::string& workloadPath, const mongocxx::uri& uri)
: ScriptRunner(phaseContext, id, workloadPath),
_script{phaseContext["Script"].to<std::string>()} {
std::string command{phaseContext["Command"].to<std::string>()};
Expand All @@ -127,7 +128,10 @@ class GeneralRunner: public ScriptRunner {
// No --file argument is required here, the script is run like
// sh /path/to/file
_invocation = "sh";
} else {
} else if (command == "mongo") {
_invocation = "/data/workdir/bin/mongo --quiet --tls --tlsAllowInvalidCertificates \"" + uri.to_string() + "\"";
}
else {
throw std::runtime_error("Script type " + command + " is not supported.");
}
}
Expand Down Expand Up @@ -179,12 +183,12 @@ struct ExternalScriptRunner::PhaseConfig {
// ignored.
metrics::Operation operation;

PhaseConfig(PhaseContext& phaseContext, ActorId id, const std::string& workloadPath, const std::string& type)
PhaseConfig(PhaseContext& phaseContext, ActorId id, const std::string& workloadPath, const std::string& type, const mongocxx::uri& uri)
: operation{phaseContext.operation("DefaultMetricsName", id)} {
if(type == "Python") {
_scriptRunner = std::make_unique<PythonRunner>(phaseContext, id, workloadPath);
} else {
_scriptRunner = std::make_unique<GeneralRunner>(phaseContext, id, workloadPath);
_scriptRunner = std::make_unique<GeneralRunner>(phaseContext, id, workloadPath, uri);
}
}
std::string runScript() {
Expand Down Expand Up @@ -218,7 +222,8 @@ void ExternalScriptRunner::run() {
ExternalScriptRunner::ExternalScriptRunner(genny::ActorContext& context)
// These are the attributes for the actor.
: Actor{context},
_loop{context, ExternalScriptRunner::id(), context.workload().workloadPath(), context["Type"].to<std::string>()}{}
_client{context.client()},
_loop{context, ExternalScriptRunner::id(), context.workload().workloadPath(), context["Type"].to<std::string>(), _client->uri()}{}

namespace {
auto registerExternalScriptRunner = Cast::registerDefault<ExternalScriptRunner>();
Expand Down
173 changes: 170 additions & 3 deletions src/workloads/scale/ReadMemoryStressUntilFailure.yml
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ Clients:
Default:
QueryOptions:
socketTimeoutMS: -1
maxPoolSize: 500
maxPoolSize: 11000

Actors:
# Drop database to get rid of stale data. Useful when running locally multiple times.
- Name: Setup
Expand All @@ -79,6 +79,21 @@ Actors:
- OperationName: RunCommand
OperationCommand: {dropDatabase: 1}

# Profile everything, so we can see $currentOp executions stats. The workload will fail, analysis won't work.
- Name: LogLevel
Type: RunCommand
Threads: 1
Phases:
OnlyActiveInPhases:
Active: [0]
NopInPhasesUpTo: *MaxPhases
PhaseConfig:
Repeat: 1
Database: admin
Operations:
- OperationName: RunCommand
OperationCommand: { profile: 2 }

# Load 20,096 documents around 520KB as described by the structure in GlobalDefaults.
- Name: LoadDocuments
Type: Loader
Expand All @@ -96,6 +111,26 @@ Actors:
DocumentCount: *NumDocs
BatchSize: *LoadBatchSize

# Create 50k idle cursors, to simulate a situation where we have some big memory consumers and many small ops.
- Name: CreateCursors
Type: CrudActor
Threads: 10000
Phases:
OnlyActiveInPhases:
Active: [1]
NopInPhasesUpTo: *MaxPhases
PhaseConfig:
Repeat: 1
Database: someDb
Collection: doesNotMatter
ThrowOnFailure: false
Operations:
- OperationName: find
OperationCommand:
Filter: {}
Options:
BatchSize: 0 # We just want a cursor stablished, the collection does not contain data.

# Spawn many threads to sort enough documents to test server's capacity to handle memory pressure.
- Name: SortMany
Type: RunCommand
Expand All @@ -105,8 +140,10 @@ Actors:
Active: [2]
NopInPhasesUpTo: *MaxPhases
PhaseConfig:
Repeat: *SortRepeat
Duration: 2 minutes
# Repeat: *SortRepeat
Database: *DBName
ThrowOnFailure: false
Operations:
- OperationMetricsName: SortMany
OperationName: RunCommand
Expand All @@ -128,6 +165,136 @@ Actors:
{$sort: {b: 1}}]
cursor: {batchSize: *SortBatchSize}

# - Name: currentOp
# Type: AdminCommand
# # Use a single thread. Multiple $currentOp will block due to the service context mutex (LockedClientCursor), which makes reported times meaningless.
# Threads: 1
# Phases:
# OnlyActiveInPhases:
# Active: [2]
# NopInPhasesUpTo: *MaxPhases
# PhaseConfig:
# Duration: 2 minutes
# Operations:
# - OperationMetricsName: currentOp
# OperationName: RunCommand
# OperationCommand:
# aggregate: 1
# pipeline:
# [{$currentOp: {allUsers: true, idleConnections: true, idleCursors: true, idleSessions: true}}, {$match: {type: "op"}}]
# cursor: {}

# Force system.profile to be constantly flushed to disk
- Name: fsync
Type: AdminCommand
Threads: 1
Phases:
OnlyActiveInPhases:
Active: [2]
NopInPhasesUpTo: *MaxPhases
PhaseConfig:
Duration: 2 minutes
Operations:
- OperationMetricsName: fsync
OperationName: RunCommand
OperationCommand:
fsync: 1

# Connect to mongodb server and run JS script
- Name: MongoshScriptRunnerWithDB
Type: ExternalScriptRunner
Threads: 1
Phases:
OnlyActiveInPhases:
Active: [2]
NopInPhasesUpTo: *MaxPhases
PhaseConfig:
Duration: 2 minutes
Command: "mongo"
MetricsName: ScriptMetrics
Script: |
const minutes = 4;
const startTime = new Date().getTime();
const targetEndTime = startTime + (1000 * 60 * minutes);
function shouldStopTest() {
return (new Date().getTime()) > targetEndTime;
}

function shouldShedOps() {
let residentMemMb = db.serverStatus().mem.resident;
//jsTestLog("Resident " + residentMemMb);
return residentMemMb > 55000;
}

function compareOps(a, b) {
return Date.parse(a.currentOpTime) - Date.parse(b.currentOpTime);
}

const testDB = db.getSiblingDB("memorystress");
function killCursors(cursorArr) {
//jsTestLog("Kill cursors " + cursorArr.length);
testDB.runCommand( { killCursors: "Collection0", cursors: cursorArr } );
}

function loadShedding() {
// Target only Collection0.
//jsTestLog("Running currentOp");
let pipeline = [{$currentOp: {allUsers: true, idleCursors: true}}, {$match: {ns: "memorystress.Collection0"}}];
let currOpResult = db.aggregate(pipeline);

// Do some sorting.
//jsTestLog("sorting");
let opsArr = currOpResult.toArray().sort(compareOps);
//jsTestLog("shedding");

let n = 0;
let cursorArr = [];

for (const op of opsArr) {
//printjson(op);
if(op.opid) {
db.killOp(op.opid);
n++;
} else if(op?.cursor?.cursorId) {
cursorArr.push(op.cursor.cursorId);
n++;
}

if(cursorArr.length >= 20 || (cursorArr.length && op.opid)) {
killCursors(cursorArr);
cursorArr = [];
}

if((n%20==0) && !shouldShedOps()) {
sleep(1);
if(shouldShedOps()) {
n = 0;
continue;
}
jsTestLog("Break shedding");
cursorArr = [];
break;
}
}
if(cursorArr.length) {
killCursors(cursorArr);
}
}

while(!shouldStopTest()) {
if(shouldShedOps()) {
loadShedding();
}
}









# Commented out because this should not be regularly scheduled, as the task is expected to fail.
# Uncomment the lines below (and possibly change the build variant) to run the workload.
# AutoRun:
Expand Down