Skip to content

Commit

Permalink
Fix
Browse files Browse the repository at this point in the history
  • Loading branch information
iambriccardo committed Dec 23, 2024
1 parent 04c0631 commit e81db18
Showing 1 changed file with 47 additions and 86 deletions.
133 changes: 47 additions & 86 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,7 @@ impl Service for EnvelopeBufferService {

#[cfg(test)]
mod tests {
use crate::services::projects::project::ProjectState;
use crate::services::projects::project::{ProjectInfo, ProjectState};
use crate::testutils::new_envelope;
use crate::MemoryStat;
use chrono::Utc;
Expand Down Expand Up @@ -747,8 +747,10 @@ mod tests {

let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
let project_info = Arc::new(ProjectInfo::default());
project_cache_handle
.test_set_project_state(project_key, ProjectState::Enabled(project_info));
addr.send(EnvelopeBuffer::Push(envelope.clone()));
project_cache_handle.test_set_project_state(project_key, ProjectState::Disabled);

tokio::time::sleep(Duration::from_millis(1000)).await;

Expand All @@ -763,6 +765,40 @@ mod tests {
assert_eq!(envelope_processor_rx.len(), 1);
}

#[tokio::test(start_paused = true)]
async fn pop_with_pending_project() {
let EnvelopeBufferServiceResult {
service,
global_tx: _global_tx,
envelope_processor_rx,
project_cache_handle,
outcome_aggregator_rx: _outcome_aggregator_rx,
..
} = envelope_buffer_service(
None,
global_config::Status::Ready(Arc::new(GlobalConfig::default())),
);

let addr = service.start_detached();

let envelope = new_envelope(false, "foo");
let project_key = envelope.meta().public_key();
project_cache_handle.test_set_project_state(project_key, ProjectState::Pending);
addr.send(EnvelopeBuffer::Push(envelope.clone()));

tokio::time::sleep(Duration::from_millis(1000)).await;

assert_eq!(envelope_processor_rx.len(), 0);

let project_info = Arc::new(ProjectInfo::default());
project_cache_handle
.test_set_project_state(project_key, ProjectState::Enabled(project_info));

tokio::time::sleep(Duration::from_millis(1000)).await;

assert_eq!(envelope_processor_rx.len(), 1);
}

#[tokio::test(start_paused = true)]
async fn pop_requires_memory_capacity() {
let EnvelopeBufferServiceResult {
Expand Down Expand Up @@ -838,90 +874,6 @@ mod tests {
assert_eq!(outcome.quantity, 1);
}

// #[tokio::test(start_paused = true)]
// async fn test_update_project() {
// let EnvelopeBufferServiceResult {
// service,
// mut envelopes_rx,
// project_cache_handle,
// global_tx: _global_tx,
// outcome_aggregator_rx: _outcome_aggregator_rx,
// } = envelope_buffer_service(
// None,
// global_config::Status::Ready(Arc::new(GlobalConfig::default())),
// );
//
// let addr = service.start_detached();
//
// let envelope = new_envelope(false, "foo");
// let project_key = envelope.meta().public_key();
//
// tokio::time::sleep(Duration::from_secs(1)).await;
//
// addr.send(EnvelopeBuffer::Push(envelope.clone()));
// tokio::time::sleep(Duration::from_secs(3)).await;
//
// let legacy::DequeuedEnvelope(envelope) = envelopes_rx.recv().await.unwrap();
//
// addr.send(EnvelopeBuffer::NotReady(project_key, envelope));
//
// tokio::time::sleep(Duration::from_millis(200)).await;
// assert_eq!(project_cache_handle.test_num_fetches(), 2);
//
// tokio::time::sleep(Duration::from_millis(1300)).await;
// assert_eq!(project_cache_handle.test_num_fetches(), 3);
// }

// #[tokio::test(start_paused = true)]
// async fn output_is_throttled() {
// let EnvelopeBufferServiceResult {
// service,
// mut envelope_processor_rx,
// project_cache_handle,
// global_tx: _global_tx,
// outcome_aggregator_rx: _outcome_aggregator_rx,
// ..
// } = envelope_buffer_service(
// None,
// global_config::Status::Ready(Arc::new(GlobalConfig::default())),
// );
//
// let addr = service.start_detached();
//
// let envelope = new_envelope(false, "foo");
// let project_key = envelope.meta().public_key();
// for _ in 0..10 {
// addr.send(EnvelopeBuffer::Push(envelope.clone()));
// }
// project_cache_handle.test_set_project_state(project_key, ProjectState::Disabled);
//
// tokio::time::sleep(Duration::from_millis(100)).await;
//
// let mut messages = vec![];
// envelopes_rx.recv_many(&mut messages, 100).await;
//
// assert_eq!(
// messages
// .iter()
// .filter(|message| matches!(message, legacy::DequeuedEnvelope(..)))
// .count(),
// 5
// );
//
// tokio::time::sleep(Duration::from_millis(100)).await;
//
// let mut messages = vec![];
// envelopes_rx.recv_many(&mut messages, 100).await;
//
// assert_eq!(
// messages
// .iter()
// .filter(|message| matches!(message, legacy::DequeuedEnvelope(..)))
// .count(),
// 5
// );
// }

#[tokio::test(start_paused = true)]
async fn test_partitioned_buffer() {
let mut runner = ServiceRunner::new();
Expand Down Expand Up @@ -969,7 +921,16 @@ mod tests {

// Create two envelopes with different project keys
let envelope1 = new_envelope(false, "foo");
let project_key = envelope1.meta().public_key();
let project_info = Arc::new(ProjectInfo::default());
project_cache_handle
.test_set_project_state(project_key, ProjectState::Enabled(project_info));

let envelope2 = new_envelope(false, "bar");
let project_key = envelope2.meta().public_key();
let project_info = Arc::new(ProjectInfo::default());
project_cache_handle
.test_set_project_state(project_key, ProjectState::Enabled(project_info));

// Send envelopes to their respective buffers
let buffer1 = &partitioned.buffers[0];
Expand Down

0 comments on commit e81db18

Please sign in to comment.