diff --git a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java index d7221e7d4545..17c45b634856 100644 --- a/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java +++ b/core/src/main/java/org/apache/iceberg/util/ParallelIterable.java @@ -20,84 +20,117 @@ import java.io.Closeable; import java.io.IOException; +import java.io.UncheckedIOException; +import java.util.ArrayDeque; +import java.util.Deque; import java.util.Iterator; import java.util.NoSuchElementException; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Future; -import org.apache.iceberg.exceptions.RuntimeIOException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; import org.apache.iceberg.io.CloseableGroup; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; +import org.apache.iceberg.relocated.com.google.common.io.Closer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class ParallelIterable extends CloseableGroup implements CloseableIterable { + + private static final Logger LOG = LoggerFactory.getLogger(ParallelIterable.class); + + // Logic behind default value: ParallelIterable is often used for file planning. + // Assuming that a DataFile or DeleteFile is about 500 bytes, a 30k limit uses 14.3 MB of memory. + private static final int DEFAULT_MAX_QUEUE_SIZE = 30_000; + private final Iterable> iterables; private final ExecutorService workerPool; + // Bound for number of items in the queue to limit memory consumption + // even in the case when input iterables are large. + private final int approximateMaxQueueSize; + public ParallelIterable(Iterable> iterables, ExecutorService workerPool) { - this.iterables = iterables; - this.workerPool = workerPool; + this(iterables, workerPool, DEFAULT_MAX_QUEUE_SIZE); + } + + public ParallelIterable( + Iterable> iterables, + ExecutorService workerPool, + int approximateMaxQueueSize) { + this.iterables = Preconditions.checkNotNull(iterables, "Input iterables cannot be null"); + this.workerPool = Preconditions.checkNotNull(workerPool, "Worker pool cannot be null"); + this.approximateMaxQueueSize = approximateMaxQueueSize; } @Override public CloseableIterator iterator() { - ParallelIterator iter = new ParallelIterator<>(iterables, workerPool); + ParallelIterator iter = + new ParallelIterator<>(iterables, workerPool, approximateMaxQueueSize); addCloseable(iter); return iter; } private static class ParallelIterator implements CloseableIterator { - private final Iterator tasks; + private final Iterator> tasks; + private final Deque> yieldedTasks = new ArrayDeque<>(); private final ExecutorService workerPool; - private final Future[] taskFutures; + private final CompletableFuture>>[] taskFutures; private final ConcurrentLinkedQueue queue = new ConcurrentLinkedQueue<>(); - private volatile boolean closed = false; + private final int maxQueueSize; + private final AtomicBoolean closed = new AtomicBoolean(false); private ParallelIterator( - Iterable> iterables, ExecutorService workerPool) { + Iterable> iterables, ExecutorService workerPool, int maxQueueSize) { this.tasks = Iterables.transform( - iterables, - iterable -> - (Runnable) - () -> { - try (Closeable ignored = - (iterable instanceof Closeable) ? (Closeable) iterable : () -> {}) { - for (T item : iterable) { - // exit manually because `ConcurrentLinkedQueue` can't be - // interrupted - if (closed) { - return; - } - - queue.add(item); - } - } catch (IOException e) { - throw new RuntimeIOException(e, "Failed to close iterable"); - } - }) + iterables, iterable -> new Task<>(iterable, queue, closed, maxQueueSize)) .iterator(); this.workerPool = workerPool; + this.maxQueueSize = maxQueueSize; // submit 2 tasks per worker at a time - this.taskFutures = new Future[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; + this.taskFutures = new CompletableFuture[2 * ThreadPools.WORKER_THREAD_POOL_SIZE]; } @Override public void close() { // close first, avoid new task submit - this.closed = true; + this.closed.set(true); - // cancel background tasks - for (Future taskFuture : taskFutures) { - if (taskFuture != null && !taskFuture.isDone()) { - taskFuture.cancel(true); + try (Closer closer = Closer.create()) { + synchronized (this) { + yieldedTasks.forEach(closer::register); + yieldedTasks.clear(); } + + // cancel background tasks and close continuations if any + for (CompletableFuture>> taskFuture : taskFutures) { + if (taskFuture != null) { + taskFuture.cancel(true); + taskFuture.thenAccept( + continuation -> { + if (continuation.isPresent()) { + try { + continuation.get().close(); + } catch (IOException e) { + LOG.error("Task close failed", e); + } + } + }); + } + } + + // clean queue + this.queue.clear(); + } catch (IOException e) { + throw new UncheckedIOException("Close failed", e); } - // clean queue - this.queue.clear(); } /** @@ -107,15 +140,17 @@ public void close() { * * @return true if there are pending tasks, false otherwise */ - private boolean checkTasks() { + private synchronized boolean checkTasks() { + Preconditions.checkState(!closed.get(), "Already closed"); boolean hasRunningTask = false; for (int i = 0; i < taskFutures.length; i += 1) { if (taskFutures[i] == null || taskFutures[i].isDone()) { if (taskFutures[i] != null) { - // check for task failure and re-throw any exception + // check for task failure and re-throw any exception. Enqueue continuation if any. try { - taskFutures[i].get(); + Optional> continuation = taskFutures[i].get(); + continuation.ifPresent(yieldedTasks::addLast); } catch (ExecutionException e) { if (e.getCause() instanceof RuntimeException) { // rethrow a runtime exception @@ -136,29 +171,29 @@ private boolean checkTasks() { } } - return !closed && (tasks.hasNext() || hasRunningTask); + return !closed.get() && (tasks.hasNext() || hasRunningTask); } - private Future submitNextTask() { - if (!closed && tasks.hasNext()) { - return workerPool.submit(tasks.next()); + private CompletableFuture>> submitNextTask() { + if (!closed.get()) { + if (!yieldedTasks.isEmpty()) { + return CompletableFuture.supplyAsync(yieldedTasks.removeFirst(), workerPool); + } else if (tasks.hasNext()) { + return CompletableFuture.supplyAsync(tasks.next(), workerPool); + } } return null; } @Override public synchronized boolean hasNext() { - Preconditions.checkState(!closed, "Already closed"); - - // if the consumer is processing records more slowly than the producers, then this check will - // prevent tasks from being submitted. while the producers are running, this will always - // return here before running checkTasks. when enough of the tasks are finished that the - // consumer catches up, then lots of new tasks will be submitted at once. this behavior is - // okay because it ensures that records are not stacking up waiting to be consumed and taking - // up memory. - // - // consumers that process results quickly will periodically exhaust the queue and submit new - // tasks when checkTasks runs. fast consumers should not be delayed. + Preconditions.checkState(!closed.get(), "Already closed"); + + // If the consumer is processing records more slowly than the producers, the producers will + // eventually fill the queue and yield, returning continuations. Continuations and new tasks + // are started by checkTasks(). The check here prevents us from restarting continuations or + // starting new tasks before the queue is emptied. Restarting too early would lead to tasks + // yielding very quickly (CPU waste on scheduling). if (!queue.isEmpty()) { return true; } @@ -192,4 +227,78 @@ public synchronized T next() { return queue.poll(); } } + + private static class Task implements Supplier>>, Closeable { + private final Iterable input; + private final ConcurrentLinkedQueue queue; + private final AtomicBoolean closed; + private final int approximateMaxQueueSize; + + private Iterator iterator = null; + + Task( + Iterable input, + ConcurrentLinkedQueue queue, + AtomicBoolean closed, + int approximateMaxQueueSize) { + this.input = Preconditions.checkNotNull(input, "input cannot be null"); + this.queue = Preconditions.checkNotNull(queue, "queue cannot be null"); + this.closed = Preconditions.checkNotNull(closed, "closed cannot be null"); + this.approximateMaxQueueSize = approximateMaxQueueSize; + } + + @Override + public Optional> get() { + try { + if (iterator == null) { + iterator = input.iterator(); + } + + while (iterator.hasNext()) { + if (queue.size() >= approximateMaxQueueSize) { + // Yield when queue is over the size limit. Task will be resubmitted later and continue + // the work. + return Optional.of(this); + } + + T next = iterator.next(); + if (closed.get()) { + break; + } + + queue.add(next); + } + } catch (Throwable e) { + try { + close(); + } catch (IOException closeException) { + // self-suppression is not permitted + // (e and closeException to be the same is unlikely, but possible) + if (closeException != e) { + e.addSuppressed(closeException); + } + } + + throw e; + } + + try { + close(); + } catch (IOException e) { + throw new UncheckedIOException("Close failed", e); + } + + // The task is complete. Returning empty means there is no continuation that should be + // executed. + return Optional.empty(); + } + + @Override + public void close() throws IOException { + iterator = null; + if (input instanceof Closeable) { + ((Closeable) input).close(); + } + } + } } diff --git a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java index af9c6ec5212c..4910732f6e35 100644 --- a/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java +++ b/core/src/test/java/org/apache/iceberg/util/TestParallelIterable.java @@ -24,15 +24,22 @@ import java.lang.reflect.Field; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; +import java.util.stream.Stream; import org.apache.iceberg.io.CloseableIterable; import org.apache.iceberg.io.CloseableIterator; +import org.apache.iceberg.relocated.com.google.common.collect.HashMultiset; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMultiset; import org.apache.iceberg.relocated.com.google.common.collect.Iterables; import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.apache.iceberg.relocated.com.google.common.collect.Multiset; import org.awaitility.Awaitility; import org.junit.jupiter.api.Test; @@ -133,6 +140,47 @@ public CloseableIterator iterator() { .untilAsserted(() -> assertThat(queue).as("Queue is not empty after cleaning").isEmpty()); } + @Test + public void limitQueueSize() throws IOException, IllegalAccessException, NoSuchFieldException { + + List> iterables = + ImmutableList.of( + () -> IntStream.range(0, 100).iterator(), + () -> IntStream.range(0, 100).iterator(), + () -> IntStream.range(0, 100).iterator()); + + Multiset expectedValues = + IntStream.range(0, 100) + .boxed() + .flatMap(i -> Stream.of(i, i, i)) + .collect(ImmutableMultiset.toImmutableMultiset()); + + int maxQueueSize = 20; + ExecutorService executor = Executors.newCachedThreadPool(); + ParallelIterable parallelIterable = + new ParallelIterable<>(iterables, executor, maxQueueSize); + CloseableIterator iterator = parallelIterable.iterator(); + Field queueField = iterator.getClass().getDeclaredField("queue"); + queueField.setAccessible(true); + ConcurrentLinkedQueue queue = (ConcurrentLinkedQueue) queueField.get(iterator); + + Multiset actualValues = HashMultiset.create(); + + while (iterator.hasNext()) { + assertThat(queue) + .as("iterator internal queue") + .hasSizeLessThanOrEqualTo(maxQueueSize + iterables.size()); + actualValues.add(iterator.next()); + } + + assertThat(actualValues) + .as("multiset of values returned by the iterator") + .isEqualTo(expectedValues); + + iterator.close(); + executor.shutdownNow(); + } + private void queueHasElements(CloseableIterator iterator, Queue queue) { assertThat(iterator.hasNext()).isTrue(); assertThat(iterator.next()).isNotNull(); diff --git a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java index 4cfb24f62921..2c614677db55 100644 --- a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java +++ b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/data/TestFlinkParquetReader.java @@ -160,70 +160,207 @@ public void testTwoLevelList() throws IOException { new Schema( optional(1, "arraybytes", Types.ListType.ofRequired(3, Types.BinaryType.get())), optional(2, "topbytes", Types.BinaryType.get())); - org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); - File testFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(testFile.delete()).isTrue(); + byte[] expectedByte = {0x00, 0x01}; + validateTwoLevelListConversion(schema, expectedByte); + } - ParquetWriter writer = - AvroParquetWriter.builder(new Path(testFile.toURI())) - .withDataModel(GenericData.get()) - .withSchema(avroSchema) - .config("parquet.avro.add-list-element-records", "true") - .config("parquet.avro.write-old-list-structure", "true") - .build(); + /** + * Validates two-level list conversion through Parquet serialization. + * + * @param schema the Iceberg schema + * @param expectedByte the expected byte array for validation + * @throws IOException if I/O error occurs during file operations + */ + private void validateTwoLevelListConversion(Schema schema, byte[] expectedByte) + throws IOException { + File testFile = createTempFile(); - GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema); - List expectedByteList = Lists.newArrayList(); - byte[] expectedByte = {0x00, 0x01}; - ByteBuffer expectedBinary = ByteBuffer.wrap(expectedByte); - expectedByteList.add(expectedBinary); - recordBuilder.set("arraybytes", expectedByteList); - recordBuilder.set("topbytes", expectedBinary); - GenericData.Record expectedRecord = recordBuilder.build(); + try { + // Write test data to Parquet file + writeTestDataToParquet(testFile, schema, expectedByte); - writer.write(expectedRecord); - writer.close(); + // Read and validate data from Parquet file + readAndValidateTwoLevelList(testFile, schema, expectedByte); + } catch (IOException e) { + throw new IOException("Failed to validate two-level list conversion", e); + } + } + + /** + * Writes test data with two-level list structure to a Parquet file. + * + * @param file the target Parquet file + * @param schema the Iceberg schema + * @param expectedByte the byte array to write + * @throws IOException if write operation fails + */ + private void writeTestDataToParquet(File file, Schema schema, byte[] expectedByte) + throws IOException { + org.apache.avro.Schema avroSchema = AvroSchemaUtil.convert(schema.asStruct()); + ParquetWriter writer = null; + try { + writer = + AvroParquetWriter.builder(new Path(file.toURI())) + .withDataModel(GenericData.get()) + .withSchema(avroSchema) + .config("parquet.avro.add-list-element-records", "true") + .config("parquet.avro.write-old-list-structure", "true") + .build(); + + GenericRecordBuilder recordBuilder = new GenericRecordBuilder(avroSchema); + List expectedByteList = Lists.newArrayList(); + ByteBuffer expectedBinary = ByteBuffer.wrap(expectedByte); + expectedByteList.add(expectedBinary); + recordBuilder.set("arraybytes", expectedByteList); + recordBuilder.set("topbytes", expectedBinary); + GenericData.Record expectedRecord = recordBuilder.build(); + + writer.write(expectedRecord); + } catch (IOException e) { + throw new IOException("Failed to write test data to Parquet file: " + file.getAbsolutePath(), e); + } finally { + if (writer != null) { + try { + writer.close(); + } catch (IOException e) { + throw new IOException("Failed to close Parquet writer", e); + } + } + } + } + + /** + * Reads and validates two-level list data from a Parquet file. + * + * @param file the Parquet file to read + * @param schema the Iceberg schema + * @param expectedByte the expected byte array for validation + * @throws IOException if read operation fails + */ + private void readAndValidateTwoLevelList(File file, Schema schema, byte[] expectedByte) + throws IOException { try (CloseableIterable reader = - Parquet.read(Files.localInput(testFile)) + Parquet.read(Files.localInput(file)) .project(schema) .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) .build()) { Iterator rows = reader.iterator(); - assertThat(rows).hasNext(); + + assertThat(rows) + .as("Expected at least one row in the Parquet file") + .hasNext(); + RowData rowData = rows.next(); - assertThat(rowData.getArray(0).getBinary(0)).isEqualTo(expectedByte); - assertThat(rowData.getBinary(1)).isEqualTo(expectedByte); - assertThat(rows).isExhausted(); + + // Validate array bytes + assertThat(rowData.getArray(0).getBinary(0)) + .as("Array bytes should match expected value") + .isEqualTo(expectedByte); + + // Validate top-level bytes + assertThat(rowData.getBinary(1)) + .as("Top-level bytes should match expected value") + .isEqualTo(expectedByte); + + // Ensure no more rows + assertThat(rows) + .as("Expected exactly one row in the Parquet file") + .isExhausted(); + } catch (IOException e) { + throw new IOException("Failed to read and validate two-level list from Parquet file", e); } } + /** + * Creates a temporary file for testing. + * + * @return a new temporary file + * @throws IOException if file creation fails + */ + private File createTempFile() throws IOException { + File tempFile = File.createTempFile("junit", null, temp.toFile()); + if (!tempFile.delete()) { + throw new IOException("Failed to delete temporary file: " + tempFile.getAbsolutePath()); + } + return tempFile; + } + + /** + * Writes records to a Parquet file and validates the read data. + * + * @param iterable the records to write + * @param schema the Iceberg schema + * @throws IOException if I/O error occurs during file operations + */ private void writeAndValidate(Iterable iterable, Schema schema) throws IOException { - File testFile = File.createTempFile("junit", null, temp.toFile()); - assertThat(testFile.delete()).isTrue(); + File testFile = createTempFile(); + + try { + // Write records to Parquet file + writeRecordsToParquet(testFile, schema, iterable); + // Read and validate records from Parquet file + readAndValidateRecords(testFile, schema, iterable); + } catch (IOException e) { + throw new IOException("Failed to write and validate records", e); + } + } + + /** + * Writes records to a Parquet file. + * + * @param file the target Parquet file + * @param schema the Iceberg schema + * @param records the records to write + * @throws IOException if write operation fails + */ + private void writeRecordsToParquet(File file, Schema schema, Iterable records) + throws IOException { try (FileAppender writer = - Parquet.write(Files.localOutput(testFile)) + Parquet.write(Files.localOutput(file)) .schema(schema) .createWriterFunc(GenericParquetWriter::buildWriter) .build()) { - writer.addAll(iterable); + writer.addAll(records); + } catch (IOException e) { + throw new IOException("Failed to write records to Parquet file: " + file.getAbsolutePath(), e); } + } + /** + * Reads and validates records from a Parquet file. + * + * @param file the Parquet file to read + * @param schema the Iceberg schema + * @param expectedRecords the expected records for validation + * @throws IOException if read operation fails + */ + private void readAndValidateRecords(File file, Schema schema, Iterable expectedRecords) + throws IOException { try (CloseableIterable reader = - Parquet.read(Files.localInput(testFile)) + Parquet.read(Files.localInput(file)) .project(schema) .createReaderFunc(type -> FlinkParquetReaders.buildReader(schema, type)) .build()) { - Iterator expected = iterable.iterator(); - Iterator rows = reader.iterator(); + Iterator expectedIter = expectedRecords.iterator(); + Iterator actualIter = reader.iterator(); LogicalType rowType = FlinkSchemaUtil.convert(schema); - for (int i = 0; i < NUM_RECORDS; i += 1) { - assertThat(rows).hasNext(); - TestHelpers.assertRowData(schema.asStruct(), rowType, expected.next(), rows.next()); + + for (int i = 0; i < NUM_RECORDS; i++) { + assertThat(actualIter) + .as("Expected more rows at index %d", i) + .hasNext(); + TestHelpers.assertRowData( + schema.asStruct(), rowType, expectedIter.next(), actualIter.next()); } - assertThat(rows).isExhausted(); + + assertThat(actualIter) + .as("Expected exactly %d records", NUM_RECORDS) + .isExhausted(); + } catch (IOException e) { + throw new IOException("Failed to read and validate records from Parquet file", e); } } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 267d4c2f0da5..2203f18b21ba 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -70,7 +70,7 @@ nessie = "0.92.1" netty-buffer = "4.1.111.Final" netty-buffer-compat = "4.1.111.Final" object-client-bundle = "3.3.2" -orc = "1.9.3" +orc = "1.9.4" parquet = "1.13.1" pig = "0.17.0" roaringbitmap = "1.2.0"