Skip to content

Commit

Permalink
Merge pull request #148 from redpanda-data/stephan/downstream-234234
Browse files Browse the repository at this point in the history
Downstream steal and TCP buf fixes
  • Loading branch information
StephanDollberg authored Oct 23, 2024
2 parents 4a3406c + 4663e75 commit 7820b86
Show file tree
Hide file tree
Showing 5 changed files with 178 additions and 19 deletions.
16 changes: 13 additions & 3 deletions include/seastar/core/reactor.hh
Original file line number Diff line number Diff line change
Expand Up @@ -330,8 +330,17 @@ private:
const bool _reuseport;
circular_buffer<double> _loads;
double _load = 0;
// Next two fields are required to enforce the monotonicity of total_steal_time()
// see that method for details.

// Last measured accumulated steal time, i.e., the simple difference of accumulated
// awake time and consumed thread CPU time.
sched_clock::duration _last_true_steal{0};
// Accumulated steal time forced to be monotinic by rejecting any updates that would
// decrease it. See total_steal_time() for details.
sched_clock::duration _last_mono_steal{0};
sched_clock::duration _total_idle{0};
sched_clock::duration _total_sleep;
sched_clock::duration _total_sleep{0};
sched_clock::time_point _start_time = now();
std::chrono::nanoseconds _max_poll_time = calculate_poll_time();
output_stream<char>::batch_flush_list_t _flush_batching;
Expand Down Expand Up @@ -411,7 +420,6 @@ private:
task_queue* pop_active_task_queue(sched_clock::time_point now);
void insert_activating_task_queues();
void account_runtime(task_queue& tq, sched_clock::duration runtime);
void account_idle(sched_clock::duration idletime);
void allocate_scheduling_group_specific_data(scheduling_group sg, scheduling_group_key key);
future<> rename_scheduling_group_specific_data(scheduling_group sg);
future<> init_scheduling_group(scheduling_group sg, sstring name, sstring shortname, float shares);
Expand Down Expand Up @@ -588,10 +596,12 @@ public:
[[deprecated("Use this_shard_id")]]
shard_id cpu_id() const;

void sleep();
void try_sleep();

steady_clock_type::duration total_idle_time();
steady_clock_type::duration total_busy_time();
steady_clock_type::duration total_awake_time() const;
std::chrono::nanoseconds total_cpu_time() const;
std::chrono::nanoseconds total_steal_time();

const io_stats& get_io_stats() const { return _io_stats; }
Expand Down
22 changes: 20 additions & 2 deletions include/seastar/net/api.hh
Original file line number Diff line number Diff line change
Expand Up @@ -396,13 +396,31 @@ public:

/// @}

