Skip to content

Commit

Permalink
Make IcebergSplitSource async
Browse files Browse the repository at this point in the history
Previously, all IcebergSplitSource scan planning activies happened on
the calling scheduler thread, which meant that all table scan planning
would happen sequentially across table scans. This hurts performance
when queries contain multiple iceberg (or other connector) table scans
that could proceed with split generation in parallel to one another.
  • Loading branch information
pettyjamesm committed Nov 1, 2024
1 parent 80d507e commit 61a79dd
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 48 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.plugin.iceberg;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@BindingAnnotation
public @interface ForIcebergScanPlanning {}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.plugin.iceberg;

import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Binder;
import com.google.inject.Key;
import com.google.inject.Module;
Expand Down Expand Up @@ -57,13 +58,15 @@

import java.util.concurrent.ExecutorService;

import static com.google.common.util.concurrent.MoreExecutors.listeningDecorator;
import static com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static com.google.inject.multibindings.Multibinder.newSetBinder;
import static com.google.inject.multibindings.OptionalBinder.newOptionalBinder;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.airlift.json.JsonCodecBinder.jsonCodecBinder;
import static io.trino.plugin.base.ClosingBinder.closingBinder;
import static java.util.concurrent.Executors.newCachedThreadPool;
import static java.util.concurrent.Executors.newFixedThreadPool;
import static org.weakref.jmx.guice.ExportBinder.newExporter;

Expand Down Expand Up @@ -127,13 +130,22 @@ public void configure(Binder binder)
newOptionalBinder(binder, IcebergFileSystemFactory.class).setDefault().to(DefaultIcebergFileSystemFactory.class).in(Scopes.SINGLETON);
newOptionalBinder(binder, CacheKeyProvider.class).setBinding().to(IcebergCacheKeyProvider.class).in(Scopes.SINGLETON);

closingBinder(binder).registerExecutor(Key.get(ExecutorService.class, ForIcebergSplitManager.class));
closingBinder(binder).registerExecutor(Key.get(ListeningExecutorService.class, ForIcebergSplitManager.class));
closingBinder(binder).registerExecutor(Key.get(ExecutorService.class, ForIcebergScanPlanning.class));
}

@Provides
@Singleton
@ForIcebergSplitManager
public ExecutorService createSplitManagerExecutor(CatalogName catalogName, IcebergConfig config)
public ListeningExecutorService createSplitSourceExecutor(CatalogName catalogName)
{
return listeningDecorator(newCachedThreadPool(daemonThreadsNamed("iceberg-split-source-" + catalogName + "-%s")));
}

@Provides
@Singleton
@ForIcebergScanPlanning
public ExecutorService createScanPlanningExecutor(CatalogName catalogName, IcebergConfig config)
{
if (config.getSplitManagerThreads() == 0) {
return newDirectExecutorService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.plugin.iceberg;

import com.google.common.collect.ImmutableList;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import io.airlift.units.Duration;
import io.trino.filesystem.cache.CachingHostAddressProvider;
Expand Down Expand Up @@ -53,21 +54,24 @@ public class IcebergSplitManager
private final IcebergTransactionManager transactionManager;
private final TypeManager typeManager;
private final IcebergFileSystemFactory fileSystemFactory;
private final ExecutorService executor;
private final ListeningExecutorService splitSourceExecutor;
private final ExecutorService icebergPlanningExecutor;
private final CachingHostAddressProvider cachingHostAddressProvider;

@Inject
public IcebergSplitManager(
IcebergTransactionManager transactionManager,
TypeManager typeManager,
IcebergFileSystemFactory fileSystemFactory,
@ForIcebergSplitManager ExecutorService executor,
@ForIcebergSplitManager ListeningExecutorService splitSourceExecutor,
@ForIcebergScanPlanning ExecutorService icebergPlanningExecutor,
CachingHostAddressProvider cachingHostAddressProvider)
{
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.typeManager = requireNonNull(typeManager, "typeManager is null");
this.fileSystemFactory = requireNonNull(fileSystemFactory, "fileSystemFactory is null");
this.executor = requireNonNull(executor, "executor is null");
this.splitSourceExecutor = requireNonNull(splitSourceExecutor, "splitSourceExecutor is null");
this.icebergPlanningExecutor = requireNonNull(icebergPlanningExecutor, "icebergPlanningExecutor is null");
this.cachingHostAddressProvider = requireNonNull(cachingHostAddressProvider, "cachingHostAddressProvider is null");
}

Expand All @@ -92,7 +96,7 @@ public ConnectorSplitSource getSplits(
Table icebergTable = icebergMetadata.getIcebergTable(session, table.getSchemaTableName());
Duration dynamicFilteringWaitTimeout = getDynamicFilteringWaitTimeout(session);

Scan scan = getScan(icebergMetadata, icebergTable, table, executor);
Scan scan = getScan(icebergMetadata, icebergTable, table, icebergPlanningExecutor);

IcebergSplitSource splitSource = new IcebergSplitSource(
fileSystemFactory,
Expand All @@ -107,7 +111,8 @@ public ConnectorSplitSource getSplits(
typeManager,
table.isRecordScannedFiles(),
getMinimumAssignedSplitWeight(session),
cachingHostAddressProvider);
cachingHostAddressProvider,
splitSourceExecutor);

return new ClassLoaderSafeConnectorSplitSource(splitSource, IcebergSplitManager.class.getClassLoader());
}
Expand Down
Loading

0 comments on commit 61a79dd

Please sign in to comment.