Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Record Batch support for delta log stats_parsed (#435) #454

Closed
wants to merge 2 commits into from

Conversation

mgill25
Copy link

@mgill25 mgill25 commented Oct 9, 2021

  • Initial attempt at creating record batches out of stats_parsed from the delta log for Add actions.
  • A new representation for the stats via StatRecordBatch
  • Currently only supports creation of batches for columns that have integer types. Hard-coded i64s for now. Of course, this needs to be extended to provide support for more types.
  • Added an associated test case.

The basic idea is to simply provide the Arrow schema for the columns for which we want to create the batches. I've left the selection of columns as an argument - I can see cases where it might not make sense to keep statistics for all columns.

I only parse minValues, maxValues and numRecords for now. nullCounts need to be added.

Additionally, StatRecordBatch is not yet embedded into the primary Add struct yet. I ran into some issues there and plan to figure that out in the coming days/weeks.

First PR. I'm sure there's plenty of things that need to be improved here. Feedback is very much appreciated! :)

@mgill25 mgill25 changed the title Record Batch support for delta log stats_parsed (#435) WIP: Record Batch support for delta log stats_parsed (#435) Oct 9, 2021
@houqp
Copy link
Member

houqp commented Oct 10, 2021

Thanks @mgill25 for taking on this initiative! I think to fully leverage the columnar design, we can't store the stats in the action struct itself. We will need to store them in a level of abstraction that's higher than Action. I am going to take another look at the design to write up some of my thoughts as a follow up comment.

@houqp
Copy link
Member

houqp commented Oct 10, 2021

Apologize @mgill25 , I think I mislead you in my previous comment in #435 by suggesting the use of record batch. After taking a closer look at this, I think using arrow arrays to store these values would have been better. But because Array in arrow is immutable, we need to use dyn ArrayBuilder instead (MutableArray in arrow2 when we migrate to it).

On top of this, to really leverage the columnar format, we can't store the array inside the add action because each add action represents a single row. We need to store them outside of the action rows.

To better demonstrate the problem, here is a visualization of what the data looks like conceptually:

num records max values min values null count
c1c2.fooc2.foo c1c2.fooc2.foo c1c2.fooc2.foo
file1 20 112100 ......... 000
file2 10 -100200 ......... 1010
...
fileN 100 10000 ......... 002

Each row in the table represents a file recorded by an action and its corresponding file stats. Notice for per column stats (min/max values & null count), we need to create one array per column per stats type. For example, the array for column c1 max values would be [11, -10, ..., 100].

So perhaps the DeltaTableState struct would be a better place to store these column stats. We could model them as:

pub enum ValuesStatsArray {
  // mutable array that could contain nulls
  Array(Box<dyn arrow::array:ArrayBuilder>),
  Nested(HashMap<String, ValuesStatsArray>),
}

pub enum NullCountStatsArray {
  // we can store record counts as native rust vec here because
  // we know there is no null value and we know the type is DeltaDataTypeLong
  Array(Vec<DeltaDataTypeLong>),
  Nested(HashMap<String, NullCountStatsArray>),
}

pub struct DeltaTableStats { 
  num_records: Vec<DeltaDataTypeLong>,  // not a column stats, so we can just mode it as Vec<T>
  max_values: HashMap<String, ValuesStatsArray>,
  min_values: HashMap<String, ValuesStatsArray>,
  null_count: HashMap<String, NullCountStatsArray>,
}

pub struct DeltaTableState {
  ...
  files: Vec<action::Add>,
  stats: DeltaTableStats,
  ...
}

Then we can append to these column arrays inside the process_action function as we go through each add action:

Action::add(v) => {

fn process_action(
    state: &mut DeltaTableState,
    action: Action,
    handle_tombstones: bool,
) -> Result<(), ApplyLogError> {
    match action {
        Action::add(v) => {
            match v.stats_parsed {
                 Some(stats_parsed) => {
                     // append fields from stats_parsed to state.stats's column arrays
                     state.stats.num_records.append_value(stats_parsed.num_records);
                     ...  // for each column append to max_values, min_values, null_count.
                 }
                 None => {
                     let stats = v.get_stats()?.unwrap();
                     // append fields from stats to state.stats's column arrays
                     state.stats.num_records.append_value(stats.num_records);
                     ...  // for each column append to max_values, min_values, null_count.
                 }
            }
            state.files.push(v.path_decoded()?);  // TODO: append to files with stats field removed
        }
        ...
    }
}

This would be the most space efficient way to store the stats. However, it results in one overhead that occurs during table state merge in DeltaTableState::merge:

delta-rs/rust/src/delta.rs

Lines 440 to 441 in 9000bd4

self.files
.retain(|a| !new_removals.contains(a.path.as_str()));

Performing retain operation on Arrow arrays is not supported today, the closest thing we get is the arrow::compute::kernels::take function, which requires copying the whole array data even if we are only deleting a single array value. On top of that, even if we implement retain for Arrow array builder to perform in place update, it would still be an expensive operation because we need to retain both the value buffer and validity buffer.

I can think of two possible workarounds to addresses this issue.

  1. Replace Arrow array with native Rust Vec<Option<T>>:
// nullable array for different delta primitive types
pub enum ValuesArray {
  Long(Vec<Option<DeltaDataTypeLong>>),
  String(Vec<Option<String>>),
  ...
}

pub enum ValuesStatsArray {
    Array(ValuesArray),
    Nested(HashMap<String, ValuesStatsArray>),
}

This approach makes the retain operation cheaper and even avoids the delta core to depend on arrow, but it will take up more space because for some of the scalar types wrapping them with Option doubles their size in Rust.

  1. Make arrays and vectors in DeltaTableState append-only, then perform filter on read in DeltaTable::files method:
impl DeltaTable {
    pub fn files(&self) -> impl Iterator<&action::Add> {
        // self.tombstones is stored as a HasSet instead of Vec
        self.files.iter().filter(|f| !self.tombstones.contains(f.path.as_str()))
    }
}

This approach makes table state update very fast but has the following drawbacks:

  • More overhead on file list get method call, but this can be worked around if caller caches the file list for subsequent calls.
  • DeltaTableState will consume more memory over time as as table applies updates and deleted file actions not being removed from the state. We will add need to perform periodic table state compaction after tombstone grows over a certain size.

I am leaning more towards Vec<Option<T>>, but to be honest, I am not very happy with both designs. @dispanser @Dandandan @mgill25 @xianwill @mosyp @nevi-me @viirya @rtyler I am curious what are your thoughts on the best way to reduce memory usage for stats and partition values.

PS: For the context, these file stats are used in two places in downstream compute, one for setting per file stats in table scan:

let partitions = filenames

One for aggregating over all table stats:

pub fn datafusion_table_statistics(&self) -> Statistics {

@houqp
Copy link
Member

houqp commented Oct 10, 2021

Alternatively, we could keep storing stats and partition values in a row based format, i.e. inside Add struct, but inside a flat vector. Then we come up with a convention to map vector index to columns in the table schema:

pub struct Stats { 
    pub num_records: DeltaDataTypeLong,
    pub min_values: Vec<Value>,
    pub max_values: Vec<Value>,
    pub null_count: Vec<DeltaDataTypeLong>,
}

pub struct Add { 
    pub path: String,
    ...
    pub stats: Stats,
}

@mosyp
Copy link
Contributor

mosyp commented Nov 8, 2021

@mgill25 Hi, great PR. I wonder whether you're planning on finish that. Also alternatively @houqp, we could merge it to the new feature branch instead and work on it collectively.

The mem issue has become the concern for us so we thinking either we can benefit from this feature or not storing struct actions in memory at all (for kafka-delta-ingest we don't need them)

@manishgill-tomtom
Copy link

Re: the design ideas, @houqp I'm not well versed in Arrow enough to make the best design decision, so perhaps Vec<Option<T>> is the short term solution we take for now? I will try to make an update to the PR this week.

@dispanser
Copy link
Contributor

The mem issue has become the concern for us so we thinking either we can benefit from this feature or not storing struct actions in memory at all (for kafka-delta-ingest we don't need them)

If you don't need the stats for the use case, not storing them at all is may be the best (and cheapest option). An alternative "workaround" for memory consumption problems that I can see would be to have configurable whilelist of columns for which the stats are useful, and discard the rest. For our use case, most of these column stats are useless (e.g., UUIDs min / max stats tend to not have any filtering capability).

There's a possibility in delta itself to control this to some extend already on the write, but it involves reordering columns (and assumes that the table consumer can convince the table producer to perform that change).

@houqp
Copy link
Member

houqp commented Nov 15, 2021

Sorry for the late response, last 2 weeks have been crazy for me, so I am still catching up on all the great discussions :)

Re: the design ideas, @houqp I'm not well versed in Arrow enough to make the best design decision, so perhaps Vec<Option> is the short term solution we take for now? I will try to make an update to the PR this week.

This sounds good to me 👍

If you don't need the stats for the use case, not storing them at all is may be the best (and cheapest option). An alternative "workaround" for memory consumption problems that I can see would be to have configurable whilelist of columns for which the stats are useful, and discard the rest. For our use case, most of these column stats are useless (e.g., UUIDs min / max stats tend to not have any filtering capability).

I think supporting projection based stats loading could be an interesting and useful feature to add 👍

These optimization probably wont' help with kafka-delta-ingest though since it doesn't know if downstream consumer of the table will need these stats or not.

@mgill25
Copy link
Author

mgill25 commented Jan 3, 2022

Apologies for zero activity on this for the last 2 months.

I think for now I don't have the bandwidth to work on this, so if anyone wants to pick this up, please feel free to.

@rtyler rtyler marked this pull request as draft May 9, 2023 16:55
rtyler added a commit that referenced this pull request Jan 23, 2024
# Description

This is still very much a work in progress, opening it up for visibility
and discussion.

Finally I do hope that we can make the switch to arrow based log
handling. Aside from hopefully advantages in the memory footprint, I
also believe it opens us up to many future optimizations as well.

To make the transition we introduce two new structs 

- `Snapshot` - a half lazy version of the Snapshot, which only tries to
get `Protocol` & `Metadata` actions ASAP. Of course these drive all our
planning activities and without them there is not much we can do.
- `EagerSnapshot` - An intermediary structure, which eagerly loads file
actions and does log replay to serve as a compatibility laver for the
current `DeltaTable` APIs.

One conceptually larger change is related to how we view the
availability of information. Up until now `DeltaTableState` could be
initialized empty, containing no useful information for any code to work
with. State (snapshots) now always needs to be created valid. The thing
that may not yet be initialized is the `DeltaTable`, which now only
carries the table configuration and the `LogStore`. the state / snapshot
is now optional. Consequently all code that works against a snapshot no
longer needs to handle that matadata / schema etc may not be available.

This also has implications for the datafusion integration. We already
are working against snapshots mostly, but should abolish most traits
implemented for `DeltaTable` as this does not provide the information
(and never has) that is al least required to execute a query.

Some larger notable changes include:

* remove `DeltaTableMetadata` and always use `Metadata` action.
* arrow and parquet are now required, as such the features got removed.
Personalyl I would also argue, that if you cannot read checkpoints, you
cannot read delta tables :). - so hopefully users weren't using
arrow-free versions.

### Major follow-ups:

* (pre-0.17) review integration with `log_store` and `object_store`.
Currently we make use mostly of `ObjectStore` inside the state handling.
What we really use is `head` / `list_from` / `get` - my hope would be
that we end up with a single abstraction...
* test cleanup - we are currently dealing with test flakiness and have
several approaches to scaffolding tests. SInce we have the
`deltalake-test` crate now, this can be reconciled.
* ...
* do more processing on borrowed data ...
* perform file-heavy operations on arrow data
* update checkpoint writing to leverage new state handling and arrow ...
* switch to exposing URL in public APIs

## Questions

* should paths be percent-encoded when written to checkpoint?

# Related Issue(s)

supersedes: #454
supersedes: #1837
closes: #1776
closes: #425 (should also be addressed in the current implementation)
closes: #288 (multi-part checkpoints are deprecated)
related: #435

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: R. Tyler Croy <[email protected]>
@roeap
Copy link
Collaborator

roeap commented Jan 26, 2024

superseded by #2037

@roeap roeap closed this Jan 26, 2024
RobinLin666 pushed a commit to RobinLin666/delta-rs that referenced this pull request Feb 2, 2024
# Description

This is still very much a work in progress, opening it up for visibility
and discussion.

Finally I do hope that we can make the switch to arrow based log
handling. Aside from hopefully advantages in the memory footprint, I
also believe it opens us up to many future optimizations as well.

To make the transition we introduce two new structs 

- `Snapshot` - a half lazy version of the Snapshot, which only tries to
get `Protocol` & `Metadata` actions ASAP. Of course these drive all our
planning activities and without them there is not much we can do.
- `EagerSnapshot` - An intermediary structure, which eagerly loads file
actions and does log replay to serve as a compatibility laver for the
current `DeltaTable` APIs.

One conceptually larger change is related to how we view the
availability of information. Up until now `DeltaTableState` could be
initialized empty, containing no useful information for any code to work
with. State (snapshots) now always needs to be created valid. The thing
that may not yet be initialized is the `DeltaTable`, which now only
carries the table configuration and the `LogStore`. the state / snapshot
is now optional. Consequently all code that works against a snapshot no
longer needs to handle that matadata / schema etc may not be available.

This also has implications for the datafusion integration. We already
are working against snapshots mostly, but should abolish most traits
implemented for `DeltaTable` as this does not provide the information
(and never has) that is al least required to execute a query.

Some larger notable changes include:

* remove `DeltaTableMetadata` and always use `Metadata` action.
* arrow and parquet are now required, as such the features got removed.
Personalyl I would also argue, that if you cannot read checkpoints, you
cannot read delta tables :). - so hopefully users weren't using
arrow-free versions.

### Major follow-ups:

* (pre-0.17) review integration with `log_store` and `object_store`.
Currently we make use mostly of `ObjectStore` inside the state handling.
What we really use is `head` / `list_from` / `get` - my hope would be
that we end up with a single abstraction...
* test cleanup - we are currently dealing with test flakiness and have
several approaches to scaffolding tests. SInce we have the
`deltalake-test` crate now, this can be reconciled.
* ...
* do more processing on borrowed data ...
* perform file-heavy operations on arrow data
* update checkpoint writing to leverage new state handling and arrow ...
* switch to exposing URL in public APIs

## Questions

* should paths be percent-encoded when written to checkpoint?

# Related Issue(s)

supersedes: delta-io#454
supersedes: delta-io#1837
closes: delta-io#1776
closes: delta-io#425 (should also be addressed in the current implementation)
closes: delta-io#288 (multi-part checkpoints are deprecated)
related: delta-io#435

# Documentation

<!---
Share links to useful documentation
--->

---------

Co-authored-by: R. Tyler Croy <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants