Skip to content

Commit

Permalink
fix: update field schemas to match protocol specs
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Nov 10, 2023
1 parent 0778f9e commit 76471e1
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 232 deletions.
7 changes: 0 additions & 7 deletions crates/deltalake-core/src/kernel/actions/arrow/schemas.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ impl ActionType {
Self::Metadata => get_root("metaData", self.arrow_fields()),
Self::Protocol => get_root("protocol", self.arrow_fields()),
Self::Remove => get_root("remove", self.arrow_fields()),
Self::RowIdHighWaterMark => get_root("rowIdHighWaterMark", self.arrow_fields()),
Self::Txn => get_root("txn", self.arrow_fields()),
}
}
Expand All @@ -30,7 +29,6 @@ impl ActionType {
Self::Metadata => metadata_fields(),
Self::Protocol => protocol_fields(),
Self::Remove => remove_fields(),
Self::RowIdHighWaterMark => watermark_fields(),
Self::Txn => txn_fields(),
}
}
Expand All @@ -48,7 +46,6 @@ pub fn get_log_schema() -> Schema {
ActionType::Metadata.arrow_field(),
ActionType::Protocol.arrow_field(),
ActionType::Remove.arrow_field(),
ActionType::RowIdHighWaterMark.arrow_field(),
ActionType::Txn.arrow_field(),
]),
metadata: Default::default(),
Expand Down Expand Up @@ -207,10 +204,6 @@ fn txn_fields() -> Vec<Field> {
])
}

fn watermark_fields() -> Vec<Field> {
Vec::from_iter([Field::new("highWaterMark", DataType::Int64, true)])
}

fn commit_info_fields() -> Vec<Field> {
Vec::from_iter([
Field::new("timestamp", DataType::Int64, true),
Expand Down
27 changes: 10 additions & 17 deletions crates/deltalake-core/src/kernel/actions/arrow/snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,19 +46,17 @@ impl TableStateArrow {
pub fn try_new(version: i64, actions: RecordBatch) -> DeltaResult<Self> {
let metadata = parse_action(&actions, &ActionType::Metadata)?
.next()
.map(|a| match a {
.and_then(|a| match a {
Action::Metadata(m) => Some(m),
_ => None,
})
.flatten()
.ok_or(Error::Generic("expected metadata".into()))?;
let protocol = parse_action(&actions, &ActionType::Protocol)?
.next()
.map(|a| match a {
.and_then(|a| match a {
Action::Protocol(p) => Some(p),
_ => None,
})
.flatten()
.ok_or(Error::Generic("expected protocol".into()))?;
Ok(Self {
version,
Expand Down Expand Up @@ -218,10 +216,10 @@ impl LogSegment {
let data = store
.get(&path)
.await
.map_err(|e| Error::from(e))?
.map_err(Error::from)?
.bytes()
.await
.map_err(|e| Error::from(e))?;
.map_err(Error::from)?;

Ok(data)
}
Expand All @@ -238,7 +236,7 @@ impl LogSegment {

// let checkpoint_stream = ParquetRecordBatchStreamBuilder::new(input);

return Ok(batch);
Ok(batch)
}

// Read a stream of log data from this log segment.
Expand Down Expand Up @@ -309,7 +307,7 @@ fn decode_commit_file_stream<S: Stream<Item = DeltaResult<Bytes>> + Unpin>(

#[derive(Debug, Deserialize, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct CheckpointMetadata {
pub struct LastCheckpoint {
/// The version of the table when the last checkpoint was made.
pub version: i64,
/// The number of actions that are stored in the checkpoint.
Expand All @@ -332,7 +330,7 @@ pub struct CheckpointMetadata {
async fn read_last_checkpoint(
object_store: &dyn ObjectStore,
log_root: &Path,
) -> DeltaResult<Option<CheckpointMetadata>> {
) -> DeltaResult<Option<LastCheckpoint>> {
let file_path = log_root.child(LAST_CHECKPOINT_FILE_NAME);
match object_store.get(&file_path).await {
Ok(data) => Ok(Some(serde_json::from_slice(&data.bytes().await?)?)),
Expand All @@ -343,7 +341,7 @@ async fn read_last_checkpoint(

/// List all log files after a given checkpoint.
async fn list_log_files_with_checkpoint(
cp: &CheckpointMetadata,
cp: &LastCheckpoint,
fs_client: &dyn ObjectStore,
log_root: &Path,
) -> DeltaResult<(Vec<FileMeta>, Vec<FileMeta>)> {
Expand All @@ -356,12 +354,7 @@ async fn list_log_files_with_checkpoint(
.try_collect::<Vec<_>>()
.await?
.into_iter()
.filter_map(
|m| match commit_version(m.location.filename().unwrap_or_default()) {
Some(_) => Some(m),
None => None,
},
)
.filter_map(|m| commit_version(m.location.filename().unwrap_or_default()).map(|_| m))
.collect::<Vec<_>>();

let mut commit_files = files
Expand Down Expand Up @@ -465,7 +458,7 @@ async fn list_log_files(

let commit_files = commit_files
.into_iter()
.filter(|f| f.location.commit_version().unwrap_or(0) as i64 > max_checkpoint_version)
.filter(|f| f.location.commit_version().unwrap_or(0) > max_checkpoint_version)
.collect::<Vec<_>>();

let checkpoint_files = checkpoint_files
Expand Down
2 changes: 0 additions & 2 deletions crates/deltalake-core/src/kernel/actions/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ pub enum ActionType {
Protocol,
/// modify the data in a table by removing individual logical files
Remove,
/// The Row ID high-water mark tracks the largest ID that has been assigned to a row in the table.
RowIdHighWaterMark,
/// Transactional information
Txn,
}
Expand Down
Loading

0 comments on commit 76471e1

Please sign in to comment.