diff --git a/doc/manual/source/manual-page.rst b/doc/manual/source/manual-page.rst index ecc6b7a..ab447da 100644 --- a/doc/manual/source/manual-page.rst +++ b/doc/manual/source/manual-page.rst @@ -239,6 +239,16 @@ random A boolean value specifying whether the unit should pick a source unit at random. If the value is ``false`` or not given, the source units are picked in the order given. + +Merge Unit +---------- + +A unit of type ``"merge"`` will merge the data from all data sets of its +source units. It has the following configuration options: + +sources + A list of strings each containing the name of a unit to use as a + source. SLURM Unit diff --git a/etc/rtrtr.conf b/etc/rtrtr.conf index ac008a4..7df4c91 100644 --- a/etc/rtrtr.conf +++ b/etc/rtrtr.conf @@ -94,6 +94,12 @@ sources = [ "local-3323", "local-3324", "cloudflare-json" ] random = false +# We can also merge the data from multiple sources +[units.merged-rtr] +type = "merge" +sources = [ "local-3323", "local-3324", "cloudflare-json" ] + + # Local exceptions can be applied, too. [units.slurm] type = "slurm" @@ -115,6 +121,14 @@ listen = [ "127.0.0.1:9001" ] unit = "any-rtr" +# Let’s have a target for the merged data, too. +[targets.local-9002] +type = "rtr" +listen = [ "127.0.0.1:9002" ] +unit = "merged-rtr" + + +# We can also provide the data as JSON over HTTP. [targets.http-json] type = "http" path = "/json" diff --git a/src/payload.rs b/src/payload.rs index 03cc5ef..685347b 100644 --- a/src/payload.rs +++ b/src/payload.rs @@ -47,7 +47,7 @@ //! base type yet returns references to the items. For now, these need to //! separate because the `Iterator` trait requires the returned items to have //! the same lifetime as the iterator type itself. -use std::slice; +use std::{mem, slice}; use std::borrow::Borrow; use std::cmp::Ordering; use std::collections::HashSet; @@ -254,6 +254,19 @@ impl Block { } } + /// Splits off the part of the block before the given pack index. + /// + /// Moves the start of this block to the given index and returns a block + /// from the original start to the new start. + fn split_off_at(&mut self, pack_index: usize) -> Block { + assert!(pack_index >= self.range.start); + assert!(pack_index <= self.range.end); + let mut res = self.clone(); + res.range.end = pack_index; + self.range.start = res.range.end; + res + } + /// Returns an owned iterator-like for the block. pub fn owned_iter(&self) -> OwnedBlockIter { OwnedBlockIter::new(self.clone()) @@ -417,15 +430,6 @@ impl Set { OwnedSetIter::new(self) } - /// Returns a set which has this set and the other set merged. - /// - /// The two sets may overlap. - pub fn merge(&self, other: &Set) -> Set { - let mut res = self.to_builder(); - res.insert_set(other.clone()); - res.finalize() - } - /// Returns a set with the indicated elements removed. /// /// Each element in the current set is presented to the closure and only @@ -478,6 +482,108 @@ impl Set { } } + /// Returns a set merging the elements from this and another set. + pub fn merge(&self, other: &Set) -> Set { + let mut left_tail = self.blocks.iter().cloned(); + let mut right_tail = other.blocks.iter().cloned(); + let mut left_head = left_tail.next(); + let mut right_head = right_tail.next(); + let mut target = Vec::new(); + let mut target_len = 0; + + // Merge potentially overlapping blocks. + loop { + // Skip over empty blocks. If either side runs out of block, we + // are done with this difficult part. + let left = loop { + match left_head.as_mut() { + Some(block) if block.is_empty() => { } + Some(block) => break Some(block), + None => break None, + } + left_head = left_tail.next(); + }; + let right = loop { + match right_head.as_mut() { + Some(block) if block.is_empty() => { } + Some(block) => break Some(block), + None => break None, + } + right_head = right_tail.next(); + }; + let (left, right) = match (left, right) { + (Some(left), Some(right)) => (left, right), + _ => break, + }; + + // Make left the block that starts first. Since neither block is + // empty, we can unwrap. + if right.first().unwrap() < left.first().unwrap() { + mem::swap(left, right); + } + + // Find out how much of left we can add. + // + // First, find the part of left that is before right. + let first_right = right.first().unwrap(); + let mut left_idx = left.range.start; + while let Some(item) = left.get_from_pack(left_idx) { + if item >= first_right { + break; + } + left_idx += 1; + } + + // Now progress left_idx as long as elements are equal with right. + let mut right_idx = right.range.start; + while let (Some(left_item), Some(right_item)) = ( + left.get_from_pack(left_idx), right.get_from_pack(right_idx) + ) { + if left_item == right_item { + left_idx += 1; + right_idx += 1; + } + else { + break + } + } + + // left_idx now is the end of the range in left we need to add to + // the target. + let new = left.split_off_at(left_idx); + target_len += new.len(); + target.push(new); + + // Finally, right to its new start. + right.range.start = right_idx; + } + + // At least one of the two iterators is now exhausted. So we can now + // just push whatever is left on either to the target. Don’t forget + // the heads, though, only one of which at most should not be empty. + if let Some(block) = left_head { + if !block.is_empty() { + target_len += block.len(); + target.push(block); + } + } + if let Some(block) = right_head { + if !block.is_empty() { + target_len += block.len(); + target.push(block); + } + } + for block in left_tail.chain(right_tail) { + target_len += block.len(); + target.push(block) + } + + Set { + blocks: target.into(), + len: target_len + } + } + /// Returns the diff to get from `other` to `self`. pub fn diff_from(&self, other: &Set) -> Diff { let mut diff = DiffBuilder::empty(); @@ -1188,31 +1294,33 @@ pub(crate) mod testrig { ) } - /// Create a pack of payload from a slice of `u32`s. - pub fn pack(values: &[u32]) -> Pack { + /// Create a pack of payload from an array of `u32`s. + pub fn pack(values: [u32; N]) -> Pack { Pack { items: - values.iter().cloned().map(p).collect::>().into() + values.into_iter().map(p).collect::>().into() } } - /// Creates a set from a vec of blocks. - pub fn set(values: Vec) -> Set { - let len = values.iter().map(|item| item.len()).sum(); - Set { - blocks: Arc::from(values.into_boxed_slice()), - len - } - } - - /// Create a block of payload from a slice of `u32`s. - pub fn block(values: &[u32], range: Range) -> Block { + /// Create a block of payload from an array of `u32`s. + pub fn block( + values: [u32; N], range: Range + ) -> Block { Block { pack: pack(values), range } } + /// Create a set from an array of blocks. + pub fn set(blocks: [Block; N]) -> Set { + let len = blocks.iter().map(|item| item.len()).sum(); + Set { + blocks: Arc::from(blocks.as_slice()), + len + } + } + /// Checks that a pack fulfils all invariants. pub fn check_pack(pack: &Pack) { // Empty pack is allowed. @@ -1242,10 +1350,10 @@ pub(crate) mod testrig { } } - /// Creates an update from a bunch of integers - pub fn update(values: &[u32]) -> Update { + /// Creates an update from an array of `u32`s. + pub fn update(values: [u32; N]) -> Update { Update::new( - set(vec![ + set([ block(values, 0..values.len()) ]) ) @@ -1272,13 +1380,52 @@ mod test { use super::*; use super::testrig::*; + #[test] + fn set_merge() { + assert!( + set([block([], 0..0)]).merge( + &set([block([], 0..0)]) + ).iter().eq(set([block([], 0..0)]).iter()) + ); + assert!( + set([block([1, 3, 4], 0..3)]).merge( + &set([block([1, 3, 4], 0..3)]) + ).iter().eq(set([block([1, 3, 4], 0..3)]).iter()) + ); + assert!( + set([block([1, 3, 4], 0..3)]).merge( + &set([block([], 0..0)]) + ).iter().eq(set([block([1, 3, 4], 0..3)]).iter()) + ); + assert!( + set([block([], 0..0)]).merge( + &set([block([1, 3, 4], 0..3)]) + ).iter().eq(set([block([1, 3, 4], 0..3)]).iter()) + ); + assert!( + set([block([1, 3, 4, 5], 0..4)]).merge( + &set([block([1, 3, 4], 0..3)]) + ).iter().eq(set([block([1, 3, 4, 5], 0..4)]).iter()) + ); + assert!( + set([block([1, 3, 5], 0..3)]).merge( + &set([block([1, 3, 4], 0..3)]) + ).iter().eq(set([block([1, 3, 4, 5], 0..4)]).iter()) + ); + assert!( + set([block([1, 3, 5], 0..3), block([10, 11], 0..2)]).merge( + &set([block([3, 4], 0..2)]) + ).iter().eq(set([block([1, 3, 4, 5, 10, 11], 0..6)]).iter()) + ); + } + #[test] fn set_iter() { assert_eq!( Set { blocks: vec![ - block(&[1, 2, 4], 0..3), - block(&[4, 5], 1..2) + block([1, 2, 4], 0..3), + block([4, 5], 1..2) ].into(), len: 4 }.iter().cloned().collect::>(), @@ -1289,11 +1436,11 @@ mod test { #[test] fn set_builder() { let mut builder = SetBuilder::empty(); - builder.insert_pack(pack(&[1, 2, 11, 12])); - builder.insert_pack(pack(&[5, 6, 7, 15, 18])); - builder.insert_pack(pack(&[6, 7])); - builder.insert_pack(pack(&[7])); - builder.insert_pack(pack(&[17])); + builder.insert_pack(pack([1, 2, 11, 12])); + builder.insert_pack(pack([5, 6, 7, 15, 18])); + builder.insert_pack(pack([6, 7])); + builder.insert_pack(pack([7])); + builder.insert_pack(pack([17])); let set = builder.finalize(); check_set(&set); assert_eq!( @@ -1308,8 +1455,8 @@ mod test { assert_eq!( Diff { - announced: pack(&[6, 7, 15, 18]), - withdrawn: pack(&[2, 8, 9]), + announced: pack([6, 7, 15, 18]), + withdrawn: pack([2, 8, 9]), }.iter().collect::>(), [ (&p(2), W), (&p(6), A), (&p(7), A), (&p(8), W), (&p(9), W), @@ -1420,7 +1567,7 @@ mod test { #[test] fn owned_block_iter() { - fn test_iter(payload: &[Payload], block: Block) { + fn test_iter(payload: [Payload; N], block: Block) { let piter = payload.iter(); let mut oiter = block.owned_iter(); @@ -1434,46 +1581,46 @@ mod test { // Empty set. test_iter( - &[], - block(&[], 0..0) + [], + block([], 0..0) ); // Empty range over a non-empty block. test_iter( - &[], - block(&[7, 8, 10, 12, 18, 19], 3..3) + [], + block([7, 8, 10, 12, 18, 19], 3..3) ); // Blocks with a range. test_iter( - &[p(7), p(8), p(10), p(12), p(18), p(19)], - block(&[7, 8, 10, 12, 18, 19], 0..6) + [p(7), p(8), p(10), p(12), p(18), p(19)], + block([7, 8, 10, 12, 18, 19], 0..6) ); test_iter( - &[p(7), p(8), p(10), p(12), p(18), p(19)], - block(&[2, 3, 7, 8, 10, 12, 18, 19], 2..8) + [p(7), p(8), p(10), p(12), p(18), p(19)], + block([2, 3, 7, 8, 10, 12, 18, 19], 2..8) ); test_iter( - &[p(7), p(8), p(10), p(12), p(18), p(19)], - block(&[7, 8, 10, 12, 18, 19, 21, 22], 0..6) + [p(7), p(8), p(10), p(12), p(18), p(19)], + block([7, 8, 10, 12, 18, 19, 21, 22], 0..6) ); test_iter( - &[p(7), p(8), p(10), p(12), p(18), p(19)], - block(&[2, 3, 7, 8, 10, 12, 18, 19, 21], 2..8) + [p(7), p(8), p(10), p(12), p(18), p(19)], + block([2, 3, 7, 8, 10, 12, 18, 19, 21], 2..8) ); test_iter( - &[p(7)], - block(&[2, 3, 7, 8, 10, 12, 18, 19, 21], 2..3) + [p(7)], + block([2, 3, 7, 8, 10, 12, 18, 19, 21], 2..3) ); test_iter( - &[p(7), p(8), p(10), p(12), p(18), p(19)], - block(&[7, 8, 10, 12, 18, 19], 0..6) + [p(7), p(8), p(10), p(12), p(18), p(19)], + block([7, 8, 10, 12, 18, 19], 0..6) ); } #[test] fn set_iters() { - fn test_iter(payload: &[Payload], set: Set) { + fn test_iter(payload: [Payload; N], set: Set) { let piter = payload.iter(); let mut iter = set.iter(); let mut oiter = set.owned_iter(); @@ -1490,51 +1637,51 @@ mod test { // Empty set. test_iter( - &[], - Set::from(pack(&[])) + [], + Set::from(pack([])) ); // Complete single pack. test_iter( - &[p(7), p(8), p(10), p(12), p(18), p(19)], - Set::from(pack(&[7, 8, 10, 12, 18, 19])) + [p(7), p(8), p(10), p(12), p(18), p(19)], + Set::from(pack([7, 8, 10, 12, 18, 19])) ); // Empty range over a non-empty block. test_iter( - &[], - Set::from(block(&[7, 8, 10, 12, 18, 19], 3..3)) + [], + Set::from(block([7, 8, 10, 12, 18, 19], 3..3)) ); // Blocks with a range. test_iter( - &[p(7), p(8), p(10), p(12), p(18), p(19)], - Set::from(block(&[7, 8, 10, 12, 18, 19], 0..6)) + [p(7), p(8), p(10), p(12), p(18), p(19)], + Set::from(block([7, 8, 10, 12, 18, 19], 0..6)) ); test_iter( - &[p(7), p(8), p(10), p(12), p(18), p(19)], - Set::from(block(&[2, 3, 7, 8, 10, 12, 18, 19], 2..8)) + [p(7), p(8), p(10), p(12), p(18), p(19)], + Set::from(block([2, 3, 7, 8, 10, 12, 18, 19], 2..8)) ); test_iter( - &[p(7), p(8), p(10), p(12), p(18), p(19)], - Set::from(block(&[7, 8, 10, 12, 18, 19, 21, 22], 0..6)) + [p(7), p(8), p(10), p(12), p(18), p(19)], + Set::from(block([7, 8, 10, 12, 18, 19, 21, 22], 0..6)) ); test_iter( - &[p(7), p(8), p(10), p(12), p(18), p(19)], - Set::from(block(&[2, 3, 7, 8, 10, 12, 18, 19, 21], 2..8)) + [p(7), p(8), p(10), p(12), p(18), p(19)], + Set::from(block([2, 3, 7, 8, 10, 12, 18, 19, 21], 2..8)) ); test_iter( - &[p(7)], - Set::from(block(&[2, 3, 7, 8, 10, 12, 18, 19, 21], 2..3)) + [p(7)], + Set::from(block([2, 3, 7, 8, 10, 12, 18, 19, 21], 2..3)) ); // Multiple blocks. test_iter( - &[p(7), p(8), p(10), p(12), p(18), p(19)], - set(vec![ - block(&[2, 7, 8, 10], 1..3), - block(&[10], 0..1), - block(&[2, 12, 18, 19], 1..4) + [p(7), p(8), p(10), p(12), p(18), p(19)], + set([ + block([2, 7, 8, 10], 1..3), + block([10], 0..1), + block([2, 12, 18, 19], 1..4) ]) ); } diff --git a/src/test.rs b/src/test.rs index 914b74b..df552f4 100644 --- a/src/test.rs +++ b/src/test.rs @@ -168,8 +168,8 @@ async fn simple_comms() { } ).unwrap(); - u.send_payload(testrig::update(&[2])).await; - assert_eq!(t.recv_payload().await.unwrap(), testrig::update(&[2])); + u.send_payload(testrig::update([2])).await; + assert_eq!(t.recv_payload().await.unwrap(), testrig::update([2])); } diff --git a/src/units/combine.rs b/src/units/combine.rs index 0cda49c..bb2a19c 100644 --- a/src/units/combine.rs +++ b/src/units/combine.rs @@ -6,7 +6,7 @@ use futures::future::{select, select_all, Either, FutureExt}; use log::debug; use rand::{thread_rng, Rng}; use serde::Deserialize; -use crate::metrics; +use crate::{metrics, payload}; use crate::metrics::{Metric, MetricType, MetricUnit}; use crate::comms::{ Gate, GateMetrics, Link, Terminated, UnitHealth, UnitUpdate @@ -195,6 +195,58 @@ impl metrics::Source for AnyMetrics { } +//------------ Merge --------------------------------------------------------- + +/// A unit merging the data sets of all upstream units. +#[derive(Debug, Deserialize)] +pub struct Merge { + /// The set of units whose data set should be merged. + sources: Vec, +} + +impl Merge { + pub async fn run( + mut self, mut component: Component, mut gate: Gate + ) -> Result<(), Terminated> { + if self.sources.is_empty() { + gate.update(UnitUpdate::Gone).await; + return Err(Terminated) + } + let metrics = gate.metrics(); + component.register_metrics(metrics.clone()); + + loop { + { + let res = select( + select_all( + self.sources.iter_mut().map(|link| + link.query().boxed() + ) + ), + gate.process().boxed() + ).await; + + if let Either::Right(_) = res { + continue + } + } + + let mut output = payload::Set::default(); + for source in self.sources.iter() { + if matches!(source.health(), UnitHealth::Healthy) { + if let Some(update) = source.payload() { + output = output.merge(update.set()) + } + } + } + gate.update( + UnitUpdate::Payload(payload::Update::new(output)) + ).await; + } + } +} + + //============ Tests ========================================================= #[cfg(test)] @@ -245,25 +297,25 @@ mod test { // Set one unit to healthy by sending a data update. Check that // the target unstalls with an update. - u1.send_payload(testrig::update(&[1])).await; - assert_eq!(t.recv_payload().await.unwrap(), testrig::update(&[1])); + u1.send_payload(testrig::update([1])).await; + assert_eq!(t.recv_payload().await.unwrap(), testrig::update([1])); // Set another unit to healthy. This shouldn’t change anything. - u2.send_payload(testrig::update(&[2])).await; + u2.send_payload(testrig::update([2])).await; t.recv_nothing().unwrap(); // Now stall the first one and check that we get an update with the // second’s data. u1.send_stalled().await; - assert_eq!(t.recv_payload().await.unwrap(), testrig::update(&[2])); + assert_eq!(t.recv_payload().await.unwrap(), testrig::update([2])); // Now stall the second one, too, and watch us stall. u2.send_stalled().await; t.recv_stalled().await.unwrap(); // Now unstall the third one and receive its data. - u3.send_payload(testrig::update(&[3])).await; - assert_eq!(t.recv_payload().await.unwrap(), testrig::update(&[3])); + u3.send_payload(testrig::update([3])).await; + assert_eq!(t.recv_payload().await.unwrap(), testrig::update([3])); } } diff --git a/src/units/mod.rs b/src/units/mod.rs index 04ed8c1..8c9c340 100644 --- a/src/units/mod.rs +++ b/src/units/mod.rs @@ -45,6 +45,9 @@ pub enum Unit { #[serde(rename = "json")] Json(json::Json), + #[serde(rename = "merge")] + Merge(combine::Merge), + #[serde(rename = "slurm")] Slurm(slurm::LocalExceptions), @@ -62,6 +65,7 @@ impl Unit { Unit::RtrTcp(unit) => unit.run(component, gate).await, Unit::RtrTls(unit) => unit.run(component, gate).await, Unit::Json(unit) => unit.run(component, gate).await, + Unit::Merge(unit) => unit.run(component, gate).await, Unit::Slurm(unit) => unit.run(component, gate).await, #[cfg(test)]