Skip to content

Commit

Permalink
feat(buffer): Disable dequeueing with user signal (#4070)
Browse files Browse the repository at this point in the history
Add a temporary signal handler for testing purposes that allows pausing
all dequeueing from the envelope buffer.
  • Loading branch information
jjbayer authored Sep 24, 2024
1 parent 0643a13 commit 50791c6
Showing 1 changed file with 21 additions and 4 deletions.
25 changes: 21 additions & 4 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,13 +155,14 @@ impl EnvelopeBufferService {
async fn ready_to_pop(
&mut self,
buffer: &PolymorphicEnvelopeBuffer,
dequeue: bool,
) -> Option<Permit<DequeuedEnvelope>> {
relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
status = "checking"
);

self.system_ready(buffer).await;
self.system_ready(buffer, dequeue).await;

relay_statsd::metric!(
counter(RelayCounters::BufferReadyToPop) += 1,
Expand Down Expand Up @@ -192,15 +193,15 @@ impl EnvelopeBufferService {
/// - We should not pop from disk into memory when relay's overall memory capacity
/// has been reached.
/// - We need a valid global config to unspool.
async fn system_ready(&self, buffer: &PolymorphicEnvelopeBuffer) {
async fn system_ready(&self, buffer: &PolymorphicEnvelopeBuffer, dequeue: bool) {
loop {
// We should not unspool from external storage if memory capacity has been reached.
// But if buffer storage is in memory, unspooling can reduce memory usage.
let memory_ready =
!buffer.is_external() || self.memory_checker.check_memory().has_capacity();
let global_config_ready = self.global_config_rx.borrow().is_ready();

if memory_ready && global_config_ready {
if memory_ready && global_config_ready && dequeue {
return;
}
tokio::time::sleep(DEFAULT_SLEEP).await;
Expand Down Expand Up @@ -376,6 +377,9 @@ impl Service for EnvelopeBufferService {
let mut global_config_rx = self.global_config_rx.clone();
let services = self.services.clone();

let dequeue = Arc::<AtomicBool>::new(true.into());
let dequeue1 = dequeue.clone();

tokio::spawn(async move {
let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await;

Expand Down Expand Up @@ -412,7 +416,7 @@ impl Service for EnvelopeBufferService {
// On the one hand, we might want to prioritize dequeuing over enqueuing
// so we do not exceed the buffer capacity by starving the dequeue.
// on the other hand, prioritizing old messages violates the LIFO design.
Some(permit) = self.ready_to_pop(&buffer) => {
Some(permit) = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => {
match Self::try_pop(&config, &mut buffer, &services, permit).await {
Ok(new_sleep) => {
sleep = new_sleep;
Expand Down Expand Up @@ -449,6 +453,19 @@ impl Service for EnvelopeBufferService {

relay_log::info!("EnvelopeBufferService: stopping");
});

#[cfg(unix)]
tokio::spawn(async move {
use tokio::signal::unix::{signal, SignalKind};
let Ok(mut signal) = signal(SignalKind::user_defined1()) else {
return;
};
while let Some(()) = signal.recv().await {
let deq = !dequeue1.load(Ordering::Relaxed);
dequeue1.store(deq, Ordering::Relaxed);
relay_log::info!("SIGUSR1 receive, dequeue={}", deq);
}
});
}
}

Expand Down

0 comments on commit 50791c6

Please sign in to comment.