Skip to content

Commit

Permalink
fix: pop logs in log segment
Browse files Browse the repository at this point in the history
  • Loading branch information
ion-elgreco committed Dec 16, 2024
1 parent a5d0097 commit 580947a
Show file tree
Hide file tree
Showing 8 changed files with 204 additions and 63 deletions.
4 changes: 2 additions & 2 deletions crates/core/src/kernel/snapshot/log_segment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,9 +70,9 @@ impl PathExt for Path {
}

#[derive(Debug, Clone, PartialEq)]
pub(super) struct LogSegment {
pub struct LogSegment {
pub(super) version: i64,
pub(super) commit_files: VecDeque<ObjectMeta>,
pub commit_files: VecDeque<ObjectMeta>,
pub(super) checkpoint_files: Vec<ObjectMeta>,
}

Expand Down
44 changes: 42 additions & 2 deletions crates/core/src/kernel/snapshot/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
//!
//!
use std::collections::{HashMap, HashSet};
use std::collections::{HashMap, HashSet, VecDeque};
use std::io::{BufRead, BufReader, Cursor};
use std::sync::Arc;

Expand All @@ -34,10 +34,12 @@ use super::{
Action, Add, AddCDCFile, CommitInfo, DataType, Metadata, Protocol, Remove, StructField,
Transaction,
};
use crate::checkpoints::cleanup_expired_logs_for;
use crate::kernel::parse::read_cdf_adds;
use crate::kernel::{ActionType, StructType};
use crate::logstore::LogStore;
use crate::operations::transaction::CommitData;
use crate::protocol::ProtocolError;
use crate::table::config::TableConfig;
use crate::{DeltaResult, DeltaTableConfig, DeltaTableError};

Expand All @@ -53,7 +55,7 @@ mod visitors;
/// A snapshot of a Delta table
#[derive(Debug, Serialize, Deserialize, Clone, PartialEq)]
pub struct Snapshot {
log_segment: LogSegment,
pub log_segment: LogSegment,
config: DeltaTableConfig,
protocol: Protocol,
metadata: Metadata,
Expand Down Expand Up @@ -567,6 +569,44 @@ impl EagerSnapshot {
))
}

pub async fn clean_up_logs(
&mut self,
until_version: i64,
log_store: &dyn LogStore,
cutoff_timestamp: i64,
) -> Result<usize, ProtocolError> {
let mut deleted =
cleanup_expired_logs_for(until_version, log_store, cutoff_timestamp).await?;
let mut survived_files = VecDeque::new();

while !deleted.is_empty() {
if deleted.is_empty() {
break
}
if self.snapshot.log_segment.commit_files.is_empty() {
break
}

match self.snapshot.log_segment.commit_files.pop_back() {
Some(end) => {
if let Ok(idx) = deleted.binary_search(&end.location) {
deleted.remove(idx);
} else {
survived_files.push_front(end.clone());
}
}
None => (),
}
}

self.snapshot
.log_segment
.commit_files
.append(&mut survived_files);

Ok(deleted.len())
}

/// Advance the snapshot based on the given commit actions
pub fn advance<'a>(
&mut self,
Expand Down
20 changes: 11 additions & 9 deletions crates/core/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ use serde_json::Value;
use tracing::warn;

use self::conflict_checker::{TransactionInfo, WinningCommitSummary};
use crate::checkpoints::{cleanup_expired_logs_for, create_checkpoint_for};
use crate::checkpoints::create_checkpoint_for;
use crate::errors::DeltaTableError;
use crate::kernel::{
Action, CommitInfo, EagerSnapshot, Metadata, Protocol, ReaderFeatures, Transaction,
Expand Down Expand Up @@ -664,7 +664,7 @@ impl PostCommit<'_> {
} else {
snapshot.advance(vec![&self.data])?;
}
let state = DeltaTableState { snapshot };
let mut state = DeltaTableState { snapshot };
// Execute each hook
if self.create_checkpoint {
self.create_checkpoint(&state, &self.log_store, self.version)
Expand All @@ -677,13 +677,15 @@ impl PostCommit<'_> {
};

if cleanup_logs {
cleanup_expired_logs_for(
self.version,
self.log_store.as_ref(),
Utc::now().timestamp_millis()
- state.table_config().log_retention_duration().as_millis() as i64,
)
.await?;
state
.snapshot
.clean_up_logs(
self.version,
self.log_store.as_ref(),
Utc::now().timestamp_millis()
- state.table_config().log_retention_duration().as_millis() as i64,
)
.await?;
}
Ok(state)
} else {
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/vacuum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ impl VacuumBuilder {
self.log_store.object_store().clone(),
)
.await?;

let valid_files = self.snapshot.file_paths_iter().collect::<HashSet<Path>>();

let mut files_to_delete = vec![];
Expand Down
38 changes: 22 additions & 16 deletions crates/core/src/protocol/checkpoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use chrono::{Datelike, NaiveDate, NaiveDateTime, Utc};
use futures::{StreamExt, TryStreamExt};
use itertools::Itertools;
use lazy_static::lazy_static;
use object_store::path::Path;
use object_store::{Error, ObjectStore};
use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
Expand All @@ -27,7 +28,7 @@ use crate::kernel::{
use crate::logstore::LogStore;
use crate::table::state::DeltaTableState;
use crate::table::{get_partition_col_data_types, CheckPoint, CheckPointBuilder};
use crate::{open_table_with_version, DeltaTable};
use crate::{open_table_with_version, DeltaTable, DeltaTableError};

type SchemaPath = Vec<String>;

Expand Down Expand Up @@ -96,22 +97,25 @@ pub async fn create_checkpoint(table: &DeltaTable) -> Result<(), ProtocolError>
Ok(())
}

/// Delete expires log files before given version from table. The table log retention is based on
/// the `logRetentionDuration` property of the Delta Table, 30 days by default.
pub async fn cleanup_metadata(table: &DeltaTable) -> Result<usize, ProtocolError> {
// / Delete expires log files before given version from table. The table log retention is based on
// / the `logRetentionDuration` property of the Delta Table, 30 days by default.
pub async fn cleanup_metadata(table: &DeltaTable) -> Result<(Option<DeltaTableState>, usize), ProtocolError> {
let log_retention_timestamp = Utc::now().timestamp_millis()
- table
.snapshot()
.map_err(|_| ProtocolError::NoMetaData)?
.table_config()
.log_retention_duration()
.as_millis() as i64;
cleanup_expired_logs_for(
table.version(),
table.log_store.as_ref(),
log_retention_timestamp,
)
.await
let version = table.version();

let mut state: Option<DeltaTableState> = table.state.clone();
let size = state.as_mut()
.ok_or(ProtocolError::NoMetaData)?
.snapshot
.clean_up_logs(version, table.log_store.as_ref(), log_retention_timestamp)
.await?;
Ok((state, size))
}

/// Loads table from given `table_uri` at given `version` and creates checkpoint for it.
Expand All @@ -132,7 +136,7 @@ pub async fn create_checkpoint_from_table_uri_and_cleanup(
cleanup.unwrap_or_else(|| snapshot.table_config().enable_expired_log_cleanup());

if table.version() >= 0 && enable_expired_log_cleanup {
let deleted_log_num = cleanup_metadata(&table).await?;
let (_,deleted_log_num) = cleanup_metadata(&table).await?;
debug!("Deleted {:?} log files.", deleted_log_num);
}

Expand Down Expand Up @@ -198,7 +202,7 @@ pub async fn cleanup_expired_logs_for(
until_version: i64,
log_store: &dyn LogStore,
cutoff_timestamp: i64,
) -> Result<usize, ProtocolError> {
) -> Result<Vec<Path>, ProtocolError> {
lazy_static! {
static ref DELTA_LOG_REGEX: Regex =
Regex::new(r"_delta_log/(\d{20})\.(json|checkpoint|json.tmp).*$").unwrap();
Expand All @@ -210,7 +214,7 @@ pub async fn cleanup_expired_logs_for(
.await;

if let Err(Error::NotFound { path: _, source: _ }) = maybe_last_checkpoint {
return Ok(0);
return Ok(vec![]);
}

let last_checkpoint = maybe_last_checkpoint?.bytes().await?;
Expand Down Expand Up @@ -255,7 +259,7 @@ pub async fn cleanup_expired_logs_for(
.await?;

debug!("Deleted {} expired logs", deleted.len());
Ok(deleted.len())
Ok(deleted)
}

fn parquet_bytes_from_state(
Expand Down Expand Up @@ -889,7 +893,8 @@ mod tests {
log_retention_timestamp,
)
.await
.unwrap();
.unwrap()
.len();
assert_eq!(count, 0);
println!("{:?}", count);

Expand Down Expand Up @@ -917,7 +922,8 @@ mod tests {
log_retention_timestamp,
)
.await
.unwrap();
.unwrap()
.len();
assert_eq!(count, 1);

let log_store = table.log_store();
Expand Down
16 changes: 15 additions & 1 deletion python/deltalake/_internal.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,21 @@ class RawDeltaTable:
ending_timestamp: Optional[str] = None,
allow_out_of_range: bool = False,
) -> pyarrow.RecordBatchReader: ...
def write(
self,
data: pyarrow.RecordBatchReader,
partition_by: Optional[List[str]],
mode: str,
schema_mode: Optional[str],
predicate: Optional[str],
target_file_size: Optional[int],
name: Optional[str],
description: Optional[str],
configuration: Optional[Mapping[str, Optional[str]]],
writer_properties: Optional[WriterProperties],
commit_properties: Optional[CommitProperties],
post_commithook_properties: Optional[PostCommitHookProperties],
) -> None: ...
def transaction_versions(self) -> Dict[str, Transaction]: ...
def __datafusion_table_provider__(self) -> Any: ...

Expand All @@ -243,7 +258,6 @@ def write_to_deltalake(
data: pyarrow.RecordBatchReader,
partition_by: Optional[List[str]],
mode: str,
table: Optional[RawDeltaTable],
schema_mode: Optional[str],
predicate: Optional[str],
target_file_size: Optional[int],
Expand Down
49 changes: 31 additions & 18 deletions python/deltalake/writer.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,25 +320,38 @@ def write_deltalake(
conversion_mode=ArrowSchemaConversionMode.PASSTHROUGH,
)
data = RecordBatchReader.from_batches(schema, (batch for batch in data))
write_deltalake_rust(
table_uri=table_uri,
data=data,
partition_by=partition_by,
mode=mode,
table=table._table if table is not None else None,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
storage_options=storage_options,
writer_properties=writer_properties,
commit_properties=commit_properties,
post_commithook_properties=post_commithook_properties,
)
if table:
table.update_incremental()
table._table.write(
data=data,
partition_by=partition_by,
mode=mode,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
writer_properties=writer_properties,
commit_properties=commit_properties,
post_commithook_properties=post_commithook_properties,
)
else:
write_deltalake_rust(
table_uri=table_uri,
data=data,
partition_by=partition_by,
mode=mode,
schema_mode=schema_mode,
predicate=predicate,
target_file_size=target_file_size,
name=name,
description=description,
configuration=configuration,
storage_options=storage_options,
writer_properties=writer_properties,
commit_properties=commit_properties,
post_commithook_properties=post_commithook_properties,
)
elif engine == "pyarrow":
warnings.warn(
"pyarrow engine is deprecated and will be removed in v1.0",
Expand Down
Loading

0 comments on commit 580947a

Please sign in to comment.