Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: tracking resource more readable #7267

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions server/src/main/java/org/opensearch/tasks/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.ExceptionsHelper;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.core.common.io.stream.NamedWriteable;
import org.opensearch.core.tasks.TaskId;
import org.opensearch.core.tasks.resourcetracker.ResourceStats;
Expand All @@ -58,6 +57,7 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

/**
* Current task information
Expand Down Expand Up @@ -95,7 +95,7 @@

private final Map<Long, List<ThreadResourceInfo>> resourceStats;

private final List<NotifyOnceListener<Task>> resourceTrackingCompletionListeners;
private final List<Consumer<Task>> resourceTrackingCompletionCallback;

/**
* Keeps track of the number of active resource tracking threads for this task. It is initialized to 1 to track
Expand Down Expand Up @@ -139,7 +139,7 @@
long startTimeNanos,
Map<String, String> headers,
ConcurrentHashMap<Long, List<ThreadResourceInfo>> resourceStats,
List<NotifyOnceListener<Task>> resourceTrackingCompletionListeners
List<Consumer<Task>> resourceTrackingCompletionCallback
) {
this.id = id;
this.type = type;
Expand All @@ -150,7 +150,7 @@
this.startTimeNanos = startTimeNanos;
this.headers = headers;
this.resourceStats = resourceStats;
this.resourceTrackingCompletionListeners = resourceTrackingCompletionListeners;
this.resourceTrackingCompletionCallback = resourceTrackingCompletionCallback;
}

/**
Expand Down Expand Up @@ -527,9 +527,9 @@
* Registers a task resource tracking completion listener on this task if resource tracking is still active.
* Returns true on successful subscription, false otherwise.
*/
public boolean addResourceTrackingCompletionListener(NotifyOnceListener<Task> listener) {
public boolean addResourceTrackingCompletionCallback(Consumer<Task> callback) {
if (numActiveResourceTrackingThreads.get() > 0) {
resourceTrackingCompletionListeners.add(listener);
resourceTrackingCompletionCallback.add(callback);
return true;
}

Expand Down Expand Up @@ -564,19 +564,15 @@
int count = numActiveResourceTrackingThreads.decrementAndGet();

if (count == 0) {
List<Exception> listenerExceptions = new ArrayList<>();
resourceTrackingCompletionListeners.forEach(listener -> {
Copy link
Member

@andrross andrross Feb 13, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@fudongyingluck Seems like there is a potential behavior change here. The NotifyOnceListener will only take action on the first invocation, and all subsequent invocations are no-ops. You're removing that behavior by using a basic Consumer. How do you know this won't cause a regression?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your time. I think the "all subsequent invocations" means the innerOnFailure(). Although there is a little change here, I do not think we should surround the exception layer after layer. The code is very hard to read. And innerOnFailure() actually has another way to complete it. Since I haven't worked on this part recently, this issue will be closed. Thanks for your time again ~

List<Exception> callbackExceptions = new ArrayList<>();
resourceTrackingCompletionCallback.forEach(callback -> {
try {
listener.onResponse(this);
} catch (Exception e1) {
try {
listener.onFailure(e1);
} catch (Exception e2) {
listenerExceptions.add(e2);
}
callback.accept(this);
} catch (Exception e) {
callbackExceptions.add(e);

Check warning on line 572 in server/src/main/java/org/opensearch/tasks/Task.java

View check run for this annotation

Codecov / codecov/patch

server/src/main/java/org/opensearch/tasks/Task.java#L571-L572

Added lines #L571 - L572 were not covered by tests
}
});
ExceptionsHelper.maybeThrowRuntimeAndSuppress(listenerExceptions);
ExceptionsHelper.maybeThrowRuntimeAndSuppress(callbackExceptions);
}

return count;
Expand Down
16 changes: 3 additions & 13 deletions server/src/main/java/org/opensearch/tasks/TaskManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
import org.opensearch.core.Assertions;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.core.common.unit.ByteSizeValue;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.core.tasks.TaskId;
Expand Down Expand Up @@ -211,18 +210,9 @@ public Task register(String type, String action, TaskAwareRequest request) {
}

if (task.supportsResourceTracking()) {
boolean success = task.addResourceTrackingCompletionListener(new NotifyOnceListener<>() {
@Override
protected void innerOnResponse(Task task) {
// Stop tracking the task once the last thread has been marked inactive.
if (taskResourceTrackingService.get() != null && task.supportsResourceTracking()) {
taskResourceTrackingService.get().stopTracking(task);
}
}

@Override
protected void innerOnFailure(Exception e) {
ExceptionsHelper.reThrowIfNotNull(e);
boolean success = task.addResourceTrackingCompletionCallback(t -> {
if (taskResourceTrackingService.get() != null) {
taskResourceTrackingService.get().stopTracking(t);
}
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.concurrent.AbstractRunnable;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.core.common.io.stream.StreamInput;
import org.opensearch.core.common.io.stream.StreamOutput;
import org.opensearch.core.tasks.TaskCancelledException;
Expand Down Expand Up @@ -189,16 +188,8 @@ protected void doRun() {
// operationFinishedValidator will be called just after all task threads are marked inactive and
// the task is unregistered.
if (taskTestContext.operationFinishedValidator != null) {
boolean success = task.addResourceTrackingCompletionListener(new NotifyOnceListener<>() {
@Override
protected void innerOnResponse(Task task) {
taskTestContext.operationFinishedValidator.accept(task, threadId.get());
}

@Override
protected void innerOnFailure(Exception e) {
ExceptionsHelper.reThrowIfNotNull(e);
}
boolean success = task.addResourceTrackingCompletionCallback(t -> {
taskTestContext.operationFinishedValidator.accept(task, threadId.get());
});

if (success == false) {
Expand Down
Loading