Skip to content

Commit

Permalink
Use fast thread local and fix GC nepotism (Fixes eclipse-vertx#4749)
Browse files Browse the repository at this point in the history
  • Loading branch information
franz1981 committed Jun 21, 2023
1 parent 5348750 commit 15f7368
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 32 deletions.
61 changes: 32 additions & 29 deletions src/main/java/io/vertx/core/net/impl/pool/CombinerExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
package io.vertx.core.net.impl.pool;

import io.netty.util.concurrent.FastThreadLocal;
import io.netty.util.internal.PlatformDependent;

import java.util.Queue;
Expand All @@ -27,7 +28,12 @@ public class CombinerExecutor<S> implements Executor<S> {
private final Queue<Action<S>> q = PlatformDependent.newMpscQueue();
private final AtomicInteger s = new AtomicInteger();
private final S state;
private final ThreadLocal<Task> current = new ThreadLocal<>();

protected final class InProgressHead {
Task head;
}

private final FastThreadLocal<InProgressHead> current = new FastThreadLocal<>();

public CombinerExecutor(S state) {
this.state = state;
Expand All @@ -40,50 +46,47 @@ public void submit(Action<S> action) {
return;
}
Task head = null;
Task tail = null;
do {
try {
head = pollAndExecute(head);
final Action<S> a = q.poll();
if (a == null) {
break;
}
Task task = a.execute(state);
if (task != null) {
if (head == null) {
assert tail == null;
tail = task;
head = task;
} else {
tail = tail.next(task);
}
}
} finally {
s.set(0);
}
} while (!q.isEmpty() && s.compareAndSet(0, 1));
if (head != null) {
Task inProgress = current.get();
InProgressHead inProgress = current.get();
if (inProgress == null) {
current.set(head);
inProgress = new InProgressHead();
current.set(inProgress);
try {
while (head != null) {
head.run();
head = head.next;
}
// don't trust tail during this: linkTasksToHead can change it!
head.runNextTasks(inProgress);
} finally {
current.remove();
}
} else {
merge(inProgress, head);
}
}
}

private Task pollAndExecute(Task head) {
Action<S> action;
while ((action = q.poll()) != null) {
Task task = action.execute(state);
if (task != null) {
if (head == null) {
head = task;
} else {
merge(head, task);
}
assert inProgress.head != null;
linkTasksToHead(inProgress.head, head, tail);
}
}
return head;
}

private static void merge(Task head, Task tail) {
Task tmp = tail.prev;
tail.prev = head.prev;
head.prev.next = tail;
head.prev = tmp;
private static void linkTasksToHead(Task head, Task nextHead, Task nextTail) {
Task oldNext = head.replaceNext(nextHead);
nextTail.next(oldNext);
}
}
27 changes: 24 additions & 3 deletions src/main/java/io/vertx/core/net/impl/pool/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,30 @@

public abstract class Task {

Task prev = this;
Task next;
private Task next;

public abstract void run();
public Task replaceNext(Task next) {
Task oldNext = this.next;
this.next = next;
return oldNext;
}

public Task next(Task next) {
this.next = next;
return next;
}

protected final void runNextTasks(CombinerExecutor.InProgressHead head) {
Task task = this;
while (task != null) {
head.head = task;
task.run();
final Task next = task.next;
// help GC :P
task.next = null;
task = next;
}
}

public abstract void run();
}

0 comments on commit 15f7368

Please sign in to comment.