Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] In a Paimon primary key table, using ORC offers significantly higher efficiency for point lookups based on the primary key compared to Parquet. #4586

Open
2 tasks done
Aiden-Dong opened this issue Nov 25, 2024 · 13 comments
Labels
enhancement New feature or request

Comments

@Aiden-Dong
Copy link

Aiden-Dong commented Nov 25, 2024

Search before asking

  • I searched in the issues and found nothing similar.

Motivation

Basic Information

table

{
  "version" : 2,
  "id" : 0,
  "fields" : [ {
    "id" : 0,
    "name" : "f0",
    "type" : "BIGINT NOT NULL"
  }, {
    "id" : 1,
    "name" : "f1",
    "type" : "STRING"
  }, {
    "id" : 2,
    "name" : "f2",
    "type" : "STRING"
  }, {
    "id" : 3,
    "name" : "f4",
    "type" : "FLOAT"
  }, {
    "id" : 4,
    "name" : "f5",
    "type" : "DOUBLE"
  }, {
    "id" : 5,
    "name" : "f6",
    "type" : "BOOLEAN"
  }, {
    "id" : 6,
    "name" : "f7",
    "type" : {
      "type" : "ARRAY",
      "element" : "BIGINT"
    }
  } ],
  "highestFieldId" : 6,
  "partitionKeys" : [ ],
  "primaryKeys" : [ "f0" ],
  "options" : {
    "bucket" : "1",
    "file.format" : "parquet/orc"
  },
  "timeMillis" : 1732796139692
}
  • RowCount : 400w
  • bucket : 1
  • FileNumber : 7
  • Ordered Write

The data sample :

1 : [1, h2,7c17184a-46d3-4565-aa87-b31316af2144, 1.000000, 1.000000, false, ([1,2,3,]))
2 : [2, h1,919fc338-5ef6-474c-a1fa-1b2407512731, 2.000000, 2.000000, true, ([2,3,4,]))
3 : [3, h3,6126ee02-8d74-4c3a-a00e-328af5871e4a, 3.000000, 3.000000, false, ([3,4,5]))
4 : [4, h3,c2b0e039-a468-4853-aaa1-57bcde5a49b6, 4.000000, 4.000000, trye, ([4,5,6]))

The Write Example

Table table = TableUtil.getTable(); // PrimaryKeyFileStoreTable
BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();

String[] items = new String[] {"h1", "h2", "h3"};

BatchTableWrite write = writeBuilder.newWrite(); // TableWriteImpl

long startTime = System.currentTimeMillis();

for (int i = 0; i < 4000000; i++) {

    GenericRow genericRow =
            GenericRow.of(
                    (long) i,
                    BinaryString.fromString(items[i % 3]),
                    BinaryString.fromString(UUID.randomUUID().toString()),
                    (float) i,
                    (double) i,
                    (i % 2) == 0,
                    BinaryArray.fromLongArray(
                            new Long[] {(long) i, (long) i + 1, (long) i + 2}));

    write.write(genericRow);

    if ((i % 10000) == 0) {
        System.out.println("write rows : " + i);
    }
}

List<CommitMessage> messages = write.prepareCommit();
BatchTableCommit commit = writeBuilder.newCommit();

commit.commit(messages);

long stopTime = System.currentTimeMillis();
System.out.println("time: " + (stopTime - startTime));

The Read Example

random 30 for read

Table table = TableUtil.getTable(); // PrimaryKeyFileStoreTable

PredicateBuilder builder =
        new PredicateBuilder(
                RowType.of(
                        DataTypes.BIGINT(),
                        DataTypes.STRING(),
                        DataTypes.STRING(),
                        DataTypes.FLOAT(),
                        DataTypes.DOUBLE(),
                        DataTypes.BOOLEAN(),
                        DataTypes.ARRAY(DataTypes.BIGINT())));

  int[] projection = new int[] {0, 1, 2, 3, 4, 5, 6};
  ReadBuilder readBuilder = table.newReadBuilder().withProjection(projection);
  List<Split> splits = readBuilder.newScan().plan().splits();

  long startTime = System.currentTimeMillis();

  Random random = new Random();

  for (int i = 0; i < 30; i++) {

      int value = random.nextInt(4000000);
      Predicate keyFilter = builder.equal(0, (long) value);

      InnerTableRead read = (InnerTableRead) readBuilder.newRead();

      read.withFilter(keyFilter).executeFilter();
      RecordReader<InternalRow> reader = read.createReader(splits);

      reader.forEachRemaining(
              internalRow -> {
                  long f0 = internalRow.getLong(0);
                  String f1 = internalRow.getString(1).toString();
                  String f2 = internalRow.getString(2).toString();
                  float f3 = internalRow.getFloat(3);
                  double f4 = internalRow.getDouble(4);
                  boolean f5 = internalRow.getBoolean(5);

                  long[] f6 = internalRow.getArray(6).toLongArray();

                  System.out.println(
                          String.format(
                                  "%d : [%d, %s,%s, %f, %f, %b, (%s))",
                                  value, f0, f1, f2, f3, f4, f5, toString(f6)));
              });
  }
  long stopTime = System.currentTimeMillis();
  System.out.println("time : " + (stopTime - startTime));

