Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Versioning]: Explore Kedro + Iceberg for versioning #4241

Open
ElenaKhaustova opened this issue Oct 17, 2024 · 20 comments
Open

[Versioning]: Explore Kedro + Iceberg for versioning #4241

ElenaKhaustova opened this issue Oct 17, 2024 · 20 comments
Assignees

Comments

@ElenaKhaustova
Copy link
Contributor

Description

At the current stage by versioning we assume mapping a single version number to the corresponding versions of parameters, I/O data, and code. So one is able to retrieve a full project state including data at any point in time.

The goal is to check if we can use Iceberg to map a single version number to code, parameters, and I/O data within Kedro and how it aligns with Kedro’s workflow.

As a result, we expect a working example of kedro project used with Iceberg for versioning and some assumptions on:

  • whether it solves the main task and what are the constraints;
  • how easy is to set up;
  • how the workflow looks like;
  • whether any changes are required on the kedro side;
  • what data formats are supported;
  • how easy is to work with local/remote storage;
  • how demanding is it in terms of dependencies.

Context

#4199

Market research

@ElenaKhaustova ElenaKhaustova added Issue: Feature Request New feature or improvement to existing feature and removed Issue: Feature Request New feature or improvement to existing feature labels Oct 17, 2024
@datajoely
Copy link
Contributor

This is slightly unscientific, but I trust the vibes in the industry enough to say Iceberg will clearly be the winner in long term.

image

Plus people saying things like this:
https://www.linkedin.com/posts/michaelrosam_the-five-phases-of-a-successful-ai-data-strategy-activity-7252579389664587776-TgMo?utm_source=share&utm_medium=member_desktop

In my opinion this is a situation where we should really go all in on the technology rather than be super agnostic / on-size-fits-all. I'd love a future for Kedro where without much configuration persisted data defaults to this model.

@merelcht merelcht added this to the Dataset Versioning milestone Oct 18, 2024
@noklam
Copy link
Contributor

noklam commented Oct 18, 2024

@datajoely I actually took a stab on this a while ago. My experience with it is Delta has a more mature support than Iceberg at the moment in the Python ecosystem. for example the integration of ibis with iceberg is suboptimal. So from there I think Delta is gonna have a better performance with anything database related, AFAIK with iceberg it always load things in memory first.

One thing to note that these "versioning" are not as effective as we want. For example, an incremental change of adding 1 row will result in a complete rewrite in current Kedro dataset with Delta as well. For high-level versioning, they works very well with dataframe/table format.

The main challenge here I see is how to unify the "versioning" in Kedro, Kedro use a customisable timestamp, while Delta use a incremental version number (0, 1, xxx) or timestamp. Iceberg probably user something similar but I haven't checked.

@datajoely
Copy link
Contributor

Delta is 100% more mature, Iceberg is the horse to back.

This is the thread I was trying to find earlier:

https://x.com/sean_lynch/status/1845500735842390276

I also don't think we should be wedded to that timestamp decision. It was made a long time ago and also has a non-trivial risk of collision. If we were doing that again we'd be better off using a ULID...

@noklam
Copy link
Contributor

noklam commented Oct 29, 2024

^ To be more specific, I was referring mainly to the python binding, i.e. PyIceberg and rust-delta(python). Iceberg itself is fairly mature, especially with the catalog etc, but the python binding seems to be lacking behind a little bit.

@noklam
Copy link
Contributor

noklam commented Oct 29, 2024

Any chance I can take this ticket or work together on this? I have explored this a little bit a while ago and would be a great opportunities to continue on it.

@merelcht @ankatiyar

@deepyaman
Copy link
Member

I agree with @datajoely is the horse to back, at least from an API perspective. PyIceberg is maturing (it has moved significantly in the past couple years).

Realistically, I don't think Kedro should dictate whether you use Iceberg or Delta (or Hudi); that is a user choice, just like whether to use Spark or Polars. This is where unified APIs will ideally make implementation easier.

