diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/process/PageProcessorProvider.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/PageProcessorProvider.java similarity index 73% rename from plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/process/PageProcessorProvider.java rename to plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/PageProcessorProvider.java index cca2f093e9c0..b0b7f93559a0 100644 --- a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/process/PageProcessorProvider.java +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/PageProcessorProvider.java @@ -11,11 +11,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package io.trino.plugin.adb.connector.protocol.gpfdist.load.process; +package io.trino.plugin.adb.connector.protocol.gpfdist.load; -import io.trino.plugin.adb.connector.protocol.gpfdist.load.PageProcessor; - -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Queue; public interface PageProcessorProvider { @@ -23,5 +21,7 @@ public interface PageProcessorProvider PageProcessor take(); - ConcurrentLinkedQueue getAll(); + Queue getAll(); + + void clear(); } diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/context/WriteContext.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/context/WriteContext.java index aa6b081375e5..cb8bb8ed4d86 100644 --- a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/context/WriteContext.java +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/context/WriteContext.java @@ -22,7 +22,7 @@ import io.trino.plugin.adb.connector.protocol.gpfdist.metadata.ContextId; import io.trino.plugin.adb.connector.protocol.gpfdist.metadata.GpfdistLoadMetadata; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.Queue; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -110,7 +110,7 @@ public GpfdistPageProcessorProvider getPageProcessorProvider() @Override public void close() { - ConcurrentLinkedQueue pageProcessors = pageProcessorProvider.getAll(); + Queue pageProcessors = pageProcessorProvider.getAll(); StringBuilder sb = new StringBuilder(); pageProcessors.forEach(processor -> { try { @@ -120,7 +120,7 @@ public void close() sb.append(format("Failed to stop page processor %s. Error: %s;", processor, e.getMessage())); } }); - pageProcessors.clear(); + pageProcessorProvider.clear(); if (!sb.isEmpty()) { throw new RuntimeException(sb.toString()); } diff --git a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/process/GpfdistPageProcessorProvider.java b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/process/GpfdistPageProcessorProvider.java index 0f1d2da9330b..f7edae4453cc 100644 --- a/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/process/GpfdistPageProcessorProvider.java +++ b/plugin/trino-adb/src/main/java/io/trino/plugin/adb/connector/protocol/gpfdist/load/process/GpfdistPageProcessorProvider.java @@ -14,8 +14,10 @@ package io.trino.plugin.adb.connector.protocol.gpfdist.load.process; import io.trino.plugin.adb.connector.protocol.gpfdist.load.PageProcessor; +import io.trino.plugin.adb.connector.protocol.gpfdist.load.PageProcessorProvider; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.LinkedList; +import java.util.Queue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; @@ -26,7 +28,7 @@ public class GpfdistPageProcessorProvider implements PageProcessorProvider { private static final long ADB_SEGMENT_WAIT_TIMEOUT = 60000L; - private final ConcurrentLinkedQueue pageProcessors = new ConcurrentLinkedQueue<>(); + private final Queue pageProcessors = new LinkedList<>(); private final AtomicBoolean isReadyForProcessing = new AtomicBoolean(false); private final ReentrantLock lock = new ReentrantLock(); private final Condition isReadyForProcessingCondition = lock.newCondition(); @@ -80,8 +82,20 @@ public PageProcessor take() } @Override - public ConcurrentLinkedQueue getAll() + public Queue getAll() { return pageProcessors; } + + @Override + public void clear() + { + lock.lock(); + try { + pageProcessors.clear(); + } + finally { + lock.unlock(); + } + } }