diff --git a/relay-server/src/services/buffer/mod.rs b/relay-server/src/services/buffer/mod.rs index cc57126c20..f316de0492 100644 --- a/relay-server/src/services/buffer/mod.rs +++ b/relay-server/src/services/buffer/mod.rs @@ -155,13 +155,14 @@ impl EnvelopeBufferService { async fn ready_to_pop( &mut self, buffer: &PolymorphicEnvelopeBuffer, + dequeue: bool, ) -> Option> { relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, status = "checking" ); - self.system_ready(buffer).await; + self.system_ready(buffer, dequeue).await; relay_statsd::metric!( counter(RelayCounters::BufferReadyToPop) += 1, @@ -192,7 +193,7 @@ impl EnvelopeBufferService { /// - We should not pop from disk into memory when relay's overall memory capacity /// has been reached. /// - We need a valid global config to unspool. - async fn system_ready(&self, buffer: &PolymorphicEnvelopeBuffer) { + async fn system_ready(&self, buffer: &PolymorphicEnvelopeBuffer, dequeue: bool) { loop { // We should not unspool from external storage if memory capacity has been reached. // But if buffer storage is in memory, unspooling can reduce memory usage. @@ -200,7 +201,7 @@ impl EnvelopeBufferService { !buffer.is_external() || self.memory_checker.check_memory().has_capacity(); let global_config_ready = self.global_config_rx.borrow().is_ready(); - if memory_ready && global_config_ready { + if memory_ready && global_config_ready && dequeue { return; } tokio::time::sleep(DEFAULT_SLEEP).await; @@ -376,6 +377,9 @@ impl Service for EnvelopeBufferService { let mut global_config_rx = self.global_config_rx.clone(); let services = self.services.clone(); + let dequeue = Arc::::new(true.into()); + let dequeue1 = dequeue.clone(); + tokio::spawn(async move { let buffer = PolymorphicEnvelopeBuffer::from_config(&config, memory_checker).await; @@ -412,7 +416,7 @@ impl Service for EnvelopeBufferService { // On the one hand, we might want to prioritize dequeuing over enqueuing // so we do not exceed the buffer capacity by starving the dequeue. // on the other hand, prioritizing old messages violates the LIFO design. - Some(permit) = self.ready_to_pop(&buffer) => { + Some(permit) = self.ready_to_pop(&buffer, dequeue.load(Ordering::Relaxed)) => { match Self::try_pop(&config, &mut buffer, &services, permit).await { Ok(new_sleep) => { sleep = new_sleep; @@ -449,6 +453,19 @@ impl Service for EnvelopeBufferService { relay_log::info!("EnvelopeBufferService: stopping"); }); + + #[cfg(unix)] + tokio::spawn(async move { + use tokio::signal::unix::{signal, SignalKind}; + let Ok(mut signal) = signal(SignalKind::user_defined1()) else { + return; + }; + while let Some(()) = signal.recv().await { + let deq = !dequeue1.load(Ordering::Relaxed); + dequeue1.store(deq, Ordering::Relaxed); + relay_log::info!("SIGUSR1 receive, dequeue={}", deq); + } + }); } }