Skip to content

Commit

Permalink
Merge pull request #48 from rdettai/refacto-manifest-list-reader
Browse files Browse the repository at this point in the history
Move manifest list reader from spec crate
  • Loading branch information
JanKaul authored Nov 9, 2024
2 parents 3285b96 + 4bf2750 commit ee4761b
Show file tree
Hide file tree
Showing 12 changed files with 117 additions and 134 deletions.
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ serde_derive = "^1.0"
serde_json = "^1.0"
futures = "0.3.30"
async-trait = "0.1"
chrono = { version = "0.4", default-features = false, features = ["serde"] }
chrono = { version = "0.4", default-features = false, features = ["serde", "clock"] }
arrow = "52.1.0"
arrow-schema = "52.1.0"
datafusion = "40.0.0"
Expand All @@ -39,4 +39,3 @@ itertools = "0.10.5"
derive-getters = "0.3.0"
tracing = "0.1"
tracing-futures = "0.2"

8 changes: 4 additions & 4 deletions datafusion_iceberg/src/pruning_statistics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@
*
* Pruning is done on two levels:
*
* 1. Prune ManifestFiles based on information in Manifest_list_file
* 2. Prune DataFiles based on information in Manifest_file
* 1. Prune manifests based on information in manifests lists
* 2. Prune data files based on information in manifests
*
* For the first level the triat PruningStatistics is implemented for the DataFusionTable. It returns the pruning information for the manifest files
* For the first level the trait [`PruningStatistics`] is implemented for the DataFusionTable. It returns the pruning information for the manifest files
* and not the final data files.
*
* For the second level the trait PruningStatistics is implemented for the ManifestFile
* For the second level the trait PruningStatistics is implemented for the Manifest
*/

