Skip to content

Commit

Permalink
Fix ParallelIterable deadlock
Browse files Browse the repository at this point in the history
It was observed that with high concurrency/high workload scenario
cluster deadlocks due to manifest readers waiting for connection from S3 pool.

Specifically, ManifestGroup#plan will create ManifestReader per every ParallelIterable.Task.
These readers will effectively hold onto S3 connection from the pool.
When ParallelIterable queue is full, Task will be tabled for later use.

Consider scenario:
S3 connection pool size=1
approximateMaxQueueSize=1
workerPoolSize=1

ParallelIterable1: starts TaskP1
ParallelIterable1: TaskP1 produces result, queue gets full, TaskP1 is put on hold (holds S3 connection)
ParallelIterable2: starts TaskP2, TaskP2 is scheduled on workerPool but is blocked on S3 connection pool
ParallelIterable1: result gets consumed, TaskP1 is scheduled again
ParallelIterable1: TaskP1 waits for workerPool to be free, but TaskP2 is waiting for TaskP1 to release connection

The fix make sure Task is finished once it's started. This way limited resources like
connection pool are not put on hold. Queue size might exceed strict limits, but it should
still be bounded.

Fixes apache#11768
  • Loading branch information
sopel39 committed Dec 13, 2024
1 parent 540d6a6 commit da723f0
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 36 deletions.
12 changes: 6 additions & 6 deletions core/src/main/java/org/apache/iceberg/util/ParallelIterable.java
Original file line number Diff line number Diff line change
Expand Up @@ -257,17 +257,17 @@ private static class Task<T> implements Supplier<Optional<Task<T>>>, Closeable {
@Override
public Optional<Task<T>> get() {
try {
if (queue.size() >= approximateMaxQueueSize) {
// Yield when queue is over the size limit. Task will be resubmitted later and continue
// the work.
return Optional.of(this);
}

if (iterator == null) {
iterator = input.iterator();
}

while (iterator.hasNext()) {
if (queue.size() >= approximateMaxQueueSize) {
// Yield when queue is over the size limit. Task will be resubmitted later and continue
// the work.
return Optional.of(this);
}

T next = iterator.next();
if (closed.get()) {
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
import java.util.stream.Stream;
Expand All @@ -39,6 +40,7 @@
import org.apache.iceberg.util.ParallelIterable.ParallelIterator;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;

public class TestParallelIterable {
@Test
Expand Down Expand Up @@ -148,17 +150,15 @@ public void limitQueueSize() {
.collect(ImmutableMultiset.toImmutableMultiset());

int maxQueueSize = 20;
ExecutorService executor = Executors.newCachedThreadPool();
ExecutorService executor = Executors.newSingleThreadExecutor();
ParallelIterable<Integer> parallelIterable =
new ParallelIterable<>(iterables, executor, maxQueueSize);
ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator();

Multiset<Integer> actualValues = HashMultiset.create();

while (iterator.hasNext()) {
assertThat(iterator.queueSize())
.as("iterator internal queue size")
.isLessThanOrEqualTo(maxQueueSize + iterables.size());
assertThat(iterator.queueSize()).as("iterator internal queue size").isLessThanOrEqualTo(100);
actualValues.add(iterator.next());
}

Expand All @@ -171,38 +171,55 @@ public void limitQueueSize() {
}

@Test
public void queueSizeOne() {
List<Iterable<Integer>> iterables =
ImmutableList.of(
() -> IntStream.range(0, 100).iterator(),
() -> IntStream.range(0, 100).iterator(),
() -> IntStream.range(0, 100).iterator());
@Timeout(10)
public void noDeadlock() {
ExecutorService executor = Executors.newFixedThreadPool(1);
Semaphore semaphore = new Semaphore(1);

Multiset<Integer> expectedValues =
IntStream.range(0, 100)
.boxed()
.flatMap(i -> Stream.of(i, i, i))
.collect(ImmutableMultiset.toImmutableMultiset());
List<Iterable<Integer>> iterablesA =
ImmutableList.of(
testIterable(
semaphore::acquire, semaphore::release, IntStream.range(0, 100).iterator()));
List<Iterable<Integer>> iterablesB =
ImmutableList.of(
testIterable(
semaphore::acquire, semaphore::release, IntStream.range(200, 300).iterator()));

ExecutorService executor = Executors.newCachedThreadPool();
ParallelIterable<Integer> parallelIterable = new ParallelIterable<>(iterables, executor, 1);
ParallelIterator<Integer> iterator = (ParallelIterator<Integer>) parallelIterable.iterator();
ParallelIterable<Integer> parallelIterableA = new ParallelIterable<>(iterablesA, executor, 1);
ParallelIterable<Integer> parallelIterableB = new ParallelIterable<>(iterablesB, executor, 1);

Multiset<Integer> actualValues = HashMultiset.create();
parallelIterableA.iterator().next();
parallelIterableB.iterator().next();

while (iterator.hasNext()) {
assertThat(iterator.queueSize())
.as("iterator internal queue size")
.isLessThanOrEqualTo(1 + iterables.size());
actualValues.add(iterator.next());
}
executor.shutdownNow();
}

assertThat(actualValues)
.as("multiset of values returned by the iterator")
.isEqualTo(expectedValues);
private <T> CloseableIterable<T> testIterable(
RunnableWithException open, RunnableWithException close, Iterator<T> iterator) {
return new CloseableIterable<T>() {
@Override
public void close() {
try {
close.run();
} catch (Exception e) {
throw new RuntimeException(e);
}
}

@Override
public CloseableIterator<T> iterator() {
try {
open.run();
return CloseableIterator.withClose(iterator);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
};
}

iterator.close();
executor.shutdownNow();
private interface RunnableWithException {
void run() throws Exception;
}

private void queueHasElements(ParallelIterator<Integer> iterator) {
Expand Down

0 comments on commit da723f0

Please sign in to comment.