From 4275c6b70731e181e9f1c92fac737584a17f6c81 Mon Sep 17 00:00:00 2001 From: WenjunMin Date: Mon, 9 Sep 2024 19:55:47 +0800 Subject: [PATCH] feat(spec): Impl the ManifestFile read functionality (#69) --- crates/paimon/Cargo.toml | 1 + crates/paimon/src/spec/data_file.rs | 44 +++-- crates/paimon/src/spec/manifest_entry.rs | 130 ++++++++++++++ crates/paimon/src/spec/manifest_file_meta.rs | 59 +------ crates/paimon/src/spec/manifest_list.rs | 111 ------------ crates/paimon/src/spec/mod.rs | 7 +- crates/paimon/src/spec/objects_file.rs | 162 ++++++++++++++++++ crates/paimon/src/spec/stats.rs | 77 +++++++++ ...est-8ded1f09-fcda-489e-9167-582ac0f9f846-0 | Bin 0 -> 1787 bytes 9 files changed, 403 insertions(+), 188 deletions(-) create mode 100644 crates/paimon/src/spec/manifest_entry.rs delete mode 100644 crates/paimon/src/spec/manifest_list.rs create mode 100644 crates/paimon/src/spec/objects_file.rs create mode 100644 crates/paimon/src/spec/stats.rs create mode 100644 crates/paimon/tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0 diff --git a/crates/paimon/Cargo.toml b/crates/paimon/Cargo.toml index 959475f..ceb6100 100644 --- a/crates/paimon/Cargo.toml +++ b/crates/paimon/Cargo.toml @@ -44,6 +44,7 @@ serde = { version = "1", features = ["derive"] } serde_bytes = "0.11.15" serde_json = "1.0.120" serde_with = "3.9.0" +serde_repr = "0.1" snafu = "0.8.3" typed-builder = "^0.19" opendal = { version = "0.49", features = ["services-fs"] } diff --git a/crates/paimon/src/spec/data_file.rs b/crates/paimon/src/spec/data_file.rs index 37165e6..17fab24 100644 --- a/crates/paimon/src/spec/data_file.rs +++ b/crates/paimon/src/spec/data_file.rs @@ -15,7 +15,9 @@ // specific language governing permissions and limitations // under the License. -use crate::spec::RowType; +use crate::spec::stats::BinaryTableStats; +use chrono::serde::ts_milliseconds::deserialize as from_millis; +use chrono::serde::ts_milliseconds::serialize as to_millis; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; @@ -48,12 +50,6 @@ impl BinaryRow { } } -/// TODO: implement me. -/// The statistics for columns, supports the following stats. -/// -/// Impl References: -type SimpleStats = (); - /// The Source of a file. /// TODO: move me to the manifest module. /// @@ -72,25 +68,43 @@ pub enum FileSource { #[derive(Debug, Eq, PartialEq, Serialize, Deserialize)] #[serde(rename_all = "camelCase")] pub struct DataFileMeta { + #[serde(rename = "_FILE_NAME")] pub file_name: String, + #[serde(rename = "_FILE_SIZE")] pub file_size: i64, // row_count tells the total number of rows (including add & delete) in this file. + #[serde(rename = "_ROW_COUNT")] pub row_count: i64, - pub min_key: BinaryRow, - pub max_key: BinaryRow, - pub key_stats: SimpleStats, - pub value_stats: SimpleStats, + #[serde(rename = "_MIN_KEY", with = "serde_bytes")] + pub min_key: Vec, + #[serde(rename = "_MAX_KEY", with = "serde_bytes")] + pub max_key: Vec, + #[serde(rename = "_KEY_STATS")] + pub key_stats: BinaryTableStats, + #[serde(rename = "_VALUE_STATS")] + pub value_stats: BinaryTableStats, + #[serde(rename = "_MIN_SEQUENCE_NUMBER")] pub min_sequence_number: i64, + #[serde(rename = "_MAX_SEQUENCE_NUMBER")] pub max_sequence_number: i64, + #[serde(rename = "_SCHEMA_ID")] pub schema_id: i64, + #[serde(rename = "_LEVEL")] pub level: i32, + #[serde(rename = "_EXTRA_FILES")] pub extra_files: Vec, + #[serde( + rename = "_CREATION_TIME", + serialize_with = "to_millis", + deserialize_with = "from_millis" + )] pub creation_time: DateTime, + #[serde(rename = "_DELETE_ROW_COUNT")] // rowCount = add_row_count + delete_row_count. pub delete_row_count: Option, // file index filter bytes, if it is small, store in data file meta + #[serde(rename = "_EMBEDDED_FILE_INDEX", with = "serde_bytes")] pub embedded_index: Option>, - pub file_source: Option, } impl Display for DataFileMeta { @@ -99,7 +113,5 @@ impl Display for DataFileMeta { } } -impl DataFileMeta { - // TODO: implement me - pub const SCHEMA: RowType = RowType::new(vec![]); -} +#[allow(dead_code)] +impl DataFileMeta {} diff --git a/crates/paimon/src/spec/manifest_entry.rs b/crates/paimon/src/spec/manifest_entry.rs new file mode 100644 index 0000000..848d19b --- /dev/null +++ b/crates/paimon/src/spec/manifest_entry.rs @@ -0,0 +1,130 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::spec::DataFileMeta; +use serde::Deserialize; +use serde_repr::{Deserialize_repr, Serialize_repr}; +use serde_with::serde_derive::Serialize; + +/// The same {@link Identifier} indicates that the {@link ManifestEntry} refers to the same data file. +/// +/// Impl Reference: +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)] +pub struct Identifier { + pub partition: Vec, + pub bucket: i32, + pub level: i32, + pub file_name: String, +} + +/// Kind of a file. +/// Impl Reference: +#[derive(PartialEq, Eq, Debug, Clone, Serialize_repr, Deserialize_repr)] +#[repr(u8)] +pub enum FileKind { + Add = 0, + Delete = 1, +} + +/// Entry of a manifest file, representing an addition / deletion of a data file. +/// Impl Reference: +#[derive(Debug, PartialEq, Eq, Serialize, Deserialize)] +pub struct ManifestEntry { + #[serde(rename = "_KIND")] + kind: FileKind, + + #[serde(rename = "_PARTITION", with = "serde_bytes")] + partition: Vec, + + #[serde(rename = "_BUCKET")] + bucket: i32, + + #[serde(rename = "_TOTAL_BUCKETS")] + total_buckets: i32, + + #[serde(rename = "_FILE")] + file: DataFileMeta, + + #[serde(rename = "_VERSION")] + version: i32, +} + +#[allow(dead_code)] +impl ManifestEntry { + fn kind(&self) -> &FileKind { + &self.kind + } + + fn partition(&self) -> &Vec { + &self.partition + } + + fn bucket(&self) -> i32 { + self.bucket + } + + fn level(&self) -> i32 { + self.file.level + } + + fn file_name(&self) -> &str { + &self.file.file_name + } + + fn min_key(&self) -> &Vec { + &self.file.min_key + } + + fn max_key(&self) -> &Vec { + &self.file.max_key + } + + fn identifier(&self) -> Identifier { + Identifier { + partition: self.partition.clone(), + bucket: self.bucket, + level: self.file.level, + file_name: self.file.file_name.clone(), + } + } + + pub fn total_buckets(&self) -> i32 { + self.total_buckets + } + + pub fn file(&self) -> &DataFileMeta { + &self.file + } + + pub fn new( + kind: FileKind, + partition: Vec, + bucket: i32, + total_buckets: i32, + file: DataFileMeta, + version: i32, + ) -> Self { + ManifestEntry { + kind, + partition, + bucket, + total_buckets, + file, + version, + } + } +} diff --git a/crates/paimon/src/spec/manifest_file_meta.rs b/crates/paimon/src/spec/manifest_file_meta.rs index 382d579..36f92b9 100644 --- a/crates/paimon/src/spec/manifest_file_meta.rs +++ b/crates/paimon/src/spec/manifest_file_meta.rs @@ -15,6 +15,7 @@ // specific language governing permissions and limitations // under the License. +use crate::spec::stats::BinaryTableStats; use serde::{Deserialize, Serialize}; use std::fmt::{Display, Formatter}; @@ -128,61 +129,3 @@ impl Display for ManifestFileMeta { ) } } - -/// The statistics for columns, supports the following stats. -/// -/// All statistics are stored in the form of a Binary, which can significantly reduce its memory consumption, but the cost is that the column type needs to be known when getting. -/// -/// Impl Reference: -#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] -pub struct BinaryTableStats { - /// the minimum values of the columns - #[serde(rename = "_MIN_VALUES", with = "serde_bytes")] - min_values: Vec, - - /// the maximum values of the columns - #[serde(rename = "_MAX_VALUES", with = "serde_bytes")] - max_values: Vec, - - /// the number of nulls of the columns - #[serde(rename = "_NULL_COUNTS")] - null_counts: Vec, -} - -impl BinaryTableStats { - /// Get the minimum values of the columns - #[inline] - pub fn min_values(&self) -> &[u8] { - &self.min_values - } - - /// Get the maximum values of the columns - #[inline] - pub fn max_values(&self) -> &[u8] { - &self.max_values - } - - /// Get the number of nulls of the columns - #[inline] - pub fn null_counts(&self) -> &Vec { - &self.null_counts - } - - pub fn new( - min_values: Vec, - max_values: Vec, - null_counts: Vec, - ) -> BinaryTableStats { - Self { - min_values, - max_values, - null_counts, - } - } -} - -impl Display for BinaryTableStats { - fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result { - todo!() - } -} diff --git a/crates/paimon/src/spec/manifest_list.rs b/crates/paimon/src/spec/manifest_list.rs deleted file mode 100644 index 2cffd5c..0000000 --- a/crates/paimon/src/spec/manifest_list.rs +++ /dev/null @@ -1,111 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -use super::manifest_file_meta::ManifestFileMeta; -use crate::io::FileIO; -use crate::{Error, Result}; -use apache_avro::types::Value; -use apache_avro::{from_value, Reader}; -use serde::{Deserialize, Serialize}; - -#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)] -#[serde(transparent)] -/// This file includes several [`ManifestFileMeta`], representing all data of the whole table at the corresponding snapshot. -pub struct ManifestList { - entries: Vec, -} - -impl ManifestList { - pub fn entries(&self) -> &Vec { - &self.entries - } - - pub fn from_avro_bytes(bytes: &[u8]) -> Result { - let reader = Reader::new(bytes).map_err(Error::from)?; - let records = reader - .collect::, _>>() - .map_err(Error::from)?; - let values = Value::Array(records); - from_value::(&values).map_err(Error::from) - } -} - -pub struct ManifestListFactory { - file_io: FileIO, -} - -/// The factory to read and write [`ManifestList`] -impl ManifestListFactory { - pub fn new(file_io: FileIO) -> ManifestListFactory { - Self { file_io } - } - - /// Write several [`ManifestFileMeta`]s into a manifest list. - /// - /// NOTE: This method is atomic. - pub fn write(&mut self, _metas: Vec) -> &str { - todo!() - } - - /// Read [`ManifestList`] from the manifest file. - pub async fn read(&self, path: &str) -> Result { - let bs = self.file_io.new_input(path)?.read().await?; - // todo support other formats - ManifestList::from_avro_bytes(bs.as_ref()) - } -} - -#[cfg(test)] -mod tests { - use crate::spec::{BinaryTableStats, ManifestFileMeta, ManifestList}; - - #[tokio::test] - async fn test_read_manifest_list() { - let workdir = - std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}")); - let path = workdir - .join("tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0"); - let v = std::fs::read(path.to_str().unwrap()).unwrap(); - let res = ManifestList::from_avro_bytes(&v).unwrap(); - let value_bytes = vec![ - 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, - ]; - assert_eq!( - res, - ManifestList { - entries: vec![ - ManifestFileMeta::new( - "manifest-19d138df-233f-46f7-beb6-fadaf4741c0e".to_string(), - 10, - 10, - 10, - BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), - 1 - ), - ManifestFileMeta::new( - "manifest-a703ee48-c411-413e-b84e-c03bdb179631".to_string(), - 11, - 0, - 10, - BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), - 2 - ) - ], - } - ); - } -} diff --git a/crates/paimon/src/spec/mod.rs b/crates/paimon/src/spec/mod.rs index 80fb47d..59c7d00 100644 --- a/crates/paimon/src/spec/mod.rs +++ b/crates/paimon/src/spec/mod.rs @@ -34,8 +34,9 @@ pub use snapshot::*; mod manifest_file_meta; pub use manifest_file_meta::*; -mod manifest_list; -pub use manifest_list::*; - +mod manifest_entry; +mod objects_file; +mod stats; mod types; + pub use types::*; diff --git a/crates/paimon/src/spec/objects_file.rs b/crates/paimon/src/spec/objects_file.rs new file mode 100644 index 0000000..5135c32 --- /dev/null +++ b/crates/paimon/src/spec/objects_file.rs @@ -0,0 +1,162 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use crate::Error; +use apache_avro::types::Value; +use apache_avro::{from_value, Reader}; +use serde::de::DeserializeOwned; + +#[allow(dead_code)] +pub fn from_avro_bytes(bytes: &[u8]) -> crate::Result> { + let reader = Reader::new(bytes).map_err(Error::from)?; + let records = reader + .collect::, _>>() + .map_err(Error::from)?; + let values = Value::Array(records); + from_value::>(&values).map_err(Error::from) +} + +#[cfg(test)] +mod tests { + use crate::spec::manifest_entry::{FileKind, ManifestEntry}; + use crate::spec::objects_file::from_avro_bytes; + use crate::spec::stats::BinaryTableStats; + use crate::spec::{DataFileMeta, ManifestFileMeta}; + use chrono::{DateTime, Utc}; + + #[tokio::test] + async fn test_read_manifest_list() { + let workdir = + std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}")); + let path = workdir + .join("tests/fixtures/manifest/manifest-list-5c7399a0-46ae-4a5e-9c13-3ab07212cdb6-0"); + let v = std::fs::read(path.to_str().unwrap()).unwrap(); + let res = from_avro_bytes::(&v).unwrap(); + let value_bytes = vec![ + 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, + ]; + assert_eq!( + res, + vec![ + ManifestFileMeta::new( + "manifest-19d138df-233f-46f7-beb6-fadaf4741c0e".to_string(), + 10, + 10, + 10, + BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), + 1 + ), + ManifestFileMeta::new( + "manifest-a703ee48-c411-413e-b84e-c03bdb179631".to_string(), + 11, + 0, + 10, + BinaryTableStats::new(value_bytes.clone(), value_bytes.clone(), vec![1, 2]), + 2 + ) + ], + ); + } + + #[tokio::test] + async fn test_read_manifest_entry() { + let workdir = + std::env::current_dir().unwrap_or_else(|err| panic!("current_dir must exist: {err}")); + let path = + workdir.join("tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0"); + let v = std::fs::read(path.to_str().unwrap()).unwrap(); + let res = from_avro_bytes::(&v).unwrap(); + let value_bytes = vec![ + 0, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 49, 0, 0, 0, 0, 0, 0, 129, 1, 0, 0, 0, 0, 0, 0, 0, + ]; + let single_value = vec![0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0]; + assert_eq!( + res, + vec![ + ManifestEntry::new( + FileKind::Delete, + single_value.clone(), + 1, + 10, + DataFileMeta { + file_name: "f1.parquet".to_string(), + + file_size: 10, + row_count: 100, + min_key: single_value.clone(), + max_key: single_value.clone(), + key_stats: BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + value_stats: BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + min_sequence_number: 1, + max_sequence_number: 100, + schema_id: 0, + level: 1, + extra_files: vec![], + creation_time: "2024-09-06T07:45:55.039+00:00" + .parse::>() + .unwrap(), + delete_row_count: Some(0), + embedded_index: None, + }, + 2 + ), + ManifestEntry::new( + FileKind::Add, + single_value.clone(), + 2, + 10, + DataFileMeta { + file_name: "f2.parquet".to_string(), + file_size: 10, + row_count: 100, + min_key: single_value.clone(), + max_key: single_value.clone(), + key_stats: BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + value_stats: BinaryTableStats::new( + value_bytes.clone(), + value_bytes.clone(), + vec![1, 2] + ), + min_sequence_number: 1, + max_sequence_number: 100, + schema_id: 0, + level: 1, + extra_files: vec![], + creation_time: "2024-09-06T07:45:55.039+00:00" + .parse::>() + .unwrap(), + delete_row_count: Some(1), + embedded_index: None, + }, + 2 + ), + ] + ) + } +} diff --git a/crates/paimon/src/spec/stats.rs b/crates/paimon/src/spec/stats.rs new file mode 100644 index 0000000..98923ce --- /dev/null +++ b/crates/paimon/src/spec/stats.rs @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use serde::{Deserialize, Serialize}; +use std::fmt::{Display, Formatter}; + +/// The statistics for columns, supports the following stats. +/// +/// All statistics are stored in the form of a Binary, which can significantly reduce its memory consumption, but the cost is that the column type needs to be known when getting. +/// +/// Impl Reference: +#[derive(PartialEq, Eq, Debug, Clone, Serialize, Deserialize)] +pub struct BinaryTableStats { + /// the minimum values of the columns + #[serde(rename = "_MIN_VALUES", with = "serde_bytes")] + min_values: Vec, + + /// the maximum values of the columns + #[serde(rename = "_MAX_VALUES", with = "serde_bytes")] + max_values: Vec, + + /// the number of nulls of the columns + #[serde(rename = "_NULL_COUNTS")] + null_counts: Vec, +} + +impl BinaryTableStats { + /// Get the minimum values of the columns + #[inline] + pub fn min_values(&self) -> &[u8] { + &self.min_values + } + + /// Get the maximum values of the columns + #[inline] + pub fn max_values(&self) -> &[u8] { + &self.max_values + } + + /// Get the number of nulls of the columns + #[inline] + pub fn null_counts(&self) -> &Vec { + &self.null_counts + } + + pub fn new( + min_values: Vec, + max_values: Vec, + null_counts: Vec, + ) -> BinaryTableStats { + Self { + min_values, + max_values, + null_counts, + } + } +} + +impl Display for BinaryTableStats { + fn fmt(&self, _: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } +} diff --git a/crates/paimon/tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0 b/crates/paimon/tests/fixtures/manifest/manifest-8ded1f09-fcda-489e-9167-582ac0f9f846-0 new file mode 100644 index 0000000000000000000000000000000000000000..57a7ef44bdbd3f0091b686221da3a3496518e16c GIT binary patch literal 1787 zcmds2L5tHs6rRK!EV$r8mAwcu2NA4=dXjo++Kw2SrkbX;yUQ}nrnB8FNt#Vkp)5TJ z;?=8vLGb9oljl8o^WtSMLP7iiUR<0cZIU)EU9Un8&Ae~E_uiN9y|xZdIN>&VU3d_)?`q1se0!Z&_Rg<&@g2S;-y#_yZ5gD02EC)j|S#gvyYgkm=U*zaH z<%VgBoPwk7V~YPzqTma+(TfgJYRvteW)-Q4C$p|)mb^N;U3-R_B9>AMBf}`8E|Qas z%j*X9G*t|THZe{7<(AA6k865g)l?(~3khpdYO*sX2OG*|0z3R;hT$0olycX>>+N0l4#=~sJTF{U0avY@b1ck%`E&8(bDw8W bq5Ba4ft!Lyms1cuTT_7txwjAAEEn<@4XHLQ literal 0 HcmV?d00001