Time Consumption in ORC/Parquet

  • PARQUET reader : 17982ms
  • ORC reader : 1096ms

Root Cause Analysis

Under the current query predicate pushdown, in ORC, it can be pushed down to the column index level, whereas in Parquet, it is only pushed down to the row group level.

Solution

No response

Anything else?

No response

Are you willing to submit a PR?

  • I'm willing to submit a PR!
@ranxianglei
Copy link
Contributor

Can you turn on these two parameters and try again? @Aiden-Dong
是否可以打开这两个参数再试试?paimon默认并没有做orc下推。

'orc.reader.filter.use.selected'='true',
'orc.reader.sarg.to.filter'='true'

see #4231

@ranxianglei
Copy link
Contributor

In addition, you can turn off executeFilter, because turning it on will cause reading to slow down. In addition, orc pushdown will ensure that only the required data is returned in most cases.
另外你可以关闭executeFilter,因为开启它会导致读变慢,另外orc下推大多数情况下会保证只返回需要的数据,

@Aiden-Dong
Copy link
Author

In addition, you can turn off executeFilter, because turning it on will cause reading to slow down. In addition, orc pushdown will ensure that only the required data is returned in most cases. 另外你可以关闭executeFilter,因为开启它会导致读变慢,另外orc下推大多数情况下会保证只返回需要的数据,

是否是我这个表述存在问题, 我实际使用过程中发现 ORC 谓词下推能力更强, parquet 的谓词下推没有推到 column page 级别, 我最近这两天正在研究修复这个问题, 打算将 读取 parquet 的过滤谓词推到 page 级别, 减少不必要的数据扫描

@Aiden-Dong
Copy link
Author

In addition, you can turn off executeFilter, because turning it on will cause reading to slow down. In addition, orc pushdown will ensure that only the required data is returned in most cases. 另外你可以关闭executeFilter,因为开启它会导致读变慢,另外orc下推大多数情况下会保证只返回需要的数据,

是否是我这个表述存在问题, 我实际使用过程中发现 ORC 谓词下推能力更强, parquet 的谓词下推没有推到 column page 级别, 我最近这两天正在研究修复这个问题, 打算将 读取 parquet 的过滤谓词推到 page 级别, 减少不必要的数据扫描

Is there an issue with my statement? In my practical usage, I've found that ORC's predicate pushdown capability is stronger. Parquet's predicate pushdown hasn't reached the column page level. I've been working on a solution to this problem for the past couple of days, aiming to push down the filtering predicates of Parquet reads to the page level to reduce unnecessary data scans.

@ranxianglei
Copy link
Contributor

I haven't done much research on the related implementation of Parquet. All our optimizations are centered around this ORC. I have tested that the default Paimon table configuration will not actually push the filter conditions to ORC for execution, so I raised PR #4231 to fix this problem.
I'm not sure if Parquet also successfully pushed down, if not you can try to fix it.
I see that your test takes 1 second to read the ORC cycle 30 times. I wonder if your disk is an SSD? After turning on the above two parameters, it will theoretically be faster, only about 100ms? I don't have time to test it yet, but I can test it when I have time.

Parquet的相关实现我没有做过多研究,我们的全部优化都是围绕这ORC展开的。我测试过,默认的Paimon表配置是不会真正的把filter条件下推给ORC执行的,因此我提了pr #4231修复了这个问题。
我不确定Parquet是否也成功下推了,如果没有你也可尝试修复一下。
我看你的测试读取ORC循环30次需要1s,不知道你的磁盘是ssd吗?开启了上面的两个参数后理论上要更快一些,大约只需要100ms以内?我暂时没有时间测试,有时间了我也可以测试一下。

@Aiden-Dong
Copy link
Author