@datajoely
Copy link
Contributor

So I'm actually being bullish and saying we should pick one of these when it comes to our idea of versioned data. We simply don't have capacity to integrate everywhere properly.

@datajoely
Copy link
Contributor

@merelcht merelcht assigned noklam and unassigned ElenaKhaustova Nov 11, 2024
@astrojuanlu
Copy link
Member

Realistically, I don't think Kedro should dictate whether you use Iceberg or Delta (or Hudi); that is a user choice, just like whether to use Spark or Polars.

I'm with @deepyaman on this one. There should be a layer in Kedro that is format-agnostic. We can be more opinionated in a higher layer.

@astrojuanlu
Copy link
Member

What's clear though is that the Apache Iceberg’s REST catalog API has won for sure kedro-org/kedro-devrel#141 (comment)

@datajoely
Copy link
Contributor

I'm with @deepyaman on this one. There should be a layer in Kedro that is format-agnostic. We can be more opinionated in a higher layer.

I just want to warn against the noble pursuit of generalisation when there are times to pick a winner, I'd much rather pick a horse and do it well.

@noklam
Copy link
Contributor

noklam commented Nov 18, 2024

@ElenaKhaustova I have left some questions at the end since it's not a PR yet.

https://noklam.github.io/blog/posts/pyiceberg/2024-11-18-PyIcebergDataset.html

# Questions
- What does it means when we said " if we can use Iceberg to map a single version number to code, parameters, and I/O data within Kedro and how it aligns with Kedro’s workflow." Versioning code & parameters sounds more like versioning artifacts.
- How to efficiently version data? `overwrite` is a completely re-write. For SQL engine this is implemented by the engine that utilise API like `append`, `replace`. With pandas/polars it is unclear if it's possible. (Maybe be possible if it's using something like `ibis`)
- Incremental pipeline (and incremental data)
- Version non-table type, i.e. parameters, code(?), Iceberg support only these three formats out of the box: Apache Parquet, Apache ORC, and Apache Avro. Parquet is the first-class citizen and the only format that people use in practice.

@ElenaKhaustova
Copy link
Contributor Author

@ElenaKhaustova I have left some questions at the end since it's not a PR yet.

https://noklam.github.io/blog/posts/pyiceberg/2024-11-18-PyIcebergDataset.html

# Questions
- What does it means when we said " if we can use Iceberg to map a single version number to code, parameters, and I/O data within Kedro and how it aligns with Kedro’s workflow." Versioning code & parameters sounds more like versioning artifacts.
- How to efficiently version data? `overwrite` is a completely re-write. For SQL engine this is implemented by the engine that utilise API like `append`, `replace`. With pandas/polars it is unclear if it's possible. (Maybe be possible if it's using something like `ibis`)
- Incremental pipeline (and incremental data)
- Version non-table type, i.e. parameters, code(?), Iceberg support only these three formats out of the box: Apache Parquet, Apache ORC, and Apache Avro. Parquet is the first-class citizen and the only format that people use in practice.

