Skip to content

Commit

Permalink
* Added threading library:
Browse files Browse the repository at this point in the history
  * Added `PeriodicThread`: Runs a function periodically in its own thread.
  • Loading branch information
helly25 committed Mar 24, 2024
1 parent e4f7477 commit 13214d4
Show file tree
Hide file tree
Showing 7 changed files with 270 additions and 0 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@
# Ignore the directory in which `clangd` stores its local index.
/.cache/
/.vscode/

/MODULE.bazel
/MODULE.bazel.lock
5 changes: 5 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 0.3.0

* Added threading library:
* Added `PeriodicThread`: Runs a function periodically in its own thread.

# 0.2.22

* Change the way `mbo::types::Extend` types are constructed to support more complex and deeper nested types.
Expand Down
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ The C++ library is organized in functional groups each residing in their own dir
* gMock-matcher `IsOkAndHolds`: Tests an absl::StatusOr for absl::OkStatus and contents.
* gMock-Matcher `StatusIs`: Tests an absl::Status or absl::StatusOr against a specific status code and message.
* macro `MBO_ASSERT_OK_AND_ASSIGN`: Simplifies testing with functions that return `absl::StatusOr<T>`.
* Threading
* `namespace mbo::thread`
* mbo/thread:periodic_thread_cc, mbo/thread/periodic_thread.h
* class `PeriodicThread`: Runs a function periodically in its own thread.
* Types
* `namespace mbo::types`
* mbo/types:cases_cc, mbo/types/cases.h
Expand Down
23 changes: 23 additions & 0 deletions mbo/thread/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package(default_visibility = ["//visibility:private"])

cc_library(
name = "periodic_thread_cc",
srcs = ["periodic_thread.cc"],
hdrs = ["periodic_thread.h"],
deps = [
"@com_google_absl//absl/base:core_headers",
"@com_google_absl//absl/synchronization",
"@com_google_absl//absl/time",
],
)


