Skip to content

Commit

Permalink
Merge pull request #13 from jnioche/concurrently
Browse files Browse the repository at this point in the history
Make requests concurrently
  • Loading branch information
jnioche authored Oct 5, 2024
2 parents 031fc5e + 9787590 commit 9d67439
Showing 1 changed file with 80 additions and 82 deletions.
162 changes: 80 additions & 82 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@
//! <https://api.carbonintensity.org.uk/>

use chrono::{Datelike, Duration, Local, NaiveDate, NaiveDateTime, NaiveTime};
use futures::future;
use reqwest::{Client, StatusCode};
use serde::{Deserialize, Serialize};
use serde::{de::DeserializeOwned, Deserialize, Serialize};
use thiserror::Error;
use url::ParseError;

mod region;
mod target;
Expand All @@ -24,11 +24,19 @@ pub enum ApiError {
RestError { status: StatusCode, body: String },
/// There was an error parsing a URL from a string.
#[error("Error parsing URL: {0}")]
UrlParseError(#[from] ParseError),
UrlParseError(#[from] url::ParseError),
#[error("Error parsing date: {0}")]
DateParseError(#[from] chrono::ParseError),
#[error("Error executing concurrent task: {0}")]
ConcurrentTaskFailedError(#[from] tokio::task::JoinError),
#[error("Error: {0}")]
Error(String),
}

pub type Result<T> = std::result::Result<T, ApiError>;

pub type IntensityForDate = (NaiveDateTime, i32);

#[derive(Debug, Serialize, Deserialize)]
pub struct GenerationMix {
fuel: String,
Expand Down Expand Up @@ -76,7 +84,7 @@ static BASE_URL: &str = "https://api.carbonintensity.org.uk";
/// Uses either
/// - <https://api.carbonintensity.org.uk/regional/postcode/>
/// - <https://api.carbonintensity.org.uk/regional/regionid/>
pub async fn get_intensity(target: &Target) -> Result<i32, ApiError> {
pub async fn get_intensity(target: &Target) -> Result<i32> {
let path = match target {
Target::Postcode(postcode) => {
if postcode.len() < 2 || postcode.len() > 4 {
Expand All @@ -94,7 +102,7 @@ pub async fn get_intensity(target: &Target) -> Result<i32, ApiError> {
get_intensity_for_url(&url).await
}

fn parse(date: &str) -> Result<NaiveDateTime, chrono::ParseError> {
fn parse_date(date: &str) -> std::result::Result<NaiveDateTime, chrono::ParseError> {
if let Ok(date) = NaiveDate::parse_from_str(date, "%Y-%m-%d") {
return Ok(date.and_hms_opt(0, 0, 0).unwrap());
}
Expand All @@ -105,37 +113,24 @@ fn parse(date: &str) -> Result<NaiveDateTime, chrono::ParseError> {
/// Normalises the start and end dates
/// returns ranges that are acceptable by the API
/// both in their duration and string representation
fn normalise_dates(
start: &str,
end: &Option<&str>,
) -> Result<Vec<(NaiveDateTime, NaiveDateTime)>, ApiError> {
let start_date: NaiveDateTime = match parse(start) {
Ok(res) => res,
Err(_err) => return Err(ApiError::Error("Invalid start date".to_string() + start)),
};
fn normalise_dates(start: &str, end: &Option<&str>) -> Result<Vec<(NaiveDateTime, NaiveDateTime)>> {
let start_date = parse_date(start)?;

let now = Local::now().naive_local();

// if the end is not set - use now
let mut end_date: NaiveDateTime;
if end.is_none() {
end_date = now;
} else {
// a date exists
let sd = parse(end.unwrap());
if sd.is_err() {
return Err(ApiError::Error(
"Invalid end date ".to_string() + end.unwrap(),
));
} else {
end_date = sd.unwrap();
let end_date = match end {
None => now,
Some(end_date) => {
let end_date = parse_date(end_date)?;
// check that the date is not in the future - otherwise set it to now
if now.and_utc().timestamp() < end_date.and_utc().timestamp() {
now
} else {
end_date
}
}
}

// check that the date is not in the future - otherwise set it to now
if now.and_utc().timestamp() < end_date.and_utc().timestamp() {
end_date = now;
}
};

// split into ranges
let mut ranges = Vec::new();
Expand Down Expand Up @@ -174,7 +169,7 @@ pub async fn get_intensities(
target: &Target,
start: &str,
end: &Option<&str>,
) -> Result<Vec<(NaiveDateTime, i32)>, ApiError> {
) -> Result<Vec<IntensityForDate>> {
let path = match target {
Target::Postcode(postcode) => {
if postcode.len() < 2 || postcode.len() > 4 {
Expand All @@ -191,58 +186,52 @@ pub async fn get_intensities(

let ranges = normalise_dates(start, end)?;

let mut output = Vec::new();

// TODO query in parallel
for (start_date, end_date) in ranges {
// shift dates by one minute
let start_date = start_date + Duration::minutes(1);
let end_date = end_date + Duration::minutes(1);
// format dates
let start_date = start_date.format("%Y-%m-%dT%H:%MZ");
let end_date = end_date.format("%Y-%m-%dT%H:%MZ");

let url = format!("{BASE_URL}/regional/intensity/{start_date}/{end_date}/{path}");
let region_data = get_intensities_for_url(&url).await?;
let mut tuples = to_tuple(region_data)?;
output.append(&mut tuples);
}
Ok(output)
// Spawns concurrent tasks...
let tasks: Vec<_> = ranges
.into_iter()
.map(|(start_date, end_date)| {
// shift dates by one minute
let start_date = start_date + Duration::minutes(1);
let end_date = end_date + Duration::minutes(1);
// format dates
let start_date = start_date.format("%Y-%m-%dT%H:%MZ");
let end_date = end_date.format("%Y-%m-%dT%H:%MZ");

let url = format!("{BASE_URL}/regional/intensity/{start_date}/{end_date}/{path}");

tokio::spawn(async move {
let region_data = get_intensities_for_url(&url).await?;
to_tuples(region_data)
})
})
.collect();

let tasks_results = future::try_join_all(tasks).await?;
tasks_results
.into_iter()
.collect::<Result<Vec<_>>>() // convert to single Result
.map(|nested_tuples| nested_tuples.into_iter().flatten().collect())
}

/// converts the values from JSON into a simpler
/// representation Vec<DateTime, float>
fn to_tuple(data: RegionData) -> Result<Vec<(NaiveDateTime, i32)>, ApiError> {
let mut values: Vec<(NaiveDateTime, i32)> = Vec::new();
fn to_tuples(data: RegionData) -> Result<Vec<IntensityForDate>> {
let mut values: Vec<IntensityForDate> = Vec::new();
for d in data.data {
let start_date = parse(&d.from).expect("Unparsable date");
let start_date = parse_date(&d.from)?;
let intensity = d.intensity.forecast;
values.push((start_date, intensity));
}
Ok(values)
}

async fn get_intensities_for_url(url: &str) -> Result<RegionData, ApiError> {
let client = Client::new();
let response = client.get(url).send().await?;

let status = response.status();

if status.is_success() {
let json_str = response.text().await?;
if let Ok(PowerData { data }) = serde_json::from_str::<PowerData>(&json_str) {
Ok(data)
} else {
Err(ApiError::Error(format!("Invalid JSON returned {json_str}")))
}
} else {
let body = response.text().await?;
Err(ApiError::RestError { status, body })
}
async fn get_intensities_for_url(url: &str) -> Result<RegionData> {
let PowerData { data } = get_response(url).await?;
Ok(data)
}

/// Retrieves the intensity value from a structure
async fn get_intensity_for_url(url: &str) -> Result<i32, ApiError> {
async fn get_intensity_for_url(url: &str) -> Result<i32> {
let result = get_instant_data(url).await?;

let intensity = result
Expand All @@ -259,22 +248,30 @@ async fn get_intensity_for_url(url: &str) -> Result<i32, ApiError> {
}

// Internal method to handle the querying and parsing
async fn get_instant_data(url: &str) -> Result<Root, ApiError> {
async fn get_instant_data(url: &str) -> Result<Root> {
get_response::<Root>(url).await
}

/// Makes a GET request to the given URL
///
/// Deserialize the JSON response as `T` and returns Ok<T> if all is well.
/// Returns an `ApiError` when the HTTP request failed or the response body
/// couldn't be deserialized as a `T` value.
async fn get_response<T>(url: &str) -> Result<T>
where
T: DeserializeOwned,
{
let client = Client::new();
let response = client.get(url).send().await?;

let status = response.status();

if status.is_success() {
if let Ok(root) = response.json::<Root>().await {
return Ok(root);
} else {
return Err(ApiError::Error("Invalid JSON returned".to_string()));
}
if !status.is_success() {
let body = response.text().await?;
return Err(ApiError::RestError { status, body });
}
// failure
let body = response.text().await?;
Err(ApiError::RestError { status, body })

let target = response.json::<T>().await?;
Ok(target)
}

#[cfg(test)]
Expand All @@ -288,7 +285,8 @@ mod tests {
{"data":{"regionid":11,"shortname":"South West England","postcode":"BS7","data":[{"from":"2022-12-31T23:30Z","to":"2023-01-01T00:00Z","intensity":{"forecast":152,"index":"moderate"},"generationmix":[{"fuel":"biomass","perc":1.4},{"fuel":"coal","perc":3.3},{"fuel":"imports","perc":14.3},{"fuel":"gas","perc":28.5},{"fuel":"nuclear","perc":7},{"fuel":"other","perc":0},{"fuel":"hydro","perc":0.5},{"fuel":"solar","perc":0},{"fuel":"wind","perc":45.1}]},{"from":"2023-01-01T00:00Z","to":"2023-01-01T00:30Z","intensity":{"forecast":181,"index":"moderate"},"generationmix":[{"fuel":"biomass","perc":1.4},{"fuel":"coal","perc":3.4},{"fuel":"imports","perc":9.1},{"fuel":"gas","perc":36.1},{"fuel":"nuclear","perc":6.8},{"fuel":"other","perc":0},{"fuel":"hydro","perc":0.4},{"fuel":"solar","perc":0},{"fuel":"wind","perc":42.8}]},{"from":"2023-01-01T00:30Z","to":"2023-01-01T01:00Z","intensity":{"forecast":189,"index":"moderate"},"generationmix":[{"fuel":"biomass","perc":1.3},{"fuel":"coal","perc":3.4},{"fuel":"imports","perc":12.1},{"fuel":"gas","perc":37.6},{"fuel":"nuclear","perc":6.4},{"fuel":"other","perc":0},{"fuel":"hydro","perc":0.4},{"fuel":"solar","perc":0},{"fuel":"wind","perc":38.8}]},{"from":"2023-01-01T01:00Z","to":"2023-01-01T01:30Z","intensity":{"forecast":183,"index":"moderate"},"generationmix":[{"fuel":"biomass","perc":1.7},{"fuel":"coal","perc":3.2},{"fuel":"imports","perc":6.1},{"fuel":"gas","perc":37.3},{"fuel":"nuclear","perc":7.3},{"fuel":"other","perc":0},{"fuel":"hydro","perc":0.4},{"fuel":"solar","perc":0},{"fuel":"wind","perc":44}]},{"from":"2023-01-01T01:30Z","to":"2023-01-01T02:00Z","intensity":{"forecast":175,"index":"moderate"},"generationmix":[{"fuel":"biomass","perc":1.5},{"fuel":"coal","perc":2.9},{"fuel":"imports","perc":6.6},{"fuel":"gas","perc":36},{"fuel":"nuclear","perc":7.2},{"fuel":"other","perc":0},{"fuel":"hydro","perc":0.4},{"fuel":"solar","perc":0},{"fuel":"wind","perc":45.5}]}]}}
"#;

let result: Result<PowerData, serde_json::Error> = serde_json::from_str(json_str);
let result: std::result::Result<PowerData, serde_json::Error> =
serde_json::from_str(json_str);
println!("{:?}", result);
}

Expand Down

0 comments on commit 9d67439

Please sign in to comment.