Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Iceberg read support in Velox #5977

Open
yingsu00 opened this issue Aug 3, 2023 · 25 comments
Open

Iceberg read support in Velox #5977

yingsu00 opened this issue Aug 3, 2023 · 25 comments
Labels
enhancement New feature or request iceberg

Comments

@yingsu00
Copy link
Collaborator

yingsu00 commented Aug 3, 2023

Description

This document talks about supporting Iceberg reads in Velox

Iceberg is a widely adopted data lake table format. It uses a set of metadata and manifest files to manage the tables, and was carefully designed to provide the following functionalities and features:

  1. Row-level inserts and deletes
    Supports both Merge On Read and Copy On Write
    In MOR mode
    • Uses delete files to encode rows deleted in existing data files
    • New data files will be added for newly inserted rows, by default in the same format of the base data file
  2. Full schema evolution and partition spec evolution
  3. Fast scan planning:
    The table and its data files are included by the manifests therefore no expensive directory listing is required when scheduling the splits
  4. Snapshot/Serializable Isolation
    • Reads are isolated from concurrent writes and always use a committed snapshot of a table’s data
    • Writer uses Optimistic Concurrency (OCC) to guarantee the ACID properties of the updates by performing Compare And Swap operation on the catalog link to the metadata file.

In this doc we will focus on the Velox support on 1, reading the data with Iceberg DeleteFiles.

Overview

In Prestissimo, the existing Iceberg connector achieves these functionalities during the planning and scheduling phases by calling into the Iceberg library on the coordinator. The planner and MetadataManager on the coordinator get the Iceberg table information including the snapshot information by issuing some API calls to the Iceberg library. Since Velox only supports filter pushdown in the HiveConnector, a special planner rule will be applied to pushdown the filters to scan during the planning phase. The scheduler creates Prestissimo IcebergSplits, which will be serialized and converted to a HiveIcebergSplit to Velox.

HiveIcebergSplit and IcebergDeleteFile

The HiveIcebergSplit extends the HiveConnectorSplit and adds a special field called deleteFiles, which is a vector of iceberg::IcebergDeleteFile. The IcebergDeleteFile contains information about which rows in the base data file are deleted. The delete files can be any Hive file formats, e.g. Parquet, ORC, AVRO, but in Prestissimo, it would be the same file format as the base data files.

There are two types of Iceberg delete files

  • Positional deletes
    mark a row deleted by data file path and the row position in the data file
  • Equality deletes
    mark a row deleted by one or more column values, like id = 5
    The IcebergDeleteFile has a filePath field which points to an Iceberg delete file, which could be either a positional delete file or an equality delete file. equalityFieldIds is for equality delete files only. lowerBounds and upperBounds are for positional delete files only.
struct IcebergDeleteFile {
  FileContent content;
  const std::string filePath;
  dwio::common::FileFormat fileFormat;
  uint64_t recordCount;
  uint64_t fileSizeInBytes;
  // The field ids for the delete columns for equality delete files
  std::vector<int32_t> equalityFieldIds;
  // The lower bounds of the in-file positions for the deleted rows, identified
  // by each column's field id. E.g. The deleted rows for a column with field id
  // 1 is in range [10, 50], where 10 and 50 are the deleted row positions in
  // the data file, then lowerBounds would contain entry <1, "10">
  std::unordered_map<int32_t, std::string> lowerBounds;
  // The upper bounds of the in-file positions for the deleted rows, identified
  // by each column's field id. E.g. The deleted rows for a column with field id
  // 1 is in range [10, 50], then upperBounds will contain entry <1, "50">
  std::unordered_map<int32_t, std::string> upperBounds;
}

IcebergeDataSource

IcebergeDataSource was used to read the delete files, and interpret the positional deletes as row numbers and encapsulates them in the Mutation object, or constructs predicates for the equality delete files. These delete rows or predicates will then be pushed down into the readers. Bearing in mind the common practice of writing code, namely, reducing code duplicity but maintain necessary structures and encapsulations, we create this class and make it extend HiveDataSource. The operations in this class do NOT overlap with that in HiveDataSource, but are an Iceberg domain specific layer on top of HiveDataSource.

The major Iceberg operations are:

  • openDeletes: create another HiveDataSource to read the delete files and return materialized delete file contents
  • readPositionalDeletes: use openDeletes to read a positional delete file
  • readEqualityDeletes: use openDeletes to read an equality delete file and construct the filters and add them to the scanSpec.
  • readNext: convert the delete rows in file into the delete bitmap in batch and encapsulates it in a Mutation object, then passes it to the base next() function.

The IcebergeDataSource was created when HiveConnector receives a HiveIcebergSplit by checking the customSplitInfo:

customSplitInfo["table_format"] = "hive_iceberg";

There’re alternative ways to do this check, e.g. by using a special connectorId, or checking the TableHandle type, etc. We discarded the connectorId implementation because Velox uses non-fixed connector Ids and the tests could use other names. Other ways

Positional Deletes

The positional deletes file contains two special Iceberg columns "file_path" and "pos" and a optional third column "row".

