Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make requests concurrently #13

Merged
merged 10 commits into from
Oct 5, 2024
162 changes: 80 additions & 82 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
//! <https://api.carbonintensity.org.uk/>

use chrono::{Datelike, Duration, Local, NaiveDate, NaiveDateTime, NaiveTime};
use futures::future;
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use thiserror::Error;
use url::ParseError;

mod region;
mod target;
Expand All @@ -24,11 +24,19 @@ pub enum ApiError {
RestError { status: StatusCode, body: String },
/// There was an error parsing a URL from a string.
#[error("Error parsing URL: {0}")]
UrlParseError(#[from] ParseError),
UrlParseError(#[from] url::ParseError),
#[error("Error parsing date: {0}")]
DateParseError(#[from] chrono::ParseError),
#[error("Error executing concurrent task: {0}")]
ConcurrentTaskFailedError(#[from] tokio::task::JoinError),
#[error("Error: {0}")]
Error(String),
}

pub type Result<T> = std::result::Result<T, ApiError>;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One reason why I introduced this was because writing down the types when massaging the return values from the tokio tasks was getting a bit out of control and error prone.


pub type IntensityForDate = (NaiveDateTime, i32);

#[derive(Debug, Serialize, Deserialize)]
pub struct GenerationMix {
fuel: String,
Expand Down Expand Up @@ -76,7 +84,7 @@ static BASE_URL: &str = "https://api.carbonintensity.org.uk";
/// Uses either
/// - <https://api.carbonintensity.org.uk/regional/postcode/>
/// - <https://api.carbonintensity.org.uk/regional/regionid/>
pub async fn get_intensity(target: &Target) -> Result<i32, ApiError> {
pub async fn get_intensity(target: &Target) -> Result<i32> {
let path = match target {
Target::Postcode(postcode) => {
if postcode.len() < 2 || postcode.len() > 4 {
Expand All @@ -94,7 +102,7 @@ pub async fn get_intensity(target: &Target) -> Result<i32, ApiError> {
get_intensity_for_url(&url).await
}

fn parse(date: &str) -> Result<NaiveDateTime, chrono::ParseError> {
fn parse_date(date: &str) -> std::result::Result<NaiveDateTime, chrono::ParseError> {
if let Ok(date) = NaiveDate::parse_from_str(date, "%Y-%m-%d") {
return Ok(date.and_hms_opt(0, 0, 0).unwrap());
}
Expand All @@ -105,37 +113,24 @@ fn parse(date: &str) -> Result<NaiveDateTime, chrono::ParseError> {
/// Normalises the start and end dates
/// returns ranges that are acceptable by the API
/// both in their duration and string representation
fn normalise_dates(
start: &str,
end: &Option<&str>,
) -> Result<Vec<(NaiveDateTime, NaiveDateTime)>, ApiError> {
let start_date: NaiveDateTime = match parse(start) {
Ok(res) => res,
Err(_err) => return Err(ApiError::Error("Invalid start date".to_string() + start)),
};
fn normalise_dates(start: &str, end: &Option<&str>) -> Result<Vec<(NaiveDateTime, NaiveDateTime)>> {
let start_date = parse_date(start)?;

let now = Local::now().naive_local();

// if the end is not set - use now
let mut end_date: NaiveDateTime;
if end.is_none() {
end_date = now;
} else {
// a date exists
let sd = parse(end.unwrap());
if sd.is_err() {
return Err(ApiError::Error(
"Invalid end date ".to_string() + end.unwrap(),
));
} else {
end_date = sd.unwrap();
let end_date = match end {
None => now,
Some(end_date) => {
let end_date = parse_date(end_date)?;
// check that the date is not in the future - otherwise set it to now
if now.and_utc().timestamp() < end_date.and_utc().timestamp() {
now
} else {
end_date
}
}
}

// check that the date is not in the future - otherwise set it to now
if now.and_utc().timestamp() < end_date.and_utc().timestamp() {
end_date = now;
}
};

// split into ranges
let mut ranges = Vec::new();
Expand Down Expand Up @@ -174,7 +169,7 @@ pub async fn get_intensities(
target: &Target,
start: &str,
end: &Option<&str>,
) -> Result<Vec<(NaiveDateTime, i32)>, ApiError> {
) -> Result<Vec<IntensityForDate>> {
let path = match target {
Target::Postcode(postcode) => {
if postcode.len() < 2 || postcode.len() > 4 {
Expand All @@ -191,58 +186,52 @@ pub async fn get_intensities(

let ranges = normalise_dates(start, end)?;

let mut output = Vec::new();

// TODO query in parallel
for (start_date, end_date) in ranges {
// shift dates by one minute
let start_date = start_date + Duration::minutes(1);
let end_date = end_date + Duration::minutes(1);
// format dates
let start_date = start_date.format("%Y-%m-%dT%H:%MZ");
let end_date = end_date.format("%Y-%m-%dT%H:%MZ");

let url = format!("{BASE_URL}/regional/intensity/{start_date}/{end_date}/{path}");
let region_data = get_intensities_for_url(&url).await?;
let mut tuples = to_tuple(region_data)?;
output.append(&mut tuples);
}
Ok(output)
// Spawns concurrent tasks...
let tasks: Vec<_> = ranges
.into_iter()
.map(|(start_date, end_date)| {
// shift dates by one minute
let start_date = start_date + Duration::minutes(1);
let end_date = end_date + Duration::minutes(1);
// format dates
let start_date = start_date.format("%Y-%m-%dT%H:%MZ");
let end_date = end_date.format("%Y-%m-%dT%H:%MZ");

let url = format!("{BASE_URL}/regional/intensity/{start_date}/{end_date}/{path}");

tokio::spawn(async move {
let region_data = get_intensities_for_url(&url).await?;
to_tuples(region_data)
})
})
.collect();

let tasks_results = future::try_join_all(tasks).await?;
tasks_results
.into_iter()
.collect::<Result<Vec<_>>>() // convert to single Result
.map(|nested_tuples| nested_tuples.into_iter().flatten().collect())
}

/// converts the values from JSON into a simpler
/// representation Vec<DateTime, float>
fn to_tuple(data: RegionData) -> Result<Vec<(NaiveDateTime, i32)>, ApiError> {
let mut values: Vec<(NaiveDateTime, i32)> = Vec::new();
fn to_tuples(data: RegionData) -> Result<Vec<IntensityForDate>> {
let mut values: Vec<IntensityForDate> = Vec::new();
for d in data.data {
let start_date = parse(&d.from).expect("Unparsable date");
let start_date = parse_date(&d.from)?;
let intensity = d.intensity.forecast;
values.push((start_date, intensity));
}
Ok(values)
}

async fn get_intensities_for_url(url: &str) -> Result<RegionData, ApiError> {
let client = Client::new();
let response = client.get(url).send().await?;

let status = response.status();

if status.is_success() {
let json_str = response.text().await?;
if let Ok(PowerData { data }) = serde_json::from_str::<PowerData>(&json_str) {
Ok(data)
} else {
Err(ApiError::Error(format!("Invalid JSON returned {json_str}")))
}
} else {
let body = response.text().await?;
Err(ApiError::RestError { status, body })
}
async fn get_intensities_for_url(url: &str) -> Result<RegionData> {
let PowerData { data } = get_response(url).await?;
Ok(data)
}

/// Retrieves the intensity value from a structure
async fn get_intensity_for_url(url: &str) -> Result<i32, ApiError> {
async fn get_intensity_for_url(url: &str) -> Result<i32> {
let result = get_instant_data(url).await?;

let intensity = result
Expand All @@ -259,22 +248,30 @@ async fn get_intensity_for_url(url: &str) -> Result<i32, ApiError> {
}

// Internal method to handle the querying and parsing
async fn get_instant_data(url: &str) -> Result<Root, ApiError> {
async fn get_instant_data(url: &str) -> Result<Root> {
get_response::<Root>(url).await
}

/// Makes a GET request to the given URL
///
/// Deserialize the JSON response as `T` and returns Ok<T> if all is well.
/// Returns an `ApiError` when the HTTP request failed or the response body
/// couldn't be deserialized as a `T` value.
async fn get_response<T>(url: &str) -> Result<T>
where
T: DeserializeOwned,
{
let client = Client::new();
let response = client.get(url).send().await?;

let status = response.status();

if status.is_success() {
if let Ok(root) = response.json::<Root>().await {
return Ok(root);
} else {
return Err(ApiError::Error("Invalid JSON returned".to_string()));
}
if !status.is_success() {
let body = response.text().await?;
return Err(ApiError::RestError { status, body });
}
// failure
let body = response.text().await?;
Err(ApiError::RestError { status, body })

let target = response.json::<T>().await?;
Ok(target)
}

#[cfg(test)]
Expand All @@ -288,7 +285,8 @@ mod tests {
{"data":{"regionid":11,"shortname":"South West England","postcode":"BS7","data":[{"from":"2022-12-31T23:30Z","to":"2023-01-01T00:00Z","intensity":{"forecast":152,"index":"moderate"},"generationmix":[{"fuel":"biomass","perc":1.4},{"fuel":"coal","perc":3.3},{"fuel":"imports","perc":14.3},{"fuel":"gas","perc":28.5},{"fuel":"nuclear","perc":7},{"fuel":"other","perc":0},{"fuel":"hydro","perc":0.5},{"fuel":"solar","perc":0},{"fuel":"wind","perc":45.1}]},{"from":"2023-01-01T00:00Z","to":"2023-01-01T00:30Z","intensity":{"forecast":181,"index":"moderate"},"generationmix":[{"fuel":"biomass","perc":1.4},{"fuel":"coal","perc":3.4},{"fuel":"imports","perc":9.1},{"fuel":"gas","perc":36.1},{"fuel":"nuclear","perc":6.8},{"fuel":"other","perc":0},{"fuel":"hydro","perc":0.4},{"fuel":"solar","perc":0},{"fuel":"wind","perc":42.8}]},{"from":"2023-01-01T00:30Z","to":"2023-01-01T01:00Z","intensity":{"forecast":189,"index":"moderate"},"generationmix":[{"fuel":"biomass","perc":1.3},{"fuel":"coal","perc":3.4},{"fuel":"imports","perc":12.1},{"fuel":"gas","perc":37.6},{"fuel":"nuclear","perc":6.4},{"fuel":"other","perc":0},{"fuel":"hydro","perc":0.4},{"fuel":"solar","perc":0},{"fuel":"wind","perc":38.8}]},{"from":"2023-01-01T01:00Z","to":"2023-01-01T01:30Z","intensity":{"forecast":183,"index":"moderate"},"generationmix":[{"fuel":"biomass","perc":1.7},{"fuel":"coal","perc":3.2},{"fuel":"imports","perc":6.1},{"fuel":"gas","perc":37.3},{"fuel":"nuclear","perc":7.3},{"fuel":"other","perc":0},{"fuel":"hydro","perc":0.4},{"fuel":"solar","perc":0},{"fuel":"wind","perc":44}]},{"from":"2023-01-01T01:30Z","to":"2023-01-01T02:00Z","intensity":{"forecast":175,"index":"moderate"},"generationmix":[{"fuel":"biomass","perc":1.5},{"fuel":"coal","perc":2.9},{"fuel":"imports","perc":6.6},{"fuel":"gas","perc":36},{"fuel":"nuclear","perc":7.2},{"fuel":"other","perc":0},{"fuel":"hydro","perc":0.4},{"fuel":"solar","perc":0},{"fuel":"wind","perc":45.5}]}]}}
"#;

let result: Result<PowerData, serde_json::Error> = serde_json::from_str(json_str);
let result: std::result::Result<PowerData, serde_json::Error> =
serde_json::from_str(json_str);
println!("{:?}", result);
}

Expand Down
Loading