Skip to content

Commit

Permalink
refactor: reorganize query plan transformation and execution
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh committed Jan 12, 2025
1 parent 315e997 commit 1ac8977
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 85 deletions.
7 changes: 3 additions & 4 deletions src/handlers/airplane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,8 @@ impl FlightService for AirServiceImpl {
Status::internal(format!("Failed to process query: {e}"))
})?;

let stream_names = query.stream_names();
let stream_name = stream_names
.first()
let stream_name = query
.first_stream_name()
.ok_or_else(|| Status::internal("Failed to get stream name from query"))?;

let event = if send_to_ingester(
Expand Down Expand Up @@ -179,7 +178,7 @@ impl FlightService for AirServiceImpl {

let time = Instant::now();
let (records, _) = query
.execute(stream_name.clone())
.execute()
.await
.map_err(|err| Status::internal(err.to_string()))?;

Expand Down
8 changes: 3 additions & 5 deletions src/handlers/http/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,14 +131,12 @@ pub async fn query(
) -> Result<impl Responder, QueryError> {
let key = extract_session_key_from_req(&req)?;
let query = query_request.into_query(key).await?;

let stream_names = query.stream_names();
let first_stream_name = stream_names
.first()
let first_stream_name = query
.first_stream_name()
.ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?;

let time = Instant::now();
let (records, fields) = query.execute(first_stream_name.clone()).await?;
let (records, fields) = query.execute().await?;

let response = QueryResponse {
records,
Expand Down
146 changes: 70 additions & 76 deletions src/query/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,15 +122,10 @@ pub struct Query {
}

impl Query {
pub async fn execute(
&self,
stream_name: String,
) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
let time_partition = STREAM_INFO.get_time_partition(&stream_name)?;

let df = QUERY_SESSION
.execute_logical_plan(self.final_logical_plan(&time_partition))
.await?;
pub async fn execute(&self) -> Result<(Vec<RecordBatch>, Vec<String>), ExecuteError> {
let time_partitions = self.get_time_partitions()?;
let logical_plan = self.final_logical_plan(time_partitions);
let df = QUERY_SESSION.execute_logical_plan(logical_plan).await?;

let fields = df
.schema()
Expand All @@ -148,8 +143,21 @@ impl Query {
Ok((results, fields))
}

/// Get the time partitions for the streams mentioned in the query
fn get_time_partitions(&self) -> Result<HashMap<String, String>, ExecuteError> {
let mut time_partitions = HashMap::default();
for stream_name in self.stream_names.iter() {
let Some(time_partition) = STREAM_INFO.get_time_partition(stream_name)? else {
continue;
};
time_partitions.insert(stream_name.clone(), time_partition);
}

Ok(time_partitions)
}

/// return logical plan with all time filters applied through
fn final_logical_plan(&self, time_partition: &Option<String>) -> LogicalPlan {
fn final_logical_plan(&self, time_partitions: HashMap<String, String>) -> LogicalPlan {
// see https://github.com/apache/arrow-datafusion/pull/8400
// this can be eliminated in later version of datafusion but with slight caveat
// transform cannot modify stringified plans by itself
Expand All @@ -161,7 +169,7 @@ impl Query {
plan.plan.as_ref().clone(),
self.time_range.start.naive_utc(),
self.time_range.end.naive_utc(),
time_partition,
&time_partitions,
);
LogicalPlan::Explain(Explain {
verbose: plan.verbose,
Expand All @@ -178,15 +186,16 @@ impl Query {
x,
self.time_range.start.naive_utc(),
self.time_range.end.naive_utc(),
time_partition,
&time_partitions,
)
.data
}
}
}

pub fn stream_names(&self) -> &[String] {
&self.stream_names
// name of the main/first stream in the query
pub fn first_stream_name(&self) -> Option<&String> {
self.stream_names.first()
}
}

Expand Down Expand Up @@ -223,80 +232,65 @@ fn transform(
plan: LogicalPlan,
start_time: NaiveDateTime,
end_time: NaiveDateTime,
time_partition: &Option<String>,
time_partitions: &HashMap<String, String>,
) -> Transformed<LogicalPlan> {
plan.transform(&|plan| match plan {
LogicalPlan::TableScan(table) => {
let mut new_filters = vec![];
if !table_contains_any_time_filters(&table, time_partition) {
let mut _start_time_filter: Expr;
let mut _end_time_filter: Expr;
match time_partition {
Some(time_partition) => {
_start_time_filter =
PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(Expr::Column(Column::new(
Some(table.table_name.to_owned()),
time_partition.clone(),
)));
_end_time_filter =
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
.binary_expr(Expr::Column(Column::new(
Some(table.table_name.to_owned()),
time_partition,
)));
}
None => {
_start_time_filter =
PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(Expr::Column(Column::new(
Some(table.table_name.to_owned()),
event::DEFAULT_TIMESTAMP_KEY,
)));
_end_time_filter =
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time))
.binary_expr(Expr::Column(Column::new(
Some(table.table_name.to_owned()),
event::DEFAULT_TIMESTAMP_KEY,
)));
}
}

new_filters.push(_start_time_filter);
new_filters.push(_end_time_filter);
}
let new_filter = new_filters.into_iter().reduce(and);
if let Some(new_filter) = new_filter {
let filter =
Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap();
Ok(Transformed::yes(LogicalPlan::Filter(filter)))
} else {
Ok(Transformed::no(LogicalPlan::TableScan(table)))
}
plan.transform(|plan| {
let LogicalPlan::TableScan(table) = plan else {
return Ok(Transformed::no(plan));
};

// Early return if filters already exist
if query_can_be_filtered_on_stream_time_partition(&table, time_partitions) {
return Ok(Transformed::no(LogicalPlan::TableScan(table)));
}
x => Ok(Transformed::no(x)),

let stream = table.table_name.clone();
let time_partition = time_partitions
.get(stream.table())
.map(|x| x.as_str())
.unwrap_or(event::DEFAULT_TIMESTAMP_KEY);

// Create column expression once
let column_expr = Expr::Column(Column::new(Some(stream.clone()), time_partition));

// Build filters
let low_filter = PartialTimeFilter::Low(std::ops::Bound::Included(start_time))
.binary_expr(column_expr.clone());
let high_filter =
PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)).binary_expr(column_expr);

// Combine filters
let new_filter = and(low_filter, high_filter);
let filter = Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap();

Ok(Transformed::yes(LogicalPlan::Filter(filter)))
})
.expect("transform only transforms the tablescan")
}

fn table_contains_any_time_filters(
// check if the query contains the streams's time partition as filter
fn query_can_be_filtered_on_stream_time_partition(
table: &datafusion::logical_expr::TableScan,
time_partition: &Option<String>,
time_partitions: &HashMap<String, String>,
) -> bool {
table
.filters
.iter()
.filter_map(|x| {
if let Expr::BinaryExpr(binexpr) = x {
Some(binexpr)
} else {
None
}
.filter_map(|x| match x {
Expr::BinaryExpr(binexpr) => Some(binexpr),
_ => None,
})
.any(|expr| {
matches!(&*expr.left, Expr::Column(Column { name, .. })
if ((time_partition.is_some() && name == time_partition.as_ref().unwrap()) ||
(!time_partition.is_some() && name == event::DEFAULT_TIMESTAMP_KEY)))
.any(|expr| match &*expr.left {
Expr::Column(Column {
name: column_name, ..
}) => {
time_partitions
.get(table.table_name.table())
.map(|x| x.as_str())
.unwrap_or(event::DEFAULT_TIMESTAMP_KEY)
== column_name
}
_ => false,
})
}

Expand Down

0 comments on commit 1ac8977

Please sign in to comment.