From the versioning research (https://miro.com/app/board/uXjVK9U8mVo=/?share_link_id=24726044039) pain points and summary, we concluded that users mention two major problems—versioning and experiment tracking. At first, we decided to focus on versioning. With it, the main user pain point was not to version a specific artefact as current kedro versioning allows so (not in an optimal way though) but to be able to retrieve a whole experiment/run. Meaning being able to travel back in time with your code and data and checkout to a specific version for the whole kedro project not just for the individual artifact.

Please see Kedro + DVC example for better understanding: #4239 (comment)

It's clear we can easily version artifacts (tabular data), but what about versioning catalogs/projects—more high-level entities and non-tabular data?

So the main questions are:

  1. Does it make sense to use it for anything else but versioning tabular data?
  2. Can we map artifacts snapshots with the git commit hash for example to retrieve the full project state or is there any other mechanism for that?

@datajoely
Copy link
Contributor

My view:

  1. Does it make sense to use it for anything else but versioning tabular data?

I'm willing to bet >95% of use cases fall into this.

  1. Can we map artifacts snapshots with the git commit hash for example to retrieve the full project state or is there any other mechanism for that?

Now I've seen how elegant the dvc integration can be, maybe that's the right paradigm?

@noklam
Copy link
Contributor

noklam commented Nov 19, 2024

It's clear we can easily version artifacts (tabular data), but what about versioning catalogs/projects—more high-level entities and non-tabular data?

  • Iceberg (table) was designed for tabular data, so non-tabular is not supported.
  • Version object isn't something that Iceberg can do.
  • While version artifacts data is possible, Iceberg can do it in a much more efficient way (though not supported in my current implementation)

So the main questions are:

Does it make sense to use it for anything else but versioning tabular data?

My short answer is no.

Can we map artifacts snapshots with the git commit hash for example to retrieve the full project state or is there any other mechanism for that?

You can create "branch" and "tags" with Iceberg Table, but again that's for tabular data only.
https://py.iceberg.apache.org/api/#snapshot-management

Other comments

Many experiment tracking tools start with metric tracking + git hash, then slowly adding data versioning & lineage as well. In general, a full reproducible experiment (whether or not this is important is a different story):

  1. Python environment (freeze version + OS)
  2. Code, this often mean git hash + uncommited changes. This can be combined with generating a git patch file + some GitOps.
  3. Data versioning. This is the most tricky one. With artifacts, this is usually simpler, but it's extremely hard with tables or anything stored in database simply because state is not preserved (or anything that can be mutated externally). What I am seeing here is that DVC solves the artifact problem but cannot do much with tables.

With Iceberg, the metadata needed to be handled externally, i.e. a SQLite db that keep tracks of runs git hash + load_version of all the data (or their snapshot_id from the Iceberg table). So when user need to do time travel, it needs to specify the load version like this:

SELECT * FROM your_table_name TIMESTAMP AS OF 'YYYY-MM-DD HH:MM:SS'; # can be id as well, 

@noklam
Copy link
Contributor

noklam commented Nov 26, 2024

I put together an example here: https://github.com/noklam/kedro-example/tree/master/iceberg-incremental

TL;DR Summary

  • Iceberg support Table only, so won't be a full replacement of Kedro own filebased versioning.
  • Versioning is fairly flexible (tags + branch)
  • Iceberg need to work with something else, i.e. compacting metadata (same is true for Delta Lake) etc, it doesn't work just on its own.
  • Iceberg requires a proper Catalog to enable multi-users experience, SQLite comes with issue like concurrent write and locking.
  • Traditional, most Kedro workflow are "overwrite" by default, so Iceberg/Delta doesn't provided much benefit.
  • It may be more useful for DE workflow like incremental or partial update. It can be more efficient. i.e. If you have 1 GB of partitioned data today, and a new partitions (1mb) arrive tomorrow, Kedro will save all data so it resulted in ~2GB of storage. Iceberg can handle this nicer, provided that we map native API.
  • Unclear about the benefit of using Iceberg with Dataframe, since it's almost always a complete overwrite. It can be useful with SQL base workflow, but that is outside of Kedro scope and may best used with librarise like ibis. As of today, ibis do not have native Iceberg support (feat: Iceberg table support ibis-project/ibis#7712)
  • To achieve full time-travel, we need extra component to track parameters/git hash etc.

In my mind, it wouldn't be changes in Kedro, but more likely at dataset level.

@datajoely @ElenaKhaustova

@astrojuanlu
Copy link
Member

Iceberg requires a proper Catalog to enable multi-users experience, SQLite comes with issue like concurrent write and locking.

Indeed. From https://py.iceberg.apache.org/configuration/#catalogs, "PyIceberg currently has native catalog type support for REST, SQL, Hive, Glue and DynamoDB"

Traditional, most Kedro workflow are "overwrite" by default, so Iceberg/Delta doesn't provided much benefit.

Yes but many users (including ourselves! like @DimedS's ETL for PyPI data) are using Kedro for data pipelines that require UPSERT/merge operations, see long discussion and pointers to user evidence in #3578

To achieve full time-travel, we need extra component to track parameters/git hash etc.

Is that unique to Iceberg?

@datajoely
Copy link
Contributor

Thanks Nok this is super helpful, in general upserts don't canonically make sense in Kedro

@deepyaman
Copy link
Member

  • Traditional, most Kedro workflow are "overwrite" by default, so Iceberg/Delta doesn't provided much benefit.
  • It may be more useful for DE workflow like incremental or partial update. It can be more efficient. i.e. If you have 1 GB of partitioned data today, and a new partitions (1mb) arrive tomorrow, Kedro will save all data so it resulted in ~2GB of storage. Iceberg can handle this nicer, provided that we map native API.
  • Unclear about the benefit of using Iceberg with Dataframe, since it's almost always a complete overwrite. It can be useful with SQL base workflow, but that is outside of Kedro scope and may best used with librarise like ibis. As of today, ibis do not have native Iceberg support (feat: Iceberg table support ibis-project/ibis#7712)

Yes, Iceberg, Delta, etc. makes sense in a lake context, particularly with upsert operations. I don't think there's anything significant blocking supporting Iceberg in Ibis; PyIceberg may not support the most efficient implementation (haven't checked lately), but it should still be possible.

Spark with Iceberg is also very possible.

In my mind, it wouldn't be changes in Kedro, but more likely at dataset level.

💯 This is what I've tried to say from the beginning; it's about whether the engine supports working with Delta, Iceberg, etc., and those will also interface directly with the relevant Delta/Iceberg catalog.

Thanks Nok this is super helpful, in general upserts don't canonically make sense in Kedro

Again, for a subset of datasets, this should make sense. kedro-org/kedro-plugins#835 is technically feasible, and upserts are also possible for Spark. I think this is necessary to be a serious player in DE tooling; nobody is going to reprocess all their data every run in production.

@noklam
Copy link
Contributor

noklam commented Nov 27, 2024

Yes but many users (including ourselves! like @DimedS's ETL for PyPI data) are using Kedro for data pipelines that require UPSERT/merge operations, see long discussion and pointers to user evidence in #3578 @deepyaman @astrojuanlu

Yes. Upsert/Append/Merge is possible with Kedro, but it wasn't the main use case. It could be a separate argument that we want to bet more on this. I haven't implemented this fully but I my demo is based on that incremental pipeline, so we can discuss in details later.

To achieve full time-travel, we need extra component to track parameters/git hash etc.
Is that unique to Iceberg? @astrojuanlu

Not really, that would be the same for Iceberg/Delta

Again, for a subset of datasets, this should make sense. kedro-org/kedro-plugins#835 is technically feasible, and upserts are also possible for Spark. I think this is necessary to be a serious player in DE tooling; nobody is going to reprocess all their data every run in production.

Spark with Iceberg is also very possible.

In fact I think Spark is the most common path that Kedro user will interact with Delta/Iceberg. In theory you can write to a Delta Lake /Iceberg table through pyiceberg or delta-rs, but I suspect it's very rare (I bet on no Kedro users want to write a non-spark pipeline to write to a Delta table). Reading these table could be more common, and these libraries is relatively lightweight (compare to spark).

Upsert is slightly out of scope for this issue. There are things that we can do to add value:

  • Better integration with Spark, i.e. If user already have Iceberg table, can we can it easy for them to specify a version through the Data Catalog?
  • As an implementation of versioned PartitionedDataset (for tables only), it can be also used for improving incremental pipelines in Kedro. i.e. New data arrive, process a chunk, update a chunk instead of copying everything.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
Status: In Progress
Development

No branches or pull requests

7 participants