Skip to content

Commit

Permalink
Scheduler: process messages immediately after sleeping
Browse files Browse the repository at this point in the history
This resets the counter that controls after how many process...
invocations messages are processed, as after sleeping the probability of
messages being present is higher and if there is still no samples, the
scheduler might go back to sleep immediately so the worst case latency of
message processing can be at best `processStreamToMessageRatio`
(defaults to 16) times 10ms, so 160ms, which this change should bring
back to 10ms again.

Also change types from gr::Size_t to std::size_t, as they are only
internally used and will not end up in any serialised output.

Signed-off-by: Alexander Krimm <[email protected]>
  • Loading branch information
wirew0rm authored and RalphSteinhagen committed Oct 31, 2024
1 parent 5c51acd commit e3d349b
Showing 1 changed file with 7 additions and 4 deletions.
11 changes: 7 additions & 4 deletions core/include/gnuradio-4.0/Scheduler.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,8 @@ class SchedulerBase : public Block<Derived> {
}

[[maybe_unused]] auto currentProgress = this->_graph.progress().value();
gr::Size_t inactiveCycleCount = 0U;
gr::Size_t msgToCount = 0U;
std::size_t inactiveCycleCount = 0UZ;
std::size_t msgToCount = 0UZ;
auto activeState = this->state();
do {
[[maybe_unused]] auto pe = profiler_handler.startCompleteEvent("scheduler_base.work");
Expand All @@ -281,7 +281,7 @@ class SchedulerBase : public Block<Derived> {
currentProgress = this->_graph.progress().value();
}

bool processMessages = msgToCount == 0U;
bool processMessages = msgToCount == 0UZ;
if (processMessages) {
if (runnerID == 0UZ || _nRunningJobs.load(std::memory_order_acquire) == 0UZ) {
this->processScheduledMessages(); // execute the scheduler- and Graph-specific message handler only once globally
Expand Down Expand Up @@ -311,8 +311,10 @@ class SchedulerBase : public Block<Derived> {
_graph.ackTopologyChange();
}
std::this_thread::sleep_for(std::chrono::milliseconds(timeout_ms));
msgToCount = 0UZ;
} else { // other states
std::this_thread::sleep_for(std::chrono::milliseconds(timeout_ms));
msgToCount = 0UZ;
}

// optionally tracking progress and block if there is none
Expand All @@ -321,13 +323,14 @@ class SchedulerBase : public Block<Derived> {
if (currentProgress == progressAfter) {
inactiveCycleCount++;
} else {
inactiveCycleCount = 0U;
inactiveCycleCount = 0UZ;
}

currentProgress = progressAfter;
if (inactiveCycleCount > timeout_inactivity_count) {
// allow scheduler process to sleep before retrying (N.B. intended to save CPU/battery power)
std::this_thread::sleep_for(std::chrono::milliseconds(timeout_ms));
msgToCount = 0UZ;
}
}
} while (lifecycle::isActive(activeState));
Expand Down

0 comments on commit e3d349b

Please sign in to comment.