diff --git a/met_connectors/src/frost/fetch.rs b/met_connectors/src/frost/fetch.rs index 56fe1ce..81fe452 100644 --- a/met_connectors/src/frost/fetch.rs +++ b/met_connectors/src/frost/fetch.rs @@ -3,11 +3,12 @@ use chrono::{prelude::*, Duration}; use chronoutil::RelativeDuration; use rove::data_switch::{self, DataCache, Polygon, SpaceSpec, TimeSpec, Timestamp}; +#[allow(clippy::type_complexity)] fn extract_data( mut resp: serde_json::Value, time: DateTime, request_time_resolution: RelativeDuration, -) -> Result, FrostLatLonElev)>, Error> { +) -> Result), FrostLatLonElev)>, Error> { let ts_portion = resp .get_mut("data") .ok_or(Error::FindObs( @@ -27,6 +28,8 @@ fn extract_data( "couldn't find header field on tseries".to_string(), ))?; + let station_id = util::extract_station_id(header)?; + // TODO: Should there be a location for each observation? let location = util::extract_location(header, time)?; @@ -46,11 +49,11 @@ fn extract_data( .take(), )?; - Ok(Some((obs, location))) + Ok(Some(((station_id, obs), location))) }) // Is there some smart way to avoid a double collect without making the error handling // messy? - .collect::, FrostLatLonElev)>>, Error>>()? + .collect::), FrostLatLonElev)>>, Error>>()? .into_iter() .flatten() .collect(); @@ -87,7 +90,7 @@ fn json_to_data_cache( let processed_ts_vec = ts_vec .into_iter() - .map(|(obses, location)| { + .map(|((station_id, obses), location)| { // TODO: preallocate? // let ts_length = (end_time - first_obs_time) / period; let mut data = Vec::new(); @@ -149,9 +152,9 @@ fn json_to_data_cache( curr_obs_time = curr_obs_time + period; } - Ok((data, location)) + Ok(((station_id, data), location)) }) - .collect::>, FrostLatLonElev)>, Error>>()?; + .collect::>), FrostLatLonElev)>, Error>>()?; Ok(DataCache::new( processed_ts_vec.iter().map(|ts| ts.1.latitude).collect(), @@ -166,8 +169,8 @@ fn json_to_data_cache( } pub async fn fetch_data_inner( - space_spec: SpaceSpec<'_>, - time_spec: TimeSpec, + space_spec: &SpaceSpec, + time_spec: &TimeSpec, num_leading_points: u8, num_trailing_points: u8, extra_spec: Option<&str>, @@ -362,7 +365,7 @@ mod tests { Utc.with_ymd_and_hms(2023, 6, 26, 14, 0, 0).unwrap(), ); assert_eq!( - series_cache.data[0], + series_cache.data[0].1, vec![Some(27.3999996), Some(25.7999992), Some(26.)] ); } diff --git a/met_connectors/src/frost/mod.rs b/met_connectors/src/frost/mod.rs index c703a01..78406dd 100644 --- a/met_connectors/src/frost/mod.rs +++ b/met_connectors/src/frost/mod.rs @@ -102,8 +102,8 @@ where impl DataConnector for Frost { async fn fetch_data( &self, - space_spec: SpaceSpec<'_>, - time_spec: TimeSpec, + space_spec: &SpaceSpec, + time_spec: &TimeSpec, num_leading_points: u8, num_trailing_points: u8, extra_spec: Option<&str>, diff --git a/met_connectors/src/frost/util.rs b/met_connectors/src/frost/util.rs index 1f22ead..0e31475 100644 --- a/met_connectors/src/frost/util.rs +++ b/met_connectors/src/frost/util.rs @@ -58,3 +58,18 @@ pub fn extract_location( Ok(lat_lon_elev) } + +pub fn extract_station_id(header: &mut serde_json::Value) -> Result { + let station_id = header + .get_mut("id") + .ok_or(Error::FindMetadata( + "couldn't find id in header".to_string(), + ))? + .get_mut("stationid") + .ok_or(Error::FindMetadata( + "couldn't find stationid field in id".to_string(), + ))? + .take(); + + Ok(serde_json::from_value(station_id)?) +} diff --git a/met_connectors/src/lustre_netatmo/mod.rs b/met_connectors/src/lustre_netatmo/mod.rs index 9adc368..1bc9b7c 100644 --- a/met_connectors/src/lustre_netatmo/mod.rs +++ b/met_connectors/src/lustre_netatmo/mod.rs @@ -60,7 +60,11 @@ fn read_netatmo(timestamp: Timestamp) -> Result { lats.push(record.lat); lons.push(record.lon); elevs.push(record.elev); - values.push(vec![Some(record.value)]); + values.push(( + // would be nice if we could come up with better identifiers for this + format!("({},{})", record.lat, record.lon), + vec![Some(record.value)], + )); } } @@ -73,8 +77,8 @@ fn read_netatmo(timestamp: Timestamp) -> Result { impl DataConnector for LustreNetatmo { async fn fetch_data( &self, - space_spec: SpaceSpec<'_>, - time_spec: TimeSpec, + space_spec: &SpaceSpec, + time_spec: &TimeSpec, num_leading_points: u8, num_trailing_points: u8, _extra_spec: Option<&str>, @@ -90,7 +94,8 @@ impl DataConnector for LustreNetatmo { match space_spec { SpaceSpec::All => { - tokio::task::spawn_blocking(move || read_netatmo(time_spec.timerange.start)).await? + let start_time = time_spec.timerange.start; + tokio::task::spawn_blocking(move || read_netatmo(start_time)).await? } SpaceSpec::One(_) => Err(data_switch::Error::UnimplementedSeries( "netatmo files are only in timeslice format".to_string(), diff --git a/proto/rove.proto b/proto/rove.proto index 2ebc5ee..95d02eb 100644 --- a/proto/rove.proto +++ b/proto/rove.proto @@ -1,12 +1,14 @@ syntax = "proto3"; import "google/protobuf/timestamp.proto"; +import "google/protobuf/empty.proto"; package rove; service Rove { - rpc ValidateSeries (ValidateSeriesRequest) returns (stream ValidateSeriesResponse) {} - rpc ValidateSpatial (ValidateSpatialRequest) returns (stream ValidateSpatialResponse) {} + // TODO: should we reconsider allowing results to stream, in favour of a more + // space efficient response format? + rpc Validate (ValidateRequest) returns (stream ValidateResponse) {} } message GeoPoint { @@ -14,6 +16,10 @@ message GeoPoint { float lon = 2; } +message Polygon { + repeated GeoPoint polygon = 1; +} + enum Flag { // WIP PASS = 0; FAIL = 1; @@ -24,59 +30,50 @@ enum Flag { // WIP ISOLATED = 6; } -// tests a time series between start_time and end_time -message ValidateSeriesRequest { - // resource locator of the form : - // e.g. oda:123456 where oda is a data source known to the system, and 123456 - // is a timeseries ID in oda - string series_id = 1; - // if not provided, the start of the time series will be used - optional google.protobuf.Timestamp start_time = 2; - // if not provided, the end of the time series will be used - optional google.protobuf.Timestamp end_time = 3; - // list of the names of tests to be run on the data - repeated string tests = 4; -} - -// tests a time slice with optional list of data sources to use, and a list of -// coordinates that define a polygon representing the area to be tested -message ValidateSpatialRequest { - // resource locator of the form : - // e.g. frost:air_temperature where frost is a data source known to the system, - // and air_temperature is the element for the call to get data - string spatial_id = 1; - // extra data sources providing data to help qc the first source, but he data - // from these sources will not be qced themselves +message ValidateRequest { + // name of the data source you want to QC data from + string data_source = 1; + // extra data sources providing data to help QC the first source, but the data + // from these sources will not be QCed themselves repeated string backing_sources = 2; - google.protobuf.Timestamp time = 3; - repeated string tests = 4; - // if not provided, the whole globe will be used - repeated GeoPoint polygon = 5; + // timestamps defining an inclusive range of time to QC data from + google.protobuf.Timestamp start_time = 3; + google.protobuf.Timestamp end_time = 4; + // an ISO 8601 duration stamp defining the time resolution of data do be QCed + // (e.g. "PT1H" for hourly data) + string time_resolution = 5; + // one of 3 specifiers can be used to spatially specify down the data to be + // QCed + oneof SpaceSpec { + // one series of data (i.e one data point per time step) with a string that + // will be passed to the data connector to identify it. This will likely + // represent something like a timeseries id, station id and param id pair, + // or similar, but is left up to the data connector/source to define + string one = 6; + // a series of lat-lon points defining a polygon in space + Polygon polygon = 7; + // no spatial restriction at all + google.protobuf.Empty all = 8; + } + // list of the names of tests to be run on the data + repeated string tests = 9; + // optional string containing extra information to be passed to the data + // connector, to further specify the data to be QCed + optional string extra_spec = 10; } -message SeriesTestResult { +message TestResult { google.protobuf.Timestamp time = 1; - Flag flag = 2; -} - -message ValidateSeriesResponse { - // name of the test this flag is from - string test = 1; - // results for each data point in the series, paired with timestamps to - // identify the point - repeated SeriesTestResult results = 2; -} - -message SpatialTestResult { - // TODO: maybe this should contain the series ID? - GeoPoint location = 1; - Flag flag = 2; + // data source defined identifier, it's recommended to use this to identify + // a timeseries/station/location as appropriate + string identifier = 2; + Flag flag = 3; } -message ValidateSpatialResponse { +message ValidateResponse { // name of the test this flag is from string test = 1; - // results for each data point in the series, paired with geopoints to + // results for each data point, paired with timestamp and an identifier to // identify the point - repeated SpatialTestResult results = 2; + repeated TestResult results = 2; } diff --git a/src/data_switch.rs b/src/data_switch.rs index 0465437..11989dd 100644 --- a/src/data_switch.rs +++ b/src/data_switch.rs @@ -93,14 +93,14 @@ pub struct GeoPoint { /// A geospatial polygon /// /// represented by its vertices as a sequence of lat-lon points -pub type Polygon = [GeoPoint]; +pub type Polygon = Vec; /// Specifier of which data to fetch from a source by location -pub enum SpaceSpec<'a> { +pub enum SpaceSpec { /// One single timeseries, specified with a data_id - One(&'a str), + One(String), /// A Polygon in lat-lon space defining the area from which to fetch data - Polygon(&'a Polygon), + Polygon(Polygon), /// The whole data set All, } @@ -113,10 +113,11 @@ pub enum SpaceSpec<'a> { pub struct DataCache { /// Vector of timeseries. /// - /// Each inner vector represents a timeseries, with its data points in chronological order. + /// Each inner vector represents a timeseries, tagged with a string + /// identifier, with its data points in chronological order. /// All these timeseries are aligned on start_time and period. /// `None`s represent gaps in the series. - pub data: Vec>>, + pub data: Vec<(String, Vec>)>, /// Time of the first observation in data pub start_time: Timestamp, /// Period of the timeseries, i.e. the time gap between successive elements @@ -147,7 +148,7 @@ impl DataCache { period: RelativeDuration, num_leading_points: u8, num_trailing_points: u8, - data: Vec>>, + data: Vec<(String, Vec>)>, ) -> Self { // TODO: ensure vecs have same size Self { @@ -224,9 +225,8 @@ pub trait DataConnector: Sync + std::fmt::Debug { /// fetch specified data from the data source async fn fetch_data( &self, - space_spec: SpaceSpec<'_>, - // TODO: should this include a time resolution? - time_spec: TimeSpec, + space_spec: &SpaceSpec, + time_spec: &TimeSpec, num_leading_points: u8, num_trailing_points: u8, extra_spec: Option<&str>, @@ -275,8 +275,8 @@ impl<'ds> DataSwitch<'ds> { pub(crate) async fn fetch_data( &self, data_source_id: &str, - space_spec: SpaceSpec<'_>, - time_spec: TimeSpec, + space_spec: &SpaceSpec, + time_spec: &TimeSpec, num_leading_points: u8, num_trailing_points: u8, extra_spec: Option<&str>, diff --git a/src/harness.rs b/src/harness.rs index caef254..70d76e1 100644 --- a/src/harness.rs +++ b/src/harness.rs @@ -1,9 +1,6 @@ use crate::{ data_switch::DataCache, - pb::{ - Flag, GeoPoint, SeriesTestResult, SpatialTestResult, ValidateSeriesResponse, - ValidateSpatialResponse, - }, + pb::{Flag, TestResult, ValidateResponse}, }; use chrono::prelude::*; use chronoutil::DateRule; @@ -20,11 +17,9 @@ pub enum Error { UnknownFlag(String), } -pub async fn run_test_series( - test: &str, - cache: &DataCache, -) -> Result { - let flags: Vec = match test { +// TODO: make sure we aren't feeding leading/trailing values to the spatial tests +pub async fn run_test(test: &str, cache: &DataCache) -> Result { + let flags: Vec<(String, Vec)> = match test { // TODO: put these in a lookup table? "dip_check" => { const LEADING_PER_RUN: u8 = 2; @@ -35,11 +30,13 @@ pub async fn run_test_series( let mut result_vec = Vec::with_capacity(cache.data.len()); // NOTE: Does data in each series have the same len? - let series_len = cache.data[0].len(); + let series_len = cache.data[0].1.len(); for i in 0..cache.data.len() { - result_vec.push( - cache.data[i][(cache.num_leading_points - LEADING_PER_RUN).into()..series_len] + result_vec.push(( + cache.data[i].0.clone(), + cache.data[i].1 + [(cache.num_leading_points - LEADING_PER_RUN).into()..series_len] .windows((LEADING_PER_RUN + 1).into()) .map(|window| { olympian::dip_check(window, 2., 3.)? @@ -47,9 +44,9 @@ pub async fn run_test_series( .map_err(Error::UnknownFlag) }) .collect::, Error>>()?, - ) + )) } - result_vec[0].clone() + result_vec } "step_check" => { const LEADING_PER_RUN: u8 = 1; @@ -57,11 +54,13 @@ pub async fn run_test_series( let mut result_vec = Vec::with_capacity(cache.data.len()); // NOTE: Does data in each series have the same len? - let series_len = cache.data[0].len(); + let series_len = cache.data[0].1.len(); for i in 0..cache.data.len() { - result_vec.push( - cache.data[i][(cache.num_leading_points - LEADING_PER_RUN).into()..series_len] + result_vec.push(( + cache.data[i].0.clone(), + cache.data[i].1 + [(cache.num_leading_points - LEADING_PER_RUN).into()..series_len] .windows((LEADING_PER_RUN + 1).into()) .map(|window| { olympian::step_check(window, 2., 3.)? @@ -69,138 +68,161 @@ pub async fn run_test_series( .map_err(Error::UnknownFlag) }) .collect::, Error>>()?, - ) + )) } - result_vec[0].clone() + result_vec } - _ => { - if test.starts_with("test") { - vec![Flag::Inconclusive] - } else { - return Err(Error::InvalidTestName(test.to_string())); - } - } - }; - - let results = DateRule::new( - // TODO: make sure this start time is actually correct - Utc.timestamp_opt(cache.start_time.0, 0).unwrap(), - cache.period, - ) - .skip(cache.num_leading_points.into()) - .zip(flags.into_iter()) - .map(|(time, flag)| SeriesTestResult { - time: Some(prost_types::Timestamp { - seconds: time.timestamp(), - nanos: 0, - }), - flag: flag.into(), - }) - .collect(); - - Ok(ValidateSeriesResponse { - test: test.to_string(), - results, - }) -} - -#[allow(clippy::match_single_binding)] -pub async fn run_test_spatial( - test: &str, - cache: &DataCache, -) -> Result { - let flags: Vec = match test { "buddy_check" => { let n = cache.data.len(); - let series_len = cache.data[0].len(); - let mut result_vec = Vec::with_capacity(series_len); + let series_len = cache.data[0].1.len(); + + let mut result_vec: Vec<(String, Vec)> = cache + .data + .iter() + .map(|ts| (ts.0.clone(), Vec::with_capacity(series_len))) + .collect(); + for i in 0..series_len { // TODO: change `buddy_check` to accept Option? - let inner: Vec = cache.data.iter().map(|v| v[i].unwrap()).collect(); - result_vec.push( - olympian::buddy_check( - &cache.rtree, - &inner, - &vec![5000.; n], - &vec![2; n], - 2., - 200., - 0., - 1., - 2, - &vec![true; n], - )? + let inner: Vec = cache.data.iter().map(|v| v.1[i].unwrap()).collect(); + + let spatial_result = olympian::buddy_check( + &cache.rtree, + &inner, + &vec![5000.; n], + &vec![2; n], + 2., + 200., + 0., + 1., + 2, + &vec![true; n], + )?; + + if spatial_result + .iter() + .any(|flag| Flag::try_from(*flag).is_err()) + { + return Err(Error::UnknownFlag( + // this is messy, but at least it's not on the critical path + // and it lets the critical path code be more efficient + Flag::try_from( + *spatial_result + .iter() + .find(|flag| Flag::try_from(**flag).is_err()) + .unwrap(), + ) + .err() + .unwrap(), + )); + } + + for (i, flag) in spatial_result .into_iter() - .map(|flag| flag.try_into().map_err(Error::UnknownFlag)) - .collect::, Error>>()?, - ) + .map(|flag| flag.try_into().unwrap()) + .enumerate() + { + result_vec[i].1.push(flag); + } } - result_vec[0].clone() + result_vec } "sct" => { let n = cache.data.len(); - let series_len = cache.data[0].len(); - let mut result_vec = Vec::with_capacity(series_len); + let series_len = cache.data[0].1.len(); + + let mut result_vec: Vec<(String, Vec)> = cache + .data + .iter() + .map(|ts| (ts.0.clone(), Vec::with_capacity(series_len))) + .collect(); + for i in 0..series_len { // TODO: change `sct` to accept Option? - let inner: Vec = cache.data.iter().map(|v| v[i].unwrap()).collect(); - result_vec.push( - olympian::sct( - &cache.rtree, - &inner, - 5, - 100, - 50000., - 150000., - 5, - 20, - 200., - 10000., - 200., - &vec![4.; n], - &vec![8.; n], - &vec![0.5; n], - None, - )? + let inner: Vec = cache.data.iter().map(|v| v.1[i].unwrap()).collect(); + let spatial_result = olympian::sct( + &cache.rtree, + &inner, + 5, + 100, + 50000., + 150000., + 5, + 20, + 200., + 10000., + 200., + &vec![4.; n], + &vec![8.; n], + &vec![0.5; n], + None, + )?; + + if spatial_result + .iter() + .any(|flag| Flag::try_from(*flag).is_err()) + { + return Err(Error::UnknownFlag( + // this is messy, but at least it's not on the critical path + // and it lets the critical path code be more efficient + Flag::try_from( + *spatial_result + .iter() + .find(|flag| Flag::try_from(**flag).is_err()) + .unwrap(), + ) + .err() + .unwrap(), + )); + } + + for (i, flag) in spatial_result .into_iter() - .map(|flag| flag.try_into().map_err(Error::UnknownFlag)) - .collect::, Error>>()?, - ); + .map(|flag| flag.try_into().unwrap()) + .enumerate() + { + result_vec[i].1.push(flag); + } } - result_vec[0].clone() + result_vec } _ => { + // used for integration testing if test.starts_with("test") { - vec![Flag::Inconclusive] + vec![("test".to_string(), vec![Flag::Inconclusive])] } else { - // TODO: have more specific error for spatial vs series here? return Err(Error::InvalidTestName(test.to_string())); } } }; - let results = cache - .rtree - .lats - // TODO: if lats and lons in points were in 1 vec, we could do into_iter, - // and remove one of the zips - .iter() - .zip(cache.rtree.lons.iter()) - .map(|(lat, lon)| GeoPoint { - lat: *lat, - lon: *lon, + let date_rule = DateRule::new( + // TODO: make sure this start time is actually correct + Utc.timestamp_opt(cache.start_time.0, 0).unwrap(), + cache.period, + ); + let results = flags + .into_iter() + .flat_map(|flag_series| { + flag_series + .1 + .into_iter() + .zip(date_rule) + .zip(std::iter::repeat(flag_series.0)) }) - .zip(flags.into_iter()) - .map(|(location, flag)| SpatialTestResult { - // TODO: get to the bottom of exactly why the Some is needed - location: Some(location), + .map(|((flag, time), identifier)| TestResult { + time: Some(prost_types::Timestamp { + seconds: time.timestamp(), + nanos: 0, + }), + identifier, flag: flag.into(), }) .collect(); - Ok(ValidateSpatialResponse { + Ok(ValidateResponse { test: test.to_string(), results, }) diff --git a/src/lib.rs b/src/lib.rs index 5303cd6..b69ed0c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -147,14 +147,14 @@ pub mod dev_utils { impl DataConnector for TestDataSource { async fn fetch_data( &self, - space_spec: SpaceSpec<'_>, - _time_spec: TimeSpec, + space_spec: &SpaceSpec, + _time_spec: &TimeSpec, num_leading_points: u8, num_trailing_points: u8, _extra_spec: Option<&str>, ) -> Result { match space_spec { - SpaceSpec::One(data_id) => match data_id { + SpaceSpec::One(data_id) => match data_id.as_str() { // TODO: should we maybe be using time_spec for these instead of data_id? // maybe something to come back to when we finalize the format of time_spec "single" => black_box(Ok(DataCache::new( @@ -165,7 +165,7 @@ pub mod dev_utils { RelativeDuration::minutes(5), num_leading_points, num_trailing_points, - vec![vec![Some(1.); self.data_len_single]; 1], + vec![("test".to_string(), vec![Some(1.); self.data_len_single]); 1], ))), "series" => black_box(Ok(DataCache::new( vec![0.; 1], @@ -175,7 +175,7 @@ pub mod dev_utils { RelativeDuration::minutes(5), num_leading_points, num_trailing_points, - vec![vec![Some(1.); self.data_len_spatial]; 1], + vec![("test".to_string(), vec![Some(1.); self.data_len_spatial]); 1], ))), _ => panic!("unknown data_id"), }, @@ -192,7 +192,7 @@ pub mod dev_utils { // TODO: update this to use num_leading/trailing? 0, 0, - vec![vec![Some(1.); 1]; self.data_len_spatial], + vec![("test".to_string(), vec![Some(1.); 1]); self.data_len_spatial], ))), SpaceSpec::Polygon(_) => unimplemented!(), } diff --git a/src/scheduler.rs b/src/scheduler.rs index 598b447..58f2b93 100644 --- a/src/scheduler.rs +++ b/src/scheduler.rs @@ -1,13 +1,10 @@ use crate::{ dag::{Dag, NodeId}, - data_switch::{ - self, DataCache, DataSwitch, Polygon, SpaceSpec, TimeSpec, Timerange, Timestamp, - }, + data_switch::{self, DataCache, DataSwitch, SpaceSpec, TimeSpec}, harness, // TODO: rethink this dependency? - pb::{ValidateSeriesResponse, ValidateSpatialResponse}, + pb::ValidateResponse, }; -use chronoutil::RelativeDuration; use futures::stream::FuturesUnordered; use std::collections::HashMap; use thiserror::Error; @@ -93,10 +90,10 @@ impl<'a> Scheduler<'a> { Ok(subdag) } - fn schedule_tests_series( + fn schedule_tests( subdag: Dag<&'static str>, data: DataCache, - ) -> Receiver> { + ) -> Receiver> { // spawn and channel are required if you want handle "disconnect" functionality // the `out_stream` will not be polled after client disconnect let (tx, rx) = channel(subdag.nodes.len()); @@ -105,7 +102,7 @@ impl<'a> Scheduler<'a> { let mut test_futures = FuturesUnordered::new(); for leaf_index in subdag.leaves.clone().into_iter() { - test_futures.push(harness::run_test_series( + test_futures.push(harness::run_test( subdag.nodes.get(leaf_index).unwrap().elem, &data, )); @@ -139,7 +136,7 @@ impl<'a> Scheduler<'a> { if children_completed >= subdag.nodes.get(*parent_index).unwrap().children.len() { - test_futures.push(harness::run_test_series( + test_futures.push(harness::run_test( subdag.nodes.get(*parent_index).unwrap().elem, &data, )) @@ -154,144 +151,18 @@ impl<'a> Scheduler<'a> { rx } - // sad about the amount of repetition here... perhaps we can do better once async - // closures drop? - fn schedule_tests_spatial( - subdag: Dag<&'static str>, - data: DataCache, - ) -> Receiver> { - // spawn and channel are required if you want handle "disconnect" functionality - // the `out_stream` will not be polled after client disconnect - let (tx, rx) = channel(subdag.nodes.len()); - tokio::spawn(async move { - let mut children_completed_map: HashMap = HashMap::new(); - let mut test_futures = FuturesUnordered::new(); - - for leaf_index in subdag.leaves.clone().into_iter() { - test_futures.push(harness::run_test_spatial( - subdag.nodes.get(leaf_index).unwrap().elem, - &data, - )); - } - - while let Some(res) = test_futures.next().await { - match tx.send(res.clone().map_err(Error::Runner)).await { - Ok(_) => { - // item (server response) was queued to be send to client - } - Err(_item) => { - // output_stream was build from rx and both are dropped - break; - } - } - - match res { - Ok(inner) => { - let completed_index = subdag.index_lookup.get(inner.test.as_str()).unwrap(); - - for parent_index in - subdag.nodes.get(*completed_index).unwrap().parents.iter() - { - let children_completed = children_completed_map - .get(parent_index) - .map(|x| x + 1) - .unwrap_or(1); - - children_completed_map.insert(*parent_index, children_completed); - - if children_completed - >= subdag.nodes.get(*parent_index).unwrap().children.len() - { - test_futures.push(harness::run_test_spatial( - subdag.nodes.get(*parent_index).unwrap().elem, - &data, - )) - } - } - } - Err(_) => break, - } - } - }); - - rx - } - - /// Run a set of timeseries QC tests on some data - /// - /// `series_id` is a string identifier of the data to be QCed in the form - /// "data_source_id:data_id", where `data_source_id` is the key identifying - /// a connector in the [`DataSwitch`](data_switch::DataSwitch), and `data_id` - /// is an extra identifier that gets passed to the relevant DataConnector. - /// The format of data_id is connector-specific. `timerange` represents - /// the range of the time in the time series whose data is to be QCed. - /// - /// `tests` represents the QC tests to be run. Any tests these depend on - /// will be found via the [`DAG`](Dag), and run as well. - /// - /// # Errors - /// - /// Returned from the function if: - /// - The provided test array is empty - /// - A test in the provided array did not have a matching entry in the DAG - /// - The data_source_id component of the provided series_id did not have a - /// matching entry in the Scheduler's DataSwitch - /// - /// In the the returned channel if: - /// - The test harness encounters an error on during one of the QC tests. - /// This will also result in the channel being closed - pub async fn validate_series_direct( - &self, - series_id: impl AsRef, - tests: &[impl AsRef], - time_spec: TimeSpec, - ) -> Result>, Error> { - if tests.is_empty() { - return Err(Error::InvalidArg("must specify at least 1 test to be run")); - } - - let (data_source_id, data_id) = series_id - .as_ref() - .split_once(':') - // TODO: remove this unwrap by splitting these in the proto - .unwrap(); - - let data = match self - .data_switch - // TODO: num_leading and num_trailing here should be determined from the test list - .fetch_data( - data_source_id, - SpaceSpec::One(data_id), - time_spec, - 2, - 2, - // TODO: this should probably be able to be Some, needs fixing in proto - None, - ) - .await - { - Ok(data) => data, - Err(e) => { - tracing::error!(%e); - return Err(Error::DataSwitch(e)); - } - }; - - let subdag = self.construct_subdag(tests)?; - - Ok(Scheduler::schedule_tests_series(subdag, data)) - } - /// Run a set of spatial QC tests on some data /// - /// `spatial_id` is a string identifier of the data to be QCed in the form - /// "data_source_id:data_id", where `data_source_id` is the key identifying - /// a connector in the [`DataSwitch`](data_switch::DataSwitch), and `data_id` - /// is an extra identifier that gets passed to the relevant DataConnector. - /// The format of data_id is connector-specific. `time` represents - /// the timestamp of the spatial slice to be QCed, while `polygon` is a vec - /// of lat-lon pairs that encode the vertices of a polygon defining the - /// region of the spatial slice in which data should be QCed. + /// `data_source` is the key identifying a connector in the + /// [`DataSwitch`](data_switch::DataSwitch). + /// `backing_sources` a list of keys similar to `data_source`, but data + /// from these will only be used to QC data from `data_source` and will not + /// themselves be QCed. + /// `time_spec` and `space_spec` narrow down what data to QC, more info + /// on what these mean and how to construct them can be found on their + /// own doc pages. + /// `extra_spec` is an extra identifier that gets passed to the relevant + /// DataConnector. The format of `extra_spec` is connector-specific. /// /// `tests` represents the QC tests to be run. Any tests these depend on /// will be found via the [`DAG`](Dag), and run as well. @@ -301,46 +172,38 @@ impl<'a> Scheduler<'a> { /// Returned from the function if: /// - The provided test array is empty /// - A test in the provided array did not have a matching entry in the DAG - /// - The data_source_id component of the provided spatial_id did not have a - /// matching entry in the Scheduler's DataSwitch + /// - The data_source string did not have a matching entry in the + /// Scheduler's DataSwitch /// /// In the the returned channel if: /// - The test harness encounters an error on during one of the QC tests. /// This will also result in the channel being closed - pub async fn validate_spatial_direct( + pub async fn validate_direct( &self, - spatial_id: impl AsRef, + data_source: impl AsRef, + // TODO: we should actually use these + _backing_sources: &[impl AsRef], + // TODO: should we allow a way to call this without a dependency on chronoutil? + // adding a constructor for timespec that can take a string would achieve this + time_spec: &TimeSpec, + space_spec: &SpaceSpec, tests: &[impl AsRef], - polygon: &Polygon, - time: Timestamp, - ) -> Result>, Error> { + extra_spec: Option<&str>, + ) -> Result>, Error> { if tests.is_empty() { return Err(Error::InvalidArg("must specify at least 1 test to be run")); } - let (data_source_id, extra_spec) = spatial_id - .as_ref() - .split_once(':') - // TODO: remove this unwrap by splitting these in the proto - .unwrap(); - let data = match self .data_switch .fetch_data( - data_source_id, - SpaceSpec::Polygon(polygon), - TimeSpec { - timerange: Timerange { - start: time, - end: time, - }, - // TODO: should be a real value - time_resolution: RelativeDuration::minutes(5), - }, + data_source.as_ref(), + space_spec, + time_spec, + // TODO: derive num_leading and num_trailing from test list 0, 0, - // TODO: This should probably be able to be None, needs fixing in proto - Some(extra_spec), + extra_spec, ) .await { @@ -353,7 +216,7 @@ impl<'a> Scheduler<'a> { let subdag = self.construct_subdag(tests)?; - Ok(Scheduler::schedule_tests_spatial(subdag, data)) + Ok(Scheduler::schedule_tests(subdag, data)) } } diff --git a/src/server.rs b/src/server.rs index 7bfa1c9..82a702c 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,10 +1,10 @@ use crate::{ dag::Dag, - data_switch::{DataSwitch, GeoPoint, Polygon, TimeSpec, Timerange, Timestamp}, + data_switch::{DataSwitch, GeoPoint, SpaceSpec, TimeSpec, Timerange, Timestamp}, pb::{ + self, rove_server::{Rove, RoveServer}, - ValidateSeriesRequest, ValidateSeriesResponse, ValidateSpatialRequest, - ValidateSpatialResponse, + ValidateRequest, ValidateResponse, }, scheduler::{self, Scheduler}, }; @@ -15,10 +15,7 @@ use tokio::sync::mpsc::channel; use tokio_stream::wrappers::{ReceiverStream, UnixListenerStream}; use tonic::{transport::Server, Request, Response, Status}; -type SeriesResponseStream = - Pin> + Send>>; -type SpatialResponseStream = - Pin> + Send>>; +type ResponseStream = Pin> + Send>>; #[derive(Debug)] enum ListenerType { @@ -45,101 +42,62 @@ impl From for Status { #[tonic::async_trait] impl Rove for Scheduler<'static> { - type ValidateSeriesStream = SeriesResponseStream; - type ValidateSpatialStream = SpatialResponseStream; + type ValidateStream = ResponseStream; #[tracing::instrument] - async fn validate_series( + async fn validate( &self, - request: Request, - ) -> Result, Status> { + request: Request, + ) -> Result, Status> { tracing::debug!("Got a request: {:?}", request); let req = request.into_inner(); let req_len = req.tests.len(); - let mut rx = self - .validate_series_direct( - req.series_id, - &req.tests, - TimeSpec { - timerange: Timerange { - start: Timestamp( - req.start_time - .as_ref() - .ok_or(Status::invalid_argument( - "invalid timestamp for start_time", - ))? - .seconds, - ), - end: Timestamp( - req.end_time - .as_ref() - .ok_or(Status::invalid_argument( - "invalid timestamp for start_time", - ))? - .seconds, - ), - }, - // TODO: should be a real value - time_resolution: RelativeDuration::minutes(5), - }, - ) - .await - .map_err(Into::::into)?; - - // TODO: remove this channel chaining once async iterators drop - let (tx_final, rx_final) = channel(req_len); - tokio::spawn(async move { - while let Some(i) = rx.recv().await { - match tx_final.send(i.map_err(|e| e.into())).await { - Ok(_) => { - // item (server response) was queued to be send to client - } - Err(_item) => { - // output_stream was build from rx and both are dropped - break; - } - }; - } - }); - - let output_stream = ReceiverStream::new(rx_final); - Ok(Response::new( - Box::pin(output_stream) as Self::ValidateSeriesStream - )) - } - - #[tracing::instrument] - async fn validate_spatial( - &self, - request: Request, - ) -> Result, Status> { - tracing::debug!("Got a request: {:?}", request); - - let req = request.into_inner(); - let req_len = req.tests.len(); - - let polygon: &Polygon = &req - .polygon - .into_iter() - .map(|point| GeoPoint { - lat: point.lat, - lon: point.lon, - }) - .collect::>(); - - let mut rx = self - .validate_spatial_direct( - req.spatial_id, - &req.tests, - polygon, - Timestamp( - req.time + let time_spec = TimeSpec { + timerange: Timerange { + start: Timestamp( + req.start_time .as_ref() .ok_or(Status::invalid_argument("invalid timestamp for start_time"))? .seconds, ), + end: Timestamp( + req.end_time + .as_ref() + .ok_or(Status::invalid_argument("invalid timestamp for start_time"))? + .seconds, + ), + }, + time_resolution: RelativeDuration::parse_from_iso8601(&req.time_resolution) + .map_err(|e| Status::invalid_argument(format!("invalid time_resolution: {}", e)))?, + }; + + // TODO: implementing From for SpaceSpec + // would make this much neater + let space_spec = match req.space_spec.unwrap() { + pb::validate_request::SpaceSpec::One(station_id) => SpaceSpec::One(station_id), + pb::validate_request::SpaceSpec::Polygon(pb_polygon) => SpaceSpec::Polygon( + pb_polygon + .polygon + .into_iter() + .map(|point| GeoPoint { + lat: point.lat, + lon: point.lon, + }) + .collect::>(), + ), + pb::validate_request::SpaceSpec::All(_) => SpaceSpec::All, + }; + + let mut rx = self + .validate_direct( + req.data_source, + &req.backing_sources, + &time_spec, + &space_spec, + &req.tests, + req.extra_spec.as_deref(), ) .await .map_err(Into::::into)?; @@ -162,7 +120,7 @@ impl Rove for Scheduler<'static> { let output_stream = ReceiverStream::new(rx_final); Ok(Response::new( - Box::pin(output_stream) as Self::ValidateSpatialStream + Box::pin(output_stream) as Self::ValidateStream )) } }