Skip to content

Commit

Permalink
chore: simplify MetadataFetcher to function (#1569)
Browse files Browse the repository at this point in the history
  • Loading branch information
AdamGS authored Dec 5, 2024
1 parent 8ae9137 commit 0942d55
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 44 deletions.
5 changes: 2 additions & 3 deletions vortex-datafusion/src/persistent/format.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use object_store::{ObjectMeta, ObjectStore};
use vortex_array::array::StructArray;
use vortex_array::arrow::infer_schema;
use vortex_array::Context;
use vortex_file::metadata::MetadataFetcher;
use vortex_file::metadata::fetch_metadata;
use vortex_file::{
read_initial_bytes, read_layout_from_initial, LayoutContext, LayoutDeserializer,
LayoutMessageCache, RelativeLayoutCache, Scan, VORTEX_FILE_EXTENSION,
Expand Down Expand Up @@ -118,8 +118,7 @@ impl FileFormat for VortexFormat {
stats.num_rows = Precision::Exact(row_count as usize);

let metadata_table =
MetadataFetcher::fetch(os_read_at, io.into(), root_layout, layout_message_cache)
.await?;
fetch_metadata(os_read_at, io.into(), root_layout, layout_message_cache).await?;

if let Some(metadata) = metadata_table {
let mut column_statistics = Vec::with_capacity(table_schema.fields().len());
Expand Down
58 changes: 17 additions & 41 deletions vortex-file/src/read/metadata.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
use std::future::Future;
use std::iter;
use std::iter::Once;
use std::pin::Pin;
use std::sync::{Arc, RwLock};
use std::task::{ready, Context, Poll};

use futures_util::{stream, StreamExt};
use vortex_array::ArrayData;
Expand All @@ -14,17 +10,6 @@ use super::{LayoutMessageCache, LayoutReader};
use crate::read::buffered::{BufferedLayoutReader, ReadMasked};
use crate::{MessageRead, RowMask};

type MetadataBufferedReader<R> = BufferedLayoutReader<
R,
stream::Iter<Once<VortexResult<RowMask>>>,
Vec<Option<ArrayData>>,
MetadataMaskReader,
>;

pub struct MetadataFetcher<R: VortexReadAt> {
metadata_reader: MetadataBufferedReader<R>,
}

struct MetadataMaskReader {
layout: Box<dyn LayoutReader>,
}
Expand All @@ -46,30 +31,21 @@ impl ReadMasked for MetadataMaskReader {
}
}

impl<R: VortexReadAt + Unpin> MetadataFetcher<R> {
pub fn fetch(
input: R,
dispatcher: Arc<IoDispatcher>,
root_layout: Box<dyn LayoutReader>,
layout_cache: Arc<RwLock<LayoutMessageCache>>,
) -> Self {
let metadata_reader = BufferedLayoutReader::new(
input,
dispatcher,
stream::iter(iter::once(Ok(RowMask::new_valid_between(0, 1)))),
MetadataMaskReader::new(root_layout),
layout_cache,
);
Self { metadata_reader }
}
}

impl<R: VortexReadAt + Unpin> Future for MetadataFetcher<R> {
type Output = VortexResult<Option<Vec<Option<ArrayData>>>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Ready(ready!(self.metadata_reader.poll_next_unpin(cx)).transpose())
}
pub async fn fetch_metadata<R: VortexReadAt + Unpin>(
input: R,
dispatcher: Arc<IoDispatcher>,
root_layout: Box<dyn LayoutReader>,
layout_cache: Arc<RwLock<LayoutMessageCache>>,
) -> VortexResult<Option<Vec<Option<ArrayData>>>> {
let mut metadata_reader = BufferedLayoutReader::new(
input,
dispatcher,
stream::iter(iter::once(Ok(RowMask::new_valid_between(0, 1)))),
MetadataMaskReader::new(root_layout),
layout_cache,
);

metadata_reader.next().await.transpose()
}

#[cfg(test)]
Expand All @@ -82,7 +58,7 @@ mod test {
use vortex_buffer::{Buffer, BufferString};
use vortex_io::IoDispatcher;

use crate::metadata::MetadataFetcher;
use crate::metadata::fetch_metadata;
use crate::{
read_initial_bytes, read_layout_from_initial, LayoutDeserializer, LayoutMessageCache,
RelativeLayoutCache, Scan, VortexFileWriter,
Expand Down Expand Up @@ -142,7 +118,7 @@ mod test {
)
.unwrap();
let io = IoDispatcher::default();
let metadata_table = MetadataFetcher::fetch(
let metadata_table = fetch_metadata(
written_bytes,
io.into(),
layout_reader,
Expand Down

0 comments on commit 0942d55

Please sign in to comment.