Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

get_add_actions does not return any records #2507

Open
antonsteenvoorden opened this issue May 13, 2024 · 10 comments
Open

get_add_actions does not return any records #2507

antonsteenvoorden opened this issue May 13, 2024 · 10 comments
Labels
question Further information is requested

Comments

@antonsteenvoorden
Copy link

antonsteenvoorden commented May 13, 2024

Environment

Delta-rs version: python-0.15.2 up until python-0.17.3

Binding: python


Bug

deltalake versions higher than v0.15.1 return an empty dataframe forget_add_actions after a CREATE OR REPLACE TABLE AS SELECT commit on our delta table.

What happened:

> delta_table = DeltaTable(input_path)
> partition_columns = delta_table.metadata().partition_columns
> partition_columns

This correctly shows the columns we partitioned on

> add_actions = delta_table.get_add_actions(flatten=True).to_pandas()
> add_actions.empty
True

The columns of the dataframe are present and correct.

What you expected to happen:

> delta_table = DeltaTable(input_path)
> add_actions = delta_table.get_add_actions(flatten=True).to_pandas()
> add_actions.empty
False

(and to have records of course ;)

More details:
We are interested in the values for the partitioned columns. Since deltalake does not add the partition columns when reading a DeltaTable we were using the get_add_actions method to join the partition values onto the table using the _file_uri.
(side note: if anyone knows of a better way to do this, I would be happy to hear).

Initially, we found no issues switching to v0.17.3. However, the values for the partition columns were suddenly missing. We looked at the table history (see below) and found that it broke after an ETL job caused a CREATE OR REPLACE TABLE AS SELECT.

We found that with v0.17.3 it still worked for version our table version 217 but is broken from version 218 onwards.
We tried all version in between to pin-point which release broke it, and it appears to be broken from >= v0.15.2

v0.15.1 continues to work for our delta table after this commit and does not display this behavior.

