Skip to content

Commit

Permalink
unify validate_spatial and validate_series into validate
Browse files Browse the repository at this point in the history
  • Loading branch information
intarga committed Sep 24, 2024
1 parent 14bcf0f commit acd6fd6
Show file tree
Hide file tree
Showing 10 changed files with 324 additions and 461 deletions.
21 changes: 12 additions & 9 deletions met_connectors/src/frost/fetch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use chrono::{prelude::*, Duration};
use chronoutil::RelativeDuration;
use rove::data_switch::{self, DataCache, Polygon, SpaceSpec, TimeSpec, Timestamp};

#[allow(clippy::type_complexity)]
fn extract_data(
mut resp: serde_json::Value,
time: DateTime<Utc>,
request_time_resolution: RelativeDuration,
) -> Result<Vec<(Vec<FrostObs>, FrostLatLonElev)>, Error> {
) -> Result<Vec<((String, Vec<FrostObs>), FrostLatLonElev)>, Error> {
let ts_portion = resp
.get_mut("data")
.ok_or(Error::FindObs(
Expand All @@ -27,6 +28,8 @@ fn extract_data(
"couldn't find header field on tseries".to_string(),
))?;

let station_id = util::extract_station_id(header)?;

// TODO: Should there be a location for each observation?
let location = util::extract_location(header, time)?;

Expand All @@ -46,11 +49,11 @@ fn extract_data(
.take(),
)?;

Ok(Some((obs, location)))
Ok(Some(((station_id, obs), location)))
})
// Is there some smart way to avoid a double collect without making the error handling
// messy?
.collect::<Result<Vec<Option<(Vec<FrostObs>, FrostLatLonElev)>>, Error>>()?
.collect::<Result<Vec<Option<((String, Vec<FrostObs>), FrostLatLonElev)>>, Error>>()?
.into_iter()
.flatten()
.collect();
Expand Down Expand Up @@ -87,7 +90,7 @@ fn json_to_data_cache(

let processed_ts_vec = ts_vec
.into_iter()
.map(|(obses, location)| {
.map(|((station_id, obses), location)| {
// TODO: preallocate?
// let ts_length = (end_time - first_obs_time) / period;
let mut data = Vec::new();
Expand Down Expand Up @@ -149,9 +152,9 @@ fn json_to_data_cache(
curr_obs_time = curr_obs_time + period;
}

Ok((data, location))
Ok(((station_id, data), location))
})
.collect::<Result<Vec<(Vec<Option<f32>>, FrostLatLonElev)>, Error>>()?;
.collect::<Result<Vec<((String, Vec<Option<f32>>), FrostLatLonElev)>, Error>>()?;

Ok(DataCache::new(
processed_ts_vec.iter().map(|ts| ts.1.latitude).collect(),
Expand All @@ -166,8 +169,8 @@ fn json_to_data_cache(
}

pub async fn fetch_data_inner(
space_spec: SpaceSpec<'_>,
time_spec: TimeSpec,
space_spec: &SpaceSpec,
time_spec: &TimeSpec,
num_leading_points: u8,
num_trailing_points: u8,
extra_spec: Option<&str>,
Expand Down Expand Up @@ -362,7 +365,7 @@ mod tests {
Utc.with_ymd_and_hms(2023, 6, 26, 14, 0, 0).unwrap(),
);
assert_eq!(
series_cache.data[0],
series_cache.data[0].1,
vec![Some(27.3999996), Some(25.7999992), Some(26.)]
);
}
Expand Down
4 changes: 2 additions & 2 deletions met_connectors/src/frost/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,8 +102,8 @@ where
impl DataConnector for Frost {
async fn fetch_data(
&self,
space_spec: SpaceSpec<'_>,
time_spec: TimeSpec,
space_spec: &SpaceSpec,
time_spec: &TimeSpec,
num_leading_points: u8,
num_trailing_points: u8,
extra_spec: Option<&str>,
Expand Down
15 changes: 15 additions & 0 deletions met_connectors/src/frost/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,18 @@ pub fn extract_location(

Ok(lat_lon_elev)
}

pub fn extract_station_id(header: &mut serde_json::Value) -> Result<String, Error> {
let station_id = header
.get_mut("id")
.ok_or(Error::FindMetadata(
"couldn't find id in header".to_string(),
))?
.get_mut("stationid")
.ok_or(Error::FindMetadata(
"couldn't find stationid field in id".to_string(),
))?
.take();

Ok(serde_json::from_value(station_id)?)
}
13 changes: 9 additions & 4 deletions met_connectors/src/lustre_netatmo/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,11 @@ fn read_netatmo(timestamp: Timestamp) -> Result<DataCache, data_switch::Error> {
lats.push(record.lat);
lons.push(record.lon);
elevs.push(record.elev);
values.push(vec![Some(record.value)]);
values.push((
// would be nice if we could come up with better identifiers for this
format!("({},{})", record.lat, record.lon),
vec![Some(record.value)],
));
}
}

Expand All @@ -73,8 +77,8 @@ fn read_netatmo(timestamp: Timestamp) -> Result<DataCache, data_switch::Error> {
impl DataConnector for LustreNetatmo {
async fn fetch_data(
&self,
space_spec: SpaceSpec<'_>,
time_spec: TimeSpec,
space_spec: &SpaceSpec,
time_spec: &TimeSpec,
num_leading_points: u8,
num_trailing_points: u8,
_extra_spec: Option<&str>,
Expand All @@ -90,7 +94,8 @@ impl DataConnector for LustreNetatmo {

match space_spec {
SpaceSpec::All => {
tokio::task::spawn_blocking(move || read_netatmo(time_spec.timerange.start)).await?
let start_time = time_spec.timerange.start;
tokio::task::spawn_blocking(move || read_netatmo(start_time)).await?
}
SpaceSpec::One(_) => Err(data_switch::Error::UnimplementedSeries(
"netatmo files are only in timeslice format".to_string(),
Expand Down
93 changes: 45 additions & 48 deletions proto/rove.proto
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
syntax = "proto3";

import "google/protobuf/timestamp.proto";
import "google/protobuf/empty.proto";

package rove;

service Rove {
rpc ValidateSeries (ValidateSeriesRequest) returns (stream ValidateSeriesResponse) {}
rpc ValidateSpatial (ValidateSpatialRequest) returns (stream ValidateSpatialResponse) {}
// TODO: should we reconsider allowing results to stream, in favour of a more
// space efficient response format?
rpc Validate (ValidateRequest) returns (stream ValidateResponse) {}
}

message GeoPoint {
float lat = 1;
float lon = 2;
}

message Polygon {
repeated GeoPoint polygon = 1;
}

enum Flag { // WIP
PASS = 0;
FAIL = 1;
Expand All @@ -24,59 +30,50 @@ enum Flag { // WIP
ISOLATED = 6;
}

// tests a time series between start_time and end_time
message ValidateSeriesRequest {
// resource locator of the form <data source>:<source-specific identifier>
// e.g. oda:123456 where oda is a data source known to the system, and 123456
// is a timeseries ID in oda
string series_id = 1;
// if not provided, the start of the time series will be used
optional google.protobuf.Timestamp start_time = 2;
// if not provided, the end of the time series will be used
optional google.protobuf.Timestamp end_time = 3;
// list of the names of tests to be run on the data
repeated string tests = 4;
}

// tests a time slice with optional list of data sources to use, and a list of
// coordinates that define a polygon representing the area to be tested
message ValidateSpatialRequest {
// resource locator of the form <data source>:<source-specific identifier>
// e.g. frost:air_temperature where frost is a data source known to the system,
// and air_temperature is the element for the call to get data
string spatial_id = 1;
// extra data sources providing data to help qc the first source, but he data
// from these sources will not be qced themselves
message ValidateRequest {
// name of the data source you want to QC data from
string data_source = 1;
// extra data sources providing data to help QC the first source, but the data
// from these sources will not be QCed themselves
repeated string backing_sources = 2;
google.protobuf.Timestamp time = 3;
repeated string tests = 4;
// if not provided, the whole globe will be used
repeated GeoPoint polygon = 5;
// timestamps defining an inclusive range of time to QC data from
google.protobuf.Timestamp start_time = 3;
google.protobuf.Timestamp end_time = 4;
// an ISO 8601 duration stamp defining the time resolution of data do be QCed
// (e.g. "PT1H" for hourly data)
string time_resolution = 5;
// one of 3 specifiers can be used to spatially specify down the data to be
// QCed
oneof SpaceSpec {
// one series of data (i.e one data point per time step) with a string that
// will be passed to the data connector to identify it. This will likely
// represent something like a timeseries id, station id and param id pair,
// or similar, but is left up to the data connector/source to define
string one = 6;
// a series of lat-lon points defining a polygon in space
Polygon polygon = 7;
// no spatial restriction at all
google.protobuf.Empty all = 8;
}
// list of the names of tests to be run on the data
repeated string tests = 9;
// optional string containing extra information to be passed to the data
// connector, to further specify the data to be QCed
optional string extra_spec = 10;
}

message SeriesTestResult {
message TestResult {
google.protobuf.Timestamp time = 1;
Flag flag = 2;
}

message ValidateSeriesResponse {
// name of the test this flag is from
string test = 1;
// results for each data point in the series, paired with timestamps to
// identify the point
repeated SeriesTestResult results = 2;
}

message SpatialTestResult {
// TODO: maybe this should contain the series ID?
GeoPoint location = 1;
Flag flag = 2;
// data source defined identifier, it's recommended to use this to identify
// a timeseries/station/location as appropriate
string identifier = 2;
Flag flag = 3;
}

message ValidateSpatialResponse {
message ValidateResponse {
// name of the test this flag is from
string test = 1;
// results for each data point in the series, paired with geopoints to
// results for each data point, paired with timestamp and an identifier to
// identify the point
repeated SpatialTestResult results = 2;
repeated TestResult results = 2;
}
24 changes: 12 additions & 12 deletions src/data_switch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,14 +93,14 @@ pub struct GeoPoint {
/// A geospatial polygon
///
/// represented by its vertices as a sequence of lat-lon points
pub type Polygon = [GeoPoint];
pub type Polygon = Vec<GeoPoint>;

/// Specifier of which data to fetch from a source by location
pub enum SpaceSpec<'a> {
pub enum SpaceSpec {
/// One single timeseries, specified with a data_id
One(&'a str),
One(String),
/// A Polygon in lat-lon space defining the area from which to fetch data
Polygon(&'a Polygon),
Polygon(Polygon),
/// The whole data set
All,
}
Expand All @@ -113,10 +113,11 @@ pub enum SpaceSpec<'a> {
pub struct DataCache {
/// Vector of timeseries.
///
/// Each inner vector represents a timeseries, with its data points in chronological order.
/// Each inner vector represents a timeseries, tagged with a string
/// identifier, with its data points in chronological order.
/// All these timeseries are aligned on start_time and period.
/// `None`s represent gaps in the series.
pub data: Vec<Vec<Option<f32>>>,
pub data: Vec<(String, Vec<Option<f32>>)>,
/// Time of the first observation in data
pub start_time: Timestamp,
/// Period of the timeseries, i.e. the time gap between successive elements
Expand Down Expand Up @@ -147,7 +148,7 @@ impl DataCache {
period: RelativeDuration,
num_leading_points: u8,
num_trailing_points: u8,
data: Vec<Vec<Option<f32>>>,
data: Vec<(String, Vec<Option<f32>>)>,
) -> Self {
// TODO: ensure vecs have same size
Self {
Expand Down Expand Up @@ -224,9 +225,8 @@ pub trait DataConnector: Sync + std::fmt::Debug {
/// fetch specified data from the data source
async fn fetch_data(
&self,
space_spec: SpaceSpec<'_>,
// TODO: should this include a time resolution?
time_spec: TimeSpec,
space_spec: &SpaceSpec,
time_spec: &TimeSpec,
num_leading_points: u8,
num_trailing_points: u8,
extra_spec: Option<&str>,
Expand Down Expand Up @@ -275,8 +275,8 @@ impl<'ds> DataSwitch<'ds> {
pub(crate) async fn fetch_data(
&self,
data_source_id: &str,
space_spec: SpaceSpec<'_>,
time_spec: TimeSpec,
space_spec: &SpaceSpec,
time_spec: &TimeSpec,
num_leading_points: u8,
num_trailing_points: u8,
extra_spec: Option<&str>,
Expand Down
Loading

0 comments on commit acd6fd6

Please sign in to comment.