Field id, name Type Description
2147483546 file_path string Full URI of a data file with FS scheme. This must match the file_path of the target data file in a manifest entry
2147483545 pos long Ordinal position of a deleted row in the target data file identified by file_path, starting at 0
2147483544 row required struct<...> [1] Deleted row values. Omit the column when not storing deleted rows.

    They are with special Iceberg field IDs and are defined in IcebergMetadataColumn.h. "File_path" should match with the base data file path, and “pos” is the row numbers deleted in this file. "row" is not included in the first PR but will be added in the future.

    In Velox, the “pos” column values are the row numbers of the delete rows in the data file, and will be converted into the delete bitmap within the batch, with adjusted row number relative to the beginning of the batch. For this we introduced field readOffset_ which is the read offset to the beginning of the split in number of rows for the current batch, and firstRowInSplit_ , which is the file row position for the first row in the split. The conversion is done in HiveIcebergDataSource::readNext(uint64_t size), this is because Mutation does not support offset yet, so we have to construct the bitmap for every batch instead of for the whole split. Improving the mutation performance is something we will do later.

    In fact, the row numbers can be directly passed to the readers. The current implementation has to convert the row numbers into bitmaps, then read the bitmap later in the SelectiveStructColumnReader and re-populate them as row numbers . This extra conversion can be avoided and we will do it in future PR.

    Note that this process doesn’t need to read and construct extra columns like what Trino did. It added an additional column "_pos" to the existing columns in the base data file, and then constructed the filter of the positional deletes on this extra column. This would incur extra CPU and memory costs, but Velox can handle it without them.

    Delete row range pre-filter

    The IcebergDataSource will maintain two fields startRowInSplit_ and endRowInSplit_ to identify the start and end row numbers for this split in its data file. It shall get the startRowInSplit_ and endRowInSplit_ when adding each split in addSplit(). The DeleteFile also has 2 fields called lowerBounds and upperBounds, which are When opening the delete file in openDeletes(), the lower bound and upperbound will be compared against the startRowInSplit and endRowInSplit. If the delete file doesn’t contain the rows in this split, we can skip opening this delete file.

    Equality Deletes

    Equality delete files identify deleted rows in a collection of data files by one or more column values. Equality delete files store any subset of a table’s columns and use the table’s field ids.
    An equality delete file can contain multiple columns and multiple rows. The columns are conjuncts(AND) and rows are disjuncts(OR)

    example delete file 1:
    id = 4 AND category = ‘Bear’

    equality_ids=[1, 2]
    1: id  | 2: category | 3: name 
    -------|-------------|---------
    4      | Bear        | Polar
    

    example delete file 2
    Id IN {4, 7, 9, 100, 145}

    equality_ids=[1]
    1: id 
    -------
    4      
    7
    9
    100
    145 
    

    example delete file 3
    (Id = 4 AND category IS NULL) OR (id = 7 AND category = ‘Bear’) OR (id = 9 AND category = ‘Bear’)

    equality_ids=[1, 2]
    1: id  | 2: category | 3: name 
    -------|-------------|---------
    4      | NULL        | Unkonwn
    7      | Bear        | Polar
    9      | Bear        | Brown 
    

    Note that before we decide which part of the filters can be pushed to scan and readers, we need to convert the disjuncts into conjuncts. E.g.
    (partkey < 2 and suppkey < 2) or (partkey > 9 and suppkey > 9)
    will be converted to

    (Partkey<2 or parkey > 9) and (parkey < 2 or suppkey > 9)
    and
    (suppkey < 2 or partkey > 9) and (suppkey < 2 or suppkey > 9)
    

    In Presto, this is done by the LogicalExpressionRewriter in the SimplifyRowExpressions rule. It will be simplified and transformed a few more times in several planner rules. HiveFiltePushdown rule will push the non-dynamic filters into scan. In the above example, (Partkey<2 or parkey > 9) and (suppkey < 2 or suppkey > 9) will be domain filters pushed to the reader, while (parkey < 2 or suppkey > 9) and (suppkey < 2 or partkey > 9) will be the remaining filters which will be evaluated after the ColumnReaders loaded the two columns. There are no other remaining filters that has to be done in a FilterNode.

    There’re two places we can do the above process:

    1. In coordinator scheduler.
    2. In Velox workers

    For 1 to happen, we shall notice that the delete files can only be acquired after the split was created. This is after the planning is done, and requires the scheduler to open the delete files, create HivePagesource, read the content, construct the expression, and reuse LogicalExpressionRewriter to rewrite the expression, then merge the filters with the existing ones in the HiveTableLayoutHandle. The concern is that opening the delete files on the coordinator might be a potential bottleneck. But on the other hand, we can build local caches to avoid opening the same files over and over and the expressions only need to be rewritten once. This is expected to work considering the whole partition or table may share the same delete files. Velox doesn’t need changes since the filters would be handled in the coordinator and pushed down to scan.

    If we want to choose 2 and open the delete files in Velox scan, we will have to manually rewrite and simplify the expressions because there is no equivalent LogicalExpressionRewriter in Velox. But luckily since they are all equality predicates, all of them can be handled in TableScan because there are no dynamic filters in it. We can treat these situations separately

    • One row only (a = 1 AND b = 2)
    • One column only (a IN {1, 2, 3})
    • Multi-row and multi-columns: Just evaluate the whole disjuncts as a remaining filter function.
      We can also have a local equality delete file cache on the Velox workers to improve the performance.

    We will need to make POCs for both options and see if the performance is ok. If you have thoughts please feel free to comment in this issue.

    Type Conversion

    Iceberg has its own data types. But for Prestissimo, the type conversions for the base data files would be done in the planning phase on the coordinator, so that when Velox receives the split, the type in the column handles would already be Velox types.

    The delete files need to be treated separately. But since the delete files has pre-defined schema and the types are already known, we will directly use Velox types. E.g. the “pos” column would be read as a BIGINT column directly.

    #define ICEBERG_DELETE_FILE_POSITIONS_COLUMN() \
      std::make_shared<IcebergMetadataColumn>(     \
          2147483545,                              \
          kIcebergRowPositionColumn,               \
          BIGINT(),                                \
          "Ordinal position of a deleted row in the data file");
    

    Metadata Columns

    Iceberg defines metadata columns in the Spec https://iceberg.apache.org/spec/#table-metadata-fields

    Field id, name Type Description
    2147483646 _file string Path of the file in which a row is stored
    2147483645 _pos long Ordinal position of a row in the source data file
    2147483644 _deleted boolean Whether the row has been deleted
    2147483643 _spec_id int Spec ID used to track the file containing a row
    2147483642 _partition struct Partition to which a row belongs
    2147483546 file_path string Path of a file, used in position-based delete files
    2147483545 pos long Ordinal position of a row, used in position-based delete files
    2147483544 row struct<...> Deleted row values, used in position-based delete files

    and in Iceberg library:
    package org.apache.iceberg;
    
    import java.util.Map;
    import java.util.Set;
    import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
    import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
    import org.apache.iceberg.types.Types.BooleanType;
    import org.apache.iceberg.types.Types.IntegerType;
    import org.apache.iceberg.types.Types.LongType;
    import org.apache.iceberg.types.Types.NestedField;
    import org.apache.iceberg.types.Types.StringType;
    
    public class MetadataColumns {
        public static final int FILE_PATH_COLUMN_ID = 2147483646;
        public static final String FILE_PATH_COLUMN_DOC = "Path of the file in which a row is stored";
        public static final NestedField FILE_PATH = NestedField.required(2147483646, "_file", StringType.get(), "Path of the file in which a row is stored");
        public static final NestedField ROW_POSITION = NestedField.required(2147483645, "_pos", LongType.get(), "Ordinal position of a row in the source data file");
        public static final NestedField IS_DELETED = NestedField.required(2147483644, "_deleted", BooleanType.get(), "Whether the row has been deleted");
        public static final int SPEC_ID_COLUMN_ID = 2147483643;
        public static final String SPEC_ID_COLUMN_DOC = "Spec ID used to track the file containing a row";
        public static final NestedField SPEC_ID = NestedField.required(2147483643, "_spec_id", IntegerType.get(), "Spec ID used to track the file containing a row");
        public static final int PARTITION_COLUMN_ID = 2147483642;
        public static final String PARTITION_COLUMN_NAME = "_partition";
        public static final String PARTITION_COLUMN_DOC = "Partition to which a row belongs to";
        public static final NestedField DELETE_FILE_PATH = NestedField.required(2147483546, "file_path", StringType.get(), "Path of a file in which a deleted row is stored");
        public static final NestedField DELETE_FILE_POS = NestedField.required(2147483545, "pos", LongType.get(), "Ordinal position of a deleted row in the data file");
        public static final String DELETE_FILE_ROW_FIELD_NAME = "row";
        public static final int DELETE_FILE_ROW_FIELD_ID = 2147483544;
        public static final String DELETE_FILE_ROW_DOC = "Deleted row values";
        public static final NestedField CHANGE_TYPE = NestedField.required(2147483543, "_change_type", StringType.get(), "Record type in changelog");
        public static final NestedField CHANGE_ORDINAL = NestedField.optional(2147483542, "_change_ordinal", IntegerType.get(), "Change ordinal in changelog");
        public static final NestedField COMMIT_SNAPSHOT_ID = NestedField.optional(2147483541, "_commit_snapshot_id", LongType.get(), "Commit snapshot ID");
        private static final Map<String, NestedField> META_COLUMNS;
        private static final Set<Integer> META_IDS;
    

    These metadata columns will be used internally by Presissimo. For example, an update query may require "row" and "pos" fields to be added to TableScan so that new values can be merged to the right rows by later operators.

    We will also support directly querying these metadata columns in the future. Currently Presto Hive and Iceberg supports reading two metadata columns "$path" "$file_modified_time":

    SELECT *, "$path", "$file_modified_time"
    FROM nation;
    

    We can also query

    SELECT nationkey, "$_deleted"
    FROM nation;
    

    Note that these columns are not in the actual data files, but need to be constructed by the data source and readers according to Iceberg spec. These extra columns can be added through Column Projections in HiveIcebergDataSource.

    However, to just support reads with positional delete files, we found there is no need to include additional metadata fields like “_file” or “_pos” to the base data files. The information of these two columns is self-contained in the data files. This is different than Trino's implementation, which do need to add the "_pos" field to base file read.

    Column Projections and Schema Evolution

    Columns in Iceberg data files are selected by field id. The table schema’s column names and order may change after a data file is written, and projection must be done using field ids. This can be used to implement schema evolutions. It also can be used to query metadata columns that are not existing in the data file.

    In Presto, the column mappings are done at the HivePageSource level, not just for Iceberg. We can do the same in Velox in HiveDataSource. Also the schema evolution would better be done for all Hive tables, not just Iceberg, if they achieve the same semantics. As far as I know Meta already has some plan to support it in the Hive connector for all table formats and file formats, and the design is unclear at the moment. We will need to learn more about it before making decision how to support schema evolution.

@yingsu00 yingsu00 added enhancement New feature or request iceberg labels Aug 3, 2023
yingsu00 added a commit to yingsu00/velox that referenced this issue Aug 3, 2023
This is the first cut to support reading Iceberg tables with delete
files. The design doc can be found at
facebookincubator#5977
yingsu00 added a commit to yingsu00/velox that referenced this issue Aug 3, 2023
This is the first cut to support reading Iceberg tables with delete
files. The design doc can be found at
facebookincubator#5977
@majetideepak
Copy link
Collaborator

@yingsu00 Thanks for the detailed design document. We need to separate Iceberg positional delete support and equality delete support into two separate issues as they are independent pieces. Equality deletes also has the design choice of using the coordinator. From a contribution standpoint as well, we need to add them independently.

Now you mentioned

IcebergeDataSource was used to ........ maintain necessary structures and encapsulations........ The operations in this class do NOT overlap with that in HiveDataSource, but are an Iceberg domain-specific layer on top of HiveDataSource.

I do not see a lot of Iceberg domain-specific information here except for the delete list materialization, and updating the mutation. As you mentioned, the mutation update will also go away with offset support.
The delete list materialization can be abstracted out as a separate method in the existing HiveConnector and specialized for Iceberg split type or Hudi split type.

Can we add a readPositionalDeletes() to HiveConnector that updates mutation? Inside readPositionalDeletes(), we can specialize based on the Iceberg split.

@yingsu00
Copy link
Collaborator Author

yingsu00 commented Aug 4, 2023

@majetideepak Equality deletes will be a separate implementation for sure, as I already mentioned this in the doc:

There’re two places we can do the above process:
1. In coordinator scheduler.
2. In Velox workers

We need to evaluate if both are possible and does not incur too much performance hit. The positional delete PR does not have it as its title already says it, but I put the discussion here in the design doc so people can learn the options and chime in their thoughts.

For the HiveIcebergDataSource, I think everything inside it is Iceberg specific, like how the delete files are defined, what types of the columns there are in the delete files, how to interpret these row numbers(they are file positions instead of split positions), how to pre-filter the delete rows, and how to match the base file paths are all very Iceberg specific. The Hudi and Delta table will be very different in these. For example, the Iceberg defines its metadata columns in different ways than other protocols. For another example, Hudi's log file is different than Iceberg's delete files and will be opened in a different way. It'll be a mess to mix everything into the already huge HiveDataSource. Note that HiveIcebergDataSource doesn't have all features implemented yet and it's already several hundreds lines of code. In the next a few PRs we will introduce those Iceberg specific operations like pre-filter the delete rows, match the base file paths, and maybe column projections which is also defined by the Iceberg spec. To me, having this layer handling these domain specific things and construct the Mutation object is very natural and easier to maintain than having a huge HiveDataSource file with many large if-else blocks. In the future if we want to support Hudi and Delta, we can easily extend this structure without making the code too big and too hard to read. Also Meta already has an internal extended HiveDataSource for Prism, and this is not new.

@majetideepak
Copy link
Collaborator

The positional delete PR does not have it as its title already says it

I see some code related to equality deletes here https://github.com/facebookincubator/velox/pull/5897/files#diff-54e191b935da6cdc7f5542e8e6997d7a56e2ec6cf8f8094f570d38aadcc57dfbR76. Am I missing something?

  if (deleteFile.content == FileContent::kEqualityDeletes) {
      readEqualityDeletes(deleteFile);

everything inside it is Iceberg specific, like how the delete files are defined, what types of the columns there are in the delete files

Sure, we can abstract away the deleteList -> mutation into another class and add all Iceberg-specific details there.

Iceberg specific operations like pre-filter the delete rows, match the base file paths

This is again delete list -> mutation

the Iceberg defines its metadata columns in different ways than other protocols

How does this metadata impact reading files in the worker?

maybe column projections which is also defined by the Iceberg spec

You mentioned that this and schema evolution should be done in the HiveConnector itself.

Also Meta already has an internal extended HiveDataSource for Prism

IMO creating a new DataSource is almost like adding a new connector. @Yuhta can you share some pointers on what was done for Prism?

@yingsu00
Copy link
Collaborator Author

yingsu00 commented Aug 5, 2023

readEqualityDeletes is VELOX_NYI. I can remove it for now.

Sure, we can abstract away the deleteList -> mutation into another class and add all Iceberg-specific details there.

Could you explain a bit how to abstract it into another class that is fundamentally different from the HiveIcebergeDataSource? Also, This is actually 1)IcebergSplit -> 2)Iceberge DeleteFile -> 3)Vectors for datafile path and deletelist -> 4) mutation. The Iceberg delete file and all metadata columns are defined by the Iceberg specification and therefore Iceberg specific. Apart from these data structures, the HiveIcebergDataSource is also about its behavior, that is, it needs to deserialize the split's extraFileInfo to get the IcebergDeletefiles, open the delete files and construct the delete lists. It is additional work to the plain HiveDataSource, which just reads the base data file. This behavior is on top of HiveDataSource and is different for other table formats like Hudi and Delta.

