From f1bd190e9f50df210719ba3794bc60c8d72af3c5 Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Mon, 17 Aug 2015 21:30:48 -0400 Subject: [PATCH 1/4] PLAT-937: added TimerEventSource, enabling use of a single fd for multiple timers --- service/message_loop.cc | 4 +- service/message_loop.h | 24 ++- service/service.mk | 1 + service/testing/service_testing.mk | 1 + service/testing/timer_event_source_test.cc | 45 ++++++ service/timer_event_source.cc | 162 +++++++++++++++++++++ service/timer_event_source.h | 63 ++++++++ 7 files changed, 293 insertions(+), 7 deletions(-) create mode 100644 service/testing/timer_event_source_test.cc create mode 100644 service/timer_event_source.cc create mode 100644 service/timer_event_source.h diff --git a/service/message_loop.cc b/service/message_loop.cc index 8856cce4..449eac35 100644 --- a/service/message_loop.cc +++ b/service/message_loop.cc @@ -42,9 +42,11 @@ MessageLoop(int numThreads, double maxAddedLatency, int epollTimeout) : sourceActions_([&] () { handleSourceActions(); }), numThreadsCreated(0), shutdown_(true), - totalSleepTime_(0.0) + totalSleepTime_(0.0), + timerSource_(new TimerEventSource()) { init(numThreads, maxAddedLatency, epollTimeout); + addSource("timer_", timerSource_); } MessageLoop:: diff --git a/service/message_loop.h b/service/message_loop.h index a16a45ae..a05ffbd0 100644 --- a/service/message_loop.h +++ b/service/message_loop.h @@ -7,16 +7,19 @@ #pragma once -#include #include +#include +#include +#include #include "jml/arch/wakeup_fd.h" #include "jml/arch/spinlock.h" #include "epoller.h" #include "async_event_source.h" -#include "typed_message_channel.h" #include "logs.h" +#include "timer_event_source.h" +#include "typed_message_channel.h" namespace Datacratic { @@ -32,9 +35,10 @@ struct MessageLoopLogs static Logging::Category trace; }; -/*****************************************************************************/ -/* MESSAGE LOOP */ -/*****************************************************************************/ + +/****************************************************************************/ +/* MESSAGE LOOP */ +/****************************************************************************/ struct MessageLoop : public Epoller { typedef std::function OnStop; @@ -90,7 +94,13 @@ struct MessageLoop : public Epoller { double timePeriodSeconds, std::function toRun, int priority = 0); - + + /* Adds a timer */ + void addTimer(double delay, const TimerEventSource::OnTick & onTick) + { + timerSource_->addTimer(delay, onTick); + } + typedef std::function SubordinateThreadFn; @@ -214,6 +224,8 @@ struct MessageLoop : public Epoller { void processAddSource(const SourceEntry & entry); void processRemoveSource(const SourceEntry & entry); void processRunAction(const SourceEntry & entry); + + std::shared_ptr timerSource_; }; } // namespace Datacratic diff --git a/service/service.mk b/service/service.mk index e40a682c..10c993a6 100644 --- a/service/service.mk +++ b/service/service.mk @@ -53,6 +53,7 @@ LIBSERVICES_SOURCES := \ async_event_source.cc \ async_writer_source.cc \ tcp_client.cc \ + timer_event_source.cc \ rest_service_endpoint.cc \ http_named_endpoint.cc \ rest_proxy.cc \ diff --git a/service/testing/service_testing.mk b/service/testing/service_testing.mk index 05fbe6b1..d968c233 100644 --- a/service/testing/service_testing.mk +++ b/service/testing/service_testing.mk @@ -35,6 +35,7 @@ $(eval $(call test,http_rest_proxy_stress_test,services,boost manual)) $(eval $(call test,service_proxies_test,endpoint,boost manual)) $(eval $(call test,message_loop_test,services,boost)) +$(eval $(call test,timer_event_source_test,services,boost)) $(eval $(call program,runner_test_helper,utils)) $(eval $(call test,runner_test,services,boost)) diff --git a/service/testing/timer_event_source_test.cc b/service/testing/timer_event_source_test.cc new file mode 100644 index 00000000..dd1822a1 --- /dev/null +++ b/service/testing/timer_event_source_test.cc @@ -0,0 +1,45 @@ +#define BOOST_TEST_MAIN +#define BOOST_TEST_DYN_LINK + +#include +#include + +#include + +#include "jml/arch/timers.h" +#include "jml/utils/testing/watchdog.h" +#include "soa/types/date.h" +#include "soa/service/message_loop.h" +#include "soa/service/timer_event_source.h" + +using namespace std; +using namespace Datacratic; + + +BOOST_AUTO_TEST_CASE( test_addSource_with_needsPoll ) +{ + ML::Watchdog wd(10); + std::atomic ticks(0); + MessageLoop loop(1, 0, -1); + loop.start(); + auto timer = make_shared(); + loop.addSource("timer", timer); + timer->waitConnectionState(AsyncEventSource::CONNECTED); + + auto onTick = [&] (uint64_t) { + Date now = Date::now(); + ticks++; + return ticks < 3; + }; + timer->addTimer(0.2, onTick); + + while (true) { + if (ticks == 3) { + Date now = Date::now(); + break; + } + ML::sleep(1); + } + + loop.shutdown(); +} diff --git a/service/timer_event_source.cc b/service/timer_event_source.cc new file mode 100644 index 00000000..be784f54 --- /dev/null +++ b/service/timer_event_source.cc @@ -0,0 +1,162 @@ +/* timer_event_source.cc + Wolfgang Sourdeau, August 2015 + Copyright (c) 2015 Datacratic. All rights reserved. +*/ + +#include +#include + +#include "jml/utils/exc_assert.h" +#include "timer_event_source.h" + +using namespace std; +using namespace Datacratic; + + +/****************************************************************************/ +/* TIMER EVENT SOURCE */ +/****************************************************************************/ + +TimerEventSource:: +TimerEventSource() + : timerFd_(::timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC)), + nextTick_(Date::negativeInfinity()) +{ + if (timerFd_ == -1) { + throw ML::Exception(errno, "timerfd_create"); + } +} + +TimerEventSource:: +~TimerEventSource() +{ + int res = ::close(timerFd_); + if (res == -1) { + cerr << "warning: close on timerfd: " << strerror(errno) << endl; + } +} + +int +TimerEventSource:: +selectFd() + const +{ + return timerFd_; +} + +bool +TimerEventSource:: +processOne() +{ +start: + uint64_t numWakeups = 0; + int res = ::read(timerFd_, &numWakeups, 8); + if (res == -1) { + if (errno == EINTR) { + goto start; + } + else if (errno == EAGAIN || errno == EWOULDBLOCK) { + return false; + } + else { + throw ML::Exception(errno, "timerfd read"); + } + } + ExcAssertEqual(res, 8); + onTimerTick(); + + return false; +} + +void +TimerEventSource:: +onTimerTick() +{ + Date now = Date::now(); + vector > triggered = collectTriggeredTimers(now); + + for (auto & timer: triggered) { + double interval = timer->lastTick.secondsUntil(now); + uint64_t numTicks = (uint64_t) ::floor(interval / timer->interval); + if (timer->onTick(numTicks)) { + timer->lastTick = now; + timer->nextTick = now.plusSeconds(timer->interval); + insertTimer(move(timer)); + } + } + + adjustNextTick(now); +} + +vector > +TimerEventSource:: +collectTriggeredTimers(Date refDate) +{ + vector > triggered; + vector > newQueue; + + TimersGuard guard(timersLock_); + for (auto & timer: timerQueue_) { + auto & target = (timer->nextTick < refDate) ? triggered : newQueue; + target.emplace_back(move(timer)); + } + timerQueue_ = move(newQueue); + + return triggered; +} + +void +TimerEventSource:: +adjustNextTick(Date now) +{ + TimersGuard guard(timersLock_); + + Date negInfinity = Date::negativeInfinity(); + Date nextTick = ((timerQueue_.size() > 0) + ? timerQueue_[0]->nextTick + : negInfinity); + + if (nextTick != nextTick_) { + struct itimerspec spec{{0, 0}, {0, 0}}; + + if (nextTick != negInfinity) { + double delta = nextTick - now; + auto & value = spec.it_value; + value.tv_sec = (time_t) delta; + value.tv_nsec = (delta - spec.it_value.tv_sec) * 1000000000; + } + int res = ::timerfd_settime(timerFd_, 0, &spec, nullptr); + if (res == -1) { + throw ML::Exception(errno, "timerfd_settime"); + } + nextTick_ = nextTick; + } +} + + +void +TimerEventSource:: +addTimer(double delay, const OnTick & onTick) +{ + Date now = Date::now(); + auto newTimer = make_shared(Timer{delay, onTick}); + newTimer->nextTick = now.plusSeconds(delay); + newTimer->lastTick = Date::negativeInfinity(); + insertTimer(move(newTimer)); + adjustNextTick(now); +} + +void +TimerEventSource:: +insertTimer(shared_ptr && timer) +{ + TimersGuard guard(timersLock_); + + auto timerCompare = [&] (const shared_ptr & left, + const shared_ptr & right) { + return left->nextTick < right->nextTick; + }; + auto loc = lower_bound(timerQueue_.begin(), timerQueue_.end(), + timer, timerCompare); + timerQueue_.insert(loc, move(timer)); +} diff --git a/service/timer_event_source.h b/service/timer_event_source.h new file mode 100644 index 00000000..28dc83ea --- /dev/null +++ b/service/timer_event_source.h @@ -0,0 +1,63 @@ +/* timer_event_source.h -*- C++ -*- + Wolfgang Sourdeau, August 2015 + Copyright (c) 2015 Datacratic. All rights reserved. + + A class used internally by MessageLoop to enable multiple timers using the + same timer fd: thereby reducing the number of file descriptors and context + switches required to handle such timers. +*/ + +#pragma once + +#include +#include "soa/types/date.h" +#include "async_event_source.h" + + +namespace Datacratic { + +/****************************************************************************/ +/* TIMER EVENT SOURCE */ +/****************************************************************************/ + +struct TimerEventSource : public AsyncEventSource { + /* Type of callback invoked when a timer tick occurs. The callback should + * return "true" to indicate that the timer must be rescheduled or "false" + * otherwise. */ + typedef std::function OnTick; + + TimerEventSource(); + ~TimerEventSource(); + + virtual int selectFd() const; + virtual bool processOne(); + + /* Adds a timer */ + void addTimer(double delay, const OnTick & onTick); + +private: + typedef std::mutex TimersLock; + typedef std::unique_lock TimersGuard; + + /* timers */ + struct Timer { + double interval; + OnTick onTick; + Date nextTick; + Date lastTick; + }; + + void onTimerTick(); + void insertTimer(std::shared_ptr && timer); + std::vector > collectTriggeredTimers(Date refDate); + void adjustNextTick(Date now); + + int timerFd_; + + TimersLock timersLock_; + std::vector > timerQueue_; + Date nextTick_; +}; + +} // namespace Datacratic + From 4229c82c53b9bec8197e51dc6f5f20a28371405d Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Tue, 18 Aug 2015 17:25:00 -0400 Subject: [PATCH 2/4] timers: added cancelTimer --- service/message_loop.h | 14 +++- service/testing/timer_event_source_test.cc | 32 ++++++++- service/timer_event_source.cc | 79 +++++++++++++++------- service/timer_event_source.h | 15 ++-- 4 files changed, 107 insertions(+), 33 deletions(-) diff --git a/service/message_loop.h b/service/message_loop.h index a05ffbd0..91bc2a54 100644 --- a/service/message_loop.h +++ b/service/message_loop.h @@ -95,10 +95,18 @@ struct MessageLoop : public Epoller { std::function toRun, int priority = 0); - /* Adds a timer */ - void addTimer(double delay, const TimerEventSource::OnTick & onTick) + /* Add a timer. Returns an id that is guaranteed to be > 0, which enables + * "0" to be used as a test value. */ + uint64_t addTimer(double delay, const TimerEventSource::OnTick & onTick) { - timerSource_->addTimer(delay, onTick); + return timerSource_->addTimer(delay, onTick); + } + + /* Cancel a timer. Return "true" when the corresponding timer could be + found and deleted and "false" otherwise. */ + bool cancelTimer(uint64_t timerId) + { + return timerSource_->cancelTimer(timerId); } typedef std::function ticks(0); @@ -43,3 +43,33 @@ BOOST_AUTO_TEST_CASE( test_addSource_with_needsPoll ) loop.shutdown(); } + + +BOOST_AUTO_TEST_CASE( test_cancelTimer ) +{ + ML::Watchdog wd(10); + std::atomic ticks(0); + MessageLoop loop(1, 0, -1); + loop.start(); + auto timer = make_shared(); + loop.addSource("timer", timer); + timer->waitConnectionState(AsyncEventSource::CONNECTED); + + auto onTick = [&] (uint64_t) { + Date now = Date::now(); + ticks++; + return true; + }; + auto timerId = timer->addTimer(0.2, onTick); + BOOST_CHECK(timerId > 0); + + ML::sleep(1); + BOOST_CHECK_EQUAL(timer->cancelTimer(timerId), true); + BOOST_CHECK_NE(ticks, 0); + int oldTicks = ticks; + ML::sleep(1); + BOOST_CHECK_EQUAL(ticks, oldTicks); + BOOST_CHECK_EQUAL(timer->cancelTimer(timerId), false); + + loop.shutdown(); +} diff --git a/service/timer_event_source.cc b/service/timer_event_source.cc index be784f54..06890a60 100644 --- a/service/timer_event_source.cc +++ b/service/timer_event_source.cc @@ -20,6 +20,7 @@ using namespace Datacratic; TimerEventSource:: TimerEventSource() : timerFd_(::timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC)), + counter_(1), nextTick_(Date::negativeInfinity()) { if (timerFd_ == -1) { @@ -73,14 +74,14 @@ TimerEventSource:: onTimerTick() { Date now = Date::now(); - vector > triggered = collectTriggeredTimers(now); + vector triggered = collectTriggeredTimers(now); for (auto & timer: triggered) { - double interval = timer->lastTick.secondsUntil(now); - uint64_t numTicks = (uint64_t) ::floor(interval / timer->interval); - if (timer->onTick(numTicks)) { - timer->lastTick = now; - timer->nextTick = now.plusSeconds(timer->interval); + double interval = timer.lastTick.secondsUntil(now); + uint64_t numTicks = (uint64_t) ::floor(interval / timer.interval); + if (timer.onTick(numTicks)) { + timer.lastTick = now; + timer.nextTick = now.plusSeconds(timer.interval); insertTimer(move(timer)); } } @@ -88,19 +89,27 @@ onTimerTick() adjustNextTick(now); } -vector > +vector TimerEventSource:: collectTriggeredTimers(Date refDate) { - vector > triggered; - vector > newQueue; - + vector triggered; TimersGuard guard(timersLock_); - for (auto & timer: timerQueue_) { - auto & target = (timer->nextTick < refDate) ? triggered : newQueue; - target.emplace_back(move(timer)); + + size_t nbrTriggered, nbrTimers(timerQueue_.size()); + for (nbrTriggered = 0; nbrTriggered < nbrTimers; nbrTriggered++) { + if (timerQueue_[nbrTriggered].nextTick > refDate) { + break; + } + } + + if (nbrTriggered > 0) { + triggered.reserve(nbrTriggered); + for (size_t i = 0; i < nbrTriggered; i++) { + triggered.emplace_back(move(timerQueue_[i])); + } + timerQueue_.erase(timerQueue_.begin(), timerQueue_.begin() + nbrTriggered); } - timerQueue_ = move(newQueue); return triggered; } @@ -113,7 +122,7 @@ adjustNextTick(Date now) Date negInfinity = Date::negativeInfinity(); Date nextTick = ((timerQueue_.size() > 0) - ? timerQueue_[0]->nextTick + ? timerQueue_[0].nextTick : negInfinity); if (nextTick != nextTick_) { @@ -133,30 +142,52 @@ adjustNextTick(Date now) } } - -void +uint64_t TimerEventSource:: addTimer(double delay, const OnTick & onTick) { Date now = Date::now(); - auto newTimer = make_shared(Timer{delay, onTick}); - newTimer->nextTick = now.plusSeconds(delay); - newTimer->lastTick = Date::negativeInfinity(); + uint64_t timerId = counter_.fetch_add(1); + ExcAssert(timerId != 0); // ensure we never reach the upper limit during a + // program's lifetime + + Timer newTimer{delay, onTick}; + newTimer.nextTick = now.plusSeconds(delay); + newTimer.lastTick = Date::negativeInfinity(); + newTimer.timerId = timerId; insertTimer(move(newTimer)); adjustNextTick(now); + + return timerId; } void TimerEventSource:: -insertTimer(shared_ptr && timer) +insertTimer(Timer && timer) { TimersGuard guard(timersLock_); - auto timerCompare = [&] (const shared_ptr & left, - const shared_ptr & right) { - return left->nextTick < right->nextTick; + auto timerCompare = [&] (const Timer & left, + const Timer & right) { + return left.nextTick < right.nextTick; }; auto loc = lower_bound(timerQueue_.begin(), timerQueue_.end(), timer, timerCompare); timerQueue_.insert(loc, move(timer)); } + +bool +TimerEventSource:: +cancelTimer(uint64_t timerId) +{ + TimersGuard guard(timersLock_); + + for (auto it = timerQueue_.begin(); it != timerQueue_.end(); it++) { + if (it->timerId == timerId) { + timerQueue_.erase(it); + return true; + } + } + + return false; +} diff --git a/service/timer_event_source.h b/service/timer_event_source.h index 28dc83ea..88e38f1c 100644 --- a/service/timer_event_source.h +++ b/service/timer_event_source.h @@ -9,6 +9,7 @@ #pragma once +#include #include #include "soa/types/date.h" #include "async_event_source.h" @@ -33,7 +34,10 @@ struct TimerEventSource : public AsyncEventSource { virtual bool processOne(); /* Adds a timer */ - void addTimer(double delay, const OnTick & onTick); + uint64_t addTimer(double delay, const OnTick & onTick); + + /* Cancel the given timer, returning true when the timer is found */ + bool cancelTimer(uint64_t timerId); private: typedef std::mutex TimersLock; @@ -45,19 +49,20 @@ struct TimerEventSource : public AsyncEventSource { OnTick onTick; Date nextTick; Date lastTick; + uint64_t timerId; }; void onTimerTick(); - void insertTimer(std::shared_ptr && timer); - std::vector > collectTriggeredTimers(Date refDate); + void insertTimer(Timer && timer); + std::vector collectTriggeredTimers(Date refDate); void adjustNextTick(Date now); int timerFd_; + std::atomic counter_; TimersLock timersLock_; - std::vector > timerQueue_; + std::vector timerQueue_; Date nextTick_; }; } // namespace Datacratic - From 9fe9dede8dca4e094f43bee8c759e6a38e9ceedb Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Tue, 25 Aug 2015 15:04:15 -0400 Subject: [PATCH 3/4] timer_event_source: make use of a CLOCK_MONOTONIC --- service/timer_event_source.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/service/timer_event_source.cc b/service/timer_event_source.cc index 06890a60..25b5b71d 100644 --- a/service/timer_event_source.cc +++ b/service/timer_event_source.cc @@ -19,7 +19,7 @@ using namespace Datacratic; TimerEventSource:: TimerEventSource() - : timerFd_(::timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC)), + : timerFd_(::timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC)), counter_(1), nextTick_(Date::negativeInfinity()) { From 8699eb67330374dc687c24786070cc989d16a7dd Mon Sep 17 00:00:00 2001 From: Wolfgang Sourdeau Date: Tue, 25 Aug 2015 16:42:56 -0400 Subject: [PATCH 4/4] added timer_bench --- service/testing/service_testing.mk | 1 + service/testing/timer_bench.cc | 4 ++++ 2 files changed, 5 insertions(+) create mode 100644 service/testing/timer_bench.cc diff --git a/service/testing/service_testing.mk b/service/testing/service_testing.mk index d968c233..df2cb7c1 100644 --- a/service/testing/service_testing.mk +++ b/service/testing/service_testing.mk @@ -36,6 +36,7 @@ $(eval $(call test,service_proxies_test,endpoint,boost manual)) $(eval $(call test,message_loop_test,services,boost)) $(eval $(call test,timer_event_source_test,services,boost)) +$(eval $(call test,timer_bench,services,boost manual)) $(eval $(call program,runner_test_helper,utils)) $(eval $(call test,runner_test,services,boost)) diff --git a/service/testing/timer_bench.cc b/service/testing/timer_bench.cc new file mode 100644 index 00000000..905869df --- /dev/null +++ b/service/testing/timer_bench.cc @@ -0,0 +1,4 @@ +int main() +{ + return 0; +}