version timestamp operation operationParameters
221 2024-05-07T23:22:57.000+00:00 WRITE {"mode":"Append","statsOnLoad":"false","partitionBy":"[]"}
220 2024-05-07T23:21:29.000+00:00 DELETE {"predicate":"["(sales_date#298771 >= 2024-04-24)"]"}
219 2024-05-07T18:00:06.000+00:00 CREATE OR REPLACE TABLE AS SELECT {"partitionBy":"["item_in_promo","year","item"]","description":null,"isManaged":"true","properties":"{"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false"}","statsOnLoad":"false"}
218 2024-05-07T08:41:21.000+00:00 CREATE OR REPLACE TABLE AS SELECT {"partitionBy":"["item_in_promo","year","item"]","description":null,"isManaged":"true","properties":"{"delta.checkpoint.writeStatsAsStruct":"true","delta.checkpoint.writeStatsAsJson":"false"}","statsOnLoad":"false"}
217 2024-05-06T23:39:44.000+00:00 WRITE {"mode":"Append","statsOnLoad":"false","partitionBy":"[]"}

We are not familiar enough with the codebase and Rust to determine whether this is by design or broken

How to reproduce it:

We were unable to. We tried to do the bewlo on a small example:

CREATE OR REPLACE TABLE tmp.delta_table
USING DELTA
PARTITIONED BY (partition_col)
TBLPROPERTIES (
  delta.checkpoint.writeStatsAsJson=false,
  delta.checkpoint.writeStatsAsStruct=true,
  delta.minReaderVersion=1,
  delta.minWriterVersion=2
  )
AS
SELECT 1 AS partition_col, 'value' AS something
INSERT INTO tmp.delta_table
VALUES
(1, 'value1'),
(2, 'value2'),
(3, 'value3')
INSERT INTO tmp.delta_table
VALUES
(4, 'value4'),
(5, 'value5'),
(6, 'value6')

Reading is just fine, then we do the potentially troublesome operation:

CREATE OR REPLACE TABLE tmp.delta_table 
USING DELTA
PARTITIONED BY (partition_col)
TBLPROPERTIES (delta.checkpoint.writeStatsAsJson=false,delta.checkpoint.writeStatsAsStruct=true)
AS SELECT 42 AS partition_col, 'value42' AS something 

But afterwards, we are still able to read using v0.17.3

@antonsteenvoorden antonsteenvoorden added the bug Something isn't working label May 13, 2024
@antonsteenvoorden antonsteenvoorden changed the title get_add_actions does not return any records after a CREATE OR REPLACE TABLE AS SELECT statement get_add_actions does not return any records May 13, 2024
@rtyler rtyler added the binding/python Issues for the Python package label May 15, 2024
@ion-elgreco
Copy link
Collaborator

@antonsteenvoorden the previous behavior was incorrect as it would not remove the added file actions when doing a create or replace.

Now it properly does what its supposed to. So doing create with mode overwrite, replaces the table and removes the content.

@ion-elgreco ion-elgreco added question Further information is requested and removed bug Something isn't working binding/python Issues for the Python package labels May 29, 2024
@diederikperdok
Copy link

diederikperdok commented May 30, 2024

@ion-elgreco thanks for your reply.

Are you refering to #2437? If I understand correctly, that PR fixes a bug that occurs when writing a table with delta-rs. The problem above as described by @antonsteenvoorden however, is about an issue when reading a table with delta-rs that was written using spark.

I've delved into this problem a bit further so I can provide more details of what (I think) is going on:

  1. We create a managed table whose location corresponds to a location where an external table already exists. The table is created using CREATE OR REPLACE TABLE {managed_table} AS (SELECT * FROM {external_table}). This leads to a situation where we have an add, a remove and then again an add action for the exact same file path in the delta log. So this file should be considered active.
  2. Since version 0.15.2 there is this bit of code that uses a hashmap seen to keep track of which file paths were already processed. An add action for a file that was already seen is then ignored. So the second add for the same path (the "re-add") is not processed.
  3. Therefore there is no add action for this file path in the output of get_add_actions since 0.15.2, while there should have been one.

Now to be honest I don't think this should be fixed on the delta-rs side. This approach of recreating a table in a location where there is already another table is of course hacky and dangerous and we didn't like it in the first place; we'll look for a different approach. It is also a clear violation of the delta protocol so we cannot expect you to support this.

Thanks for your help and you may close the issue.

@ion-elgreco
Copy link
Collaborator

@diederikperdok thanks for the additional info, it wasn't entirely clear before.

so add, a remove and then again an add action for the exact same file path in the delta log. this all happens in a single transaction?

As it is an odd thing to see, it could be considered a bug or missing robustness of the log replay. I'll keep it open for now.

@roeap do you have any inputs here, I think theoretically we should be able to read add remove add of the same path, as long as the sum of the path is not >1

@diederikperdok
Copy link

diederikperdok commented May 30, 2024

Ok, on 2nd look I might have misdiagnosed the issue. From the code it still looks like the LogReplayScanner would indeed not handle remove and add of the same path in 1 transaction, but that is actually not what is happening in our case.

It looks like that in our case certain actions that I can see in the deltalog .json for that version are simply not returned by get_add_actions for some reason. It's hard to diagnose as we're still not sure yet how to reproduce this in a small example.

@diederikperdok
Copy link

diederikperdok commented Jun 10, 2024

I dove into it again and it seems to have something to do with how checkpoints are handled since >= 0.15.2. Every time there is a checkpoint in the delta log, get_add_actions for that table returns an empty dataframe for 0.15.2 (or 0.17.3) while it does not for 0.15.1.

When I look into this checkpoint parquet file, I can see that the add actions are there (and there aren't remove actions for the same path) so I would expect to get output. This is also true for the oldest checkpoint that is still in our delta log.

However, I still cannot reproduce this behavior :( To reproduce it I tried the following:

  1. Create a table (SQL):
CREATE DATABASE IF NOT EXISTS mytmp;
DROP TABLE IF EXISTS mytmp.mytbl;

CREATE TABLE mytmp.mytbl
USING DELTA
PARTITIONED BY (part)
AS
SELECT 1 AS part, "foo"
  1. Force creation of a checkpoint by performing 100 small transactions (Python)
for i in range(100):
    spark.sql(f"INSERT INTO mytmp.mytbl SELECT {i} AS part, 'foo'")
  1. Check in the _delta_log directory for the first checkpoint (a *.checkpoint.parquet file). Mine is at version 36, but I'm not sure this is deterministic.
  2. Create a DeltaTable object for versions 35, 36 and 37 and call get_add_actions (Python)
delta_table = DeltaTable("{your spark.sql.warehouse.dir}/mytmp.db/mytbl/", version=36)
delta_table.get_add_actions(flatten=True).to_pandas()
    • Behavior with 0.15.1: get_add_actions for table version 35 returns 36 rows, for version 36 returns 101 rows, and for version 37 returns 38 rows
    • Behavior with 0.15.2: get_add_actions for table version 35 returns 36 rows, for version 36 returns 37 rows, and for version 37 returns 38 rows

I believe the behavior of 0.15.2 is correct here, while 0.15.1 is wrong. So there still is no explanation for why we get empty results for checkpoints on our production table on >=0.15.2 as @antonsteenvoorden described.

Will let you know if I find out more.

@antonsteenvoorden
Copy link
Author

Hi, this is still an issue that blocks us from bumping our delta-rs version, which is getting many great improvements.. Any chance you can take look at this @ion-elgreco?

@sherlockbeard
Copy link
Contributor

sherlockbeard commented Jul 22, 2024

maybe a possible reproduce will be

from pyspark.sql import SparkSession
from delta import *

builder = SparkSession.builder.appName("MyApp") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()


spark.sql("""CREATE OR REPLACE TABLE delta_table
USING DELTA
PARTITIONED BY (partition_col)
TBLPROPERTIES (
  delta.checkpoint.writeStatsAsJson=false,
  delta.checkpoint.writeStatsAsStruct=true,
  delta.minReaderVersion=1,
  delta.minWriterVersion=2
  )
AS
SELECT 1 AS partition_col, 'value' AS something""")

spark.sql("""INSERT INTO delta_table
VALUES
(1, 'value1'),
(2, 'value2'),
(3, 'value3')""")

spark.sql("""CREATE OR REPLACE TABLE delta_table2
USING DELTA
PARTITIONED BY (partition_col)
TBLPROPERTIES (
  delta.checkpoint.writeStatsAsJson=false,
  delta.checkpoint.writeStatsAsStruct=true,
  delta.minReaderVersion=1,
  delta.minWriterVersion=2
  )
AS
SELECT 1 AS partition_col, 'value' AS something""")

spark.sql("""CREATE OR REPLACE TABLE delta_table 
USING DELTA
PARTITIONED BY (partition_col)
TBLPROPERTIES (delta.checkpoint.writeStatsAsJson=false,delta.checkpoint.writeStatsAsStruct=true)
AS SELECT * from delta_table2 where partition_col = 2 """)

in the final CREATE OR REPLACE TABLE there is empty data in select statement . So get_add_actions will return empty for this.

(CREATE OR REPLACE TABLE remove's all your older actions and then add new actions from your select statement data)

Also the value of get_add_actions is correct because there is no data currently in the table (in the latest state)

@antonsteenvoorden
Copy link
Author

@sherlockbeard in our actual use case, the select is/should not be empty. We overwrite the last 2 week's worth of data. It could be that those last 2 weeks are temporarily missing, but we still have a few years worth of data in this table, so those files should theoretically still show up in the get_add_actions right?

@sherlockbeard
Copy link
Contributor

Nope @antonsteenvoorden
CREATE OR REPLACE TABLE checks if table is there . if present it overwrites all the old data with the new data (from the select command).

@ion-elgreco
Copy link
Collaborator

Hi, this is still an issue that blocks us from bumping our delta-rs version, which is getting many great improvements.. Any chance you can take look at this @ion-elgreco?

Sorry I'm too busy with other stuff nowadays.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

5 participants