Iceberg specific operations like pre-filter the delete rows, match the base file paths

This is again delete list -> mutation

The pre-filter needs to use the startRowInSplit_ and endRowInSplit_ in the Iceberg delete files. This is only present in Iceberg, not other table formats, which define the "delete file" in different ways.

the Iceberg defines its metadata columns in different ways than other protocols

How does this metadata impact reading files in the worker?

The columns in the Iceberg delete file has two required and one optional metadata columns, and it is defined by the Iceberg specification. It's 2 or 3 columns only, instead of 1 or 3, and the type of the required columns are Iceberg StringType and LongType. Other table formats define them differently, for example, Hudi may contain multiple log records in the same log file for multiple transactions.

For positional deletes, we don't need to add metadata column to the base data file. But in the future, we may need to add these metadata columns to Iceberg scan. For example, Iceberg updates may require the scan to add the "row" and "_pos" columns to the existing columns so that the rows can be merged with new values in downstream operators. As another example, this query also needs to add additional metadata columns

SELECT *, "$path", "$file_modified_time"
FROM nation;

I added more explanation in this section in the design doc.

maybe column projections which is also defined by the Iceberg spec

You mentioned that this and schema evolution should be done in the HiveConnector itself.

Yes, but the design of schema evolution in Hive is not clear now and this is only a thought. We will need to discuss this later. But no matter whether schema evolution is implemented in the base HiveDataSource or the derived HiveIcebergDataSource, Iceberg column projections need to be supported to query Iceberg-specific metadata columns.

IMO creating a new DataSource is almost like adding a new connector. @Yuhta can you share some pointers on what was done for Prism?

This is just a derived HiveDataSource that handles Iceberg specific things on top of Hive, and is far from being a full connector. They share the same configs, rely on the same storage adapters, use the same HiveTableHandle and HiveColumnHandles. They can additionally share some of the partition functions. The difference is it needs to open additional Iceberg files and construct mutation objects, or add Iceberg specific metadata columns. These are all specific to Iceberg.

I think we all agree that we want to abstract them to some separate Iceberg class to handle Iceberg related operations. Now we have several choices

  1. Inheritance, which is the way I chose
  2. Delegate, e.g. HiveIcebergDataSource contains a HiveDataSource
  3. Use if-else in major functions
HiveDataSource(split) {
    if (split is an Iceberg split) {
        // if some columns are metadata columns
        // construct the data providers for them
        // update scanSpec if needed
        // ... etc
    } else  if (split is a Hudi split) {
       // Do Hudi stuff, may need Hudi specific member fields
    } else  if (split is a Delta split) {
       // Do Delta stuff, may need Delta specific member fields
    } else {
        // original code
    }
}

void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
    ...
    // 100 lines of code
    if (split is an Iceberg split) {
        // Reset helping variables
        // read the lowerbound and upperbound and store them as member fields
        // open IcebergDeletes and store it to a member field
        ...
    } else  if (split is a Hudi split) {
       // Do Hudi stuff, may need Hudi specific member fields
        // open Hudi log files and store them to some member field
    } else  if (split is a Delta split) {
       // Do Delta stuff, may need Delta specific member fields
    }
}

