Skip to content

Commit

Permalink
Refactor: convert all subscriptions to be oneshot
Browse files Browse the repository at this point in the history
  • Loading branch information
ColdenCullen committed Sep 29, 2023
1 parent 558b399 commit 9135a08
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 89 deletions.
10 changes: 4 additions & 6 deletions include/tlb/private/event_loop.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@
#include "tlb/event_loop.h"

enum tlb_sub_mode {
TLB_SUB_ONESHOT = TLB_BIT(1),
TLB_SUB_EDGE = TLB_BIT(2),
TLB_SUB_EDGE = TLB_BIT(1),
};

enum tlb_sub_state {
Expand All @@ -17,7 +16,6 @@ enum tlb_sub_state {
struct tlb_event_loop {
struct tlb_allocator *alloc;
int fd;
bool super_loop;
};

struct tlb_subscription {
Expand All @@ -29,9 +27,9 @@ struct tlb_subscription {
tlb_on_event *on_event;
void *userdata;

uint8_t events; /* enum tlb_events */
uint8_t sub_mode; /* enum tlb_sub_flags */
volatile uint8_t oneshot_state; /* enum tlb_sub_state */
uint8_t events; /* enum tlb_events */
uint8_t sub_mode; /* enum tlb_sub_flags */
volatile uint8_t state; /* enum tlb_sub_state */

/* Reserved for each platform to use */
union {
Expand Down
50 changes: 22 additions & 28 deletions source/bsd/event_loop_kqueue.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,11 @@ int s_kqueue_change(struct tlb_event_loop *loop, struct tlb_subscription *sub, u

/* Calculate flags */
if (flags == EV_ADD) {
/* All subscriptions are "oneshot" subscriptions */
flags |= EV_DISPATCH;
if (sub->sub_mode & TLB_SUB_EDGE) {
flags |= EV_CLEAR;
}

if (sub->sub_mode & TLB_SUB_ONESHOT) {
flags |= EV_DISPATCH;
}
}

struct tlb_evl_kqueue *kq = &sub->platform.kqueue;
Expand Down Expand Up @@ -110,7 +108,6 @@ void tlb_evl_impl_fd_init(struct tlb_subscription *sub) {

void tlb_evl_impl_timer_init(struct tlb_subscription *sub, int timeout) {
sub->ident.ident = (uintptr_t)sub;
sub->sub_mode |= TLB_SUB_ONESHOT;
sub->platform.kqueue.filters[0] = EVFILT_TIMER;
sub->platform.kqueue.data = timeout;
}
Expand Down Expand Up @@ -152,31 +149,28 @@ int tlb_evl_handle_events(struct tlb_event_loop *loop, size_t budget, int timeou
TLB_LOG_EVENT(sub, "Handling");

/* Cache this off because sub can become invalid during on_event */
const bool is_oneshot = sub->sub_mode & TLB_SUB_ONESHOT;
sub->oneshot_state = TLB_STATE_RUNNING;
sub->state = TLB_STATE_RUNNING;
sub->on_event(sub, s_events_from_kevent(ev), sub->userdata);

if (is_oneshot) {
switch (sub->oneshot_state) {
case TLB_STATE_SUBBED:
/* Not possible */
TLB_LOG_EVENT(sub, "In bad state!");
TLB_ASSERT(false);
break;

case TLB_STATE_RUNNING:
/* Resubscribe the event */
sub->oneshot_state = TLB_STATE_SUBBED;
s_kqueue_change(loop, sub, EV_ENABLE);
TLB_LOG_EVENT(sub, "Set to SUBBED");
break;

case TLB_STATE_UNSUBBED:
/* Force-remove the subscription */
sub->oneshot_state = TLB_STATE_SUBBED;
tlb_evl_remove(loop, sub);
break;
}
switch (sub->state) {
case TLB_STATE_SUBBED:
/* Not possible */
TLB_LOG_EVENT(sub, "In bad state!");
TLB_ASSERT(false);
break;

case TLB_STATE_RUNNING:
/* Resubscribe the event */
sub->state = TLB_STATE_SUBBED;
s_kqueue_change(loop, sub, EV_ENABLE);
TLB_LOG_EVENT(sub, "Set to SUBBED");
break;

case TLB_STATE_UNSUBBED:
/* Force-remove the subscription */
sub->state = TLB_STATE_SUBBED;
tlb_evl_remove(loop, sub);
break;
}
}

Expand Down
41 changes: 16 additions & 25 deletions source/event_loop.c
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,6 @@ static struct tlb_subscription *s_sub_new(struct tlb_event_loop *loop, tlb_on_ev
.name = name,
};

if (loop->super_loop) {
sub->sub_mode |= TLB_SUB_ONESHOT;
}

return sub;
}

Expand Down Expand Up @@ -116,27 +112,22 @@ int tlb_evl_remove(struct tlb_event_loop *loop, tlb_handle subscription) {
struct tlb_subscription *sub = subscription;
int result = 0;

if (sub->sub_mode & TLB_SUB_ONESHOT) {
switch ((enum tlb_sub_state)sub->oneshot_state) {
case TLB_STATE_SUBBED:
TLB_LOG_EVENT(sub, "ONESHOT & SUBBED, unsubbing and freeing");
result = tlb_evl_impl_unsubscribe(loop, sub);
tlb_free(loop->alloc, sub);
break;

case TLB_STATE_RUNNING:
TLB_LOG_EVENT(sub, "ONESHOT & RUNNING, Setting oneshot_state");
sub->oneshot_state = TLB_STATE_UNSUBBED;
break;

case TLB_STATE_UNSUBBED:
/* no-op */
break;
}
} else {
TLB_LOG_EVENT(sub, "!ONESHOT, unsubbing and freeing");
result = tlb_evl_impl_unsubscribe(loop, sub);
tlb_free(loop->alloc, sub);
TLB_LOG_EVENT(sub, "Unsubbing:");
switch ((enum tlb_sub_state)sub->state) {
case TLB_STATE_SUBBED:
TLB_LOG_EVENT(sub, " SUBBED, unsubbing and freeing");
result = tlb_evl_impl_unsubscribe(loop, sub);
tlb_free(loop->alloc, sub);
break;

case TLB_STATE_RUNNING:
TLB_LOG_EVENT(sub, " RUNNING, Setting state");
sub->state = TLB_STATE_UNSUBBED;
break;

case TLB_STATE_UNSUBBED:
/* no-op */
break;
}

return result;
Expand Down
52 changes: 24 additions & 28 deletions source/linux/event_loop_epoll.c
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,11 @@ uint32_t s_events_to_epoll(struct tlb_subscription *sub) {
epoll_events |= EPOLLOUT;
}

/* All subscriptions are "oneshot" subscriptions */
epoll_events |= EPOLLONESHOT;
if (sub->sub_mode & TLB_SUB_EDGE) {
epoll_events |= EPOLLET;
}
if (sub->sub_mode & TLB_SUB_ONESHOT) {
epoll_events |= EPOLLONESHOT;
}

return epoll_events;
}
Expand Down Expand Up @@ -111,7 +110,7 @@ void tlb_evl_impl_timer_init(struct tlb_subscription *sub, int timeout) {

sub->ident.fd = timerfd;
sub->events = TLB_EV_READ;
sub->sub_mode |= TLB_SUB_EDGE | TLB_SUB_ONESHOT;
sub->sub_mode |= TLB_SUB_EDGE;

sub->platform.epoll.close = true;
sub->platform.epoll.on_event = sub->on_event;
Expand All @@ -124,7 +123,7 @@ static void s_timer_on_event(tlb_handle subscription, int events, void *userdata
sub->platform.epoll.on_event(subscription, events, userdata);

/* Clean up after the timer */
sub->oneshot_state = TLB_STATE_UNSUBBED;
sub->state = TLB_STATE_UNSUBBED;
}

/**********************************************************************************************************************
Expand Down Expand Up @@ -167,31 +166,28 @@ int tlb_evl_handle_events(struct tlb_event_loop *loop, size_t budget, int timeou
TLB_LOG_EVENT(sub, "Handling");

/* Cache this off because sub can become invalid during on_event */
const bool is_oneshot = sub->sub_mode & TLB_SUB_ONESHOT;
sub->oneshot_state = TLB_STATE_RUNNING;
sub->state = TLB_STATE_RUNNING;
sub->on_event(sub, s_events_from_epoll(event), sub->userdata);

if (is_oneshot) {
switch ((enum tlb_sub_state)sub->oneshot_state) {
case TLB_STATE_SUBBED:
/* Not possible */
TLB_LOG_EVENT(sub, "In bad state!");
TLB_ASSERT(false);
break;

case TLB_STATE_RUNNING:
/* Resubscribe the event */
s_epoll_change(loop, sub, EPOLL_CTL_MOD);
sub->oneshot_state = TLB_STATE_SUBBED;
TLB_LOG_EVENT(sub, "Set to SUBBED");
break;

case TLB_STATE_UNSUBBED:
/* Force-remove the subscription */
sub->oneshot_state = TLB_STATE_SUBBED;
tlb_evl_remove(loop, sub);
break;
}
switch ((enum tlb_sub_state)sub->state) {
case TLB_STATE_SUBBED:
/* Not possible */
TLB_LOG_EVENT(sub, "In bad state!");
TLB_ASSERT(false);
break;

case TLB_STATE_RUNNING:
/* Resubscribe the event */
s_epoll_change(loop, sub, EPOLL_CTL_MOD);
sub->state = TLB_STATE_SUBBED;
TLB_LOG_EVENT(sub, "Set to SUBBED");
break;

case TLB_STATE_UNSUBBED:
/* Force-remove the subscription */
sub->state = TLB_STATE_SUBBED;
tlb_evl_remove(loop, sub);
break;
}
}

Expand Down
3 changes: 1 addition & 2 deletions source/tlb.c
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ struct tlb *tlb_new(struct tlb_allocator *alloc, struct tlb_options options) {
tlb->options = options;

TLB_CHECK_GOTO(0 ==, tlb_evl_init(&tlb->super_loop, alloc), evl_init_failed);
tlb->super_loop.super_loop = true;

/* Setup the pipe used to stop threads. */
TLB_CHECK_GOTO(0 ==, tlb_pipe_open(&tlb->thread_stop_pipe), pipe_open_failed);
Expand All @@ -52,7 +51,7 @@ struct tlb *tlb_new(struct tlb_allocator *alloc, struct tlb_options options) {
.on_event = s_thread_stop,
.userdata = tlb,
.events = TLB_EV_READ,
.sub_mode = TLB_SUB_EDGE, /* Don't use oneshot here, or rapidly stopping threads will be racey */
.sub_mode = TLB_SUB_EDGE,
.name = "tlb_thread_stop_pipe",
};
tlb_evl_impl_fd_init(&tlb->thread_stop_sub);
Expand Down

0 comments on commit 9135a08

Please sign in to comment.