Skip to content

Commit

Permalink
defercall: use timer instead of QMetaObject::invokeMethod
Browse files Browse the repository at this point in the history
  • Loading branch information
jkarneges committed Feb 20, 2025
1 parent e42b72b commit 9a83991
Show file tree
Hide file tree
Showing 17 changed files with 223 additions and 44 deletions.
1 change: 1 addition & 0 deletions src/core/coretests.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,5 +4,6 @@
int httpheaders_test(int argc, char **argv);
int jwt_test(int argc, char **argv);
int eventloop_test(int argc, char **argv);
int defercall_test(int argc, char **argv);

#endif
90 changes: 71 additions & 19 deletions src/core/defercall.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,45 +21,97 @@
*/

#include "defercall.h"
#include <assert.h>

static thread_local DeferCall *g_instance = nullptr;
#include "timer.h"

class DeferCall::Manager
{
public:
Manager()
{
timer_.setSingleShot(true);
timer_.timeout.connect(boost::bind(&Manager::timer_timeout, this));
}

void add(const std::weak_ptr<Call> &c)
{
calls_.push_back(c);

if(!timer_.isActive())
timer_.start(0);
}

void flush()
{
while(!calls_.empty())
process();
}

private:
Timer timer_;
std::list<std::weak_ptr<Call>> calls_;

void process()
{
// process all calls queued so far, but not any that may get queued
// during processing
std::list<std::weak_ptr<Call>> ready;
ready.swap(calls_);

for(auto c : ready)
{
if(auto p = c.lock())
p->handler();
}
}

void timer_timeout()
{
process();

// no need to re-arm the timer. if new calls were queued during
// processing, add() will have taken care of that
}
};

DeferCall::DeferCall() = default;

DeferCall::~DeferCall() = default;

void DeferCall::defer(std::function<void ()> handler)
{
Call c;
c.handler = handler;
std::shared_ptr<Call> c = std::make_shared<Call>();
c->handler = handler;

deferredCalls_.push_back(c);

QMetaObject::invokeMethod(this, "callNext", Qt::QueuedConnection);
if(!manager)
manager = new Manager;

// manager keeps a weak pointer, so we can invalidate pending calls by
// simply deleting them
manager->add(c);
}

DeferCall *DeferCall::global()
{
if(!g_instance)
g_instance = new DeferCall;
if(!instance)
instance = new DeferCall;

return g_instance;
return instance;
}

void DeferCall::cleanup()
{
delete g_instance;
g_instance = nullptr;
}
if(manager)
manager->flush();

void DeferCall::callNext()
{
// there can't be more invokeMethod resolutions than queued calls
assert(!deferredCalls_.empty());

Call c = deferredCalls_.front();
deferredCalls_.pop_front();
delete instance;
instance = nullptr;

c.handler();
delete manager;
manager = nullptr;
}

thread_local DeferCall::Manager *DeferCall::manager = nullptr;
thread_local DeferCall *DeferCall::instance = nullptr;
19 changes: 11 additions & 8 deletions src/core/defercall.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@
#ifndef DEFERCALL_H
#define DEFERCALL_H

#include <QObject>
#include <functional>
#include <memory>
#include <list>

// queues calls to be run after returning to the event loop
class DeferCall : public QObject
class DeferCall
{
Q_OBJECT

public:
DeferCall();
~DeferCall();
Expand All @@ -51,17 +51,20 @@ class DeferCall : public QObject
global()->defer([=] { delete p; });
}

private slots:
void callNext();

private:
class Call
{
public:
std::function<void ()> handler;
};

std::list<Call> deferredCalls_;
class Manager;
friend class Manager;

std::list<std::shared_ptr<Call>> deferredCalls_;

static thread_local Manager *manager;
static thread_local DeferCall *instance;
};

#endif
102 changes: 102 additions & 0 deletions src/core/defercalltest.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright (C) 2025 Fastly, Inc.
*
* This file is part of Pushpin.
*
* $FANOUT_BEGIN_LICENSE:APACHE2$
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* $FANOUT_END_LICENSE$
*/

#include <QtTest/QtTest>
#include "timer.h"
#include "defercall.h"

class DeferCallTest : public QObject
{
Q_OBJECT

private slots:
void initTestCase()
{
Timer::init(100);
}

void cleanupTestCase()
{
DeferCall::cleanup();
Timer::deinit();
}

void deferCall()
{
bool first = false;
bool second = false;

DeferCall::global()->defer([&] {
first = true;

DeferCall::global()->defer([&] {
second = true;
});
});

// the second deferred call should cause the underlying timer to
// re-arm. since we aren't contending with other timers within this
// test, both timeouts should get processed during a single timer
// processing pass. therefore, both calls should get processed within
// a single eventloop pass
QTest::qWait(10);
QVERIFY(first);
QVERIFY(second);
}

void managerCleanup()
{
bool first = false;
bool second = false;

DeferCall::global()->defer([&] {
first = true;

DeferCall::global()->defer([&] {
second = true;
});
});

// cleanup should process deferred calls queued so far as well as
// those queued during processing
DeferCall::cleanup();
QVERIFY(first);
QVERIFY(second);
}
};

