Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HADOOP-19341] HDFS Client's direct memory leaks with erasure coding enabled #7168

Open
wants to merge 6 commits into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,6 @@
import org.apache.hadoop.thirdparty.com.google.common.collect.ComparisonChain;
import org.apache.commons.lang3.builder.HashCodeBuilder;

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.TreeMap;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;

Expand All @@ -36,7 +32,7 @@
*/
@InterfaceAudience.Public
@InterfaceStability.Stable
public class ElasticByteBufferPool implements ByteBufferPool {
public abstract class ElasticByteBufferPool implements ByteBufferPool {
EungsopYoo marked this conversation as resolved.
Show resolved Hide resolved
protected static final class Key implements Comparable<Key> {
private final int capacity;
private final long insertionTime;
Expand Down Expand Up @@ -76,48 +72,6 @@ public int hashCode() {
}
}

private final TreeMap<Key, ByteBuffer> buffers =
new TreeMap<Key, ByteBuffer>();

private final TreeMap<Key, ByteBuffer> directBuffers =
new TreeMap<Key, ByteBuffer>();

private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) {
return direct ? directBuffers : buffers;
}

@Override
public synchronized ByteBuffer getBuffer(boolean direct, int length) {
TreeMap<Key, ByteBuffer> tree = getBufferTree(direct);
Map.Entry<Key, ByteBuffer> entry =
tree.ceilingEntry(new Key(length, 0));
if (entry == null) {
return direct ? ByteBuffer.allocateDirect(length) :
ByteBuffer.allocate(length);
}
tree.remove(entry.getKey());
entry.getValue().clear();
return entry.getValue();
}

@Override
public synchronized void putBuffer(ByteBuffer buffer) {
buffer.clear();
TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect());
while (true) {
Key key = new Key(buffer.capacity(), System.nanoTime());
if (!tree.containsKey(key)) {
tree.put(key, buffer);
return;
}
// Buffers are indexed by (capacity, time).
// If our key is not unique on the first try, we try again, since the
// time will be different. Since we use nanoseconds, it's pretty
// unlikely that we'll loop even once, unless the system clock has a
// poor granularity.
}
}