std::optional<RowVectorPtr> HiveDataSource::next(
    if (split is an Iceberg split) {
        // pre-filter the delete rows by lowerbound and upperbound
        // Convert in-file positions to in-batch positions
        // Construct mutation object
        ...
    } else  if (split is a Hudi split) {
       // Do Hudi stuff, may need Hudi specific member fields
    } else  if (split is a Delta split) {
       // Do Delta stuff, may need Delta specific member fields
    }
    // Orignal 80 lines of code
}

Do these type checks look awkward? As a common practice, if we see many of such type checks using if-else, we would naturally modify them to use the OOP paradigm, using either inheritance or delegation. It also helps to make the code easier to read and maintain. The HiveDataSource has already almost a thousand lines now. Adding these if-else will only make the code longer, harder to search and read. Adding additional features would make it even longer over time. It's non-trivial amount of code. Separating the code to multiple logic units or files is more friendly to us humans. Also, if we put everything in the base HiveDatasource, it would pollute the code, because we may have to add Iceberg or Hudi specific member fields.

In summary, I think the original hope to use a standard way to express deletes in the HiveConnectorSplit does NOT work, because

  1. The delete files will be opened on workers, not coordinator for performance considerations, so it needs to be done in Velox;
  2. The delete files or log files are table-format specific and have different structures, therefore the code to open these table-format specific files are different from each other. E.g. Iceberg store delete rows in one or more delete files with 2 or 3 columns, while Hudi stores multiple log records in a single log file.

Now that we need to do the work in Velox anyways, it's just a matter how we organize these table-format specific code. Having them mingled in the same class and file doesn't seem a good choice to me. Having a derived HiveIcebergDataSource does NOT mean it's a new connector, it's just a better way to hold the code we have to do anyways.

@majetideepak
Copy link
Collaborator

majetideepak commented Aug 7, 2023

The abstraction analogy I can think of is how we have the same HiveDataSource for both Dwrf and Parquet file formats. We have the Reader, and ScanSpec abstractions to handle them separately. Creating a HiveDataSource for each would have missed a lot of re-use opportunities.
So the approach (4) would be

  1. Extend HiveSplit with a DeleteFile vector. We can start with DeleteFile to be similar to IcebergDeleteFile in your code.
  2. Add std::unique_ptr<dwio::common::DeletesReader> deletesReader_; to HiveDataSource similar to reader_. Move all the Iceberg-specific delete materialization here.
  3. Add an API getMutation() to deleteReader_ and invoke that inside HiveDataSource::next.