I haven't done much research on the related implementation of Parquet. All our optimizations are centered around this ORC. I have tested that the default Paimon table configuration will not actually push the filter conditions to ORC for execution, so I raised PR #4231 to fix this problem. I'm not sure if Parquet also successfully pushed down, if not you can try to fix it. I see that your test takes 1 second to read the ORC cycle 30 times. I wonder if your disk is an SSD? After turning on the above two parameters, it will theoretically be faster, only about 100ms? I don't have time to test it yet, but I can test it when I have time.

Parquet的相关实现我没有做过多研究,我们的全部优化都是围绕这ORC展开的。我测试过,默认的Paimon表配置是不会真正的把filter条件下推给ORC执行的,因此我提了pr #4231修复了这个问题。 我不确定Parquet是否也成功下推了,如果没有你也可尝试修复一下。 我看你的测试读取ORC循环30次需要1s,不知道你的磁盘是ssd吗?开启了上面的两个参数后理论上要更快一些,大约只需要100ms以内?我暂时没有时间测试,有时间了我也可以测试一下。

Thank you for your suggestion. We will carefully consider it and try to implement it.
感谢您的意见,我们这边尝试一下。

@ranxianglei
Copy link
Contributor

Writing this way ensures that the data filtered by orc is equivalent to the given filter condition. But it also includes the plan time, so the comparison with the above is not accurate.

这样写可以确保orc过滤的数据和给定的filter条件等价。但是又包含了plan的时间,所以和上面对比不准确。

        Table table = TableUtil.getTable();   // PrimaryKeyFileStoreTable
        PredicateBuilder builder = new PredicateBuilder(
                RowType.of(DataTypes.INT(),
                        DataTypes.STRING(),
                        DataTypes.STRING()));

        int[] projection = new int[] {0, 1, 2};

        ReadBuilder readBuilder = table.newReadBuilder()
                .withProjection(projection);
        
        Random random = new Random();

        long startTime = System.currentTimeMillis();

        for(int i = 0 ; i < 30 ; i ++){
            InnerTableRead read = (InnerTableRead)readBuilder.newRead();
            int key = random.nextInt(4000000);

            Predicate keyFilter = builder.equal(0, key);

            InnerTableScan tableScan = (InnerTableScan) readBuilder
                    .withFilter(keyFilter)
                    .newScan();
            InnerTableScan innerTableScan = tableScan.withFilter(keyFilter);
            TableScan.Plan plan = innerTableScan.plan();
            List<Split> splits = plan.splits();

            read.withFilter(keyFilter);//.executeFilter();
            RecordReader<InternalRow> reader = read.createReader(splits);

            reader.forEachRemaining(internalRow -> {

                int f0 = internalRow.getInt(0);
                String f1 = internalRow.getString(1).toString();
                String f2 = internalRow.getString(2).toString();
                System.out.println(String.format("%d - {%d, %s, %s}",key, f0, f1, f2));
            });
        }
        long stopTime = System.currentTimeMillis();
        System.out.println("time : " + (stopTime - startTime));

@Aiden-Dong
Copy link
Author

Writing this way ensures that the data filtered by orc is equivalent to the given filter condition. But it also includes the plan time, so the comparison with the above is not accurate.

这样写可以确保orc过滤的数据和给定的filter条件等价。但是又包含了plan的时间,所以和上面对比不准确。

        Table table = TableUtil.getTable();   // PrimaryKeyFileStoreTable
        PredicateBuilder builder = new PredicateBuilder(
                RowType.of(DataTypes.INT(),
                        DataTypes.STRING(),
                        DataTypes.STRING()));

        int[] projection = new int[] {0, 1, 2};

        ReadBuilder readBuilder = table.newReadBuilder()
                .withProjection(projection);
        
        Random random = new Random();

        long startTime = System.currentTimeMillis();

        for(int i = 0 ; i < 30 ; i ++){
            InnerTableRead read = (InnerTableRead)readBuilder.newRead();
            int key = random.nextInt(4000000);

            Predicate keyFilter = builder.equal(0, key);

            InnerTableScan tableScan = (InnerTableScan) readBuilder
                    .withFilter(keyFilter)
                    .newScan();
            InnerTableScan innerTableScan = tableScan.withFilter(keyFilter);
            TableScan.Plan plan = innerTableScan.plan();
            List<Split> splits = plan.splits();

            read.withFilter(keyFilter);//.executeFilter();
            RecordReader<InternalRow> reader = read.createReader(splits);

            reader.forEachRemaining(internalRow -> {

                int f0 = internalRow.getInt(0);
                String f1 = internalRow.getString(1).toString();
                String f2 = internalRow.getString(2).toString();
                System.out.println(String.format("%d - {%d, %s, %s}",key, f0, f1, f2));
            });
        }
        long stopTime = System.currentTimeMillis();
        System.out.println("time : " + (stopTime - startTime));

