diff --git a/crates/corro-agent/src/agent/handlers.rs b/crates/corro-agent/src/agent/handlers.rs index 4c946568..6f2ee94c 100644 --- a/crates/corro-agent/src/agent/handlers.rs +++ b/crates/corro-agent/src/agent/handlers.rs @@ -108,7 +108,7 @@ pub fn spawn_incoming_connection_handlers( // Spawn handler tasks for this connection spawn_foca_handler(&agent, &tripwire, &conn); - uni::spawn_unipayload_handler(&tripwire, &conn, agent.clone()); + uni::spawn_unipayload_handler(agent.clone(), bookie.clone(), &tripwire, &conn); bi::spawn_bipayload_handler(&agent, &bookie, &tripwire, &conn); }); } diff --git a/crates/corro-agent/src/agent/uni.rs b/crates/corro-agent/src/agent/uni.rs index 6e9ec874..76886952 100644 --- a/crates/corro-agent/src/agent/uni.rs +++ b/crates/corro-agent/src/agent/uni.rs @@ -1,5 +1,6 @@ +use crate::agent::util::process_multiple_changes; use corro_types::{ - agent::Agent, + agent::{Agent, Bookie}, broadcast::{BroadcastV1, ChangeSource, UniPayload, UniPayloadV1}, }; use metrics::counter; @@ -11,7 +12,12 @@ use tripwire::Tripwire; /// Spawn a task that accepts unidirectional broadcast streams, then /// spawns another task for each incoming stream to handle. -pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, agent: Agent) { +pub fn spawn_unipayload_handler( + agent: Agent, + bookie: Bookie, + tripwire: &Tripwire, + conn: &quinn::Connection, +) { tokio::spawn({ let conn = conn.clone(); let mut tripwire = tripwire.clone(); @@ -40,6 +46,7 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a tokio::spawn({ let agent = agent.clone(); + let bookie = bookie.clone(); async move { let mut framed = FramedRead::new(rx, LengthDelimitedCodec::new()); @@ -59,19 +66,43 @@ pub fn spawn_unipayload_handler(tripwire: &Tripwire, conn: &quinn::Connection, a change, )), cluster_id, + priority, } => { if cluster_id != agent.cluster_id() { continue; } - if let Err(e) = agent - .tx_changes() - .send((change, ChangeSource::Broadcast)) - .await - { - error!( - "could not send change for processing: {e}" - ); - return; + + if priority { + let agent = agent.clone(); + let bookie = bookie.clone(); + + tokio::spawn(async move { + if let Err(e) = + process_multiple_changes( + agent, + bookie, + vec![( + change, + ChangeSource::Broadcast, + std::time::Instant::now(), + )], + ) + .await + { + error!("Process priority change failed: {:?}", e); + } + }); + } else { + if let Err(e) = agent + .tx_changes() + .send((change, ChangeSource::Broadcast)) + .await + { + error!( + "could not send change for processing: {e}" + ); + return; + } } } } diff --git a/crates/corro-agent/src/agent/util.rs b/crates/corro-agent/src/agent/util.rs index a1960819..21b979e5 100644 --- a/crates/corro-agent/src/agent/util.rs +++ b/crates/corro-agent/src/agent/util.rs @@ -131,6 +131,10 @@ pub async fn initialise_foca(agent: &Agent) { } /// Prune the database +// FIXME: we remove this function from running while debugging a +// performance incident. We still need to clean up deleted versions +// from the database, so this is only a temporary fix. +#[allow(unused)] pub async fn clear_overwritten_versions(agent: Agent, bookie: Bookie) { let pool = agent.pool(); diff --git a/crates/corro-agent/src/broadcast/mod.rs b/crates/corro-agent/src/broadcast/mod.rs index ba82cc2a..424fe0c0 100644 --- a/crates/corro-agent/src/broadcast/mod.rs +++ b/crates/corro-agent/src/broadcast/mod.rs @@ -463,6 +463,7 @@ pub fn runtime_loop( if let Err(e) = (UniPayload::V1 { data: UniPayloadV1::Broadcast(bcast.clone()), cluster_id: agent.cluster_id(), + priority: is_local, }) .write_to_stream((&mut ser_buf).writer()) { diff --git a/crates/corro-types/src/broadcast.rs b/crates/corro-types/src/broadcast.rs index d1eac889..6895cc6f 100644 --- a/crates/corro-types/src/broadcast.rs +++ b/crates/corro-types/src/broadcast.rs @@ -33,6 +33,8 @@ pub enum UniPayload { data: UniPayloadV1, #[speedy(default_on_eof)] cluster_id: ClusterId, + #[speedy(default_on_eof)] + priority: bool, }, }