This should allow re-use of abstractions across various table formats that support deletes.

@yingsu00
Copy link
Collaborator Author

yingsu00 commented Aug 9, 2023

@majetideepak This only works if these table format protocols work in the same semantics and behaviors, so you can extract the common behaviors (e.g. read deletes) and apply them in a pre-defined order. But unfortunately they are drastically different. For example, Hudi uses record ID that is unique to the whole partitions. For another example, Hudi does NOT use standalone delete files, but the reader needs to open the log file and take different actions on different block types. It defines different merge policies and requires the reader to

- Contents of the log files should be loaded into an effective point lookup data structure (in-memory or persisted)
- Duplicate record keys should be merged based on the ordering field specified. It is important to order the inserts and deletes in the right order to be consistent and idempotent.
- When the base file is scanned, for every record block, the reader has to lookup if there is a newer version of the data available for the record keys in the block and merge them into the record iterator.

This means the reader needs to lookup keys and may need to do sorting and comparisons. The operations will be quite different than Iceberg operations both in terms of the operation content, and also the operation steps. Mixing them together would only make the base HiveDataSource logic too complex and hard to maintain.

Having separate HiveIcebergDataSource and HiveHudiDataSource would give us much more flexibility in supporting the different behaviors of these table formats, making code files shorter and easier to maintain. Plus, it does NOT necessarily introduce duplicated code. We can easily extract common operations into separate util functions, e.g. open a file and read its content, or open a file and read one block, etc.

@majetideepak
Copy link
Collaborator

@yingsu00 I believe you are referring to the Hudi real-time view. We cannot use the existing Parquet reader to support this. We might need to implement a new reader that supports real-time merging for this. The HiveHudiDataSource abstraction will not solve this.
But Hudi Read-optimized view can be implemented similarly to Iceberg?
We can already see how the IcebergSplit class and later HudiSplit will likely be duplicate definitions as they will have the same delete-list field.
DWRF, Parquet readers are examples of how the abstractions should ideally look even though they have different protocols to read footers, row groups/stripes, etc.
We need to have a common path with specializations. Creating a new HiveDataSource is as good as creating a new connector since that is the highest abstraction in a connector.
I believe Prism support at Meta is a new connector as well.

I think it will help if @Yuhta can share some insights into this. We should probably start a discussion in the Prestissimo working group as well to have formal community input. CC @aditi-pandit

@yingsu00
Copy link
Collaborator Author

yingsu00 commented Aug 10, 2023

Yes I was referring to Hudi real-time view which needs to read Hudi log files.

But Hudi Read-optimized view can be implemented similarly to Iceberg?

Hudi Read-optimized view does NOT need to read the log files, and there's nothing need to be done in Velox.

But to support real-time read which does require reading the log files, Velox needs to

  1. Open HUDI log files. The format of Hudi log file is defined in https://hudi.apache.org/tech-specs/#log-file-format. A log file can contain multiple log blocks, and each block can be a different type. E.g.
  1. Velox needs to apply the operations in all log blocks in the log file in one batch. A log file can contain multiple blocks, not just a single delete block.

  2. If there is a delete block, Velox needs to get the record keys from the block, and merge with the base data to remove these rows from the result. Note that it's not the row number, but a unique key in the partition. The base table also contains this column in the data file. Merging has to be done AFTER both the base data and log blocks are read, and the record Ids in the delete blocks need to be matched to those in the base file, and the matched rows will then be removed. This process is very different than Iceberg, in which we only need to pass the delete rows to the reader BEFORE the read happens.

  3. Also note that the inserted rows are in the log block for HUDI, not in the base file as Iceberg. This means we also need to do the merge operation with inserted rows. While in Iceberg, the inserted rows are read as base files and no merge is needed

Looking at these operations, they are all HUDI specific. It doesn't sound right to push them into Parquet reader or DWRF reader, whose task is just to decode the data in a single file in a batch. But these HUDI specific operations, e.g. open log files, merge with base file result vectors, etc. can be easily handled in HiveHudiDataSource which relies HiveDataSource to scan base files.

Another fact was that Iceberg metadata columns are not persistent in the base file, but Hudi metadata columns are. Therefore if we need to read the metadata columns, they need to be constructed somehow, e.g. _is_deleted, or _row. These work can be easily done in HiveIcebergDataSource. But there are no such work for Hudi, therefore these are not common operations and you can't just abstract it out. I'm sure there are many other distinctions among these table formats that cannot easily be abstracted out as common operations in the same order.

We can already see how the IcebergSplit class and later HudiSplit will likely be duplicate definitions as they will have the same delete-list field.

This is not true. HiveHudiSplit does not have delete list field, but just a set of log files. And opening the log file is very different from opening Iceberg delete files as explained above. They are different things and should not be identified using the same variable. Besides, these splits will extend HiveConnectorSplit, and only keep their special fields e.g. logFiles_ for Hudi and deleteFiles_ for Iceberg. So there is actually NO duplicate.
Actually, we don't even need to introduce these XXXSplit classes. I introduced them so that we can separate the delete file deserialization logic apart from the actual reading business. As long as the splits are identifiable to be a special table format(e.g. use the hash map in the extraFileInfo), then the HiveHudiDatasource or the HiveIcebergDatasource can be created, and deserializing these deleteFiles_ can be just done in the addSplit() of these data sources. There won't be any duplicated code.

