-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[core][format] Optimize manifest reading performance,add pushdown for manifest and orc. #4497
base: master
Are you sure you want to change the base?
Changes from all commits
9873457
dedcaa4
4794d5f
462edc6
2226fb9
ee915c8
2920fc9
8a89649
c745e55
7841f25
bbdd316
10ef09c
a2acbab
e1c90c7
4364ac1
8c9a75c
dfaeac3
f71d658
efac5b6
15b1910
e1b3406
d48fff6
a0efae2
016620c
133a491
282a2c9
884d12f
a1cc9f4
e401844
710af06
669dc30
2308475
db912e4
c7a3776
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,57 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.paimon.fs; | ||
|
||
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Cache; | ||
import org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Caffeine; | ||
|
||
import java.time.Duration; | ||
import java.util.function.Function; | ||
|
||
/** | ||
* Sample Object Cache Manager . | ||
* | ||
* @param <K> | ||
* @param <V> | ||
*/ | ||
public class ObjectCacheManager<K, V> { | ||
private final Cache<K, V> cache; | ||
|
||
private ObjectCacheManager(Duration timeout, int maxSize) { | ||
this.cache = Caffeine.newBuilder().maximumSize(maxSize).expireAfterWrite(timeout).build(); | ||
ranxianglei marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
|
||
public static <K, V> ObjectCacheManager<K, V> newObjectCacheManager( | ||
Duration timeout, int maxSize) { | ||
return new ObjectCacheManager<>(timeout, maxSize); | ||
} | ||
|
||
public ObjectCacheManager<K, V> put(K k, V v) { | ||
this.cache.put(k, v); | ||
return this; | ||
} | ||
|
||
public V get(K k, Function<? super K, ? extends V> creator) { | ||
return this.cache.get(k, creator); | ||
} | ||
|
||
public V getIfPresent(K k) { | ||
return this.cache.getIfPresent(k); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -27,6 +27,7 @@ | |
import org.apache.paimon.fs.Path; | ||
import org.apache.paimon.io.RollingFileWriter; | ||
import org.apache.paimon.io.SingleFileWriter; | ||
import org.apache.paimon.predicate.Predicate; | ||
import org.apache.paimon.schema.SchemaManager; | ||
import org.apache.paimon.stats.SimpleStatsConverter; | ||
import org.apache.paimon.types.RowType; | ||
|
@@ -207,14 +208,18 @@ public boolean isCacheEnabled() { | |
} | ||
|
||
public ManifestFile create() { | ||
return create(null); | ||
} | ||
|
||
public ManifestFile create(List<Predicate> filters) { | ||
RowType entryType = VersionedObjectSerializer.versionType(ManifestEntry.SCHEMA); | ||
return new ManifestFile( | ||
fileIO, | ||
schemaManager, | ||
partitionType, | ||
new ManifestEntrySerializer(), | ||
ManifestEntrySerializer.getInstance(), | ||
entryType, | ||
fileFormat.createReaderFactory(entryType), | ||
fileFormat.createReaderFactory(entryType, filters), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If we enable the reader filter and the manifest cache, will we miss data from other buckets when reading data from There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If ObjectCache is enabled and push-down withBuckets is used, the problem you mentioned may indeed occur. So I originally planned to add a Filter condition to ObjectCache, but it was too complicated to change and I didn't have so much time to do these things, so I could only push down withBuckets for the time being. Because, in most scenarios, there will be no problem. If it is in flink or spark, I have seen that withBuckets will not be called at all. If it is an olap query and the corresponding bucket is read in segments, the bucket and segment will remain mapped. There will be no problems with the relationship. 如果开启了ObjectCache缓存,有使用了withBuckets的下推,确实可能出现你说的问题。所以我本来打算给ObjectCache增加一个Filter条件,但是改起来太复杂而我没有那么多时间做这些东西,只能暂时先把withBuckets下推做了。因为,大部分场景下都不会出现问题,如果是flink里面或者spark里面,我看了根本就不会调用withBuckets,如果是olap查询,分segment读取对应的bucket,则bucket和segment会保持映射关系,也不会出现问题。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thanks for your explanation, If we can not handle the push down when the cache enabled, I think we can disable the filter push down when the cache is enabled. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is recommended to choose the latter between metadata caching and manifest pushdown. The performance of paimon's ObjectCache implementation is very low. After testing, sometimes it is not even as fast as manifest pushdown. I will submit a PR later to fix the performance problem of ObjectCache. 在元数据缓存和manifest下推之间建议选择后者。paimon的ObjectCache实现的性能非常低,经测试有时候甚至比不上manifest下推快。后面我会提交一个pr修复ObjectCache的性能问题。 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Aitozi This is a scenario that is quite different from mainstream applications in the community. The author's internal analysis engine does not have the ability of a central node, and can only plan by each computing node themselves. Each computing node only cares about its own bucket. Actually, this is more like a manifest cache in the writer node than the current design. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @JingsongLi In the writer node, it could still may need to read more than one bucket entry from the manifest if the parallelism is lower than the bucket number There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @Aitozi It is true, there are problems in this PR's implementation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
fileFormat.createWriterFactory(entryType), | ||
compression, | ||
pathFactory.manifestFileFactory(), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,9 +35,13 @@ | |
import org.apache.paimon.operation.metrics.ScanStats; | ||
import org.apache.paimon.partition.PartitionPredicate; | ||
import org.apache.paimon.predicate.Predicate; | ||
import org.apache.paimon.predicate.PredicateBuilder; | ||
import org.apache.paimon.schema.SchemaManager; | ||
import org.apache.paimon.schema.TableSchema; | ||
import org.apache.paimon.table.source.ScanMode; | ||
import org.apache.paimon.types.DataType; | ||
import org.apache.paimon.types.IntType; | ||
import org.apache.paimon.types.RowType; | ||
import org.apache.paimon.utils.BiFilter; | ||
import org.apache.paimon.utils.Filter; | ||
import org.apache.paimon.utils.Pair; | ||
|
@@ -79,6 +83,7 @@ public abstract class AbstractFileStoreScan implements FileStoreScan { | |
|
||
private Snapshot specifiedSnapshot = null; | ||
private Filter<Integer> bucketFilter = null; | ||
private Collection<Integer> buckets; | ||
private BiFilter<Integer, Integer> totalAwareBucketFilter = null; | ||
protected ScanMode scanMode = ScanMode.ALL; | ||
private Filter<Integer> levelFilter = null; | ||
|
@@ -127,6 +132,14 @@ public FileStoreScan withPartitionFilter(PartitionPredicate predicate) { | |
@Override | ||
public FileStoreScan withBucket(int bucket) { | ||
this.bucketFilter = i -> i == bucket; | ||
this.buckets = Collections.singletonList(bucket); | ||
return this; | ||
} | ||
|
||
@Override | ||
public FileStoreScan withBuckets(Collection<Integer> buckets) { | ||
this.bucketFilter = buckets::contains; | ||
this.buckets = buckets; | ||
return this; | ||
} | ||
|
||
|
@@ -427,7 +440,7 @@ private List<ManifestEntry> readManifest( | |
@Nullable Filter<ManifestEntry> additionalTFilter) { | ||
List<ManifestEntry> entries = | ||
manifestFileFactory | ||
.create() | ||
.create(createPushDownFilter(buckets)) | ||
.withCacheMetrics( | ||
scanMetrics != null ? scanMetrics.getCacheMetrics() : null) | ||
.read( | ||
|
@@ -471,6 +484,22 @@ private Filter<InternalRow> createCacheRowFilter() { | |
return row -> manifestCacheFilter.test(partitionGetter.apply(row), bucketGetter.apply(row)); | ||
} | ||
|
||
/** | ||
* Read the corresponding entries based on the current required bucket, but push down into file | ||
* format . | ||
*/ | ||
private static List<Predicate> createPushDownFilter(Collection<Integer> buckets) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does the query performance mainly gain from the bucket field push down for the ORC manifest file? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. More than half of the performance improvement comes from the orc pushdown of the manifest, the other part comes from the optimization of OrcFileFormat creation, and the other part comes from the caching of some time-consuming object operations on Scan. 性能提升一多半来自于manifest的orc下推,另外一部分来自于OrcFileFormat创建的优化,还有一部分来自于Scan上部分耗时的对象操作缓存 @Aitozi There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
if (buckets == null || buckets.isEmpty()) { | ||
return null; | ||
} | ||
List<Predicate> predicates = new ArrayList<>(); | ||
PredicateBuilder predicateBuilder = | ||
new PredicateBuilder( | ||
RowType.of(new DataType[] {new IntType()}, new String[] {"_BUCKET"})); | ||
predicates.add(predicateBuilder.in(0, new ArrayList<>(buckets))); | ||
return predicates; | ||
} | ||
|
||
/** | ||
* Read the corresponding entries based on the current required partition and bucket. | ||
* | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you just create a PR for
FileFormatFactory
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@JingsongLi Of course you can, but I’ll change it in a few days. I’ve been a little busy lately.
当然可以,不过过几天再改,最近有点忙