Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PLAT-937: added TimerEventSource, enabling use of a single fd for mul… #197

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion service/message_loop.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ MessageLoop(int numThreads, double maxAddedLatency, int epollTimeout)
: sourceActions_([&] () { handleSourceActions(); }),
numThreadsCreated(0),
shutdown_(true),
totalSleepTime_(0.0)
totalSleepTime_(0.0),
timerSource_(new TimerEventSource())
{
init(numThreads, maxAddedLatency, epollTimeout);
addSource("timer_", timerSource_);
}

MessageLoop::
Expand Down
32 changes: 26 additions & 6 deletions service/message_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,19 @@

#pragma once

#include <thread>
#include <functional>
#include <memory>
#include <thread>
#include <vector>

#include "jml/arch/wakeup_fd.h"
#include "jml/arch/spinlock.h"

#include "epoller.h"
#include "async_event_source.h"
#include "typed_message_channel.h"
#include "logs.h"
#include "timer_event_source.h"
#include "typed_message_channel.h"

namespace Datacratic {

Expand All @@ -32,9 +35,10 @@ struct MessageLoopLogs
static Logging::Category trace;
};

/*****************************************************************************/
/* MESSAGE LOOP */
/*****************************************************************************/

/****************************************************************************/
/* MESSAGE LOOP */
/****************************************************************************/

struct MessageLoop : public Epoller {
typedef std::function<void ()> OnStop;
Expand Down Expand Up @@ -90,7 +94,21 @@ struct MessageLoop : public Epoller {
double timePeriodSeconds,
std::function<void (uint64_t)> toRun,
int priority = 0);


/* Add a timer. Returns an id that is guaranteed to be > 0, which enables
* "0" to be used as a test value. */
uint64_t addTimer(double delay, const TimerEventSource::OnTick & onTick)
{
return timerSource_->addTimer(delay, onTick);
}

/* Cancel a timer. Return "true" when the corresponding timer could be
found and deleted and "false" otherwise. */
bool cancelTimer(uint64_t timerId)
{
return timerSource_->cancelTimer(timerId);
}

typedef std::function<void (volatile int & shutdown_,
int64_t threadId)> SubordinateThreadFn;

Expand Down Expand Up @@ -214,6 +232,8 @@ struct MessageLoop : public Epoller {
void processAddSource(const SourceEntry & entry);
void processRemoveSource(const SourceEntry & entry);
void processRunAction(const SourceEntry & entry);

std::shared_ptr<TimerEventSource> timerSource_;
};

} // namespace Datacratic
1 change: 1 addition & 0 deletions service/service.mk
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ LIBSERVICES_SOURCES := \
async_event_source.cc \
async_writer_source.cc \
tcp_client.cc \
timer_event_source.cc \
rest_service_endpoint.cc \
http_named_endpoint.cc \
rest_proxy.cc \
Expand Down
1 change: 1 addition & 0 deletions service/testing/service_testing.mk
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ $(eval $(call test,http_rest_proxy_stress_test,services,boost manual))
$(eval $(call test,service_proxies_test,endpoint,boost manual))

$(eval $(call test,message_loop_test,services,boost))
$(eval $(call test,timer_event_source_test,services,boost))

$(eval $(call program,runner_test_helper,utils))
$(eval $(call test,runner_test,services,boost))
Expand Down
75 changes: 75 additions & 0 deletions service/testing/timer_event_source_test.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#define BOOST_TEST_MAIN
#define BOOST_TEST_DYN_LINK

#include <atomic>
#include <iostream>

#include <boost/test/unit_test.hpp>

#include "jml/arch/timers.h"
#include "jml/utils/testing/watchdog.h"
#include "soa/types/date.h"
#include "soa/service/message_loop.h"
#include "soa/service/timer_event_source.h"

using namespace std;
using namespace Datacratic;


BOOST_AUTO_TEST_CASE( test_addTimer )
{
ML::Watchdog wd(10);
std::atomic<int> ticks(0);
MessageLoop loop(1, 0, -1);
loop.start();
auto timer = make_shared<TimerEventSource>();
loop.addSource("timer", timer);
timer->waitConnectionState(AsyncEventSource::CONNECTED);

auto onTick = [&] (uint64_t) {
Date now = Date::now();
ticks++;
return ticks < 3;
};
timer->addTimer(0.2, onTick);

while (true) {
if (ticks == 3) {
Date now = Date::now();
break;
}
ML::sleep(1);
}

loop.shutdown();
}


BOOST_AUTO_TEST_CASE( test_cancelTimer )
{
ML::Watchdog wd(10);
std::atomic<int> ticks(0);
MessageLoop loop(1, 0, -1);
loop.start();
auto timer = make_shared<TimerEventSource>();
loop.addSource("timer", timer);
timer->waitConnectionState(AsyncEventSource::CONNECTED);

auto onTick = [&] (uint64_t) {
Date now = Date::now();
ticks++;
return true;
};
auto timerId = timer->addTimer(0.2, onTick);
BOOST_CHECK(timerId > 0);

ML::sleep(1);
BOOST_CHECK_EQUAL(timer->cancelTimer(timerId), true);
BOOST_CHECK_NE(ticks, 0);
int oldTicks = ticks;
ML::sleep(1);
BOOST_CHECK_EQUAL(ticks, oldTicks);
BOOST_CHECK_EQUAL(timer->cancelTimer(timerId), false);

loop.shutdown();
}
193 changes: 193 additions & 0 deletions service/timer_event_source.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/* timer_event_source.cc
Wolfgang Sourdeau, August 2015
Copyright (c) 2015 Datacratic. All rights reserved.
*/