These table format specification and implementations are quite complex, and I do think they deserve to have their own data sources. If we have everything in HiveDataSource, the code will become too complex for sure. I don't think the DataSource is a connector. A connector's task is to implement the Connector interface, and some of the major tasks are to create a DataSource and a DataSink. DataSource's task is to provide data to the connector, and a connector doesn't need to only have one kind of DataSource. Besides, the new HiveIcebergDataSource extends HiveDataSource, and they are actually one base type. THey share the same configs, the same base HiveConnectorSplit, and many other things.

Even though you still think HiveXXXDataSource is a new connector, it can only be said as a sub-Hive "connector"/DataSource. Given the complex logics in these table formats(check Iceberg and Hudi repo, there are hundreds of thousands lines of code), I don't see anything wrong having these sub-Hive "connector" or DataSource if we can make sure there is no code duplication, not mentioning the benefits of having shorter files, reasonable size of functions, and much easier to read code.

@Yuhta
Copy link
Contributor

Yuhta commented Aug 10, 2023

As I suggested in #5897, mutation is a split level operation so we should do it at split level instead of data source level. I proposed a DeltaSplitProcessor so that different implementations can be decoupled into its own classes. Most the code in #5897 can be moved without large change this way, although some part of the code still need change (e.g. maintain list of readers for delta files). PrismConnector is a little different here because it's dispatched at catalog level, the mutation logic can be extracted into such DeltaSplitProcessor as well.

The delete file can be in different format and it's the job of processor implementation to read it. However, the result will always be a bitmask (row numbers) or an hash table. This part is we will push down to reader and has nothing to do with the processor. Some post process will needed to be done on the result vector when pushdown is not possible. For update we will do that same, except we need some common format for updated values.

@aditi-pandit
Copy link
Collaborator

aditi-pandit commented Aug 10, 2023

@yingsu00 : Thanks for this detailed doc.

Please link to prestodb/presto#19811 to give context of how this is used in Prestissimo.

Regarding my take on some of the issues debated here:

  • Reader (like Parquet and DWRF) are about decoding contents of a file format. Table formats for both Hudi and Iceberg are very different from those. They include a lot more nuance about multiple file components (like delete lists or log records) and their interpretation to construct the data for a table at a particular version. So I do feel they warrant something other than reader.
  • Connector is a combination of (metadata service + composing in a query + data (read and write) processing). Some of these components are at the co-ordinator and some at the worker. So DataSource is strictly a subset of the Connector APIs. I don't think that adding a new DataSource is at the level of adding a new Connector.
  • Should we add a new Split representation ? My intuition is that Hudi would need a separate Split. Looking at the Java version of HudiSplit it varies significantly from Hive Split. Iceberg in comparison has more intersection than difference. But if we cannot overload HudiSplit in HiveSplit, then its cleaner to have IcebergSplit also since the TableFormat split becomes an explicit thing.
  • Should we consolidate Iceberg reading in HiveDataSource ? HiveDataSource is a wrapper over Reader with bunch more things :
    -- The control flow to receive splits and return the data in the rest of the processing flow. This is the DataSource API.
    -- AsyncDataCache to create buffered input streams for the split file handles for the reader.
    -- ScanSpec creation using Expressionevaluator to compile filter expressions to hand to the reader
    -- add DynamicFilters to ScanSpec
    While Iceberg also needs a means for i) in common with other HiveDataSources, 2), 3 ) and 4) diverge

@yingsu00, @majetideepak :
-- How would AsyncDataCache be used with the new TableFormats ? I think we could use it with all of base table files, delete files, log files.
-- How much of the ScanSpec creation carries over for Iceberg and Hudi ? Many fields are populated in this from the split.
I think @Yuhta is suggesting we make this a standalone abstraction.

We could add a TableFormatReader class to abstract some of these operations away, and make the original logic for Parquet/Dwrf a NoopTableFormatReader. But I suspect TableFormatReader APIs would then begin to look a lot like HiveDataSource APIs and the HiveDataSource would just forward calls to it.

@Yuhta
Copy link
Contributor

Yuhta commented Aug 10, 2023

@aditi-pandit The idea of TableFormatReader is similar to my idea of DeltaSplitProcessor, except that there is some state need to be keep in the later on a per split basis.

On the split representation, we can compose the base split with additional framework specific data. It does not need to be a subclass of any interface.

@aditi-pandit
Copy link
Collaborator

@aditi-pandit The idea of TableFormatReader is similar to my idea of DeltaSplitProcessor, except that there is some state need to be keep in the later on a per split basis.

On the split representation, we can compose the base split with additional framework specific data. It does not need to be a subclass of any interface.

Thanks @Yuhta. I think we cross-posted. Yeah, there is lot of intersect.

I'm having a bit of trouble understanding how Hudi MOR would translate to bitmask/HashTable ? Any ideas you can share here ?

@majetideepak
Copy link
Collaborator

@aditi-pandit HUDI MOR should translate to equality deletes since they are based on a RecordId. @yingsu00 is this accurate?

To summarize, Iceberg, and Hudi table formats provide deletes and updates (i.e. deletes + inserts).
These could be in a delete file (Iceberg) or a log file (HUDI).
We all agree that a state-based reader/processor abstraction and API are needed to process these files.

