Skip to content

Commit

Permalink
Fixes race conditions in HubDeviceSelect.
Browse files Browse the repository at this point in the history
These were problematic when the shutdown/close happened on a different thread
than the executor. Caused flakey tests.
  • Loading branch information
balazsracz committed Aug 16, 2024
1 parent c387390 commit 616e307
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 22 deletions.
67 changes: 47 additions & 20 deletions src/utils/HubDeviceSelect.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -173,8 +173,8 @@ public:
, dst_(dst)
, skipMember_(skip_member)
{
this->start_flow(STATE(allocate_buffer));
set_limit_input(shouldThrottle_);
this->start_flow(STATE(allocate_buffer));
}

void set_limit_input(bool should_throttle)
Expand Down Expand Up @@ -243,9 +243,9 @@ public:
{
/// Error reading the socket.
b_->unref();
notify_barrier();
set_terminated();
device()->report_read_error();
notify_barrier();
return exit();
}
SelectBufferInfo<buffer_type>::check_target_size(
Expand Down Expand Up @@ -313,6 +313,7 @@ public:
on_error ? on_error : EmptyNotifiable::DefaultInstance());
barrier_.new_child();
hub_->register_port(write_port());
isRegistered_ = true;
}
#endif

Expand Down Expand Up @@ -340,29 +341,30 @@ public:
::fcntl(fd, F_SETFL, O_RDWR | O_NONBLOCK);
#endif
hub_->register_port(write_port());
isRegistered_ = true;
}

/// If the barrier has not been called yet, will notify it inline.
virtual ~HubDeviceSelect()
{
if (fd_ >= 0) {
unregister_write_port();
int fd = -1;
executor()->sync_run([this, &fd]()
close_fd();
executor()->sync_run([this]()
{
fd = fd_;
fd_ = -1;
readFlow_.shutdown();
writeFlow_.shutdown();
});
::close(fd);
bool completed = false;
while (!completed) {
executor()->sync_run([this, &completed]()
{
if (barrier_.is_done()) completed = true;
});
}
}
bool completed = false;
while (!completed)
{
executor()->sync_run([this, &completed]() {
if (barrier_.is_done())
{
completed = true;
}
});
}
}

Expand All @@ -383,6 +385,14 @@ public:
{
LOG(VERBOSE, "HubDeviceSelect::unregister write port %p %p",
write_port(), &writeFlow_);
{
AtomicHolder h(this);
if (!isRegistered_)
{
return;
}
isRegistered_ = false;
}
hub_->unregister_port(&writeFlow_);
/* We put an empty message at the end of the queue. This will cause
* wait until all pending messages are dealt with, and then ping the
Expand Down Expand Up @@ -473,10 +483,7 @@ protected:
{
readFlow_.shutdown();
unregister_write_port();
if (fd_ >= 0) {
::close(fd_);
fd_ = -1;
}
close_fd();
}

/** Callback from the ReadFlow when the read call has seen an error. The
Expand All @@ -485,10 +492,27 @@ protected:
void report_read_error() override
{
unregister_write_port();
if (fd_ >= 0) {
::close(fd_);
close_fd();
}

void close_fd() {
int fd = -1;
{
AtomicHolder h(this);
fd = fd_;
if (fd < 0) {
return;
}
fd_ = -1;
}
// This is a workaround that sometimes my linux kernel gets stuck in
// ::read when I closed the fd like this, even though the fd is
// O_NONBLOCK.
executor()->add(new CallbackExecutable([this, fd]() {
::close(fd);
readFlow_.shutdown();
writeFlow_.shutdown();
}));
}

/// Hub whose data we are trying to send.
Expand All @@ -498,6 +522,9 @@ protected:
/// StateFlow for writing data to the fd. Woken by data to send or the fd
/// being writeable.
WriteFlow writeFlow_;
/// True when the write flow is registered in the hub. Used to synchronize
/// different and concurrent shutdown paths. Protected by Atomic this.
bool isRegistered_;
};

#endif // FEATURE_EXECUTOR_SELECT
Expand Down
4 changes: 2 additions & 2 deletions src/utils/test_main.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ Service g_service(&g_executor);
* the last command in a TEST_F. */
void wait_for_main_executor()
{
ExecutorGuard guard(&g_executor);
guard.wait_for_notification();
std::unique_ptr<ExecutorGuard> guard(new ExecutorGuard(&g_executor));
guard->wait_for_notification();
}


Expand Down

0 comments on commit 616e307

Please sign in to comment.