#include <math.h>
#include <sys/timerfd.h>

#include "jml/utils/exc_assert.h"
#include "timer_event_source.h"

using namespace std;
using namespace Datacratic;


/****************************************************************************/
/* TIMER EVENT SOURCE */
/****************************************************************************/

TimerEventSource::
TimerEventSource()
: timerFd_(::timerfd_create(CLOCK_REALTIME, TFD_NONBLOCK | TFD_CLOEXEC)),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We very very likely want CLOCK_MONOTONIC here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. I don't remmeber why I chose that one.

counter_(1),
nextTick_(Date::negativeInfinity())
{
if (timerFd_ == -1) {
throw ML::Exception(errno, "timerfd_create");
}
}

TimerEventSource::
~TimerEventSource()
{
int res = ::close(timerFd_);
if (res == -1) {
cerr << "warning: close on timerfd: " << strerror(errno) << endl;
}
}

int
TimerEventSource::
selectFd()
const
{
return timerFd_;
}

bool
TimerEventSource::
processOne()
{
start:
uint64_t numWakeups = 0;
int res = ::read(timerFd_, &numWakeups, 8);
if (res == -1) {
if (errno == EINTR) {
goto start;
}
else if (errno == EAGAIN || errno == EWOULDBLOCK) {
return false;
}
else {
throw ML::Exception(errno, "timerfd read");
}
}
ExcAssertEqual(res, 8);
onTimerTick();

return false;
}

void
TimerEventSource::
onTimerTick()
{
Date now = Date::now();
vector<Timer> triggered = collectTriggeredTimers(now);

for (auto & timer: triggered) {
double interval = timer.lastTick.secondsUntil(now);
uint64_t numTicks = (uint64_t) ::floor(interval / timer.interval);
if (timer.onTick(numTicks)) {
timer.lastTick = now;
timer.nextTick = now.plusSeconds(timer.interval);
insertTimer(move(timer));
}
}

adjustNextTick(now);
}

vector<TimerEventSource::Timer>
TimerEventSource::
collectTriggeredTimers(Date refDate)
{
vector<Timer> triggered;
TimersGuard guard(timersLock_);

size_t nbrTriggered, nbrTimers(timerQueue_.size());
for (nbrTriggered = 0; nbrTriggered < nbrTimers; nbrTriggered++) {
if (timerQueue_[nbrTriggered].nextTick > refDate) {
break;
}
}

if (nbrTriggered > 0) {
triggered.reserve(nbrTriggered);
for (size_t i = 0; i < nbrTriggered; i++) {
triggered.emplace_back(move(timerQueue_[i]));
}
timerQueue_.erase(timerQueue_.begin(), timerQueue_.begin() + nbrTriggered);
}

return triggered;
}

void
TimerEventSource::
adjustNextTick(Date now)
{
TimersGuard guard(timersLock_);

Date negInfinity = Date::negativeInfinity();
Date nextTick = ((timerQueue_.size() > 0)
? timerQueue_[0].nextTick
: negInfinity);

if (nextTick != nextTick_) {
struct itimerspec spec{{0, 0}, {0, 0}};

if (nextTick != negInfinity) {
double delta = nextTick - now;
auto & value = spec.it_value;
value.tv_sec = (time_t) delta;
value.tv_nsec = (delta - spec.it_value.tv_sec) * 1000000000;
}
int res = ::timerfd_settime(timerFd_, 0, &spec, nullptr);
if (res == -1) {
throw ML::Exception(errno, "timerfd_settime");
}
nextTick_ = nextTick;
}
}

uint64_t
TimerEventSource::
addTimer(double delay, const OnTick & onTick)
{
Date now = Date::now();
uint64_t timerId = counter_.fetch_add(1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wraparound?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jeremybarnes indeed and this raises a design question that I worked-around by using an assertion... I do not see how addTimer can guarantee that the ids will be unique, other than by remembering them somehow. I considered the use of a guid instead, but even though it postpones it further in time, it does not solve the problem either.

ExcAssert(timerId != 0); // ensure we never reach the upper limit during a
// program's lifetime

Timer newTimer{delay, onTick};
newTimer.nextTick = now.plusSeconds(delay);
newTimer.lastTick = Date::negativeInfinity();
newTimer.timerId = timerId;
insertTimer(move(newTimer));
adjustNextTick(now);

return timerId;
}

void
TimerEventSource::
insertTimer(Timer && timer)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you're going to make a copy anyway, it should be passed by value.

{
TimersGuard guard(timersLock_);

auto timerCompare = [&] (const Timer & left,
const Timer & right) {
return left.nextTick < right.nextTick;
};
auto loc = lower_bound(timerQueue_.begin(), timerQueue_.end(),
timer, timerCompare);
timerQueue_.insert(loc, move(timer));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This kind of thing is normally implemented using a min-heap. Using a vector for this introduces a lot of degenerate cases especially since the goal of this is to support a lot of timers i.e. if the number of timers is small, why bother? if it's large, it's likely not very efficient unless all timers are inserted at the end.


bool
TimerEventSource::
cancelTimer(uint64_t timerId)
{
TimersGuard guard(timersLock_);

for (auto it = timerQueue_.begin(); it != timerQueue_.end(); it++) {
if (it->timerId == timerId) {
timerQueue_.erase(it);
return true;
}
}

return false;
}
Loading