Now the deletes can be of two types: 1) Positional (Iceberg), 2) Equality (Iceberg, Hudi).
We need a reader API to return a selectivity vector (a.ka. mutation) for Positional deletes.
Another reader API to return filters for the Equality deletes to be passed to the scanSpec.
Another reader API to return new inserts (Hudi) as a Vector.
Another reader API to return materialized meta-data columns as a Vector.

The AsyncDataCache can be applied to delete files, though would be less useful since they are ephemeral.

@Yuhta
Copy link
Contributor

Yuhta commented Aug 15, 2023

@aditi-pandit Hudi MOR is based on equality, so for single column keys, we could get by with manipulating the ScanSpec, but I would not recommend it because we need to restore it back after the current split and it can be tricky and expensive. A better way and the only way to do it with multiple column keys is keeping these keys in an hash table, and pass the table down to the reader, probe the table in the top level struct column reader for key columns.

@Yuhta
Copy link
Contributor

Yuhta commented Aug 15, 2023

@majetideepak Yes the new layer will sit between data source and file reader, keeping internal states of deltas at a per split basis.

For file reader it will provide positional and value based data (with possibly updated cell values). From data source it will be invoked at add split, read next batch, add dynamic filter, finish split, and these can be the main interface it provides to the data source.

@aditi-pandit
Copy link
Collaborator

@aditi-pandit HUDI MOR should translate to equality deletes since they are based on a RecordId. @yingsu00 is this accurate?

To summarize, Iceberg, and Hudi table formats provide deletes and updates (i.e. deletes + inserts). These could be in a delete file (Iceberg) or a log file (HUDI). We all agree that a state-based reader/processor abstraction and API are needed to process these files.

Now the deletes can be of two types: 1) Positional (Iceberg), 2) Equality (Iceberg, Hudi). We need a reader API to return a selectivity vector (a.ka. mutation) for Positional deletes. Another reader API to return filters for the Equality deletes to be passed to the scanSpec. Another reader API to return new inserts (Hudi) as a Vector. Another reader API to return materialized meta-data columns as a Vector.

The AsyncDataCache can be applied to delete files, though would be less useful since they are ephemeral.

@majetideepak : I'm in agreement with @Yuhta to create the new abstraction layer between Reader and DataSource with these operations (read/interpret delete files/positional deletes) that might be significantly different between Iceberg and Hudi.
Like Jimmy suggests if there are delta states that can be carried between splits for optimization that design would present a way to do so.

@yingsu00 : Any thoughts ? Your current PR would only need few refactoring to achieve this new design.

@yingsu00
Copy link
Collaborator Author

yingsu00 commented Aug 24, 2023

Hudi MOR is based on equality, so for single column keys, we could get by with manipulating the ScanSpec, but I would not recommend it because we need to restore it back after the current split and it can be tricky and expensive. A better way and the only way to do it with multiple column keys is keeping these keys in an hash table, and pass the table down to the reader, probe the table in the top level struct column reader for key columns.

I think this may lose the opportunity for performance gains, because if we push these as IN filter or equality filter on the recordIds, the reader can potentially skip a lot of rows without having to decompress and decode them. Making another hashtable and probe it after all RecordIds are read may be much slower. Besides, the IN filters already make hashtables inside and there's no need to duplicate that part of code.

Moreover, we may not need to copy the full scanSpec. Instead, we can just remember and restore the filters.

@Yuhta
Copy link
Contributor

Yuhta commented Aug 24, 2023

I think this may lose the opportunity for performance gains, because if we push these as IN filter or equality filter on the recordIds, the reader can potentially skip a lot of rows without having to decompress and decode them. Making another hashtable and probe it after all RecordIds are read may be much slower. Besides, the IN filters already make hashtables inside and there's no need to duplicate that part of code.

Moreover, we may not need to copy the full scanSpec. Instead, we can just remember and restore the filters.

The "remember and restore" is the tricky part, the only way to do it correctly is keep the filter aside (maybe in Mutation) from the original ScanSpec. Creating an hash table (that support multi column keys) and probe it won't be slower than creating a single column filter (with an hash table inside).

@yingsu00
Copy link
Collaborator Author

Creating an hash table (that support multi column keys) and probe it won't be slower than creating a single column filter (with an hash table inside).

Creating the hash table is about the same cost as creating the filter, but the benefit comes from skipping the rows from the readers.

@mbasmanova
Copy link
Contributor

@yingsu00 Ying, curious what's the status of this project. Are you planning to add Iceberg-specific logic to Prestissimo or Velox?

Would Gluten also benefit from support for Iceberg?

CC: @FelixYBW @rui-mo

@FelixYBW
Copy link

FelixYBW commented Feb 5, 2024

Yes, many customers use iceberg as meta store.

@mbasmanova
Copy link
Contributor

@FelixYBW Binwei, does Gluten support Iceberg today? If not, are there any plans to add support? Would it make sense to collaborate with @yingsu00 and her team on that?

@FelixYBW
Copy link

FelixYBW commented Feb 5, 2024

Yes, Gluten supported some features but not all yet. We used Ying's PR and will collaborate on rest features.

@liujiayi771
Copy link
Contributor

@FelixYBW I have tested Ying's PR of iceberg MOR table with Gluten, and it can work with Gluten.

After these PRs are merged, we can add the support of iceberg MOR table in Gluten.

@FelixYBW
Copy link

FelixYBW commented Feb 5, 2024

Thank you for update, @liujiayi771 , also thank @yingsu00's work.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request iceberg
Projects
None yet
Development

No branches or pull requests

7 participants