diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java index c4c2940622729..6addcd9f62642 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/ElasticByteBufferPool.java @@ -20,10 +20,6 @@ import org.apache.hadoop.thirdparty.com.google.common.collect.ComparisonChain; import org.apache.commons.lang3.builder.HashCodeBuilder; -import java.nio.ByteBuffer; -import java.util.Map; -import java.util.TreeMap; - import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -36,7 +32,7 @@ */ @InterfaceAudience.Public @InterfaceStability.Stable -public class ElasticByteBufferPool implements ByteBufferPool { +public abstract class ElasticByteBufferPool implements ByteBufferPool { protected static final class Key implements Comparable { private final int capacity; private final long insertionTime; @@ -76,48 +72,6 @@ public int hashCode() { } } - private final TreeMap buffers = - new TreeMap(); - - private final TreeMap directBuffers = - new TreeMap(); - - private final TreeMap getBufferTree(boolean direct) { - return direct ? directBuffers : buffers; - } - - @Override - public synchronized ByteBuffer getBuffer(boolean direct, int length) { - TreeMap tree = getBufferTree(direct); - Map.Entry entry = - tree.ceilingEntry(new Key(length, 0)); - if (entry == null) { - return direct ? ByteBuffer.allocateDirect(length) : - ByteBuffer.allocate(length); - } - tree.remove(entry.getKey()); - entry.getValue().clear(); - return entry.getValue(); - } - - @Override - public synchronized void putBuffer(ByteBuffer buffer) { - buffer.clear(); - TreeMap tree = getBufferTree(buffer.isDirect()); - while (true) { - Key key = new Key(buffer.capacity(), System.nanoTime()); - if (!tree.containsKey(key)) { - tree.put(key, buffer); - return; - } - // Buffers are indexed by (capacity, time). - // If our key is not unique on the first try, we try again, since the - // time will be different. Since we use nanoseconds, it's pretty - // unlikely that we'll loop even once, unless the system clock has a - // poor granularity. - } - } - /** * Get the size of the buffer pool, for the specified buffer type. * @@ -126,7 +80,5 @@ public synchronized void putBuffer(ByteBuffer buffer) { */ @InterfaceAudience.Private @InterfaceStability.Unstable - public int size(boolean direct) { - return getBufferTree(direct).size(); - } + public abstract int getCurrentBuffersCount(boolean direct); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java index 6ca380ef0e46b..6d2d701b7e24a 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestMoreWeakReferencedElasticByteBufferPool.java @@ -36,7 +36,7 @@ public class TestMoreWeakReferencedElasticByteBufferPool @Test public void testMixedBuffersInPool() { - WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); ByteBuffer buffer1 = pool.getBuffer(true, 5); ByteBuffer buffer2 = pool.getBuffer(true, 10); ByteBuffer buffer3 = pool.getBuffer(false, 5); @@ -60,7 +60,7 @@ public void testMixedBuffersInPool() { @Test public void testUnexpectedBufferSizes() throws Exception { - WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); ByteBuffer buffer1 = pool.getBuffer(true, 0); // try writing a random byte in a 0 length buffer. @@ -84,7 +84,7 @@ public void testUnexpectedBufferSizes() throws Exception { * @param numDirectBuffersExpected expected number of direct buffers. * @param numHeapBuffersExpected expected number of heap buffers. */ - private void assertBufferCounts(WeakReferencedElasticByteBufferPool pool, + private void assertBufferCounts(ElasticByteBufferPool pool, int numDirectBuffersExpected, int numHeapBuffersExpected) { Assertions.assertThat(pool.getCurrentBuffersCount(true)) diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java index 1434010ffa652..01d00fb0fb3f8 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/TestWeakReferencedElasticByteBufferPool.java @@ -53,7 +53,7 @@ public TestWeakReferencedElasticByteBufferPool(String type) { @Test public void testGetAndPutBasic() { - WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); int bufferSize = 5; ByteBuffer buffer = pool.getBuffer(isDirect, bufferSize); Assertions.assertThat(buffer.isDirect()) @@ -83,7 +83,7 @@ public void testGetAndPutBasic() { @Test public void testPoolingWithDifferentSizes() { - WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); ByteBuffer buffer = pool.getBuffer(isDirect, 5); ByteBuffer buffer1 = pool.getBuffer(isDirect, 10); ByteBuffer buffer2 = pool.getBuffer(isDirect, 15); @@ -121,7 +121,7 @@ public void testPoolingWithDifferentSizes() { @Test public void testPoolingWithDifferentInsertionTime() { - WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); ByteBuffer buffer = pool.getBuffer(isDirect, 10); ByteBuffer buffer1 = pool.getBuffer(isDirect, 10); ByteBuffer buffer2 = pool.getBuffer(isDirect, 10); @@ -155,7 +155,7 @@ public void testPoolingWithDifferentInsertionTime() { @Test public void testGarbageCollection() { - WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); ByteBuffer buffer = pool.getBuffer(isDirect, 5); ByteBuffer buffer1 = pool.getBuffer(isDirect, 10); ByteBuffer buffer2 = pool.getBuffer(isDirect, 15); @@ -187,7 +187,7 @@ public void testGarbageCollection() { @Test public void testWeakReferencesPruning() { - WeakReferencedElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); + ElasticByteBufferPool pool = new WeakReferencedElasticByteBufferPool(); ByteBuffer buffer1 = pool.getBuffer(isDirect, 5); ByteBuffer buffer2 = pool.getBuffer(isDirect, 10); ByteBuffer buffer3 = pool.getBuffer(isDirect, 15); diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java index d6131f8ddeb54..da7b30831c192 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedInputStream.java @@ -36,7 +36,7 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil.StripeRange; import org.apache.hadoop.io.ByteBufferPool; -import org.apache.hadoop.io.ElasticByteBufferPool; +import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; @@ -64,7 +64,7 @@ @InterfaceAudience.Private public class DFSStripedInputStream extends DFSInputStream { - private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); + private static final ByteBufferPool BUFFER_POOL = new WeakReferencedElasticByteBufferPool(); private final BlockReaderInfo[] blockReaders; private final int cellSize; private final short dataBlkNum; diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java index 8320cc9a40866..f39e9c12e1a6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/DFSStripedOutputStream.java @@ -37,8 +37,8 @@ import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage; import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.io.ByteBufferPool; -import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureEncoder; @@ -83,7 +83,7 @@ @InterfaceAudience.Private public class DFSStripedOutputStream extends DFSOutputStream implements StreamCapabilities { - private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); + private static final ByteBufferPool BUFFER_POOL = new WeakReferencedElasticByteBufferPool(); /** * OutputStream level last exception, will be used to indicate the fatal diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java index 24c1d61382275..39ed1d2edd175 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedBlockWriter.java @@ -32,8 +32,8 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.io.ByteBufferPool; -import org.apache.hadoop.io.ElasticByteBufferPool; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.token.Token; @@ -69,7 +69,7 @@ class StripedBlockWriter { private ByteBuffer targetBuffer; private long blockOffset4Target = 0; private long seqNo4Target = 0; - private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); + private static final ByteBufferPool BUFFER_POOL = new WeakReferencedElasticByteBufferPool(); StripedBlockWriter(StripedWriter stripedWriter, DataNode datanode, Configuration conf, ExtendedBlock block, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java index 7acb679200118..7c810eeae2f71 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/erasurecode/StripedReconstructor.java @@ -30,7 +30,7 @@ import org.apache.hadoop.hdfs.util.StripedBlockUtil; import org.apache.hadoop.hdfs.util.StripedBlockUtil.BlockReadStats; import org.apache.hadoop.io.ByteBufferPool; -import org.apache.hadoop.io.ElasticByteBufferPool; +import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; import org.apache.hadoop.io.erasurecode.CodecUtil; import org.apache.hadoop.io.erasurecode.ErasureCoderOptions; import org.apache.hadoop.io.erasurecode.rawcoder.RawErasureDecoder; @@ -108,7 +108,7 @@ abstract class StripedReconstructor { private final ErasureCoderOptions coderOptions; private RawErasureDecoder decoder; private final ExtendedBlock blockGroup; - private static final ByteBufferPool BUFFER_POOL = new ElasticByteBufferPool(); + private static final ByteBufferPool BUFFER_POOL = new WeakReferencedElasticByteBufferPool(); private final boolean isValidationEnabled; private DecodingValidator validator; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java index 9d3bdbf4e9520..edaa8c0877b13 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java @@ -555,16 +555,16 @@ public void testCloseDoesNotAllocateNewBuffer() throws Exception { final ElasticByteBufferPool ebbp = (ElasticByteBufferPool) stream.getBufferPool(); // first clear existing pool - LOG.info("Current pool size: direct: " + ebbp.size(true) + ", indirect: " - + ebbp.size(false)); + LOG.info("Current pool size: direct: " + ebbp.getCurrentBuffersCount(true) + ", indirect: " + + ebbp.getCurrentBuffersCount(false)); emptyBufferPoolForCurrentPolicy(ebbp, true); emptyBufferPoolForCurrentPolicy(ebbp, false); - final int startSizeDirect = ebbp.size(true); - final int startSizeIndirect = ebbp.size(false); + final int startSizeDirect = ebbp.getCurrentBuffersCount(true); + final int startSizeIndirect = ebbp.getCurrentBuffersCount(false); // close should not allocate new buffers in the pool. stream.close(); - assertEquals(startSizeDirect, ebbp.size(true)); - assertEquals(startSizeIndirect, ebbp.size(false)); + assertEquals(startSizeDirect, ebbp.getCurrentBuffersCount(true)); + assertEquals(startSizeIndirect, ebbp.getCurrentBuffersCount(false)); } } @@ -621,10 +621,10 @@ public void testReadWhenLastIncompleteCellComeInToDecodeAlignedStripe() private void emptyBufferPoolForCurrentPolicy(ElasticByteBufferPool ebbp, boolean direct) { int size; - while ((size = ebbp.size(direct)) != 0) { + while ((size = ebbp.getCurrentBuffersCount(direct)) != 0) { ebbp.getBuffer(direct, ecPolicy.getCellSize() * ecPolicy.getNumDataUnits()); - if (size == ebbp.size(direct)) { + if (size == ebbp.getCurrentBuffersCount(direct)) { // if getBuffer didn't decrease size, it means the pool for the buffer // corresponding to current ecPolicy is empty break; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java index 2f734c62f420b..920ccdff1fafe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestReconstructStripedFile.java @@ -828,7 +828,7 @@ public void interceptFreeBlockReaderBuffer() { private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool, boolean direct) { - while (bufferPool.size(direct) != 0) { + while (bufferPool.getCurrentBuffersCount(direct) != 0) { // iterate all ByteBuffers in ElasticByteBufferPool ByteBuffer byteBuffer = bufferPool.getBuffer(direct, 0); Assert.assertEquals(0, byteBuffer.position()); @@ -837,7 +837,7 @@ private void assertBufferPoolIsEmpty(ElasticByteBufferPool bufferPool, private void emptyBufferPool(ElasticByteBufferPool bufferPool, boolean direct) { - while (bufferPool.size(direct) != 0) { + while (bufferPool.getCurrentBuffersCount(direct) != 0) { bufferPool.getBuffer(direct, 0); } } diff --git a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java index 4c8d5fb6a5f71..e88469917a087 100644 --- a/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java +++ b/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azure/BlockBlobAppendStream.java @@ -43,6 +43,7 @@ import org.apache.hadoop.fs.impl.StoreImplementationUtils; import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.io.WeakReferencedElasticByteBufferPool; import org.apache.hadoop.util.Preconditions; import org.apache.commons.lang3.StringUtils; @@ -239,7 +240,7 @@ void dump() {} * back to the queue */ private final ElasticByteBufferPool poolReadyByteBuffers - = new ElasticByteBufferPool(); + = new WeakReferencedElasticByteBufferPool(); /** * The blob's block list.