/**
* Get the size of the buffer pool, for the specified buffer type.
*
Expand All @@ -126,7 +80,5 @@ public synchronized void putBuffer(ByteBuffer buffer) {
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public int size(boolean direct) {
return getBufferTree(direct).size();
}
public abstract int getCurrentBuffersCount(boolean direct);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package org.apache.hadoop.io;

Check failure on line 1 in hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestElasticByteBufferPool.java

View check run for this annotation

ASF Cloudbees Jenkins ci-hadoop / Apache Yetus

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestElasticByteBufferPool.java#L1

asflicense: Missing Apache License

import org.assertj.core.api.Assertions;
import org.junit.Test;

import java.nio.ByteBuffer;

public class TestElasticByteBufferPool {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are just testing the WeakReferencedElasticByteBufferPool here as well with just using the abstract base class. I think this is just duplicate and you can just change all the instances of base class in the TestWeakReferencedElasticByteBufferPool

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Test
public void testGarbageCollection() throws Exception {
final ElasticByteBufferPool ebbp = new WeakReferencedElasticByteBufferPool();
ByteBuffer buffer1 = ebbp.getBuffer(true, 5);
ByteBuffer buffer2 = ebbp.getBuffer(true, 10);
ByteBuffer buffer3 = ebbp.getBuffer(true, 15);

// the pool is empty yet
Assertions.assertThat(ebbp.getCurrentBuffersCount(true)).isEqualTo(0);

// put the buffers back to the pool
ebbp.putBuffer(buffer2);
ebbp.putBuffer(buffer3);
Assertions.assertThat(ebbp.getCurrentBuffersCount(true)).isEqualTo(2);

// release the references to be garbage-collected
buffer2 = null;
buffer3 = null;
System.gc();

// call getBuffer() to trigger the cleanup
ByteBuffer buffer4 = ebbp.getBuffer(true, 10);

// the pool should be empty. buffer2, buffer3 should be garbage-collected
Assertions.assertThat(ebbp.getCurrentBuffersCount(true)).isEqualTo(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripeRange;
import org.apache.hadoop.io.ByteBufferPool;

import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;

Expand Down Expand Up @@ -64,7 +64,7 @@
@InterfaceAudience.Private
public class DFSStripedInputStream extends DFSInputStream {

private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
private static final ByteBufferPool BUFFER_POOL = new WeakReferencedElasticByteBufferPool();
private final BlockReaderInfo[] blockReaders;
private final int cellSize;
private final short dataBlkNum;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.MultipleIOException;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder;
Expand Down Expand Up @@ -83,7 +83,7 @@
@InterfaceAudience.Private
public class DFSStripedOutputStream extends DFSOutputStream
implements StreamCapabilities {
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
private static final ByteBufferPool BUFFER_POOL = new WeakReferencedElasticByteBufferPool();

/**
* OutputStream level last exception, will be used to indicate the fatal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.token.Token;

Expand Down Expand Up @@ -69,7 +69,7 @@ class StripedBlockWriter {
private ByteBuffer targetBuffer;
private long blockOffset4Target = 0;
private long seqNo4Target = 0;
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
private static final ByteBufferPool BUFFER_POOL = new WeakReferencedElasticByteBufferPool();

StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode,
Configuration conf, ExtendedBlock block,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.hadoop.hdfs.util.StripedBlockUtil;
import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats;
import org.apache.hadoop.io.ByteBufferPool;
import org.apache.hadoop.io.ElasticByteBufferPool;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.io.erasurecode.CodecUtil;
import org.apache.hadoop.io.erasurecode.ErasureCoderOptions;
import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder;
Expand Down Expand Up @@ -108,7 +108,7 @@ abstract class StripedReconstructor {
private final ErasureCoderOptions coderOptions;
private RawErasureDecoder decoder;
private final ExtendedBlock blockGroup;
private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool();
private static final ByteBufferPool BUFFER_POOL = new WeakReferencedElasticByteBufferPool();

private final boolean isValidationEnabled;
private DecodingValidator validator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -555,16 +555,16 @@ public void testCloseDoesNotAllocateNewBuffer() throws Exception {
final ElasticByteBufferPool ebbp =
(ElasticByteBufferPool) stream.getBufferPool();
// first clear existing pool
LOG.info("Current pool size: direct: " + ebbp.size(true) + ", indirect: "
+ ebbp.size(false));
LOG.info("Current pool size: direct: " + ebbp.getCurrentBuffersCount(true) + ", indirect: "
+ ebbp.getCurrentBuffersCount(false));
emptyBufferPoolForCurrentPolicy(ebbp, true);
emptyBufferPoolForCurrentPolicy(ebbp, false);
final int startSizeDirect = ebbp.size(true);
final int startSizeIndirect = ebbp.size(false);
final int startSizeDirect = ebbp.getCurrentBuffersCount(true);
final int startSizeIndirect = ebbp.getCurrentBuffersCount(false);
// close should not allocate new buffers in the pool.
stream.close();
assertEquals(startSizeDirect, ebbp.size(true));
assertEquals(startSizeIndirect, ebbp.size(false));
assertEquals(startSizeDirect, ebbp.getCurrentBuffersCount(true));
assertEquals(startSizeIndirect, ebbp.getCurrentBuffersCount(false));
}
}

Expand Down Expand Up @@ -621,10 +621,10 @@ public void testReadWhenLastIncompleteCellComeInToDecodeAlignedStripe()
private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp,
boolean direct) {
int size;
while ((size = ebbp.size(direct)) != 0) {
while ((size = ebbp.getCurrentBuffersCount(direct)) != 0) {
ebbp.getBuffer(direct,
ecPolicy.getCellSize() * ecPolicy.getNumDataUnits());
if (size == ebbp.size(direct)) {
if (size == ebbp.getCurrentBuffersCount(direct)) {
// if getBuffer didn't decrease size, it means the pool for the buffer
// corresponding to current ecPolicy is empty
break;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -828,7 +828,7 @@ public void interceptFreeBlockReaderBuffer() {

private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool,
boolean direct) {
while (bufferPool.size(direct) != 0) {
while (bufferPool.getCurrentBuffersCount(direct) != 0) {
// iterate all ByteBuffers in ElasticByteBufferPool
ByteBuffer byteBuffer = bufferPool.getBuffer(direct, 0);
Assert.assertEquals(0, byteBuffer.position());
Expand All @@ -837,7 +837,7 @@ private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool,

private void emptyBufferPool(ElasticByteBufferPool bufferPool,
boolean direct) {
while (bufferPool.size(direct) != 0) {
while (bufferPool.getCurrentBuffersCount(direct) != 0) {
bufferPool.getBuffer(direct, 0);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@

import org.apache.hadoop.fs.impl.StoreImplementationUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool;
import org.apache.hadoop.util.Preconditions;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -239,7 +240,7 @@ void dump() {}
* back to the queue
*/
private final ElasticByteBufferPool poolReadyByteBuffers
= new ElasticByteBufferPool();
= new WeakReferencedElasticByteBufferPool();

/**
* The blob's block list.
Expand Down
Loading