Skip to content

Commit

Permalink
Merge branch 'main' into refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
de-sh authored Dec 17, 2024
2 parents 63f0343 + 386a662 commit 8de3326
Show file tree
Hide file tree
Showing 3 changed files with 248 additions and 126 deletions.
123 changes: 52 additions & 71 deletions src/query/listing_table_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,11 @@ use datafusion::{
listing::{ListingOptions, ListingTable, ListingTableConfig, ListingTableUrl},
},
error::DataFusionError,
logical_expr::{col, SortExpr},
logical_expr::col,
};
use futures_util::{future, stream::FuturesUnordered, Future, TryStreamExt};
use futures_util::{stream::FuturesUnordered, Future, TryStreamExt};
use itertools::Itertools;
use object_store::{ObjectMeta, ObjectStore};
use object_store::{path::Path, ObjectMeta, ObjectStore};

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
Expand Down Expand Up @@ -60,25 +60,25 @@ impl ListingTableBuilder {
client: Arc<dyn ObjectStore>,
time_filters: &[PartialTimeFilter],
) -> Result<Self, DataFusionError> {
// Extract the minimum start time from the time filters.
let start_time = time_filters
.iter()
.filter_map(|x| match x {
PartialTimeFilter::Low(Bound::Excluded(x)) => Some(x),
PartialTimeFilter::Low(Bound::Included(x)) => Some(x),
.filter_map(|filter| match filter {
PartialTimeFilter::Low(Bound::Excluded(x))
| PartialTimeFilter::Low(Bound::Included(x)) => Some(x),
_ => None,
})
.min()
.cloned();
.min();

// Extract the maximum end time from the time filters.
let end_time = time_filters
.iter()
.filter_map(|x| match x {
PartialTimeFilter::High(Bound::Excluded(x)) => Some(x),
PartialTimeFilter::High(Bound::Included(x)) => Some(x),
.filter_map(|filter| match filter {
PartialTimeFilter::High(Bound::Excluded(x))
| PartialTimeFilter::High(Bound::Included(x)) => Some(x),
_ => None,
})
.max()
.cloned();
.max();

let Some((start_time, end_time)) = start_time.zip(end_time) else {
return Err(DataFusionError::NotImplemented(
Expand All @@ -87,62 +87,49 @@ impl ListingTableBuilder {
));
};

// Generate prefixes for the given time range
let prefixes = TimePeriod::new(
start_time.and_utc(),
end_time.and_utc(),
OBJECT_STORE_DATA_GRANULARITY,
)
.generate_prefixes();

let prefixes = prefixes
.into_iter()
.map(|entry| {
let path =
relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, entry));
storage.absolute_url(path.as_relative_path()).to_string()
})
.collect_vec();

let mut minute_resolve: HashMap<String, Vec<String>> = HashMap::new();
// Categorizes prefixes into "minute" and general resolve lists.
let mut minute_resolve = HashMap::<String, Vec<String>>::new();
let mut all_resolve = Vec::new();

for prefix in prefixes {
let components = prefix.split_terminator('/');
if components.last().is_some_and(|x| x.starts_with("minute")) {
let hour_prefix = &prefix[0..prefix.rfind("minute").expect("minute exists")];
let path = relative_path::RelativePathBuf::from(format!("{}/{}", &self.stream, prefix));
storage.absolute_url(path.as_relative_path()).to_string();
if let Some(pos) = prefix.rfind("minute") {
let hour_prefix = &prefix[..pos];
minute_resolve
.entry(hour_prefix.to_owned())
.and_modify(|list| list.push(prefix))
.or_default();
.or_default()
.push(prefix);
} else {
all_resolve.push(prefix)
all_resolve.push(prefix);
}
}

type ResolveFuture = Pin<
Box<dyn Future<Output = Result<Vec<ObjectMeta>, object_store::Error>> + Send + 'static>,
>;
// Pin<Box<dyn Future<Output = Result<BoxStream<'_, Result<ObjectMeta>>>> + Send + 'async_trait>>
// BoxStream<'_, Result<ObjectMeta>>
/// Resolve all prefixes asynchronously and collect the object metadata.
type ResolveFuture =
Pin<Box<dyn Future<Output = Result<Vec<ObjectMeta>, object_store::Error>> + Send>>;
let tasks: FuturesUnordered<ResolveFuture> = FuturesUnordered::new();

for (listing_prefix, prefix) in minute_resolve {
for (listing_prefix, prefixes) in minute_resolve {
let client = Arc::clone(&client);
tasks.push(Box::pin(async move {
let mut list = client
.list(Some(&object_store::path::Path::from(listing_prefix)))
.try_collect::<Vec<_>>()
.await?;
let path = Path::from(listing_prefix);
let mut objects = client.list(Some(&path)).try_collect::<Vec<_>>().await?;

list.retain(|object| {
prefix.iter().any(|prefix| {
object
.location
objects.retain(|obj| {
prefixes.iter().any(|prefix| {
obj.location
.prefix_matches(&object_store::path::Path::from(prefix.as_ref()))
})
});

Ok(list)
Ok(objects)
}));
}

Expand All @@ -157,25 +144,23 @@ impl ListingTableBuilder {
}));
}

let res: Vec<Vec<String>> = tasks
.and_then(|res| {
future::ok(
res.into_iter()
.map(|res| res.location.to_string())
.collect_vec(),
)
})
.try_collect()
let listing = tasks
.try_collect::<Vec<Vec<ObjectMeta>>>()
.await
.map_err(|err| DataFusionError::External(Box::new(err)))?;

let mut res = res.into_iter().flatten().collect_vec();
res.sort();
res.reverse();
.map_err(|err| DataFusionError::External(Box::new(err)))?
.into_iter()
.flat_map(|res| {
res.into_iter()
.map(|obj| obj.location.to_string())
.collect::<Vec<String>>()
})
.sorted()
.rev()
.collect_vec();

Ok(Self {
stream: self.stream,
listing: res,
listing,
})
}

Expand All @@ -188,25 +173,21 @@ impl ListingTableBuilder {
if self.listing.is_empty() {
return Ok(None);
}
let file_sort_order: Vec<Vec<SortExpr>>;
let file_format = ParquetFormat::default().with_enable_pruning(true);
if let Some(time_partition) = time_partition {
file_sort_order = vec![vec![col(time_partition).sort(true, false)]];
} else {
file_sort_order = vec![vec![col(DEFAULT_TIMESTAMP_KEY).sort(true, false)]];
}

let file_sort_order = vec![vec![time_partition
.map_or_else(|| col(DEFAULT_TIMESTAMP_KEY), col)
.sort(true, false)]];
let file_format = ParquetFormat::default().with_enable_pruning(true);
let listing_options = ListingOptions::new(Arc::new(file_format))
.with_file_extension(".parquet")
.with_file_sort_order(file_sort_order)
.with_collect_stat(true)
.with_target_partitions(1);

let config = ListingTableConfig::new_with_multi_paths(map(self.listing))
.with_listing_options(listing_options)
.with_schema(schema);
let listing_table = ListingTable::try_new(config)?;

let listing_table = Arc::new(ListingTable::try_new(config)?);
Ok(Some(listing_table))
Ok(Some(Arc::new(listing_table)))
}
}
Loading

0 comments on commit 8de3326

Please sign in to comment.