可以获取您的联系方式吗? 咨询个问题

@ranxianglei
Copy link
Contributor

The test result I used with the new code is that, including the planning time, the Orc format took 1.77s and the Parquet format took 12.95s.

我用新的代码测试结果是,在包含plan时间的情况下,Orc格式耗时1.77s,Parquet的耗时12.95s。

Orc table as

{
  "version" : 2,
  "id" : 0,
  "fields" : [ {
    "id" : 0,
    "name" : "f0",
    "type" : "INT NOT NULL"
  }, {
    "id" : 1,
    "name" : "f1",
    "type" : "STRING"
  }, {
    "id" : 2,
    "name" : "f2",
    "type" : "STRING"
  } ],
  "highestFieldId" : 2,
  "partitionKeys" : [ ],
  "primaryKeys" : [ "f0" ],
  "options" : {
    "bucket" : "1",
    "file.format" : "orc",
    "manifest.compression" : "null",
    "orc.reader.filter.use.selected":"true",
    "orc.reader.sarg.to.filter":"true"
  },
  "timeMillis" : 1731654078602
}

Parquet

{
  "version" : 2,
  "id" : 0,
  "fields" : [ {
    "id" : 0,
    "name" : "f0",
    "type" : "INT NOT NULL"
  }, {
    "id" : 1,
    "name" : "f1",
    "type" : "STRING"
  }, {
    "id" : 2,
    "name" : "f2",
    "type" : "STRING"
  } ],
  "highestFieldId" : 2,
  "partitionKeys" : [ ],
  "primaryKeys" : [ "f0" ],
  "options" : {
    "bucket" : "1",
    "file.format" : "parquet",
    "manifest.compression" : "null"
  },
  "timeMillis" : 1731654078602
}

After checking, after the filter is passed to orc, each query orc returns only 1 or 0 pieces of data, while basically all data is returned in Parquet format.
This should be the root cause of Parquet's slow implementation.

经过检查,filter传递给orc后,每个查询orc只返回1条或者0条数据,而Parquet格式下基本把所有数据都返回了。
这应该是Parquet实现慢的根本原因。

Reader code as this :

        Table table = TableUtil.getTable();   // PrimaryKeyFileStoreTable
        PredicateBuilder builder = new PredicateBuilder(
                RowType.of(DataTypes.INT(),
                        DataTypes.STRING(),
                        DataTypes.STRING()));

        int[] projection = new int[] {0, 1, 2};

        ReadBuilder readBuilder = table.newReadBuilder()
                .withProjection(projection);
        
        Random random = new Random();

        long startTime = System.currentTimeMillis();

        for(int i = 0 ; i < 30 ; i ++){
            InnerTableRead read = (InnerTableRead)readBuilder.newRead();
            int key = random.nextInt(4000000);

            Predicate keyFilter = builder.equal(0, key);

            InnerTableScan tableScan = (InnerTableScan) readBuilder
                    .withFilter(keyFilter)
                    .newScan();
            InnerTableScan innerTableScan = tableScan.withFilter(keyFilter);
            TableScan.Plan plan = innerTableScan.plan();
            List<Split> splits = plan.splits();

            read.withFilter(keyFilter);//.executeFilter();
            RecordReader<InternalRow> reader = read.createReader(splits);

            reader.forEachRemaining(internalRow -> {

                int f0 = internalRow.getInt(0);
                String f1 = internalRow.getString(1).toString();
                String f2 = internalRow.getString(2).toString();
                System.out.println(String.format("%d - {%d, %s, %s}",key, f0, f1, f2));
            });
        }
        long stopTime = System.currentTimeMillis();
        System.out.println("time : " + (stopTime - startTime));

@Aiden-Dong

@Aiden-Dong
Copy link
Author

@ranxianglei
parquet 当前版本过滤时只将过滤谓词下推到了 rowgroup 级别,我最近正在优化这块 :
#4608

The current version of Parquet only pushes filter predicates down to the RowGroup level during filtering. I’m currently working on optimizing this area.

@ranxianglei
Copy link
Contributor

[email protected] my email @Aiden-Dong

@ranxianglei
Copy link
Contributor

@Aiden-Dong 有最新的测试吗?不知道效果如何 Are there any latest tests? Don't know how effective it is

@Aiden-Dong
Copy link
Author

@Aiden-Dong 有最新的测试吗?不知道效果如何 Are there any latest tests? Don't know how effective it is

我刚刚做了个本地测试,将测试结果补充到PR下面去了
I just ran a local test and added the results to the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants