Skip to content

Commit

Permalink
feat: introduce CDC write-side support for the Update operations
Browse files Browse the repository at this point in the history
This change introduces a `CDCTracker` which helps collect changes during
merges and update. This is admittedly rather inefficient, but my hope is
that this provides a place to start iterating and improving upon the
writer code

There is still additional work which needs to be done to handle table
features properly for other code paths (see the middleware discussion we
have had in Slack) but this produces CDC files for Update operations

Fixes #604
Fixes #2095
  • Loading branch information
rtyler committed May 12, 2024
1 parent 353e08b commit 6a1addb
Show file tree
Hide file tree
Showing 9 changed files with 675 additions and 20 deletions.
3 changes: 2 additions & 1 deletion crates/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
utime = "0.3"

[features]
default = []
cdf = []
default = ["cdf"]
datafusion = [
"dep:datafusion",
"datafusion-expr",
Expand Down
6 changes: 6 additions & 0 deletions crates/core/src/delta_datafusion/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,12 @@ impl<'a> DeltaScanBuilder<'a> {
self
}

/// Use the provided [SchemaRef] for the [DeltaScan]
pub fn with_schema(mut self, schema: SchemaRef) -> Self {
self.schema = Some(schema);
self
}

pub async fn build(self) -> DeltaResult<DeltaScan> {
let config = self.config;
let schema = match self.schema {
Expand Down
316 changes: 316 additions & 0 deletions crates/core/src/operations/cdc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,316 @@
//!
//! The CDC module contains private tools for managing CDC files
//!
use crate::DeltaResult;

use arrow::array::{Array, StringArray};
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
use arrow::record_batch::RecordBatch;
use datafusion::error::Result as DataFusionResult;
use datafusion::physical_plan::{
metrics::MetricsSet, DisplayAs, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
};
use datafusion::prelude::*;
use futures::{Stream, StreamExt};
use std::sync::Arc;
use tokio::sync::mpsc::*;
use tracing::log::*;

/// Maximum in-memory channel size for the tracker to use
const MAX_CHANNEL_SIZE: usize = 1024;

/// The CDCTracker is useful for hooking reads/writes in a manner nececessary to create CDC files
/// associated with commits
pub(crate) struct CDCTracker {
schema: SchemaRef,
pre_sender: Sender<RecordBatch>,
pre_receiver: Receiver<RecordBatch>,
post_sender: Sender<RecordBatch>,
post_receiver: Receiver<RecordBatch>,
}

impl CDCTracker {
/// construct
pub(crate) fn new(schema: SchemaRef) -> Self {
let (pre_sender, pre_receiver) = channel(MAX_CHANNEL_SIZE);
let (post_sender, post_receiver) = channel(MAX_CHANNEL_SIZE);
Self {
schema,
pre_sender,
pre_receiver,
post_sender,
post_receiver,
}
}

/// Return an owned [Sender] for the caller to use when sending read but not altered batches
pub(crate) fn pre_sender(&self) -> Sender<RecordBatch> {
self.pre_sender.clone()
}

/// Return an owned [Sender][ for the caller to use when sending altered batches
pub(crate) fn post_sender(&self) -> Sender<RecordBatch> {
self.post_sender.clone()
}

pub(crate) async fn collect(mut self) -> DeltaResult<Vec<RecordBatch>> {
debug!("Collecting all the batches for diffing");
let ctx = SessionContext::new();
let mut pre = vec![];
let mut post = vec![];

while !self.pre_receiver.is_empty() {
if let Ok(batch) = self.pre_receiver.try_recv() {
pre.push(batch);
} else {
warn!("Error when receiving on the pre-receiver");
}
}

while !self.post_receiver.is_empty() {
if let Ok(batch) = self.post_receiver.try_recv() {
post.push(batch);
} else {
warn!("Error when receiving on the post-receiver");
}
}

// Collect _all_ the batches for consideration
let pre = ctx.read_batches(pre)?;
let post = ctx.read_batches(post)?;

// There is certainly a better way to do this other than stupidly cloning data for diffing
// purposes, but this is the quickest and easiest way to "diff" the two sets of batches
let preimage = pre.clone().except(post.clone())?;
let postimage = post.except(pre)?;

// Create a new schema which represents the input batch along with the CDC
// columns
let mut fields: Vec<Arc<Field>> = self.schema.fields().to_vec().clone();
fields.push(Arc::new(Field::new("_change_type", DataType::Utf8, true)));
let schema = Arc::new(Schema::new(fields));

let mut batches = vec![];

let mut pre_stream = preimage.execute_stream().await?;
let mut post_stream = postimage.execute_stream().await?;

// Fill up on pre image batches
while let Some(Ok(batch)) = pre_stream.next().await {
let batch = crate::operations::cast::cast_record_batch(
&batch,
self.schema.clone(),
true,
false,
)?;
let new_column = Arc::new(StringArray::from(vec![
Some("update_preimage");
batch.num_rows()
]));
let mut columns: Vec<Arc<dyn Array>> = batch.columns().to_vec();
columns.push(new_column);

let batch = RecordBatch::try_new(schema.clone(), columns)?;
batches.push(batch);
}

// Fill up on the post-image batches
while let Some(Ok(batch)) = post_stream.next().await {
let batch = crate::operations::cast::cast_record_batch(
&batch,
self.schema.clone(),
true,
false,
)?;
let new_column = Arc::new(StringArray::from(vec![
Some("update_postimage");
batch.num_rows()
]));
let mut columns: Vec<Arc<dyn Array>> = batch.columns().to_vec();
columns.push(new_column);

let batch = RecordBatch::try_new(schema.clone(), columns)?;
batches.push(batch);
}

debug!("Found {} batches to consider `CDC` data", batches.len());

// At this point the batches should just contain the changes
Ok(batches)
}
}

/// A DataFusion observer to help pick up on pre-image changes
pub(crate) struct CDCObserver {
parent: Arc<dyn ExecutionPlan>,
id: String,
sender: Sender<RecordBatch>,
}

impl CDCObserver {
pub(crate) fn new(
id: String,
sender: Sender<RecordBatch>,
parent: Arc<dyn ExecutionPlan>,
) -> Self {
Self { id, sender, parent }
}
}

impl std::fmt::Debug for CDCObserver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("CDCObserver").field("id", &self.id).finish()
}
}

impl DisplayAs for CDCObserver {
fn fmt_as(
&self,
_: datafusion::physical_plan::DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
write!(f, "CDCObserver id={}", self.id)
}
}

impl ExecutionPlan for CDCObserver {
fn as_any(&self) -> &dyn std::any::Any {
self
}

fn schema(&self) -> SchemaRef {
self.parent.schema()
}

fn properties(&self) -> &datafusion::physical_plan::PlanProperties {
self.parent.properties()
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.parent.clone()]
}

fn execute(
&self,
partition: usize,
context: Arc<datafusion::execution::context::TaskContext>,
) -> datafusion_common::Result<datafusion::physical_plan::SendableRecordBatchStream> {
let res = self.parent.execute(partition, context)?;
Ok(Box::pin(CDCObserverStream {
schema: self.schema(),
input: res,
sender: self.sender.clone(),
}))
}

fn statistics(&self) -> DataFusionResult<datafusion_common::Statistics> {
self.parent.statistics()
}

fn with_new_children(
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> datafusion_common::Result<Arc<dyn ExecutionPlan>> {
if let Some(parent) = children.first() {
Ok(Arc::new(CDCObserver {
id: self.id.clone(),
sender: self.sender.clone(),
parent: parent.clone(),
}))
} else {
Err(datafusion_common::DataFusionError::Internal(
"Failed to handle CDCObserver".into(),
))
}
}

fn metrics(&self) -> Option<MetricsSet> {
self.parent.metrics()
}
}

/// The CDCObserverStream simply acts to help observe the stream of data being
/// read by DataFusion to capture the pre-image versions of data
pub(crate) struct CDCObserverStream {
schema: SchemaRef,
input: SendableRecordBatchStream,
sender: Sender<RecordBatch>,
}

impl Stream for CDCObserverStream {
type Item = DataFusionResult<RecordBatch>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
self.input.poll_next_unpin(cx).map(|x| match x {
Some(Ok(batch)) => {
let _ = self.sender.try_send(batch.clone());
Some(Ok(batch))
}
other => other,
})
}

fn size_hint(&self) -> (usize, Option<usize>) {
self.input.size_hint()
}
}

impl RecordBatchStream for CDCObserverStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

#[cfg(test)]
mod tests {
use super::*;
use arrow::array::Int32Array;
use datafusion::assert_batches_sorted_eq;

#[tokio::test]
async fn test_sanity_check() {
let schema = Arc::new(Schema::new(vec![Field::new(
"value",
DataType::Int32,
true,
)]));
let tracker = CDCTracker::new(schema.clone());

let batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(vec![Some(1), Some(2), Some(3)]))],
)
.unwrap();
let updated_batch = RecordBatch::try_new(
Arc::clone(&schema),
vec![Arc::new(Int32Array::from(vec![Some(1), Some(12), Some(3)]))],
)
.unwrap();

let _ = tracker.pre_sender().send(batch).await;
let _ = tracker.post_sender().send(updated_batch).await;

match tracker.collect().await {
Ok(batches) => {
let _ = arrow::util::pretty::print_batches(&batches);
assert_eq!(batches.len(), 2);
assert_batches_sorted_eq! {[
"+-------+------------------+",
"| value | _change_type |",
"+-------+------------------+",
"| 2 | update_preimage |",
"| 12 | update_postimage |",
"+-------+------------------+",
], &batches }
}
Err(err) => {
println!("err: {err:#?}");
panic!("Should have never reached this assertion");
}
}
}
}
1 change: 1 addition & 0 deletions crates/core/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ async fn excute_non_empty_expr(
false,
None,
writer_stats_config,
None,
)
.await?
.into_iter()
Expand Down
1 change: 1 addition & 0 deletions crates/core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1389,6 +1389,7 @@ async fn execute(
safe_cast,
None,
writer_stats_config,
None,
)
.await?;

Expand Down
2 changes: 2 additions & 0 deletions crates/core/src/operations/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ use arrow::record_batch::RecordBatch;
use optimize::OptimizeBuilder;
use restore::RestoreBuilder;

#[cfg(all(feature = "cdf", feature = "datafusion"))]
mod cdc;
#[cfg(feature = "datafusion")]
pub mod constraints;
#[cfg(feature = "datafusion")]
Expand Down
5 changes: 5 additions & 0 deletions crates/core/src/operations/transaction/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,11 @@ pub static INSTANCE: Lazy<ProtocolChecker> = Lazy::new(|| {
let mut writer_features = HashSet::new();
writer_features.insert(WriterFeatures::AppendOnly);
writer_features.insert(WriterFeatures::TimestampWithoutTimezone);
#[cfg(feature = "cdf")]
{
writer_features.insert(WriterFeatures::ChangeDataFeed);
writer_features.insert(WriterFeatures::GeneratedColumns);
}
#[cfg(feature = "datafusion")]
{
writer_features.insert(WriterFeatures::Invariants);
Expand Down
Loading

0 comments on commit 6a1addb

Please sign in to comment.