Skip to content

Commit

Permalink
Revert "tokio_timerfd::Interval"
Browse files Browse the repository at this point in the history
This reverts commit 12124b2.
  • Loading branch information
problame committed Nov 20, 2024
1 parent 12124b2 commit f9bf038
Show file tree
Hide file tree
Showing 5 changed files with 43 additions and 48 deletions.
23 changes: 0 additions & 23 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,6 @@ tokio-postgres-rustls = "0.12.0"
tokio-rustls = { version = "0.26.0", default-features = false, features = ["tls12", "ring"]}
tokio-stream = "0.1"
tokio-tar = "0.3"
tokio-timerfd = "0.2.0"
tokio-util = { version = "0.7.10", features = ["io", "rt"] }
toml = "0.8"
toml_edit = "0.22"
Expand Down
1 change: 0 additions & 1 deletion pageserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ tokio-postgres.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
toml_edit = { workspace = true, features = [ "serde" ] }
tokio-timerfd.workspace = true
tracing.workspace = true
url.workspace = true
walkdir.workspace = true
Expand Down
62 changes: 42 additions & 20 deletions pageserver/src/page_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use anyhow::{bail, Context};
use async_compression::tokio::write::GzipEncoder;
use bytes::Buf;
use futures::{FutureExt, StreamExt};
use futures::FutureExt;
use itertools::Itertools;
use once_cell::sync::OnceCell;
use pageserver_api::models::{self, TenantState};
Expand Down Expand Up @@ -316,8 +316,6 @@ struct PageServerHandler {

/// See [`PageServerConf::server_side_batch_timeout`]
server_side_batch_timeout: Option<Duration>,

server_side_batch_interval: Option<tokio_timerfd::Interval>,
}

struct Carry {
Expand Down Expand Up @@ -587,11 +585,6 @@ impl PageServerHandler {
timeline_handles: TimelineHandles::new(tenant_manager),
cancel,
server_side_batch_timeout,
server_side_batch_interval: server_side_batch_timeout.map(|timeout| {
// The timerfd missed tick behavior is equivalent to
// tokio::time::MissedTickBehavior::Delay
tokio_timerfd::Interval::new_interval(timeout).expect("TODO")
}),
}
}

Expand Down Expand Up @@ -633,22 +626,51 @@ impl PageServerHandler {
{
debug_assert_current_span_has_tenant_and_timeline_id_no_shard_id();

let mut batching_deadline_storage = None; // TODO: can this just be an unsync once_cell?

loop {
// Create a future that will become ready when we need to stop batching.
use futures::future::Either;
let batching_deadline = match &*maybe_carry as &Option<Carry> {
None => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
Some(carry) => match &mut self.server_side_batch_interval {
None => {
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]))
let batching_deadline = match (
&*maybe_carry as &Option<Carry>,
&mut batching_deadline_storage,
) {
(None, None) => Either::Left(futures::future::pending()), // there's no deadline before we have something batched
(None, Some(_)) => unreachable!(),
(Some(_), Some(fut)) => Either::Right(fut), // below arm already ran
(Some(carry), None) => {
match self.server_side_batch_timeout {
None => {
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]))
}
Some(batch_timeout) => {
// Take into consideration the time the carry spent waiting.
let batch_timeout =
batch_timeout.saturating_sub(carry.started_at.elapsed());
if batch_timeout.is_zero() {
// the timer doesn't support restarting with zero duration
return Ok(BatchOrEof::Batch(smallvec::smallvec![
maybe_carry
.take()
.expect("we already checked it's Some")
.msg
]));
} else {
batching_deadline_storage = Some(Box::pin(async move {
tokio::time::sleep(batch_timeout).await;
}));
Either::Right(
batching_deadline_storage.as_mut().expect("we just set it"),
)
}
}
}
Some(interval) => Either::Right(interval.next()),
},
}
};
let msg = tokio::select! {
biased;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from fixtures.neon_fixtures import NeonEnvBuilder
from fixtures.utils import humantime_to_ms

TARGET_RUNTIME = 5
TARGET_RUNTIME = 60


@pytest.mark.parametrize(
Expand All @@ -18,12 +18,10 @@
# the next 4 cases demonstrate how not-batchable workloads suffer from batching timeout
(50, None, TARGET_RUNTIME, 1, 128, "not batchable no batching"),
(50, "10us", TARGET_RUNTIME, 1, 128, "not batchable 10us timeout"),
(50, "20us", TARGET_RUNTIME, 1, 128, "not batchable 20us timeout"),
(50, "1ms", TARGET_RUNTIME, 1, 128, "not batchable 1ms timeout"),
# the next 4 cases demonstrate how batchable workloads benefit from batching
(50, None, TARGET_RUNTIME, 100, 128, "batchable no batching"),
(50, "10us", TARGET_RUNTIME, 100, 128, "batchable 10us timeout"),
(50, "20us", TARGET_RUNTIME, 100, 128, "batchable 20us timeout"),
(50, "100us", TARGET_RUNTIME, 100, 128, "batchable 100us timeout"),
(50, "1ms", TARGET_RUNTIME, 100, 128, "batchable 1ms timeout"),
],
Expand Down

0 comments on commit f9bf038

Please sign in to comment.