Skip to content

Commit

Permalink
Add new event source type async and use it in event dispatcher (fcitx…
Browse files Browse the repository at this point in the history
…#1257)

On Windows, the pipe API is very different from unix that uv_poll
will not work easily without some big change. Instead, we introduce an
Async event that can be easily mapped to uv_async, while keep self
pipe trick on linux for other event loop implementation.

We will try to avoid pipe usage in the actual code for simplicity.
  • Loading branch information
wengxt authored Feb 3, 2025
1 parent 4ac93a0 commit 1a6ca37
Show file tree
Hide file tree
Showing 11 changed files with 245 additions and 53 deletions.
9 changes: 9 additions & 0 deletions src/lib/fcitx-utils/event.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,13 @@ std::unique_ptr<EventSource> EventLoop::addPostEvent(EventCallback callback) {
return d->impl_->addPostEvent(std::move(callback));
}

std::unique_ptr<EventSourceAsync>
EventLoop::addAsyncEvent(EventCallback callback) {
FCITX_D();
if (auto *v2 = dynamic_cast<EventLoopInterfaceV2 *>(d->impl_.get())) {
return v2->addAsyncEvent(std::move(callback));
}
return nullptr;
}

} // namespace fcitx
23 changes: 23 additions & 0 deletions src/lib/fcitx-utils/event.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,29 @@ class FCITXUTILS_EXPORT EventLoop {
FCITX_NODISCARD std::unique_ptr<EventSource>
addPostEvent(EventCallback callback);

/**
* Add an async event that is safe to be triggered from another thread.
*
* To ensure safe usage of this event:
* 1. Do not change event to disable, if there may be pending call to send()
* 2. Due to (1), if it is oneshot, ensure you only call send() once.
* 3. Join all the possible pending thread that may call send(), before
* destructing the event.
* 4. Like other event, the event should be only created/destructed on the
* event loop thread.
*
* EventDispatcher uses this event internally and provides an easier and
* safer interface to use.
*
* @see EventDispatcher
*
* @param callback callback function
* @return async event source
* @since 5.1.13
*/
FCITX_NODISCARD std::unique_ptr<EventSourceAsync>
addAsyncEvent(EventCallback callback);

/**
* Set an external event loop implementation.
*
Expand Down
48 changes: 48 additions & 0 deletions src/lib/fcitx-utils/event_libuv.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,32 @@ void IOEventCallback(uv_poll_t *handle, int status, int events);
void TimeEventCallback(uv_timer_t *handle);
void PostEventCallback(uv_prepare_t *handle);

void AsyncEventCallback(uv_async_t *handle) {
auto *source = static_cast<LibUVSourceAsync *>(
static_cast<LibUVSourceBase *>(handle->data));

if (!source->isEnabled()) {
return;
}

try {
auto sourceRef = source->watch();
if (source->isOneShot()) {
source->setEnabled(false);
}
auto callback = source->callback_;
auto ret = (*callback)(source);
if (sourceRef.isValid()) {
if (!ret) {
source->setEnabled(false);
}
}
} catch (const std::exception &e) {
// some abnormal things threw{
FCITX_FATAL() << e.what();
}
}

UVLoop::~UVLoop() {
// Close and detach all handle.
uv_walk(
Expand Down Expand Up @@ -98,6 +124,7 @@ bool LibUVSourceTime::setup(uv_loop_t *loop, uv_timer_t *timer) {
}
return true;
}

bool LibUVSourcePost::setup(uv_loop_t *loop, uv_prepare_t *prepare) {
if (int err = uv_prepare_init(loop, prepare); err < 0) {
FCITX_LIBUV_DEBUG() << "Failed to init prepare with error: " << err;
Expand All @@ -109,6 +136,19 @@ bool LibUVSourcePost::setup(uv_loop_t *loop, uv_prepare_t *prepare) {
}
return true;
}

bool LibUVSourceAsync::setup(uv_loop_t *loop, uv_async_t *async) {
if (int err = uv_async_init(loop, async, &AsyncEventCallback); err < 0) {
FCITX_LIBUV_DEBUG() << "Failed to init async with error: " << err;
return false;
}
return true;
}

void LibUVSourceAsync::send() {
uv_async_send(reinterpret_cast<uv_async_t *>(handle_));
}

bool LibUVSourceIO::setup(uv_loop_t *loop, uv_poll_t *poll) {
if (int err = uv_poll_init(loop, poll, fd_); err < 0) {
FCITX_LIBUV_DEBUG()
Expand Down Expand Up @@ -266,4 +306,12 @@ EventLoopLibUV::addPostEvent(EventCallback callback) {
auto source = std::make_unique<LibUVSourcePost>(std::move(callback), loop_);
return source;
}

std::unique_ptr<EventSourceAsync>
EventLoopLibUV::addAsyncEvent(EventCallback callback) {
auto source =
std::make_unique<LibUVSourceAsync>(std::move(callback), loop_);
return source;
}

} // namespace fcitx
20 changes: 19 additions & 1 deletion src/lib/fcitx-utils/event_libuv.h
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,23 @@ struct LibUVSourceExit final : public EventSource,
EventCallback callback_;
};

class EventLoopLibUV : public EventLoopInterface {
struct LibUVSourceAsync final
: public LibUVSource<EventSourceAsync, uv_async_t>,
public TrackableObject<LibUVSourceAsync> {
LibUVSourceAsync(EventCallback callback, std::shared_ptr<UVLoop> loop)
: LibUVSource(std::move(loop)),
callback_(std::make_shared<EventCallback>(std::move(callback))) {
setEnabled(true);
}

bool setup(uv_loop_t *loop, uv_async_t *async) override;

void send() override;

std::shared_ptr<EventCallback> callback_;
};

class EventLoopLibUV : public EventLoopInterfaceV2 {
public:
EventLoopLibUV();
bool exec() override;
Expand All @@ -238,6 +254,8 @@ class EventLoopLibUV : public EventLoopInterface {
addDeferEvent(EventCallback callback) override;
FCITX_NODISCARD std::unique_ptr<EventSource>
addPostEvent(EventCallback callback) override;
FCITX_NODISCARD std::unique_ptr<EventSourceAsync>
addAsyncEvent(EventCallback callback) override;

private:
std::shared_ptr<UVLoop> loop_;
Expand Down
48 changes: 47 additions & 1 deletion src/lib/fcitx-utils/event_sdevent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
#include "eventloopinterface.h"
#include "log.h"
#include "macros.h"
#include "misc_p.h"
#include "stringutils.h"
#include "unixfd.h"

#if defined(__COVERITY__) && !defined(__INCLUDE_LEVEL__)
#define __INCLUDE_LEVEL__ 2
Expand Down Expand Up @@ -57,7 +59,7 @@ IOEventFlags EpollFlagsToIOEventFlags(uint32_t flags) {

} // namespace

class EventLoopSDEvent : public EventLoopInterface {
class EventLoopSDEvent : public EventLoopInterfaceV2 {
public:
EventLoopSDEvent();
~EventLoopSDEvent();
Expand All @@ -77,6 +79,8 @@ class EventLoopSDEvent : public EventLoopInterface {
addDeferEvent(EventCallback callback) override;
FCITX_NODISCARD std::unique_ptr<EventSource>
addPostEvent(EventCallback callback) override;
FCITX_NODISCARD std::unique_ptr<EventSourceAsync>
addAsyncEvent(EventCallback callback) override;

private:
std::mutex mutex_;
Expand Down Expand Up @@ -374,4 +378,46 @@ EventLoopSDEvent::addPostEvent(EventCallback callback) {
source->setEventSource(sdEventSource);
return source;
}

struct SDEventSourceAsync : public EventSourceAsync {
public:
SDEventSourceAsync(EventLoopInterfaceV2 *event, EventCallback callback) {
int selfpipe[2];
if (safePipe(selfpipe)) {
throw EventLoopException(-EPIPE);
}
fd_[0].give(selfpipe[0]);
fd_[1].give(selfpipe[1]);
ioEvent_ = event->addIOEvent(
fd_[0].fd(), IOEventFlag::In,
[this, callback = std::move(callback)](EventSource *, int fd,
IOEventFlags) {
uint8_t dummy;
while (fs::safeRead(fd, &dummy, sizeof(dummy)) > 0) {
}
callback(this);
return true;
});
}

bool isEnabled() const override { return ioEvent_->isEnabled(); }
void setEnabled(bool enabled) override { ioEvent_->setEnabled(enabled); }
bool isOneShot() const override { return ioEvent_->isOneShot(); }
void setOneShot() override { ioEvent_->setOneShot(); }

void send() override {
uint8_t dummy = 0;
fs::safeWrite(fd_[1].fd(), &dummy, 1);
}

protected:
UnixFD fd_[2];
std::unique_ptr<EventSourceIO> ioEvent_;
};

std::unique_ptr<EventSourceAsync>
EventLoopSDEvent::addAsyncEvent(EventCallback callback) {
return std::make_unique<SDEventSourceAsync>(this, std::move(callback));
}

} // namespace fcitx
47 changes: 16 additions & 31 deletions src/lib/fcitx-utils/eventdispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,17 @@
#include <memory>
#include <mutex>
#include <queue>
#include <stdexcept>
#include <utility>
#include "event.h"
#include "eventloopinterface.h"
#include "fs.h"
#include "macros.h"
#include "misc_p.h"
#include "unixfd.h"

namespace fcitx {
class EventDispatcherPrivate {
public:
void dispatchEvent() {
uint8_t dummy;
while (fs::safeRead(fd_[0].fd(), &dummy, sizeof(dummy)) > 0) {
}
std::queue<std::function<void()>> eventList;
{
std::lock_guard<std::mutex> lock(mutex_);
Expand All @@ -39,56 +34,46 @@ class EventDispatcherPrivate {
}
}

// Mutex to be used to protect eventList_.
// Mutex to be used to protect fields below.
mutable std::mutex mutex_;
std::queue<std::function<void()>> eventList_;
std::unique_ptr<EventSourceIO> ioEvent_;
std::unique_ptr<EventSourceAsync> asyncEvent_;
EventLoop *loop_ = nullptr;
UnixFD fd_[2];
};

EventDispatcher::EventDispatcher()
: d_ptr(std::make_unique<EventDispatcherPrivate>()) {
FCITX_D();
int selfpipe[2];
if (safePipe(selfpipe)) {
throw std::runtime_error("Failed to create pipe");
}
d->fd_[0].give(selfpipe[0]);
d->fd_[1].give(selfpipe[1]);
}
: d_ptr(std::make_unique<EventDispatcherPrivate>()) {}

EventDispatcher::~EventDispatcher() = default;

void EventDispatcher::attach(EventLoop *event) {
FCITX_D();
std::lock_guard<std::mutex> lock(d->mutex_);
d->ioEvent_ = event->addIOEvent(d->fd_[0].fd(), IOEventFlag::In,
[d](EventSource *, int, IOEventFlags) {
d->dispatchEvent();
return true;
});
d->asyncEvent_ = event->addAsyncEvent([d](EventSource *) {
d->dispatchEvent();
return true;
});
d->loop_ = event;
}

void EventDispatcher::detach() {
FCITX_D();
std::lock_guard<std::mutex> lock(d->mutex_);
d->ioEvent_.reset();
d->asyncEvent_.reset();
d->loop_ = nullptr;
}

void EventDispatcher::schedule(std::function<void()> functor) {
FCITX_D();
if (functor) {
std::lock_guard<std::mutex> lock(d->mutex_);
if (!d->ioEvent_) {
return;
}
d->eventList_.push(std::move(functor));
if (!functor) {
return;
}
std::lock_guard<std::mutex> lock(d->mutex_);
if (!d->asyncEvent_) {
return;
}
uint8_t dummy = 0;
fs::safeWrite(d->fd_[1].fd(), &dummy, 1);
d->eventList_.push(std::move(functor));
d->asyncEvent_->send();
}

EventLoop *EventDispatcher::eventLoop() const {
Expand Down
22 changes: 22 additions & 0 deletions src/lib/fcitx-utils/eventloopinterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,21 @@ struct FCITXUTILS_EXPORT EventSourceTime : public EventSource {
FCITX_NODISCARD virtual clockid_t clock() const = 0;
};

/**
* A thread-safe event source can be triggered from other threads.
*
* @since 5.1.13
*/
struct FCITXUTILS_EXPORT EventSourceAsync : public EventSource {
/**
* Trigger the event from other thread.
*
* The callback is guranteed to be called send() if it is enabled.
* Multiple call to send() may only trigger the callback once.
*/
virtual void send() = 0;
};

using IOCallback =
std::function<bool(EventSourceIO *, int fd, IOEventFlags flags)>;
using TimeCallback = std::function<bool(EventSourceTime *, uint64_t usec)>;
Expand Down Expand Up @@ -131,6 +146,13 @@ class FCITXUTILS_EXPORT EventLoopInterface {
FCITX_NODISCARD virtual std::unique_ptr<EventSource>
addPostEvent(EventCallback callback) = 0;
};

class FCITXUTILS_EXPORT EventLoopInterfaceV2 : public EventLoopInterface {
public:
FCITX_NODISCARD virtual std::unique_ptr<EventSourceAsync>
addAsyncEvent(EventCallback callback) = 0;
};

} // namespace fcitx

#endif // _FCITX_UTILS_EVENTLOOPINTERFACE_H_
3 changes: 3 additions & 0 deletions test/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ set(FCITX_UTILS_DBUS_TEST
set(testdbus_LIBS Pthread::Pthread)
set(testeventdispatcher_LIBS Pthread::Pthread)
set(testevent_LIBS Pthread::Pthread eventlooptests)

if (NOT WIN32)
set(testcustomeventloop_LIBS Pthread::Pthread eventlooptests)
endif()

find_program(XVFB_BIN Xvfb)

Expand Down
Loading

0 comments on commit 1a6ca37

Please sign in to comment.