use std::any::Any;
Expand Down
1 change: 0 additions & 1 deletion iceberg-rust-spec/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,3 @@ url = { workspace = true }
derive_builder = "0.12.0"
thiserror = { workspace = true }
derive-getters = { workspace = true }
object_store = { workspace = true }
3 changes: 0 additions & 3 deletions iceberg-rust-spec/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,6 @@ pub enum Error {
/// Io error
#[error(transparent)]
IO(#[from] std::io::Error),
/// Objectstore error
#[error(transparent)]
ObjectStore(#[from] object_store::Error),
/// Try from slice error
#[error(transparent)]
TryFromSlice(#[from] std::array::TryFromSliceError),
Expand Down
55 changes: 9 additions & 46 deletions iceberg-rust-spec/src/spec/manifest_list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,9 @@
* Manifest lists
*/

use std::{
io::Read,
iter::{repeat, Map, Repeat, Zip},
sync::OnceLock,
};
use std::sync::OnceLock;

use apache_avro::{types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema};
use apache_avro::{types::Value as AvroValue, Schema as AvroSchema};
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use serde_repr::{Deserialize_repr, Serialize_repr};
Expand All @@ -23,39 +19,6 @@ use super::{
values::Value,
};

type ReaderZip<'a, 'metadata, R> = Zip<AvroReader<'a, R>, Repeat<&'metadata TableMetadata>>;
type ReaderMap<'a, 'metadata, R> = Map<
ReaderZip<'a, 'metadata, R>,
fn((Result<AvroValue, apache_avro::Error>, &TableMetadata)) -> Result<ManifestListEntry, Error>,
>;

/// Iterator of ManifestFileEntries
pub struct ManifestListReader<'a, 'metadata, R: Read> {
reader: ReaderMap<'a, 'metadata, R>,
}

impl<'a, 'metadata, R: Read> Iterator for ManifestListReader<'a, 'metadata, R> {
type Item = Result<ManifestListEntry, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.reader.next()
}
}

impl<'a, 'metadata, R: Read> ManifestListReader<'a, 'metadata, R> {
/// Create a new ManifestFile reader
pub fn new(reader: R, table_metadata: &'metadata TableMetadata) -> Result<Self, Error> {
let schema: &AvroSchema = match table_metadata.format_version {
FormatVersion::V1 => manifest_list_schema_v1(),
FormatVersion::V2 => manifest_list_schema_v2(),
};
Ok(Self {
reader: AvroReader::with_schema(schema, reader)?
.zip(repeat(table_metadata))
.map(avro_value_to_manifest_file),
})
}
}

#[derive(Debug, Serialize, PartialEq, Eq, Clone)]
#[serde(into = "ManifestListEntryEnum")]
/// A manifest list includes summary metadata that can be used to avoid scanning all of the manifests in a snapshot when planning a table scan.
Expand Down Expand Up @@ -300,7 +263,7 @@ impl ManifestListEntry {
}
}

pub(crate) fn try_from_v2(
pub fn try_from_v2(
entry: _serde::ManifestListEntryV2,
table_metadata: &TableMetadata,
) -> Result<ManifestListEntry, Error> {
Expand Down Expand Up @@ -344,7 +307,7 @@ impl ManifestListEntry {
})
}

pub(crate) fn try_from_v1(
pub fn try_from_v1(
entry: _serde::ManifestListEntryV1,
table_metadata: &TableMetadata,
) -> Result<ManifestListEntry, Error> {
Expand Down Expand Up @@ -688,12 +651,12 @@ pub fn manifest_list_schema_v2() -> &'static AvroSchema {
})
}

/// Convert an avro value to a [ManifestFile] according to the provided format version
pub(crate) fn avro_value_to_manifest_file(
value: (Result<AvroValue, apache_avro::Error>, &TableMetadata),
/// Convert an avro value result to a manifest list version according to the provided format version
pub fn avro_value_to_manifest_list_entry(
value: Result<AvroValue, apache_avro::Error>,
table_metadata: &TableMetadata,
) -> Result<ManifestListEntry, Error> {
let entry = value.0?;
let table_metadata = value.1;
let entry = value?;
match table_metadata.format_version {
FormatVersion::V1 => ManifestListEntry::try_from_v1(
apache_avro::from_value::<_serde::ManifestListEntryV1>(&entry)?,
Expand Down
33 changes: 2 additions & 31 deletions iceberg-rust-spec/src/spec/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,15 @@
*/
use std::{
collections::HashMap,
fmt,
io::Cursor,
str,
sync::Arc,
fmt, str,
time::{SystemTime, UNIX_EPOCH},
};

use derive_builder::Builder;
use derive_getters::Getters;
use object_store::ObjectStore;
use serde::{Deserialize, Serialize};

use crate::{error::Error, util};

use super::{
manifest_list::{ManifestListEntry, ManifestListReader},
table_metadata::TableMetadata,
};
use crate::error::Error;

use _serde::SnapshotEnum;

Expand Down Expand Up @@ -56,26 +47,6 @@ pub struct Snapshot {
schema_id: Option<i32>,
}

impl Snapshot {
// Return all manifest files associated to the latest table snapshot. Reads the related manifest_list file and returns its entries.
// If the manifest list file is empty returns an empty vector.
pub async fn manifests<'metadata>(
&self,
table_metadata: &'metadata TableMetadata,
object_store: Arc<dyn ObjectStore>,
) -> Result<impl Iterator<Item = Result<ManifestListEntry, Error>> + 'metadata, Error> {
let bytes: Cursor<Vec<u8>> = Cursor::new(
object_store
.get(&util::strip_prefix(&self.manifest_list).into())
.await?
.bytes()
.await?
.into(),
);
ManifestListReader::new(bytes, table_metadata).map_err(Into::into)
}
}

pub fn generate_snapshot_id() -> i64 {
let mut bytes: [u8; 8] = [0u8; 8];
getrandom::getrandom(&mut bytes).unwrap();
Expand Down
4 changes: 2 additions & 2 deletions iceberg-rust/src/table/manifest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ type ReaderMap<'a, R> = Map<
) -> Result<ManifestEntry, Error>,
>;

/// Iterator of ManifestFileEntries
/// Iterator of manifest entries
pub struct ManifestReader<'a, R: Read> {
reader: ReaderMap<'a, R>,
}
Expand All @@ -48,7 +48,7 @@ impl<'a, R: Read> Iterator for ManifestReader<'a, R> {
}

impl<'a, R: Read> ManifestReader<'a, R> {
/// Create a new ManifestFile reader
/// Create a new manifest reader
pub fn new(reader: R) -> Result<Self, Error> {
let reader = AvroReader::new(reader)?;
let metadata = reader.user_metadata();
Expand Down
76 changes: 76 additions & 0 deletions iceberg-rust/src/table/manifest_list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*!
* Helpers to deal with manifest lists and files
*/

use std::{
io::{Cursor, Read},
iter::{repeat, Map, Repeat, Zip},
sync::Arc,
};

use apache_avro::{types::Value as AvroValue, Reader as AvroReader, Schema as AvroSchema};
use iceberg_rust_spec::{
manifest_list::{
avro_value_to_manifest_list_entry, manifest_list_schema_v1, manifest_list_schema_v2,
ManifestListEntry,
},
snapshot::Snapshot,
table_metadata::{FormatVersion, TableMetadata},
util::strip_prefix,
};
use object_store::ObjectStore;

use crate::error::Error;

type ReaderZip<'a, 'metadata, R> = Zip<AvroReader<'a, R>, Repeat<&'metadata TableMetadata>>;
type ReaderMap<'a, 'metadata, R> = Map<
ReaderZip<'a, 'metadata, R>,
fn((Result<AvroValue, apache_avro::Error>, &TableMetadata)) -> Result<ManifestListEntry, Error>,
>;

/// Iterator of manifest list entries
pub struct ManifestListReader<'a, 'metadata, R: Read> {
reader: ReaderMap<'a, 'metadata, R>,
}

impl<'a, 'metadata, R: Read> Iterator for ManifestListReader<'a, 'metadata, R> {
type Item = Result<ManifestListEntry, Error>;
fn next(&mut self) -> Option<Self::Item> {
self.reader.next()
}
}

impl<'a, 'metadata, R: Read> ManifestListReader<'a, 'metadata, R> {
/// Create a new manifest list reader
pub fn new(reader: R, table_metadata: &'metadata TableMetadata) -> Result<Self, Error> {
let schema: &AvroSchema = match table_metadata.format_version {
FormatVersion::V1 => manifest_list_schema_v1(),
FormatVersion::V2 => manifest_list_schema_v2(),
};
Ok(Self {
reader: AvroReader::with_schema(schema, reader)?
.zip(repeat(table_metadata))
.map(|(avro_value_res, meta)| {
avro_value_to_manifest_list_entry(avro_value_res, meta).map_err(Error::from)
}),
})
}
}

/// Return all manifest files associated to the latest table snapshot. Reads the related manifest_list file and returns its entries.
/// If the manifest list file is empty returns an empty vector.
pub(crate) async fn read_snapshot<'metadata>(
snapshot: &Snapshot,
table_metadata: &'metadata TableMetadata,
object_store: Arc<dyn ObjectStore>,
) -> Result<impl Iterator<Item = Result<ManifestListEntry, Error>> + 'metadata, Error> {
let bytes: Cursor<Vec<u8>> = Cursor::new(
object_store
.get(&strip_prefix(snapshot.manifest_list()).into())
.await?
.bytes()
.await?
.into(),
);
ManifestListReader::new(bytes, table_metadata).map_err(Into::into)
}
Loading

0 comments on commit ee4761b

Please sign in to comment.