Skip to content

Commit

Permalink
HADOOP-19330. S3A: Add LeakReporter; use in S3AInputStream (#7151)
Browse files Browse the repository at this point in the history
If a file is opened for reading through the S3A connector
is not closed, then when garbage collection takes place

* An error message is reported at WARN, including the file name.
* A stack trace of where the stream was created is reported
  at INFO.
* A best-effort attempt is made to release any active HTTPS
  connection.
* The filesystem IOStatistic stream_leaks is incremented.

The intent is to make it easier to identify where streams
are being opened and not closed -as these consume resources
including often HTTPS connections from the connection pool
of limited size.

It MUST NOT be relied on as a way to clean up open
files/streams automatically; some of the normal actions of
the close() method are omitted.

Instead: view the warning messages and IOStatistics as a
sign of a problem, the stack trace as a way of identifying
what application code/library needs to be investigated.

Contributed by Steve Loughran
  • Loading branch information
steveloughran authored Nov 14, 2024
1 parent 2273278 commit 7999db5
Show file tree
Hide file tree
Showing 11 changed files with 700 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.impl;

import java.io.Closeable;
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.BooleanSupplier;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.functional.RunnableRaisingIOE;

import static java.util.Objects.requireNonNull;

/**
* A class to report leaks of streams.
* <p>
* It is created during object creation, and closed during finalization.
* Predicates should be supplied for the {@link #isOpen} probe check if the
* resource is still open, and an operation to actually close the
* target.
*/
public class LeakReporter implements Closeable {

/**
* Name of logger used to report leaks: {@value}.
*/
public static final String RESOURCE_LEAKS_LOG_NAME = "org.apache.hadoop.fs.resource.leaks";

/**
* Special log for leaked streams.
*/
private static final Logger LEAK_LOG =
LoggerFactory.getLogger(RESOURCE_LEAKS_LOG_NAME);

/**
* Format string used to build the thread information: {@value}.
*/
@VisibleForTesting
static final String THREAD_FORMAT =
"; thread: %s; id: %d";

/**
* Re-entrancy check.
*/
private final AtomicBoolean closed = new AtomicBoolean(false);

/**
* Predicate to check if the resource is open.
*/
private final BooleanSupplier isOpen;

/**
* Action to close the resource.
*/
private final RunnableRaisingIOE closeAction;

/**
* Stack trace of object creation; used to
* report of unclosed streams in finalize().
*/
private final IOException leakException;

/**
* Constructor.
* <p>
* Validates the parameters and builds the stack;
* append "; thread: " + thread name.
* @param message error message
* @param isOpen open predicate
* @param closeAction action to close
*/
public LeakReporter(
final String message,
final BooleanSupplier isOpen,
final RunnableRaisingIOE closeAction) {
this.isOpen = requireNonNull(isOpen);
this.closeAction = requireNonNull(closeAction);
// build the warning thread.
// This includes the error string to print, so as to avoid
// constructing objects in finalize().
this.leakException = new IOException(message
+ String.format(THREAD_FORMAT,
Thread.currentThread().getName(),
Thread.currentThread().getId()));
}

/**
* Close the resource.
*/
@Override
public void close() {
try {
if (!closed.getAndSet(true) && isOpen.getAsBoolean()) {
// log a warning with the creation stack
LEAK_LOG.warn(leakException.getMessage());
// The creation stack is logged at INFO, so that
// it is possible to configure logging to print
// the name of files left open, without printing
// the stacks. This is better for production use.

LEAK_LOG.info("stack", leakException);
closeAction.apply();
}
} catch (Exception e) {
LEAK_LOG.info("executing leak cleanup actions", e);
}
}

public IOException getLeakException() {
return leakException;
}

public boolean isClosed() {
return closed.get();
}

@Override
public String toString() {
return "LeakReporter{" +
"closed=" + closed.get() +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,14 @@
@InterfaceStability.Evolving
public final class StreamStatisticNames {

/**
* Count of Stream leaks from an application which
* is not cleaning up correctly.
* Value :{@value}.
*/
public static final String STREAM_LEAKS =
"stream_leaks";

/**
* Count of times the TCP stream was aborted.
* Value: {@value}.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.util.functional;

import java.io.IOException;
import java.io.UncheckedIOException;

/**
* Runnable interface whose {@link #apply()} method may raise
* an IOE.
* The implementation of {@link Runnable#run} invokes this
* and converts any raised IOE into an {@link UncheckedIOException}.
*/
@FunctionalInterface
public interface RunnableRaisingIOE extends Runnable {

/**
* Apply the operation.
* @throws IOException Any IO failure
*/
void apply() throws IOException;

@Override
default void run() {
try {
apply();
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.hadoop.fs.impl;

import java.util.concurrent.atomic.AtomicInteger;

import org.assertj.core.api.Assertions;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.test.AbstractHadoopTestBase;
import org.apache.hadoop.test.GenericTestUtils;

import static org.apache.hadoop.fs.impl.LeakReporter.THREAD_FORMAT;
import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;

public final class TestLeakReporter extends AbstractHadoopTestBase {

private static final Logger LOG =
LoggerFactory.getLogger(TestLeakReporter.class);

/**
* Count of close calls.
*/
private final AtomicInteger closeCount = new AtomicInteger();

/**
* Big test: creates a reporter, closes it.
* Verifies that the error message and stack traces is printed when
* open, and that the close callback was invoked.
* <p>
* After the first invocation, a second invocation is ignored.
*/
@Test
public void testLeakInvocation() throws Throwable {

final String message = "<message>";
final LeakReporter reporter = new LeakReporter(message,
() -> true,
this::closed);

// store the old thread name and change it,
// so the log test can verify that the old thread name is printed.
String oldName = Thread.currentThread().getName();
Thread.currentThread().setName("thread");
// Capture the logs
GenericTestUtils.LogCapturer logs =
captureLogs(LoggerFactory.getLogger(Logger.ROOT_LOGGER_NAME));
expectClose(reporter, 1);

// check the log
logs.stopCapturing();
final String output = logs.getOutput();
LOG.info("output of leak log is {}", output);

final String threadInfo = String.format(THREAD_FORMAT,
oldName,
Thread.currentThread().getId());
// log auditing
Assertions.assertThat(output)
.describedAs("output from the logs")
.contains("WARN")
.contains(message)
.contains(Thread.currentThread().getName())
.contains(threadInfo)
.contains("TestLeakReporter.testLeakInvocation")
.contains("INFO")
.contains("stack");

// no reentrancy
expectClose(reporter, 1);
}

/**
* Expect the close operation to result in
* a value of the close count to be as expected.
* @param reporter leak reporter
* @param expected expected value after the close
*/
private void expectClose(final LeakReporter reporter, final int expected) {
reporter.close();
assertCloseCount(expected);
}

/**
* Close operation: increments the counter.
*/
private void closed() {
closeCount.incrementAndGet();
}

/**
* When the source is closed, no leak cleanup takes place.
*/
@Test
public void testLeakSkipped() throws Throwable {

final LeakReporter reporter = new LeakReporter("<message>",
() -> false,
this::closed);
expectClose(reporter, 0);
}

/**
* If the probe raises an exception, the exception is swallowed
* and the close action is never invoked.
*/
@Test
public void testProbeFailureSwallowed() throws Throwable {
final LeakReporter reporter = new LeakReporter("<message>",
this::raiseNPE,
this::closed);
expectClose(reporter, 0);
}

/**
* Any exception raised in the close action it is swallowed.
*/
@Test
public void testCloseActionSwallowed() throws Throwable {
final LeakReporter reporter = new LeakReporter("<message>",
() -> true,
this::raiseNPE);
reporter.close();

Assertions.assertThat(reporter.isClosed())
.describedAs("reporter closed)")
.isTrue();
}

/**
* Always raises an NPE.
* @return never
*/
private boolean raiseNPE() {
throw new NullPointerException("oops");
}

/**
* Assert that the value of {@link #closeCount} is as expected.
* @param ex expected.
*/
private void assertCloseCount(final int ex) {
Assertions.assertThat(closeCount.get())
.describedAs("close count")
.isEqualTo(ex);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@
import org.apache.hadoop.fs.statistics.IOStatistics;
import org.apache.hadoop.fs.statistics.IOStatisticsSource;
import org.apache.hadoop.fs.statistics.IOStatisticsContext;
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
import org.apache.hadoop.fs.statistics.impl.IOStatisticsStore;
import org.apache.hadoop.fs.store.LogExactlyOnce;
import org.apache.hadoop.fs.store.audit.AuditEntryPoint;
Expand Down Expand Up @@ -5587,6 +5588,10 @@ public boolean hasPathCapability(final Path path, final String capability)
case AWS_S3_ACCESS_GRANTS_ENABLED:
return s3AccessGrantsEnabled;

// stream leak detection.
case StreamStatisticNames.STREAM_LEAKS:
return !prefetchEnabled;

default:
// is it a performance flag?
if (performanceFlags.hasCapability(capability)) {
Expand Down
Loading

0 comments on commit 7999db5

Please sign in to comment.