Skip to content

Commit

Permalink
refactor: utils/time parsing (#1024)
Browse files Browse the repository at this point in the history
TimeRange is a new type to de-serialize strings into start 
and end Datetime and store it for representing a time range 
elsewhere in code. Currently usage is mainly in query.

---------

Co-authored-by: Nitish Tiwari <[email protected]>
  • Loading branch information
de-sh and nitisht authored Dec 17, 2024
1 parent 386a662 commit f1af947
Show file tree
Hide file tree
Showing 5 changed files with 207 additions and 75 deletions.
61 changes: 33 additions & 28 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ use crate::utils::arrow::flight::{
append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc,
send_to_ingester,
};
use crate::utils::time::TimeRange;
use arrow_flight::{
flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData,
FlightDescriptor, FlightInfo, HandshakeRequest, HandshakeResponse, PutResult, SchemaAsIpc,
Expand Down Expand Up @@ -143,6 +144,8 @@ impl FlightService for AirServiceImpl {
Status::internal("Failed to create logical plan")
})?;

let time_range = TimeRange::parse_human_time(&ticket.start_time, &ticket.end_time)
.map_err(|e| Status::internal(e.to_string()))?;
// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);
Expand All @@ -159,38 +162,40 @@ impl FlightService for AirServiceImpl {
.map_err(|err| Status::internal(err.to_string()))?;

// map payload to query
let mut query = into_query(&ticket, &session_state)
let mut query = into_query(&ticket, &session_state, time_range)
.await
.map_err(|_| Status::internal("Failed to parse query"))?;

let event =
if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) {
let sql = format!("select * from {}", &stream_name);
let start_time = ticket.start_time.clone();
let end_time = ticket.end_time.clone();
let out_ticket = json!({
"query": sql,
"startTime": start_time,
"endTime": end_time
})
.to_string();

let ingester_metadatas = get_ingestor_info()
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let mut minute_result: Vec<RecordBatch> = vec![];

for im in ingester_metadatas {
if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await {
minute_result.append(&mut batches);
}
let event = if send_to_ingester(
query.time_range.start.timestamp_millis(),
query.time_range.end.timestamp_millis(),
) {
let sql = format!("select * from {}", &stream_name);
let start_time = ticket.start_time.clone();
let end_time = ticket.end_time.clone();
let out_ticket = json!({
"query": sql,
"startTime": start_time,
"endTime": end_time
})
.to_string();

let ingester_metadatas = get_ingestor_info()
.await
.map_err(|err| Status::failed_precondition(err.to_string()))?;
let mut minute_result: Vec<RecordBatch> = vec![];

for im in ingester_metadatas {
if let Ok(mut batches) = run_do_get_rpc(im, out_ticket.clone()).await {
minute_result.append(&mut batches);
}
let mr = minute_result.iter().collect::<Vec<_>>();
let event = append_temporary_events(&stream_name, mr).await?;
Some(event)
} else {
None
};
}
let mr = minute_result.iter().collect::<Vec<_>>();
let event = append_temporary_events(&stream_name, mr).await?;
Some(event)
} else {
None
};

// try authorize
match Users.authorize(key.clone(), rbac::role::Action::Query, None, None) {
Expand Down
51 changes: 10 additions & 41 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ use crate::response::QueryResponse;
use crate::storage::object_storage::commit_schema_to_storage;
use crate::storage::ObjectStorageError;
use crate::utils::actix::extract_session_key_from_req;
use crate::utils::time::{TimeParseError, TimeRange};

use super::modal::utils::logstream_utils::create_stream_and_schema_from_storage;

Expand Down Expand Up @@ -80,13 +81,17 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result<impl Respon
.await?
}
};

let time_range =
TimeRange::parse_human_time(&query_request.start_time, &query_request.end_time)?;

// create a visitor to extract the table name
let mut visitor = TableScanVisitor::default();
let _ = raw_logical_plan.visit(&mut visitor);

let tables = visitor.into_inner();
update_schema_when_distributed(tables).await?;
let mut query: LogicalQuery = into_query(&query_request, &session_state).await?;
let mut query: LogicalQuery = into_query(&query_request, &session_state, time_range).await?;

let creds = extract_session_key_from_req(&req)?;
let permissions = Users.get_permissions(&creds);
Expand Down Expand Up @@ -218,6 +223,7 @@ impl FromRequest for Query {
pub async fn into_query(
query: &Query,
session_state: &SessionState,
time_range: TimeRange,
) -> Result<LogicalQuery, QueryError> {
if query.query.is_empty() {
return Err(QueryError::EmptyQuery);
Expand All @@ -231,42 +237,13 @@ pub async fn into_query(
return Err(QueryError::EmptyEndTime);
}

let (start, end) = parse_human_time(&query.start_time, &query.end_time)?;

if start.timestamp() > end.timestamp() {
return Err(QueryError::StartTimeAfterEndTime);
}

Ok(crate::query::Query {
raw_logical_plan: session_state.create_logical_plan(&query.query).await?,
start,
end,
time_range,
filter_tag: query.filter_tags.clone(),
})
}

fn parse_human_time(
start_time: &str,
end_time: &str,
) -> Result<(DateTime<Utc>, DateTime<Utc>), QueryError> {
let start: DateTime<Utc>;
let end: DateTime<Utc>;

if end_time == "now" {
end = Utc::now();
start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?;
} else {
start = DateTime::parse_from_rfc3339(start_time)
.map_err(|_| QueryError::StartTimeParse)?
.into();
end = DateTime::parse_from_rfc3339(end_time)
.map_err(|_| QueryError::EndTimeParse)?
.into();
};

Ok((start, end))
}

/// unused for now, might need it in the future
#[allow(unused)]
fn transform_query_for_ingestor(query: &Query) -> Option<Query> {
Expand Down Expand Up @@ -312,16 +289,8 @@ pub enum QueryError {
EmptyStartTime,
#[error("End time cannot be empty")]
EmptyEndTime,
#[error("Could not parse start time correctly")]
StartTimeParse,
#[error("Could not parse end time correctly")]
EndTimeParse,
#[error("While generating times for 'now' failed to parse duration")]
NotValidDuration(#[from] humantime::DurationError),
#[error("Parsed duration out of range")]
OutOfRange(#[from] chrono::OutOfRangeError),
#[error("Start time cannot be greater than the end time")]
StartTimeAfterEndTime,
#[error("Error while parsing provided time range: {0}")]
TimeParse(#[from] TimeParseError),
#[error("Unauthorized")]
Unauthorized,
#[error("Datafusion Error: {0}")]
Expand Down
12 changes: 6 additions & 6 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ use crate::event;
use crate::metadata::STREAM_INFO;
use crate::option::CONFIG;
use crate::storage::{ObjectStorageProvider, StorageDir};
use crate::utils::time::TimeRange;

pub static QUERY_SESSION: Lazy<SessionContext> =
Lazy::new(|| Query::create_session_context(CONFIG.storage()));
Expand All @@ -54,8 +55,7 @@ pub static QUERY_SESSION: Lazy<SessionContext> =
#[derive(Debug)]
pub struct Query {
pub raw_logical_plan: LogicalPlan,
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
pub time_range: TimeRange,
pub filter_tag: Option<Vec<String>>,
}

Expand Down Expand Up @@ -164,8 +164,8 @@ impl Query {
LogicalPlan::Explain(plan) => {
let transformed = transform(
plan.plan.as_ref().clone(),
self.start.naive_utc(),
self.end.naive_utc(),
self.time_range.start.naive_utc(),
self.time_range.end.naive_utc(),
filters,
time_partition,
);
Expand All @@ -182,8 +182,8 @@ impl Query {
x => {
transform(
x,
self.start.naive_utc(),
self.end.naive_utc(),
self.time_range.start.naive_utc(),
self.time_range.end.naive_utc(),
filters,
time_partition,
)
Expand Down
1 change: 1 addition & 0 deletions src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ pub mod actix;
pub mod arrow;
pub mod header_parsing;
pub mod json;
pub mod time;
pub mod uid;
pub mod update;
use crate::handlers::http::rbac::RBACError;
Expand Down
157 changes: 157 additions & 0 deletions src/utils/time.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
/*
* Parseable Server (C) 2022 - 2024 Parseable, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*
*/

use chrono::{DateTime, Utc};

#[derive(Debug, thiserror::Error)]
pub enum TimeParseError {
#[error("Parsing humantime")]
HumanTime(#[from] humantime::DurationError),
#[error("Out of Range")]
OutOfRange(#[from] chrono::OutOfRangeError),
#[error("Error parsing time: {0}")]
Chrono(#[from] chrono::ParseError),
#[error("Start time cannot be greater than the end time")]
StartTimeAfterEndTime,
}

/// Represents a range of time with a start and end point.
#[derive(Debug)]
pub struct TimeRange {
pub start: DateTime<Utc>,
pub end: DateTime<Utc>,
}

impl TimeRange {
/// Parses human-readable time strings into a `TimeRange` object.
///
/// # Arguments
/// - `start_time`: A string representing the start of the time range. This can either be
/// a human-readable duration (e.g., `"2 hours"`) or an RFC 3339 formatted timestamp.
/// - `end_time`: A string representing the end of the time range. This can either be
/// the keyword `"now"` (to represent the current time) or an RFC 3339 formatted timestamp.
///
/// # Errors
/// - `TimeParseError::StartTimeAfterEndTime`: Returned when the parsed start time is later than the end time.
/// - Any error that might occur during parsing of durations or RFC 3339 timestamps.
///
/// # Example
/// ```ignore
/// let range = TimeRange::parse_human_time("2 hours", "now");
/// let range = TimeRange::parse_human_time("2023-01-01T12:00:00Z", "2023-01-01T15:00:00Z");
/// ```
pub fn parse_human_time(start_time: &str, end_time: &str) -> Result<Self, TimeParseError> {
let start: DateTime<Utc>;
let end: DateTime<Utc>;

if end_time == "now" {
end = Utc::now();
start = end - chrono::Duration::from_std(humantime::parse_duration(start_time)?)?;
} else {
start = DateTime::parse_from_rfc3339(start_time)?.into();
end = DateTime::parse_from_rfc3339(end_time)?.into();
};

if start > end {
return Err(TimeParseError::StartTimeAfterEndTime);
}

Ok(Self { start, end })
}
}

#[cfg(test)]
mod tests {
use super::*;
use chrono::{Duration, SecondsFormat, Utc};

#[test]
fn valid_rfc3339_timestamps() {
let start_time = "2023-01-01T12:00:00Z";
let end_time = "2023-01-01T13:00:00Z";

let result = TimeRange::parse_human_time(start_time, end_time);
let parsed = result.unwrap();

assert_eq!(
parsed.start.to_rfc3339_opts(SecondsFormat::Secs, true),
start_time
);
assert_eq!(
parsed.end.to_rfc3339_opts(SecondsFormat::Secs, true),
end_time
);
}

#[test]
fn end_time_now_with_valid_duration() {
let start_time = "1h";
let end_time = "now";

let result = TimeRange::parse_human_time(start_time, end_time);
let parsed = result.unwrap();

assert!(parsed.end <= Utc::now());
assert_eq!(parsed.end - parsed.start, Duration::hours(1));

let start_time = "30 minutes";
let end_time = "now";

let result = TimeRange::parse_human_time(start_time, end_time);
let parsed = result.unwrap();

assert!(parsed.end <= Utc::now());
assert_eq!(parsed.end - parsed.start, Duration::minutes(30));
}

#[test]
fn start_time_after_end_time() {
let start_time = "2023-01-01T14:00:00Z";
let end_time = "2023-01-01T13:00:00Z";

let result = TimeRange::parse_human_time(start_time, end_time);
assert!(matches!(result, Err(TimeParseError::StartTimeAfterEndTime)));
}

#[test]
fn invalid_start_time_format() {
let start_time = "not-a-valid-time";
let end_time = "2023-01-01T13:00:00Z";

let result = TimeRange::parse_human_time(start_time, end_time);
assert!(matches!(result, Err(TimeParseError::Chrono(_))));
}

#[test]
fn invalid_end_time_format() {
let start_time = "2023-01-01T12:00:00Z";
let end_time = "not-a-valid-time";

let result = TimeRange::parse_human_time(start_time, end_time);
assert!(matches!(result, Err(TimeParseError::Chrono(_))));
}

#[test]
fn invalid_duration_with_end_time_now() {
let start_time = "not-a-duration";
let end_time = "now";

let result = TimeRange::parse_human_time(start_time, end_time);
assert!(matches!(result, Err(TimeParseError::HumanTime(_))));
}
}

0 comments on commit f1af947

Please sign in to comment.