Skip to content

Commit

Permalink
Add task abortion test (#11881)
Browse files Browse the repository at this point in the history
Summary:

A new thread to randomly tasks. It uses flag seconds_to_abort to control the frequency.

Reviewed By: xiaoxmeng

Differential Revision: D67267972
  • Loading branch information
zation99 authored and facebook-github-bot committed Dec 17, 2024
1 parent e937db3 commit f5e6175
Showing 1 changed file with 30 additions and 18 deletions.
48 changes: 30 additions & 18 deletions velox/exec/fuzzer/MemoryArbitrationFuzzer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,6 @@ DEFINE_int32(

DEFINE_int64(arbitrator_capacity, 256L << 20, "Arbitrator capacity in bytes.");

DEFINE_int32(
abort_injection_pct,
5,
"The percentage chance of triggering task abort");

DEFINE_int32(
global_arbitration_pct,
5,
Expand All @@ -94,6 +89,12 @@ DEFINE_int32(
"filesystem. This is only applicable when 'spill_faulty_fs_ratio' is "
"larger than 0");

DEFINE_int32(
task_abort_interval_ms,
1000,
"After each specified number of milliseconds, abort a random task."
"If given 0, no task will be aborted.");

using namespace facebook::velox::tests::utils;

namespace facebook::velox::exec::test {
Expand Down Expand Up @@ -697,12 +698,12 @@ MemoryArbitrationFuzzer::orderByPlans(const std::string& tableDir) {
}

struct ThreadLocalStats {
uint64_t taskAbortCount{0};
uint64_t spillFsFaultCount{0};
};

// Stats that keeps track of per thread execution status in verify()
thread_local ThreadLocalStats threadLocalStats;
std::atomic_uint32_t taskAbortRequestCount{0};

std::shared_ptr<TempDirectoryPath>
MemoryArbitrationFuzzer::maybeGenerateFaultySpillDirectory() {
Expand Down Expand Up @@ -745,12 +746,6 @@ void MemoryArbitrationFuzzer::verify() {
auto spillDirectory = maybeGenerateFaultySpillDirectory();
const auto tableScanDir = exec::test::TempDirectoryPath::create(false);

// Set a percentage chance for the task to be externally aborted.
TestScopedAbortInjection scopedAbortInjection(
FLAGS_abort_injection_pct,
std::numeric_limits<int32_t>::max(),
[](Task* /* unused */) { ++threadLocalStats.taskAbortCount; });

std::vector<PlanWithSplits> plans;
for (const auto& plan : hashJoinPlans(tableScanDir->getPath())) {
plans.push_back(plan);
Expand Down Expand Up @@ -782,8 +777,8 @@ void MemoryArbitrationFuzzer::verify() {
queryThreads.emplace_back([&, spillDirectory, i, seed]() {
FuzzerGenerator rng(seed);
while (!stop) {
const auto prevAbortCount = threadLocalStats.taskAbortCount;
const auto prevSpillFsFaultCount = threadLocalStats.spillFsFaultCount;
const auto prevTaskAbortRequestCount = taskAbortRequestCount.load();
try {
const auto queryCtx = newQueryCtx(
memory::memoryManager(),
Expand Down Expand Up @@ -816,16 +811,16 @@ void MemoryArbitrationFuzzer::verify() {
} else if (e.errorCode() == error_code::kMemAborted.c_str()) {
++lockedStats->abortCount;
} else if (e.errorCode() == error_code::kInvalidState.c_str()) {
const auto injectedAbort =
threadLocalStats.taskAbortCount > prevAbortCount;
const auto injectedSpillFsFault =
threadLocalStats.spillFsFaultCount > prevSpillFsFaultCount;
VELOX_CHECK(injectedAbort || injectedSpillFsFault);
if (injectedAbort && !injectedSpillFsFault) {
const auto injectedTaskAbortRequest =
taskAbortRequestCount > prevTaskAbortRequestCount;
VELOX_CHECK(injectedSpillFsFault || injectedTaskAbortRequest);
if (injectedTaskAbortRequest && !injectedSpillFsFault) {
VELOX_CHECK(
e.message().find("Aborted for external error") !=
std::string::npos);
} else if (!injectedAbort && injectedSpillFsFault) {
} else if (!injectedTaskAbortRequest && injectedSpillFsFault) {
VELOX_CHECK(
e.message().find("Fault file injection on") !=
std::string::npos);
Expand Down Expand Up @@ -855,6 +850,22 @@ void MemoryArbitrationFuzzer::verify() {
}
});

// Create a thread that randomly abort one worker thread
// every seconds_to_abort seconds.
std::thread abortControlThread([&]() {
if (FLAGS_task_abort_interval_ms == 0) {
return;
}
while (!stop) {
std::this_thread::sleep_for(
std::chrono::milliseconds(FLAGS_task_abort_interval_ms));
auto tasksList = Task::getRunningTasks();
auto index = getRandomIndex(rng_, tasksList.size() - 1);
++taskAbortRequestCount;
tasksList[index]->requestAbort();
}
});

std::this_thread::sleep_for(
std::chrono::seconds(FLAGS_iteration_duration_sec));
stop = true;
Expand All @@ -863,6 +874,7 @@ void MemoryArbitrationFuzzer::verify() {
queryThread.join();
}
globalShrinkThread.join();
abortControlThread.join();
}

void MemoryArbitrationFuzzer::go() {
Expand Down

0 comments on commit f5e6175

Please sign in to comment.