From 818b4f9ebfb1564f3322198dc01c5db0c28e8efa Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 7 Dec 2024 01:09:47 +0530 Subject: [PATCH 1/6] refactor: improve readability of `ListingTableBuilder` --- src/query/listing_table_builder.rs | 123 ++++++++++++----------------- 1 file changed, 52 insertions(+), 71 deletions(-) diff --git a/src/query/listing_table_builder.rs b/src/query/listing_table_builder.rs index 685a34a4b..0be3d23be 100644 --- a/src/query/listing_table_builder.rs +++ b/src/query/listing_table_builder.rs @@ -25,11 +25,11 @@ use datafusion::{ listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl}, }, error::DataFusionError, - logical_expr::{col, SortExpr}, + logical_expr::col, }; -use futures_util::{future, stream::FuturesUnordered, Future, TryStreamExt}; +use futures_util::{stream::FuturesUnordered, Future, TryStreamExt}; use itertools::Itertools; -use object_store::{ObjectMeta, ObjectStore}; +use object_store::{path::Path, ObjectMeta, ObjectStore}; use crate::{ event::DEFAULT_TIMESTAMP_KEY, @@ -60,25 +60,25 @@ impl ListingTableBuilder { client: Arc, time_filters: &[PartialTimeFilter], ) -> Result { + // Extract the minimum start time from the time filters. let start_time = time_filters .iter() - .filter_map(|x| match x { - PartialTimeFilter::Low(Bound::Excluded(x)) => Some(x), - PartialTimeFilter::Low(Bound::Included(x)) => Some(x), + .filter_map(|filter| match filter { + PartialTimeFilter::Low(Bound::Excluded(x)) + | PartialTimeFilter::Low(Bound::Included(x)) => Some(x), _ => None, }) - .min() - .cloned(); + .min(); + // Extract the maximum end time from the time filters. let end_time = time_filters .iter() - .filter_map(|x| match x { - PartialTimeFilter::High(Bound::Excluded(x)) => Some(x), - PartialTimeFilter::High(Bound::Included(x)) => Some(x), + .filter_map(|filter| match filter { + PartialTimeFilter::High(Bound::Excluded(x)) + | PartialTimeFilter::High(Bound::Included(x)) => Some(x), _ => None, }) - .max() - .cloned(); + .max(); let Some((start_time, end_time)) = start_time.zip(end_time) else { return Err(DataFusionError::NotImplemented( @@ -87,6 +87,7 @@ impl ListingTableBuilder { )); }; + // Generate prefixes for the given time range let prefixes = TimePeriod::new( start_time.and_utc(), end_time.and_utc(), @@ -94,55 +95,41 @@ impl ListingTableBuilder { ) .generate_prefixes(); - let prefixes = prefixes - .into_iter() - .map(|entry| { - let path = - relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, entry)); - storage.absolute_url(path.as_relative_path()).to_string() - }) - .collect_vec(); - - let mut minute_resolve: HashMap> = HashMap::new(); + // Categorizes prefixes into "minute" and general resolve lists. + let mut minute_resolve = HashMap::>::new(); let mut all_resolve = Vec::new(); - for prefix in prefixes { - let components = prefix.split_terminator('/'); - if components.last().is_some_and(|x| x.starts_with("minute")) { - let hour_prefix = &prefix[0..prefix.rfind("minute").expect("minute exists")]; + let path = relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix)); + storage.absolute_url(path.as_relative_path()).to_string(); + if let Some(pos) = prefix.rfind("minute") { + let hour_prefix = &prefix[..pos]; minute_resolve .entry(hour_prefix.to_owned()) - .and_modify(|list| list.push(prefix)) - .or_default(); + .or_default() + .push(prefix); } else { - all_resolve.push(prefix) + all_resolve.push(prefix); } } - type ResolveFuture = Pin< - Box, object_store::Error>> + Send + 'static>, - >; - // Pin>>> + Send + 'async_trait>> - // BoxStream<'_, Result> + /// Resolve all prefixes asynchronously and collect the object metadata. + type ResolveFuture = + Pin, object_store::Error>> + Send>>; let tasks: FuturesUnordered = FuturesUnordered::new(); - - for (listing_prefix, prefix) in minute_resolve { + for (listing_prefix, prefixes) in minute_resolve { let client = Arc::clone(&client); tasks.push(Box::pin(async move { - let mut list = client - .list(Some(&object_store::path::Path::from(listing_prefix))) - .try_collect::>() - .await?; + let path = Path::from(listing_prefix); + let mut objects = client.list(Some(&path)).try_collect::>().await?; - list.retain(|object| { - prefix.iter().any(|prefix| { - object - .location + objects.retain(|obj| { + prefixes.iter().any(|prefix| { + obj.location .prefix_matches(&object_store::path::Path::from(prefix.as_ref())) }) }); - Ok(list) + Ok(objects) })); } @@ -157,25 +144,23 @@ impl ListingTableBuilder { })); } - let res: Vec> = tasks - .and_then(|res| { - future::ok( - res.into_iter() - .map(|res| res.location.to_string()) - .collect_vec(), - ) - }) - .try_collect() + let listing = tasks + .try_collect::>>() .await - .map_err(|err| DataFusionError::External(Box::new(err)))?; - - let mut res = res.into_iter().flatten().collect_vec(); - res.sort(); - res.reverse(); + .map_err(|err| DataFusionError::External(Box::new(err)))? + .into_iter() + .flat_map(|res| { + res.into_iter() + .map(|obj| obj.location.to_string()) + .collect::>() + }) + .sorted() + .rev() + .collect_vec(); Ok(Self { stream: self.stream, - listing: res, + listing, }) } @@ -188,25 +173,21 @@ impl ListingTableBuilder { if self.listing.is_empty() { return Ok(None); } - let file_sort_order: Vec>; - let file_format = ParquetFormat::default().with_enable_pruning(true); - if let Some(time_partition) = time_partition { - file_sort_order = vec![vec![col(time_partition).sort(true, false)]]; - } else { - file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]]; - } + let file_sort_order = time_partition + .map(|time_partition| vec![vec![col(time_partition).sort(true, false)]]) + .unwrap_or_else(|| vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]]); + let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension(".parquet") .with_file_sort_order(file_sort_order) .with_collect_stat(true) .with_target_partitions(1); - let config = ListingTableConfig::new_with_multi_paths(map(self.listing)) .with_listing_options(listing_options) .with_schema(schema); + let listing_table = ListingTable::try_new(config)?; - let listing_table = Arc::new(ListingTable::try_new(config)?); - Ok(Some(listing_table)) + Ok(Some(Arc::new(listing_table))) } } From ab00ca1d82479b9c37a31e24c183bdd77066f662 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 7 Dec 2024 02:32:06 +0530 Subject: [PATCH 2/6] refactor: `extract_timestamp_bound` --- src/query/stream_schema_provider.rs | 58 +++++++++++++---------------- 1 file changed, 26 insertions(+), 32 deletions(-) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index f27cb6998..f3b857ef4 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -687,7 +687,7 @@ impl PartialTimeFilter { let Expr::BinaryExpr(binexpr) = expr else { return None; }; - let (op, time) = extract_timestamp_bound(binexpr.clone(), time_partition)?; + let (op, time) = extract_timestamp_bound(binexpr, time_partition)?; let value = match op { Operator::Gt => PartialTimeFilter::Low(Bound::Excluded(time)), Operator::GtEq => PartialTimeFilter::Low(Bound::Included(time)), @@ -846,7 +846,7 @@ fn expr_in_boundary(filter: &Expr) -> bool { let Expr::BinaryExpr(binexpr) = filter else { return false; }; - let Some((op, time)) = extract_timestamp_bound(binexpr.clone(), None) else { + let Some((op, time)) = extract_timestamp_bound(binexpr, None) else { return false; }; @@ -860,39 +860,33 @@ fn expr_in_boundary(filter: &Expr) -> bool { ) } -fn extract_from_lit(expr: BinaryExpr, time_partition: Option) -> Option { - let mut column_name: String = String::default(); - if let Expr::Column(column) = *expr.left { - column_name = column.name; - } - if let Expr::Literal(value) = *expr.right { - match value { - ScalarValue::TimestampMillisecond(Some(value), _) => { - Some(DateTime::from_timestamp_millis(value).unwrap().naive_utc()) - } - ScalarValue::TimestampNanosecond(Some(value), _) => { - Some(DateTime::from_timestamp_nanos(value).naive_utc()) - } - ScalarValue::Utf8(Some(str_value)) => { - if time_partition.is_some() && column_name == time_partition.unwrap() { - Some(str_value.parse::().unwrap()) - } else { - None - } - } - _ => None, - } - } else { - None - } -} - -/* `BinaryExp` doesn't implement `Copy` */ fn extract_timestamp_bound( - binexpr: BinaryExpr, + binexpr: &BinaryExpr, time_partition: Option, ) -> Option<(Operator, NaiveDateTime)> { - Some((binexpr.op, extract_from_lit(binexpr, time_partition)?)) + let Expr::Literal(value) = binexpr.right.as_ref() else { + return None; + }; + + let is_time_partition = match (binexpr.left.as_ref(), time_partition) { + (Expr::Column(column), Some(time_partition)) => column.name == time_partition, + _ => false, + }; + + match value { + ScalarValue::TimestampMillisecond(Some(value), _) => Some(( + binexpr.op, + DateTime::from_timestamp_millis(*value).unwrap().naive_utc(), + )), + ScalarValue::TimestampNanosecond(Some(value), _) => Some(( + binexpr.op, + DateTime::from_timestamp_nanos(*value).naive_utc(), + )), + ScalarValue::Utf8(Some(str_value)) if is_time_partition => { + Some((binexpr.op, str_value.parse::().unwrap())) + } + _ => None, + } } async fn collect_manifest_files( From 15cb2a9ccf502132c0932c126ea9cfaf6fc6f30b Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 7 Dec 2024 03:02:18 +0530 Subject: [PATCH 3/6] refactor: don't clone --- src/query/stream_schema_provider.rs | 22 +++++++++++----------- src/utils/arrow/flight.rs | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index f3b857ef4..73c4db657 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -24,7 +24,7 @@ use crate::{ storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, }; use arrow_array::RecordBatch; -use arrow_schema::{Schema, SchemaRef, SortOptions}; +use arrow_schema::{SchemaRef, SortOptions}; use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; use datafusion::catalog::Session; @@ -333,12 +333,12 @@ impl TableProvider for StandardTableProvider { .await .map_err(|err| DataFusionError::Plan(err.to_string()))?; let time_partition = object_store_format.time_partition; - let mut time_filters = extract_primary_filter(filters, time_partition.clone()); + let mut time_filters = extract_primary_filter(filters, &time_partition); if time_filters.is_empty() { return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); } - if include_now(filters, time_partition.clone()) { + if include_now(filters, &time_partition) { if let Some(records) = event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema) { @@ -683,7 +683,7 @@ pub enum PartialTimeFilter { } impl PartialTimeFilter { - fn try_from_expr(expr: &Expr, time_partition: Option) -> Option { + fn try_from_expr(expr: &Expr, time_partition: &Option) -> Option { let Expr::BinaryExpr(binexpr) = expr else { return None; }; @@ -814,7 +814,7 @@ fn return_listing_time_filters( } } -pub fn include_now(filters: &[Expr], time_partition: Option) -> bool { +pub fn include_now(filters: &[Expr], time_partition: &Option) -> bool { let current_minute = Utc::now() .with_second(0) .and_then(|x| x.with_nanosecond(0)) @@ -846,7 +846,7 @@ fn expr_in_boundary(filter: &Expr) -> bool { let Expr::BinaryExpr(binexpr) = filter else { return false; }; - let Some((op, time)) = extract_timestamp_bound(binexpr, None) else { + let Some((op, time)) = extract_timestamp_bound(binexpr, &None) else { return false; }; @@ -862,14 +862,14 @@ fn expr_in_boundary(filter: &Expr) -> bool { fn extract_timestamp_bound( binexpr: &BinaryExpr, - time_partition: Option, + time_partition: &Option, ) -> Option<(Operator, NaiveDateTime)> { let Expr::Literal(value) = binexpr.right.as_ref() else { return None; }; let is_time_partition = match (binexpr.left.as_ref(), time_partition) { - (Expr::Column(column), Some(time_partition)) => column.name == time_partition, + (Expr::Column(column), Some(time_partition)) => &column.name == time_partition, _ => false, }; @@ -911,15 +911,15 @@ async fn collect_manifest_files( .collect()) } -// extract start time and end time from filter preficate +// Extract start time and end time from filter predicate fn extract_primary_filter( filters: &[Expr], - time_partition: Option, + time_partition: &Option, ) -> Vec { let mut time_filters = Vec::new(); filters.iter().for_each(|expr| { let _ = expr.apply(&mut |expr| { - let time = PartialTimeFilter::try_from_expr(expr, time_partition.clone()); + let time = PartialTimeFilter::try_from_expr(expr, time_partition); if let Some(time) = time { time_filters.push(time); Ok(TreeNodeRecursion::Stop) diff --git a/src/utils/arrow/flight.rs b/src/utils/arrow/flight.rs index 2fb6bd849..6781f2d6e 100644 --- a/src/utils/arrow/flight.rs +++ b/src/utils/arrow/flight.rs @@ -135,7 +135,7 @@ pub fn send_to_ingester(start: i64, end: i64) -> bool { ); let ex = [Expr::BinaryExpr(ex1), Expr::BinaryExpr(ex2)]; - CONFIG.parseable.mode == Mode::Query && include_now(&ex, None) + CONFIG.parseable.mode == Mode::Query && include_now(&ex, &None) } fn lit_timestamp_milli(time: i64) -> Expr { From 7e00e5bd7293e823f3409d2d324026715e574d3f Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 7 Dec 2024 03:03:42 +0530 Subject: [PATCH 4/6] refactor: iter collect time filter extraction --- src/query/stream_schema_provider.rs | 28 +++++++++++++++------------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 73c4db657..17d8aae98 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -916,19 +916,21 @@ fn extract_primary_filter( filters: &[Expr], time_partition: &Option, ) -> Vec { - let mut time_filters = Vec::new(); - filters.iter().for_each(|expr| { - let _ = expr.apply(&mut |expr| { - let time = PartialTimeFilter::try_from_expr(expr, time_partition); - if let Some(time) = time { - time_filters.push(time); - Ok(TreeNodeRecursion::Stop) - } else { - Ok(TreeNodeRecursion::Jump) - } - }); - }); - time_filters + filters + .iter() + .filter_map(|expr| { + let mut time_filter = None; + let _ = expr.apply(&mut |expr| { + if let Some(time) = PartialTimeFilter::try_from_expr(expr, &time_partition) { + time_filter = Some(time); + Ok(TreeNodeRecursion::Stop) // Stop further traversal + } else { + Ok(TreeNodeRecursion::Jump) // Skip this node + } + }); + time_filter + }) + .collect() } trait ManifestExt: ManifestFile { From 435db2f0fe82ceddc52e97d64bc4e3a9456ee5e4 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Sat, 7 Dec 2024 03:45:44 +0530 Subject: [PATCH 5/6] ci: fix suggestions --- src/query/listing_table_builder.rs | 6 +++--- src/query/stream_schema_provider.rs | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/src/query/listing_table_builder.rs b/src/query/listing_table_builder.rs index 0be3d23be..0a3edf2bf 100644 --- a/src/query/listing_table_builder.rs +++ b/src/query/listing_table_builder.rs @@ -174,9 +174,9 @@ impl ListingTableBuilder { return Ok(None); } - let file_sort_order = time_partition - .map(|time_partition| vec![vec![col(time_partition).sort(true, false)]]) - .unwrap_or_else(|| vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]]); + let file_sort_order = vec![vec![time_partition + .map_or_else(|| col(DEFAULT_TIMESTAMP_KEY), col) + .sort(true, false)]]; let file_format = ParquetFormat::default().with_enable_pruning(true); let listing_options = ListingOptions::new(Arc::new(file_format)) .with_file_extension(".parquet") diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 17d8aae98..cc612d4f8 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -24,7 +24,7 @@ use crate::{ storage::{ObjectStoreFormat, STREAM_ROOT_DIRECTORY}, }; use arrow_array::RecordBatch; -use arrow_schema::{SchemaRef, SortOptions}; +use arrow_schema::{Schema, SchemaRef, SortOptions}; use bytes::Bytes; use chrono::{DateTime, NaiveDateTime, Timelike, Utc}; use datafusion::catalog::Session; @@ -921,7 +921,7 @@ fn extract_primary_filter( .filter_map(|expr| { let mut time_filter = None; let _ = expr.apply(&mut |expr| { - if let Some(time) = PartialTimeFilter::try_from_expr(expr, &time_partition) { + if let Some(time) = PartialTimeFilter::try_from_expr(expr, time_partition) { time_filter = Some(time); Ok(TreeNodeRecursion::Stop) // Stop further traversal } else { From 9452eb669a7cb58443c7dae14bf6e4daaeae92de Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Mon, 9 Dec 2024 13:26:03 +0530 Subject: [PATCH 6/6] test: `extract_timestamp_bound` --- src/query/stream_schema_provider.rs | 149 +++++++++++++++++++++++++++- 1 file changed, 147 insertions(+), 2 deletions(-) diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index cc612d4f8..0766aa2e5 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -1044,11 +1044,16 @@ fn satisfy_constraints(value: CastRes, op: Operator, stats: &TypedStatistics) -> mod tests { use std::ops::Add; - use chrono::{DateTime, Duration, NaiveDate, NaiveTime, Utc}; + use chrono::{DateTime, Duration, NaiveDate, NaiveDateTime, NaiveTime, Utc}; + use datafusion::{ + logical_expr::{BinaryExpr, Operator}, + prelude::Expr, + scalar::ScalarValue, + }; use crate::catalog::snapshot::ManifestItem; - use super::{is_overlapping_query, PartialTimeFilter}; + use super::{extract_timestamp_bound, is_overlapping_query, PartialTimeFilter}; fn datetime_min(year: i32, month: u32, day: u32) -> DateTime { NaiveDate::from_ymd_opt(year, month, day) @@ -1131,4 +1136,144 @@ mod tests { assert!(!res) } + + #[test] + fn timestamp_in_milliseconds() { + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::TimestampMillisecond( + Some(1672531200000), + None, + ))), + }; + + let time_partition = Some("timestamp_column".to_string()); + let result = extract_timestamp_bound(&binexpr, &time_partition); + + let expected = Some(( + Operator::Eq, + NaiveDateTime::parse_from_str("2023-01-01 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(), + )); + + assert_eq!(result, expected); + } + + #[test] + fn timestamp_in_nanoseconds() { + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Gt, + right: Box::new(Expr::Literal(ScalarValue::TimestampNanosecond( + Some(1672531200000000000), + None, + ))), + }; + + let time_partition = Some("timestamp_column".to_string()); + let result = extract_timestamp_bound(&binexpr, &time_partition); + + let expected = Some(( + Operator::Gt, + NaiveDateTime::parse_from_str("2023-01-01 00:00:00", "%Y-%m-%d %H:%M:%S").unwrap(), + )); + + assert_eq!(result, expected); + } + + #[test] + fn string_timestamp() { + let timestamp = "2023-01-01T00:00:00"; + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Lt, + right: Box::new(Expr::Literal(ScalarValue::Utf8(Some(timestamp.to_owned())))), + }; + + let time_partition = Some("timestamp_column".to_string()); + let result = extract_timestamp_bound(&binexpr, &time_partition); + + let expected = Some(( + Operator::Lt, + NaiveDateTime::parse_from_str(timestamp, "%Y-%m-%dT%H:%M:%S").unwrap(), + )); + + assert_eq!(result, expected); + } + + #[test] + fn unexpected_utf8_column() { + let timestamp = "2023-01-01T00:00:00"; + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("other_column".into())), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Utf8(Some(timestamp.to_owned())))), + }; + + let time_partition = Some("timestamp_column".to_string()); + let result = extract_timestamp_bound(&binexpr, &time_partition); + + assert!(result.is_none()); + } + + #[test] + fn unsupported_literal_type() { + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::Int32(Some(42)))), + }; + + let time_partition = Some("timestamp_column".to_string()); + let result = extract_timestamp_bound(&binexpr, &time_partition); + + assert!(result.is_none()); + } + + #[test] + fn no_literal_on_right() { + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Eq, + right: Box::new(Expr::Column("other_column".into())), + }; + + let time_partition = Some("timestamp_column".to_string()); + let result = extract_timestamp_bound(&binexpr, &time_partition); + + assert!(result.is_none()); + } + + #[test] + fn non_time_partition_timestamps() { + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::TimestampMillisecond( + Some(1672531200000), + None, + ))), + }; + + let time_partition = None; + let result = extract_timestamp_bound(&binexpr, &time_partition); + let expected = Some(( + Operator::Eq, + NaiveDateTime::parse_from_str("2023-01-01T00:00:00", "%Y-%m-%dT%H:%M:%S").unwrap(), + )); + + assert_eq!(result, expected); + + let binexpr = BinaryExpr { + left: Box::new(Expr::Column("timestamp_column".into())), + op: Operator::Eq, + right: Box::new(Expr::Literal(ScalarValue::TimestampNanosecond( + Some(1672531200000000000), + None, + ))), + }; + let result = extract_timestamp_bound(&binexpr, &time_partition); + + assert_eq!(result, expected); + } }