cc_binary(
name = "periodic",
srcs = ["periodic_main.cc"],
deps = [
":periodic_thread_cc",
"@com_google_absl//absl/strings:str_format",
"@com_google_absl//absl/time",
],
)
63 changes: 63 additions & 0 deletions mbo/thread/periodic_main.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Copyright 2024 M. Boerger (helly25.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <atomic>
#include <cstdint>

#include "absl/strings/str_format.h"
#include "absl/time/time.h"
#include "mbo/thread/periodic_thread.h"

void Test() {
static constexpr uint64_t kMaxCycle = 9'999;
static constexpr absl::Duration kInterval = absl::Milliseconds(100);
absl::Time start;
absl::Duration total_correction;
std::atomic_size_t cycle = 0;
mbo::thread::PeriodicThread periodic_thread({
.interval = kInterval,
.initial_wait = kInterval,
.func =
[&] {
const absl::Time now = absl::Now();
if (cycle == 0) {
start = now;
const std::string time = absl::FormatTime("%Y-%m-%d at %H:%M:%E6S", start, absl::LocalTimeZone());
absl::PrintF("[%04d]: %s %13s %12s %13s %12s\n", cycle, time, "duration", "average", "correction", "avg-corr");
} else {
const std::size_t this_cycle = cycle;
const absl::Duration dur = now - start;
const absl::Duration avg = dur / this_cycle;
const double dur_sec = absl::ToDoubleSeconds(dur);
const double avg_sec = absl::ToDoubleSeconds(avg);
const absl::Duration correction = avg - kInterval;
total_correction += correction;
const std::string time_str = absl::FormatTime("%Y-%m-%d at %H:%M:%E6S", now, absl::LocalTimeZone());
const std::string corr_str = absl::FormatDuration(correction);
const std::string avg_corr_str = absl::FormatDuration(total_correction / this_cycle);
absl::PrintF(
"[%04d]: %s %+13.6f ~%11.9f %13s %12s\n", this_cycle, time_str, dur_sec, avg_sec, corr_str,
avg_corr_str);
}
return ++cycle <= kMaxCycle;
},
});
periodic_thread.Join();
}

int main() {
Test();
absl::PrintF("All done!\n");
return 0;
}
90 changes: 90 additions & 0 deletions mbo/thread/periodic_thread.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright 2024 M. Boerger (helly25.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "mbo/thread/periodic_thread.h"

#include <atomic>
#include <thread>

#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"

namespace mbo::thread {

PeriodicThread::~PeriodicThread() noexcept {
stop_ = true;
Join();
}

PeriodicThread::PeriodicThread(Options options) noexcept : options_(std::move(options)), thread_([this] { Run(); }) {}

bool PeriodicThread::IsDone() const {
absl::MutexLock lock(&mx_);
return done_;
}

void PeriodicThread::Join() const {
{
absl::MutexLock lock(&mx_);
mx_.Await(absl::Condition(&done_));
}
if (thread_.joinable()) {
thread_.join();
}
}

void PeriodicThread::Run() {
running_ = true;
if (options_.initial_wait > absl::ZeroDuration()) {
absl::SleepFor(options_.initial_wait);
}
std::size_t cycle = 0;
absl::Time begin = absl::Now();
absl::Time start = begin;
absl::Duration adjust = absl::ZeroDuration();
while (!stop_) {
if (!options_.func() || stop_) {
break;
}
++cycle;
const absl::Time end = absl::Now();
const absl::Duration took = end - start + adjust;
absl::Duration sleep = options_.interval >= took ? options_.interval - took : options_.min_interval;
absl::SleepFor(sleep);
start = absl::Now();
// We adjust based on average divergence as well as the most recent runtime.
// This allows for small overcorrections over time as well as handling inconsistent runtime of actual function.
// Adjusting the average too little may result in an inability to keep to the intended interval time. Values in the
// low percentages (1.01..1.1) appear to be working.
// Adjusting the recent run strongly (e.g. >1) only works if callback runtimes are steady. Smaller values are
// better at dealing with inconsisten runtimes, even for very small inconsistencies. Setting the value to 1 appears
// to be a good compromise.
// This is not configurable in the `Options` for now as that prevent using better algorithms later.
static constexpr double kAdjustAverage = 1.05;
static constexpr double kAdjustRecent = 1;
adjust = kAdjustAverage * ((start - begin) / cycle - options_.interval) + kAdjustRecent * ((start - end) - sleep);
static constexpr std::size_t kMaxCycleAdjustWindow = 1000;
if (cycle % kMaxCycleAdjustWindow == 0) {
// We actually reset the cycle to 0 as we otherwise would need to handle cycle overrun in the next window
// `max(size_t) - cycle < kMaxCycleAdjustWindow`. Further we would need to use a modulo operation for cycle when
// computing the adjustment (`cycle % (kMaxCycleAdjustWindow + 1)`).
cycle = 0;
begin = start;
}
}
absl::MutexLock lock(&mx_);
done_ = true;
}

} // namespace mbo::thread
82 changes: 82 additions & 0 deletions mbo/thread/periodic_thread.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright 2024 M. Boerger (helly25.com)
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <atomic>
#include <thread>

#include "absl/base/thread_annotations.h"
#include "absl/synchronization/mutex.h"
#include "absl/time/time.h"

namespace mbo::thread {

// The `PeriodicThread` runs a function periodically in its own thread.
//
// The implementation uses simple heuristics to adjust to the intended interval over time using a
// sliding window. This allows for inconsistent function run times to eventually lead to closely
// following the interval on average. The implementation may still not be able to respect the
// interval correctly, even over longer periods of time. However, the divergence from the intended
// interval should be very small (high accuracy). Use binary `//mbo/thread:periodic` to determine
// the accuracy for actual machines.
//
// The advantage of 'allowing' the imperfect interval handling is that the management has a low cost
// and thus the most real time can be spent sleeping.
//
// The `PeriodicThread`:
// * Automatically stops if the function returns `false`.
// * Automatically starts on creation, but an `initial_wait` time can be configured.
// * Cannot be restarted.
// * Can be stopped, but may have to sleep for a full interval time before actually stopping.
// * The destructor will wait for the thread to stop if it is running.
// * The behavior is undefined if the function runtime exceeds the interval (or does not allow time
// for interval management).
class PeriodicThread {
public:
struct Options {
absl::Duration interval;
absl::Duration min_interval = absl::Milliseconds(1);
absl::Duration initial_wait = absl::ZeroDuration();
std::function<bool()> func; // Return `true` for continue, `false` for stop.
};

~PeriodicThread() noexcept;
explicit PeriodicThread(Options options) noexcept;

PeriodicThread(const PeriodicThread&) = delete;
PeriodicThread& operator=(const PeriodicThread&) = delete;
PeriodicThread(PeriodicThread&& other) = delete;
PeriodicThread& operator=(PeriodicThread&&) = delete;

void Stop() { stop_ = true; }

bool IsStopping() const { return stop_; }

bool IsRunning() const { return running_; }

bool IsDone() const;

void Join() const;

private:
void Run();

const Options options_;
mutable absl::Mutex mx_;
std::atomic_bool stop_ = false;
std::atomic_bool running_ = false;
bool done_ ABSL_GUARDED_BY(mx_) = false;
mutable std::thread thread_;
};

} // namespace mbo::thread

0 comments on commit 13214d4

Please sign in to comment.