Skip to content

Commit

Permalink
Merge pull request #3 from MooreThreads/dynolog
Browse files Browse the repository at this point in the history
Raise on-demand profiling priority
  • Loading branch information
tangliang-mt authored Sep 19, 2024
2 parents 390f42a + adead90 commit 4bb26ab
Show file tree
Hide file tree
Showing 11 changed files with 158 additions and 26 deletions.
4 changes: 4 additions & 0 deletions libkineto/include/ActivityProfilerInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ class ActivityProfilerInterface {
// and duration and / or iteration stop criterion.
// Tracing terminates when either condition is met.
virtual void scheduleTrace(const std::string& configStr) {}
virtual bool isSyncProfilingRunning() {
return false;
}
virtual void setSyncProfilingRunning(bool b) {}

// *** Synchronous API ***
// These must be called in order:
Expand Down
1 change: 1 addition & 0 deletions libkineto/include/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ class Config : public AbstractConfig {
}

void updateActivityProfilerRequestReceivedTime();
void updateActivityProfilerStartTime();

void printActivityProfilerConfig(std::ostream& s) const override;

Expand Down
92 changes: 76 additions & 16 deletions libkineto/src/ActivityProfilerController.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,12 @@ void ActivityProfilerController::setInvariantViolationsLoggerFactory(
}

bool ActivityProfilerController::canAcceptConfig() {
return !profiler_->isActive();
// If has ongoing or pending on-demand profiling, do not receive new one.
return !profiler_->isOnDemandProfilingRunning() && !profiler_->isOnDemandProfilingPending();
}

void ActivityProfilerController::acceptConfig(const Config& config) {
LOG(INFO) << "acceptConfig";
VLOG(1) << "acceptConfig";
if (config.activityProfilerEnabled()) {
scheduleTrace(config);
Expand Down Expand Up @@ -190,6 +192,7 @@ bool ActivityProfilerController::shouldActivateIterationConfig(
void ActivityProfilerController::profilerLoop() {
setThreadName("Kineto Activity Profiler");
VLOG(0) << "Entering activity profiler loop";
LOG(INFO) << "Entering activity profiler loop";

auto now = system_clock::now();
auto next_wakeup_time = now + Config::kControllerIntervalMsecs;
Expand All @@ -216,7 +219,10 @@ void ActivityProfilerController::profilerLoop() {
next_wakeup_time += Config::kControllerIntervalMsecs;
}

if (profiler_->isActive()) {
// Only run performRunLoopStep on on-demand profiling and status is ongoing,
// as sync profiling which is called by python api,
// has other control logic (controlled by python code directly.)
if (profiler_->isActive() && profiler_->isOnDemandProfilingRunning()) {
next_wakeup_time = profiler_->performRunLoopStep(now, next_wakeup_time);
VLOG(1) << "Profiler loop: "
<< duration_cast<milliseconds>(system_clock::now() - now).count()
Expand All @@ -225,6 +231,7 @@ void ActivityProfilerController::profilerLoop() {
}

VLOG(0) << "Exited activity profiling loop";
LOG(INFO) << "Exited activity profiling loop";
}

void ActivityProfilerController::step() {
Expand All @@ -242,7 +249,10 @@ void ActivityProfilerController::step() {
}
}

if (profiler_->isActive()) {
// Only run performRunLoopStep on on-demand profiling and status is ongoing,
// as sync profiling which is called by python api,
// has other control logic (controlled by python code directly.)
if (profiler_->isActive() && profiler_->isOnDemandProfilingRunning()) {
auto now = system_clock::now();
auto next_wakeup_time = now + Config::kControllerIntervalMsecs;
profiler_->performRunLoopStep(now, next_wakeup_time, currentIter);
Expand All @@ -264,12 +274,46 @@ int ActivityProfilerController::getCurrentRunloopState() {
return profiler_->getCurrentRunloopState();
}

bool ActivityProfilerController::isSyncProfilingRunning() {
LOG(INFO) << "call isSyncProfilingRunning";
return profiler_->isSyncProfilingRunning();
}

void ActivityProfilerController::setSyncProfilingRunning(bool b) {
LOG(INFO) << "call setSyncProfilingRunning";
profiler_->setSyncProfilingRunning(b);
}

void ActivityProfilerController::scheduleTrace(const Config& config) {
LOG(INFO) << "scheduleTrace";
VLOG(1) << "scheduleTrace";
if (profiler_->isActive()) {
LOG(WARNING) << "Ignored request - profiler busy";

// If has another pending on-demand profiling, just return.
if (profiler_->isOnDemandProfilingPending()) {
LOG(WARNING) << "Ignored on-demand profiling request, as another on-demand profiler is pending.";
return;
}

// If has another ongoing on-demand profiling, just return.
if (profiler_->isOnDemandProfilingRunning()) {
LOG(WARNING) << "Ignored on-demand profiling request, as another on-demand profiler is running.";
return;
}
profiler_->setOnDemandProfilingPending(true);
configLoader_.notifyCurrentRunloopState(4); // 4 - on-demand profiling pending
LOG(INFO) << "On-demand profiling enter [pending] status.";
// If has another ongoing sync profiling, wait until it is finished normally.
while (profiler_->isSyncProfilingRunning()) {
// Block here, until sync profiling is finished.
LOG(INFO) << "wait until sync profiling finished.";
// sleep in main thread, readOnDemandConfigFromDaemon will be blocked.
usleep(500000); // 500ms
}
profiler_->setOnDemandProfilingRunning(true);
profiler_->setOnDemandProfilingPending(false);
configLoader_.notifyCurrentRunloopState(1); // 1 - on-demand profiling running
LOG(INFO) << "On-demand profiling enter [running] status.";

int64_t currentIter = iterationCount_;
if (config.hasProfileStartIteration() && currentIter < 0) {
LOG(WARNING) << "Ignored profile iteration count based request as "
Expand All @@ -290,6 +334,10 @@ void ActivityProfilerController::scheduleTrace(const Config& config) {
return;
}

// Modify config request time, because may delay by sync profiling tasks.
asyncRequestConfig_->updateActivityProfilerRequestReceivedTime();
asyncRequestConfig_->updateActivityProfilerStartTime();

// start a profilerLoop() thread to handle request
if (!profilerThread_) {
profilerThread_ =
Expand All @@ -298,32 +346,44 @@ void ActivityProfilerController::scheduleTrace(const Config& config) {
}

void ActivityProfilerController::prepareTrace(const Config& config) {
// Requests from ActivityProfilerApi have higher priority than
// Requests from ActivityProfilerApi have lower priority than
// requests from other sources (signal, daemon).
// Cancel any ongoing request and refuse new ones.
// Refuse new ones if has any ongoing profiling.
auto now = system_clock::now();
if (profiler_->isActive()) {
LOG(WARNING) << "Cancelling current trace request in order to start "
<< "higher priority synchronous request";
if (libkineto::api().client()) {
libkineto::api().client()->stop();
}
profiler_->stopTrace(now);
profiler_->reset();
if (profiler_->isActive() || profiler_->isOnDemandProfilingRunning()) {
LOG(WARNING) << "Ignored prepareTrace request - profiler busy";
return;
}
if (profiler_->isOnDemandProfilingPending()) {
LOG(WARNING) << "Ignored prepareTrace request - as on-demand profiling pending.";
return;
}

profiler_->configure(config, now);
profiler_->setSyncProfilingRunning(true);
}

void ActivityProfilerController::startTrace() {
if (profiler_->isOnDemandProfilingRunning()) {
LOG(WARNING) << "Ignored startTrace request - on-demand profiler busy";
return;
}
UST_LOGGER_MARK_COMPLETED(kWarmUpStage);
profiler_->startTrace(std::chrono::system_clock::now());
}

std::unique_ptr<ActivityTraceInterface> ActivityProfilerController::stopTrace() {
auto logger = std::make_unique<MemoryTraceLogger>(profiler_->config());
if (profiler_->isOnDemandProfilingRunning()) {
LOG(WARNING) << "Ignored stopTrace request - on-demand profiler busy";
return std::make_unique<ActivityTrace>(std::move(logger), loggerFactory());
}
if (!profiler_->isActive()) {
LOG(WARNING) << "Ignored stopTrace request - as profiler is NOT active";
return std::make_unique<ActivityTrace>(std::move(logger), loggerFactory());
}
profiler_->stopTrace(std::chrono::system_clock::now());
UST_LOGGER_MARK_COMPLETED(kCollectionStage);
auto logger = std::make_unique<MemoryTraceLogger>(profiler_->config());
profiler_->processTrace(*logger);
// Will follow up with another patch for logging URLs when ActivityTrace is moved.
UST_LOGGER_MARK_COMPLETED(kPostProcessingStage);
Expand Down
3 changes: 3 additions & 0 deletions libkineto/src/ActivityProfilerController.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include <memory>
#include <mutex>
#include <thread>
#include <unistd.h> //usleep()

// TODO(T90238193)
// @lint-ignore-every CLANGTIDY facebook-hte-RelativeInclude
Expand Down Expand Up @@ -55,6 +56,8 @@ class ActivityProfilerController : public ConfigLoader::ConfigHandler {
bool canAcceptConfig() override;
void acceptConfig(const Config& config) override;
void scheduleTrace(const Config& config);
bool isSyncProfilingRunning();
void setSyncProfilingRunning(bool b);
int getCurrentRunloopState() override;

// These API are used for Synchronous Tracing.
Expand Down
8 changes: 8 additions & 0 deletions libkineto/src/ActivityProfilerProxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ void ActivityProfilerProxy::scheduleTrace(const Config& config) {
controller_->scheduleTrace(config);
}

bool ActivityProfilerProxy::isSyncProfilingRunning() {
return controller_->isSyncProfilingRunning();
}

void ActivityProfilerProxy::setSyncProfilingRunning(bool b) {
return controller_->setSyncProfilingRunning(b);
}

void ActivityProfilerProxy::prepareTrace(
const std::set<ActivityType>& activityTypes,
const std::string& configStr) {
Expand Down
2 changes: 2 additions & 0 deletions libkineto/src/ActivityProfilerProxy.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ class ActivityProfilerProxy : public ActivityProfilerInterface {

void scheduleTrace(const std::string& configStr) override;
void scheduleTrace(const Config& config);
bool isSyncProfilingRunning() override;
void setSyncProfilingRunning(bool b) override;

void prepareTrace(
const std::set<ActivityType>& activityTypes,
Expand Down
7 changes: 6 additions & 1 deletion libkineto/src/Config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ constexpr int kDefaultSamplesPerReport(1);
constexpr int kDefaultMaxEventProfilersPerGpu(1);
constexpr int kDefaultEventProfilerHearbeatMonitorPeriod(0);
constexpr seconds kMaxRequestAge(10);
constexpr seconds kDefaultOnDemandConfigUpdateIntervalSecs(5);
constexpr seconds kDefaultOnDemandConfigUpdateIntervalSecs(1);
// 3200000 is the default value set by MUPTI
constexpr size_t kDefaultMuptiDeviceBufferSize(3200000);
// Default value set by MUPTI is 250
Expand Down Expand Up @@ -439,6 +439,11 @@ void Config::setClientDefaults() {
activitiesLogToMemory_ = true;
}

void Config::updateActivityProfilerStartTime() {
profileStartTime_ = system_clock::now() +
activitiesWarmupDuration() + 2 * Config::kControllerIntervalMsecs;
}

void Config::validate(
const time_point<system_clock>& fallbackProfileStartTime) {
if (samplePeriod_.count() == 0) {
Expand Down
7 changes: 7 additions & 0 deletions libkineto/src/ConfigLoader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,13 @@ std::string ConfigLoader::readOnDemandConfigFromDaemon(
return daemonConfigLoader_->readOnDemandConfig(events, activities, currentRunloopState);
}

void ConfigLoader::notifyCurrentRunloopState(int state) {
if (!daemonConfigLoader_) {
return;
}
daemonConfigLoader_->readOnDemandConfig(false, true, state);
}

int ConfigLoader::contextCountForGpu(uint32_t device) {
if (!daemonConfigLoader_) {
// FIXME: Throw error?
Expand Down
2 changes: 2 additions & 0 deletions libkineto/src/ConfigLoader.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ class ConfigLoader {
}
}

void notifyCurrentRunloopState(int state);

bool canHandlerAcceptConfig(ConfigKind kind) {
std::lock_guard<std::mutex> lock(updateThreadMutex_);
for (ConfigHandler* handler : handlers_[kind]) {
Expand Down
2 changes: 2 additions & 0 deletions libkineto/src/MuptiActivityProfiler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,7 @@ void MuptiActivityProfiler::configure(

// Check if now is a valid time to start.
if (!derivedConfig_->canStart(now)) {
// Added by qiaoning @20240912 should update on-demand running status? MAYBE.
return;
}

Expand Down Expand Up @@ -1084,6 +1085,7 @@ const time_point<system_clock> MuptiActivityProfiler::performRunLoopStep(
processTraceInternal(*logger_);
UST_LOGGER_MARK_COMPLETED(kPostProcessingStage);
resetInternal();
onDemandProfilingRunning_ = false;
VLOG(0) << "ProcessTrace -> WaitForRequest";
break;
}
Expand Down
56 changes: 47 additions & 9 deletions libkineto/src/MuptiActivityProfiler.h
Original file line number Diff line number Diff line change
Expand Up @@ -116,17 +116,49 @@ class MuptiActivityProfiler {
return currentRunloopState_ != RunloopState::WaitForRequest;
}

bool isOnDemandProfilingPending() const {
return onDemandProfilingPending_;
}

void setOnDemandProfilingPending(bool b) {
onDemandProfilingPending_ = b;
}

bool isOnDemandProfilingRunning() const {
return onDemandProfilingRunning_;
}

void setOnDemandProfilingRunning(bool b) {
onDemandProfilingRunning_ = b;
}

bool isSyncProfilingRunning() const {
return syncProfilingRunning_;
}

void setSyncProfilingRunning(bool b) {
syncProfilingRunning_ = b;
}

int getCurrentRunloopState() const {
switch (currentRunloopState_) {
case RunloopState::WaitForRequest:
return 0;
case RunloopState::Warmup:
return 1;
case RunloopState::CollectTrace:
return 2;
case RunloopState::ProcessTrace:
return 3;
if (onDemandProfilingPending_) {
return 4; // Pending
}
if (onDemandProfilingRunning_) {
switch (currentRunloopState_) {
case RunloopState::WaitForRequest:
return 1; // Consider as warmup because onDemandProfilingRunning_ is true
case RunloopState::Warmup:
return 1;
case RunloopState::CollectTrace:
return 2;
case RunloopState::ProcessTrace:
return 3;
}
}
// onDemandProfilingPending_ and onDemandProfilingRunning_ both false,
// so, new on-demand profiling can be accepted now, ignore syncProfilingRunning_ status.
return 0; // WaitForRequest
}

// Invoke at a regular interval to perform profiling activities.
Expand Down Expand Up @@ -430,6 +462,12 @@ class MuptiActivityProfiler {

// Runloop phase
std::atomic<RunloopState> currentRunloopState_{RunloopState::WaitForRequest};
// On-Demand profiling pending status
std::atomic_bool onDemandProfilingPending_{false};
// On-Demand profiling running status
std::atomic_bool onDemandProfilingRunning_{false};
// In the process of sync api profiling (already executed prepareTrace and NOT finished yet).
std::atomic_bool syncProfilingRunning_{false};

// Keep track of the start time and end time for the trace collected.
// External threads using startTrace need to manually stopTrace. Part of the mock tests.
Expand Down

0 comments on commit 4bb26ab

Please sign in to comment.