namespace {
namespace Main {
QTEST_MAIN(DeferCallTest)
}
}

extern "C" {

int defercall_test(int argc, char **argv)
{
return Main::main(argc, argv);
}

}

#include "defercalltest.moc"
5 changes: 0 additions & 5 deletions src/core/eventlooptest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,6 @@ class EventLoopTest : public QObject
Q_OBJECT

private slots:
void cleanupTestCase()
{
DeferCall::cleanup();
}

void socketNotifier()
{
EventLoop loop(1);
Expand Down
10 changes: 10 additions & 0 deletions src/core/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ mod tests {
unsafe { call_c_main(ffi::eventloop_test, args) as u8 }
}

fn defercall_test(args: &[&OsStr]) -> u8 {
// SAFETY: safe to call
unsafe { call_c_main(ffi::defercall_test, args) as u8 }
}

#[test]
fn httpheaders() {
assert!(qtest::run(httpheaders_test));
Expand All @@ -134,4 +139,9 @@ mod tests {
fn eventloop() {
assert!(qtest::run(eventloop_test));
}

#[test]
fn defercall() {
assert!(qtest::run(defercall_test));
}
}
3 changes: 2 additions & 1 deletion src/core/tests.pri
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ INCLUDES += \
SOURCES += \
$$PWD/httpheaderstest.cpp \
$$PWD/jwttest.cpp \
$$PWD/eventlooptest.cpp
$$PWD/eventlooptest.cpp \
$$PWD/defercalltest.cpp
15 changes: 12 additions & 3 deletions src/core/timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,19 @@ int TimerManager::add(int msec, Timer *r)
{
qint64 currentTime = QDateTime::currentMSecsSinceEpoch();

// expireTime must be >= startTime_
qint64 expireTime = qMax(currentTime + msec, startTime_);
qint64 expiresTicks;
if(msec <= 0)
{
// for timeouts of zero, set immediate expiration with no rounding up
expiresTicks = currentTicks_;
}
else
{
// expireTime must be >= startTime_
qint64 expireTime = qMax(currentTime + msec, startTime_);

qint64 expiresTicks = durationToTicksRoundUp(expireTime - startTime_);
expiresTicks = durationToTicksRoundUp(expireTime - startTime_);
}

int id = wheel_.add(expiresTicks, (size_t)r);

Expand Down
3 changes: 2 additions & 1 deletion src/handler/filtertest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -208,14 +208,15 @@ private slots:

void cleanupTestCase()
{
limiter.reset();
zhttpOut.reset();
filterServer.reset();

// ensure deferred deletes are processed
QCoreApplication::instance()->sendPostedEvents();

Timer::deinit();
DeferCall::cleanup();
Timer::deinit();
}

void messageFilters()
Expand Down
4 changes: 2 additions & 2 deletions src/handler/handlerengine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -826,8 +826,8 @@ class AcceptWorker : public Deferred

QList<std::shared_ptr<HttpSession>> takeSessions()
{
QList<std::shared_ptr<HttpSession>> out = sessions;
sessions.clear();
QList<std::shared_ptr<HttpSession>> out;
out.swap(sessions);

foreach(const std::shared_ptr<HttpSession> &hs, out)
hs->setParent(0);
Expand Down
2 changes: 1 addition & 1 deletion src/handler/handlerenginetest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ private slots:
// ensure deferred deletes are processed
QCoreApplication::instance()->sendPostedEvents();

Timer::deinit();
DeferCall::cleanup();
Timer::deinit();
}

void acceptNoHold()
Expand Down
2 changes: 1 addition & 1 deletion src/handler/handlermain.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,8 @@ int handler_main(int argc, char **argv)
QCoreApplication::instance()->sendPostedEvents();

// deinit here, after all event loop activity has completed
Timer::deinit();
DeferCall::cleanup();
Timer::deinit();

return ret;
}
Expand Down
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ pub mod ffi {
pub fn httpheaders_test(argc: libc::c_int, argv: *const *const libc::c_char) -> libc::c_int;
pub fn jwt_test(argc: libc::c_int, argv: *const *const libc::c_char) -> libc::c_int;
pub fn eventloop_test(argc: libc::c_int, argv: *const *const libc::c_char) -> libc::c_int;
pub fn defercall_test(argc: libc::c_int, argv: *const *const libc::c_char) -> libc::c_int;
pub fn routesfile_test(argc: libc::c_int, argv: *const *const libc::c_char) -> libc::c_int;
pub fn proxyengine_test(argc: libc::c_int, argv: *const *const libc::c_char) -> libc::c_int;
pub fn filter_test(argc: libc::c_int, argv: *const *const libc::c_char) -> libc::c_int;
Expand Down
2 changes: 1 addition & 1 deletion src/proxy/app.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,8 @@ class EngineThread : public QThread
QCoreApplication::instance()->sendPostedEvents();

// deinit here, after all event loop activity has completed
Timer::deinit();
DeferCall::cleanup();
Timer::deinit();
}

private:
Expand Down
Loading

0 comments on commit 9a83991

Please sign in to comment.