/// Options for creating a listening socket.
///
/// WARNING: these options currently only have an effect when using
/// the POSIX stack: all options are ignored on the native stack as they
/// are not implemented there.
struct listen_options {
bool reuse_address = false;
server_socket::load_balancing_algorithm lba = server_socket::load_balancing_algorithm::default_;
transport proto = transport::TCP;
int listen_backlog = 100;
unsigned fixed_cpu = 0u;
std::optional<file_permissions> unix_domain_socket_permissions;

/// If set, the SO_SNDBUF size will be set to the given value on the listening socket
/// via setsockopt. This buffer size is inherited by the sockets returned by
/// accept and is the preferred way to set the buffer size for these sockets since
/// setting it directly on the already-accepted socket is ineffective (see TCP(7)).
std::optional<int> so_sndbuf;

/// If set, the SO_RCVBUF size will be set to the given value on the listening socket
/// via setsockopt. This buffer size is inherited by the sockets returned by
/// accept and is the preferred way to set the buffer size for these sockets since
/// setting it directly on the already-accepted socket is ineffective (see TCP(7)).
std::optional<int> so_rcvbuf;

void set_fixed_cpu(unsigned cpu) {
lba = server_socket::load_balancing_algorithm::fixed;
fixed_cpu = cpu;
Expand Down Expand Up @@ -457,8 +475,8 @@ public:
return false;
}

/**
* Returns available network interfaces. This represents a
/**
* Returns available network interfaces. This represents a
* snapshot of interfaces available at call time, hence the
* return by value.
*/
Expand Down
71 changes: 58 additions & 13 deletions src/core/reactor.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1028,11 +1028,6 @@ reactor::account_runtime(task_queue& tq, sched_clock::duration runtime) {
tq._runtime += runtime;
}

void
reactor::account_idle(sched_clock::duration runtime) {
// anything to do here?
}

struct reactor::task_queue::indirect_compare {
bool operator()(const task_queue* tq1, const task_queue* tq2) const {
return tq1->_vruntime < tq2->_vruntime;
Expand Down Expand Up @@ -1757,6 +1752,15 @@ reactor::posix_listen(socket_address sa, listen_options opts) {
if (opts.reuse_address) {
fd.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1);
}

if (opts.so_sndbuf) {
fd.setsockopt(SOL_SOCKET, SO_SNDBUF, *opts.so_sndbuf);
}

if (opts.so_rcvbuf) {
fd.setsockopt(SOL_SOCKET, SO_RCVBUF, *opts.so_rcvbuf);
}

if (_reuseport && !sa.is_af_unix())
fd.setsockopt(SOL_SOCKET, SO_REUSEPORT, 1);

Expand Down Expand Up @@ -2709,8 +2713,14 @@ void reactor::register_metrics() {
sm::make_gauge("utilization", [this] { return (1-_load) * 100; }, sm::description("CPU utilization")),
sm::make_counter("cpu_busy_ms", [this] () -> int64_t { return total_busy_time() / 1ms; },
sm::description("Total cpu busy time in milliseconds")),
sm::make_counter("sleep_time_ms_total", [this] () -> int64_t { return _total_sleep / 1ms; },
sm::description("Total reactor sleep time (wall clock)")),
sm::make_counter("awake_time_ms_total", [this] () -> int64_t { return total_awake_time() / 1ms; },
sm::description("Total reactor awake time (wall_clock)")),
sm::make_counter("cpu_used_time_ms", [this] () -> int64_t { return total_cpu_time() / 1ms; },
sm::description("Total reactor thread CPU time (from CLOCK_THREAD_CPUTIME)")),
sm::make_counter("cpu_steal_time_ms", [this] () -> int64_t { return total_steal_time() / 1ms; },
sm::description("Total steal time, the time in which some other process was running while Seastar was not trying to run (not sleeping)."
sm::description("Total steal time, the time in which something else was running while the reactor was runnable (not sleeping)."
"Because this is in userspace, some time that could be legitimally thought as steal time is not accounted as such. For example, if we are sleeping and can wake up but the kernel hasn't woken us up yet.")),
// total_operations value:DERIVE:0:U
sm::make_counter("aio_reads", _io_stats.aio_reads, sm::description("Total aio-reads operations")),
Expand Down Expand Up @@ -3458,7 +3468,6 @@ int reactor::do_run() {
if (check_for_work()) {
if (idle) {
_total_idle += idle_end - idle_start;
account_idle(idle_end - idle_start);
idle_start = idle_end;
idle = false;
}
Expand All @@ -3484,15 +3493,13 @@ int reactor::do_run() {
// Turn off the task quota timer to avoid spurious wakeups
struct itimerspec zero_itimerspec = {};
_task_quota_timer.timerfd_settime(0, zero_itimerspec);
auto start_sleep = now();
_cpu_stall_detector->start_sleep();
_cpu_profiler->stop();
sleep();
try_sleep();
_cpu_profiler->start();
_cpu_stall_detector->end_sleep();
// We may have slept for a while, so freshen idle_end
idle_end = now();
_total_sleep += idle_end - start_sleep;
_task_quota_timer.timerfd_settime(0, task_quote_itimerspec);
}
} else {
Expand All @@ -3511,8 +3518,9 @@ int reactor::do_run() {
return _return;
}


void
reactor::sleep() {
reactor::try_sleep() {
for (auto i = _pollers.begin(); i != _pollers.end(); ++i) {
auto ok = (*i)->try_enter_interrupt_mode();
if (!ok) {
Expand Down Expand Up @@ -4938,6 +4946,14 @@ steady_clock_type::duration reactor::total_busy_time() {
return now() - _start_time - _total_idle;
}

steady_clock_type::duration reactor::total_awake_time() const {
return now() - _start_time - _total_sleep;
}

std::chrono::nanoseconds reactor::total_cpu_time() const {
return thread_cputime_clock::now().time_since_epoch();
}

std::chrono::nanoseconds reactor::total_steal_time() {
// Steal time: this mimics the concept some Hypervisors have about Steal time.
// That is the time in which a VM has something to run, but is not running because some other
Expand All @@ -4951,9 +4967,38 @@ std::chrono::nanoseconds reactor::total_steal_time() {
// process is ready to run but the kernel hasn't scheduled us yet, that would be technically
// steal time but we have no ways to account it.
//
// Furthermore, not all steal is from other processes: time used by the syscall thread and any
// alien threads will show up as steal as well as any time spent in a system call that
// unexpectedly blocked (since CPU time won't tick up when that occurs).
//
// But what we have here should be good enough and at least has a well defined meaning.
return std::chrono::duration_cast<std::chrono::nanoseconds>(now() - _start_time - _total_sleep) -
std::chrono::duration_cast<std::chrono::nanoseconds>(thread_cputime_clock::now().time_since_epoch());
//
// Because we calculate sleep time with timestamps around polling methods that may sleep, like
// io_getevents, we systematically over-count sleep time, since there is CPU usage within the
// period timed as sleep, before and after an actual sleep occurs (and no sleep may occur at all,
// e.g., if there are events immediately available). Over-counting sleep means we under-count the
// wall-clock awake time, and so if there is no "true" steal, we will generally have a small
// *negative* steal time, because we under-count awake wall clock time while thread CPU time does
// not have a corresponding error.
//
// Becuase we claim "steal" is a counter, we must ensure that it never deceases, because PromQL
// functions which use counters will produce non-sensical results if they do. Therefore we clamp
// the output such that it never decreases.
//
// Finally, we don't just clamp difference of awake and CPU time since proces start at 0, but
// take the last value we returned from this function and then calculate the incremental steal
// time since that measurement, clamped to 0. This means that as soon as steal time becomes
// positive, it will be reflected in the measurement, rather than needing to "consume" all the
// accumulated negative steal time before positive steal times start showing up.


auto true_steal = total_awake_time() - total_cpu_time();
auto mono_steal = _last_mono_steal + std::max(true_steal - _last_true_steal, 0ns);

_last_true_steal = true_steal;
_last_mono_steal = mono_steal;

return mono_steal;
}

static std::atomic<unsigned long> s_used_scheduling_group_ids_bitmap{3}; // 0=main, 1=atexit
Expand Down
9 changes: 9 additions & 0 deletions src/core/reactor_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,12 @@ bool reactor_backend_aio::await_events(int timeout, const sigset_t* active_sigma
bool did_work = false;
int r;
do {
const bool may_sleep = !tsp || (tsp->tv_nsec + tsp->tv_sec > 0);
const auto before_getevents = may_sleep ? sched_clock::now() : sched_clock::time_point{};
r = io_pgetevents(_polling_io.io_context, 1, batch_size, batch, tsp, active_sigmask);
if (may_sleep) {
_r._total_sleep += sched_clock::now() - before_getevents;
}
if (r == -1 && errno == EINTR) {
return true;
}
Expand Down Expand Up @@ -855,7 +860,9 @@ reactor_backend_epoll::wait_and_process(int timeout, const sigset_t* active_sigm
}
});
std::array<epoll_event, 128> eevt;
const auto before_pwait = sched_clock::now();
int nr = ::epoll_pwait(_epollfd.get(), eevt.data(), eevt.size(), timeout, active_sigmask);
_r._total_sleep += sched_clock::now() - before_pwait;
if (nr == -1 && errno == EINTR) {
return false; // gdb can cause this
}
Expand Down Expand Up @@ -1585,7 +1592,9 @@ class reactor_backend_uring final : public reactor_backend {
}
struct ::io_uring_cqe* cqe = nullptr;
sigset_t sigs = *active_sigmask; // io_uring_wait_cqes() wants non-const
const auto before_wait_cqes = sched_clock::now();
auto r = ::io_uring_wait_cqes(&_uring, &cqe, 1, nullptr, &sigs);
_r._total_sleep += sched_clock::now() - before_wait_cqes;
if (__builtin_expect(r < 0, false)) {
switch (-r) {
case EINTR:
Expand Down
79 changes: 78 additions & 1 deletion tests/unit/socket_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,17 @@
#include <seastar/util/std-compat.hh>
#include <seastar/util/later.hh>
#include <seastar/testing/test_case.hh>
#include <seastar/testing/thread_test_case.hh>
#include <seastar/core/abort_source.hh>
#include <seastar/core/sleep.hh>
#include <seastar/core/thread.hh>
#include <seastar/core/when_all.hh>

#include <seastar/net/api.hh>
#include <seastar/net/posix-stack.hh>

#include <optional>
#include <tuple>

using namespace seastar;

future<> handle_connection(connected_socket s) {
Expand Down Expand Up @@ -224,3 +228,76 @@ SEASTAR_TEST_CASE(socket_on_close_local_shutdown_test) {
when_all(std::move(client), std::move(server)).discard_result().get();
});
}

SEASTAR_THREAD_TEST_CASE(socket_bufsize) {

// Test that setting the send and recv buffer sizes on the listening
// socket is propagated to the socket returned by accept().

auto buf_size = [](std::optional<int> snd_size, std::optional<int> rcv_size) {
listen_options lo{
.reuse_address = true,
.lba = server_socket::load_balancing_algorithm::fixed,
.so_sndbuf = snd_size,
.so_rcvbuf = rcv_size
};

ipv4_addr addr("127.0.0.1", 1234);
server_socket ss = seastar::listen(addr, lo);
connected_socket client = connect(addr).get();
connected_socket server = ss.accept().get().connection;

auto sockopt = [&](int option) {
int val{};
int ret = server.get_sockopt(SOL_SOCKET, option, &val, sizeof(val));
BOOST_REQUIRE_EQUAL(ret, 0);
return val;
};

int send = sockopt(SO_SNDBUF);
int recv = sockopt(SO_RCVBUF);

ss.abort_accept();
client.shutdown_output();
server.shutdown_output();


return std::make_tuple(send, recv);
};

constexpr int small_size = 8192, big_size = 128 * 1024;

// we pass different sizes for send and recv to catch any copy/paste
// style bugs
auto [send_small, recv_small] = buf_size(small_size, small_size * 2);
auto [send_big, recv_big] = buf_size(big_size, big_size * 2);

// Setting socket buffer sizes isn't an exact science: the kernel does
// some rounding, and also (currently) doubles the requested size and
// also applies so limits. So as a basic check, assert simply that the
// explicit small buffer ends up smaller than the explicit big buffer,
// and that both results are at least as large as the requested amount.
// The latter condition could plausibly fail if the OS clamped the size
// at a small amount, but this is unlikely for the chosen buffer sizes.

BOOST_CHECK_LT(send_small, send_big);
BOOST_CHECK_LT(recv_small, recv_big);

BOOST_CHECK_GE(send_small, small_size);
BOOST_CHECK_GE(send_big, big_size);

BOOST_CHECK_GE(recv_small, small_size * 2);
BOOST_CHECK_GE(recv_big, big_size * 2);

// not much to check here with "default" sizes, but let's at least call it
// and check that we get a reasonable answer
auto [send_default, recv_default] = buf_size({}, {});

BOOST_CHECK_GE(send_default, 4096);
BOOST_CHECK_GE(recv_default, 4096);

// we don't really know the default socket size and it can vary by kernel
// config, but 20 MB should be enough for everyone.
BOOST_CHECK_LT(send_default, 20'000'000);
BOOST_CHECK_LT(recv_default, 20'000'000);
}

0 comments on commit 7820b86

Please sign in to comment.