From 6f2fa9b5e911a385d83bf3ba8e30dd2ff9f57a7e Mon Sep 17 00:00:00 2001 From: Kandhare Date: Mon, 27 Jun 2016 16:17:49 -0700 Subject: [PATCH 1/2] Added byteBuffer based APIs to support zerocopy calls for distributedlog. Please review and merge these two changes together --- .../bookkeeper/client/CRC32DigestManager.java | 4 +- .../bookkeeper/client/DigestManager.java | 36 +++++----- .../client/LedgerFragmentReplicator.java | 18 ++--- .../bookkeeper/client/LedgerHandle.java | 66 ++++++++++--------- .../bookkeeper/client/MacDigestManager.java | 16 ++--- .../apache/bookkeeper/client/ClientUtil.java | 4 +- .../bookkeeper/client/LedgerRecoveryTest.java | 2 +- .../client/ParallelLedgerRecoveryTest.java | 2 +- 8 files changed, 78 insertions(+), 70 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java index 9194bf9c..58accb4a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/CRC32DigestManager.java @@ -49,7 +49,7 @@ byte[] getValueAndReset() { } @Override - void update(byte[] data, int offset, int length) { - crc.get().update(data, offset, length); + void update(ByteBuffer data) { + crc.get().update(data); } } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java index 2753680f..920d234c 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java @@ -18,16 +18,16 @@ * limitations under the License. */ -import java.nio.ByteBuffer; -import java.security.GeneralSecurityException; - import org.apache.bookkeeper.client.BKException.BKDigestMatchException; import org.apache.bookkeeper.client.BookKeeper.DigestType; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBufferInputStream; import org.jboss.netty.buffer.ChannelBuffers; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.security.GeneralSecurityException; /** * This class takes an entry, attaches a digest to it and packages it with relevant @@ -45,11 +45,12 @@ abstract class DigestManager { abstract int getMacCodeLength(); - void update(byte[] data) { - update(data, 0, data.length); - } + /*void update(ByteBuffer data) { + update(data, 0, data.limit()); + }*/ - abstract void update(byte[] data, int offset, int length); + // abstract void update(byte[] data, int offset, int length); + abstract void update(ByteBuffer data); abstract byte[] getValueAndReset(); final int macCodeLength; @@ -80,7 +81,8 @@ static DigestManager instantiate(long ledgerId, byte[] passwd, DigestType digest * @return */ - public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, byte[] data, int doffset, int dlength) { + public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAddConfirmed, long length, + ByteBuffer data) { byte[] bufferArray = new byte[METADATA_LENGTH + macCodeLength]; ByteBuffer buffer = ByteBuffer.wrap(bufferArray); @@ -90,8 +92,8 @@ public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAd buffer.putLong(length); buffer.flip(); - update(buffer.array(), 0, METADATA_LENGTH); - update(data, doffset, dlength); + update(buffer); + update(data); byte[] digest = getValueAndReset(); buffer.limit(buffer.capacity()); @@ -99,7 +101,7 @@ public ChannelBuffer computeDigestAndPackageForSending(long entryId, long lastAd buffer.put(digest); buffer.flip(); - return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer), ChannelBuffers.wrappedBuffer(data, doffset, dlength)); + return ChannelBuffers.wrappedBuffer(ChannelBuffers.wrappedBuffer(buffer), ChannelBuffers.wrappedBuffer(data)); } private void verifyDigest(ChannelBuffer dataReceived) throws BKDigestMatchException { @@ -123,10 +125,14 @@ private void verifyDigest(long entryId, ChannelBuffer dataReceived, boolean skip this.getClass().getName(), dataReceived.readableBytes()); throw new BKDigestMatchException(); } - update(dataReceivedBuffer.array(), dataReceivedBuffer.position(), METADATA_LENGTH); + + dataReceivedBuffer.limit(dataReceivedBuffer.position() + METADATA_LENGTH); + update(dataReceivedBuffer); int offset = METADATA_LENGTH + macCodeLength; - update(dataReceivedBuffer.array(), dataReceivedBuffer.position() + offset, dataReceived.readableBytes() - offset); + dataReceivedBuffer.position(dataReceivedBuffer.position() + offset); + dataReceivedBuffer.limit( dataReceived.readableBytes() - offset); + update(dataReceivedBuffer); digest = getValueAndReset(); for (int i = 0; i < digest.length; i++) { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java index ffabb58f..dee4250a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerFragmentReplicator.java @@ -19,17 +19,6 @@ */ package org.apache.bookkeeper.client; -import java.util.ArrayList; -import java.util.Enumeration; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; import org.apache.bookkeeper.net.BookieSocketAddress; import org.apache.bookkeeper.proto.BookieProtocol; @@ -42,6 +31,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + /** * This is the helper class for replicating the fragments from one bookie to * another @@ -288,7 +282,7 @@ public void readComplete(int rc, LedgerHandle lh, ChannelBuffer toSend = lh.getDigestManager() .computeDigestAndPackageForSending(entryId, lh.getLastAddConfirmed(), entry.getLength(), - data, 0, data.length); + ByteBuffer.wrap(data)); bkc.getBookieClient().addEntry(newBookie, lh.getId(), lh.getLedgerKey(), entryId, toSend, multiWriteCallback, null, BookieProtocol.FLAG_RECOVERY_ADD); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index d700802a..eb3908ea 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -20,23 +20,12 @@ */ package org.apache.bookkeeper.client; -import java.security.GeneralSecurityException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Enumeration; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.RejectedExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; - import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Objects; +import com.google.common.cache.CacheBuilder; +import com.google.common.cache.CacheLoader; +import com.google.common.cache.LoadingCache; +import com.google.common.util.concurrent.RateLimiter; import org.apache.bookkeeper.client.AsyncCallback.AddCallback; import org.apache.bookkeeper.client.AsyncCallback.CloseCallback; import org.apache.bookkeeper.client.AsyncCallback.ReadCallback; @@ -58,10 +47,14 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.util.concurrent.RateLimiter; +import java.nio.ByteBuffer; +import java.security.GeneralSecurityException; +import java.util.*; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; /** * Ledger handle contains ledger metadata and is used to access the read and @@ -598,7 +591,26 @@ public void asyncAddEntry(final byte[] data, final AddCallback cb, public void asyncAddEntry(final byte[] data, final int offset, final int length, final AddCallback cb, final Object ctx) { PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx); - doAsyncAddEntry(op, data, offset, length, cb, ctx); + doAsyncAddEntry(op, ByteBuffer.wrap(data, offset, length), cb, ctx); + } + + + /** + * Add entry asynchronously to an open ledger, using a bytebuffer. + * + * @param data + * ByteBuffer to be written + * @param cb + * object implementing callbackinterface + * @param ctx + * some control object + * @throws ArrayIndexOutOfBoundsException if offset or length is negative or + * offset and length sum to a value higher than the length of data. + */ + public void asyncAddEntry(final ByteBuffer data, + final AddCallback cb, final Object ctx) { + PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx); + doAsyncAddEntry(op, data, cb, ctx); } /** @@ -613,17 +625,11 @@ public void asyncAddEntry(final byte[] data, final int offset, final int length, void asyncRecoveryAddEntry(final byte[] data, final int offset, final int length, final AddCallback cb, final Object ctx) { PendingAddOp op = new PendingAddOp(LedgerHandle.this, cb, ctx).enableRecoveryAdd(); - doAsyncAddEntry(op, data, offset, length, cb, ctx); + doAsyncAddEntry(op, ByteBuffer.wrap(data, offset, length), cb, ctx); } - private void doAsyncAddEntry(final PendingAddOp op, final byte[] data, final int offset, final int length, + private void doAsyncAddEntry(final PendingAddOp op, final ByteBuffer data, final AddCallback cb, final Object ctx) { - if (offset < 0 || length < 0 - || (offset + length) > data.length) { - throw new ArrayIndexOutOfBoundsException( - "Invalid values for offset("+offset - +") or length("+length+")"); - } throttler.acquire(); final long entryId; @@ -670,8 +676,8 @@ public String toString() { @Override public void safeRun() { ChannelBuffer toSend = macManager.computeDigestAndPackageForSending( - entryId, getLastAddConfirmed(), currentLength, data, offset, length); - op.initiate(toSend, length); + entryId, getLastAddConfirmed(), currentLength, data ); + op.initiate(toSend, data.limit()); } @Override public String toString() { diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java index 45c7f283..49e2186f 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/MacDigestManager.java @@ -18,15 +18,15 @@ * limitations under the License. */ -import java.security.GeneralSecurityException; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.crypto.Mac; import javax.crypto.spec.SecretKeySpec; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import java.nio.ByteBuffer; +import java.security.GeneralSecurityException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import static com.google.common.base.Charsets.UTF_8; @@ -78,8 +78,8 @@ byte[] getValueAndReset() { } @Override - void update(byte[] data, int offset, int length) { - mac.get().update(data, offset, length); + void update(ByteBuffer data) { + mac.get().update(data); } diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java index dc43b2c3..c9a7f285 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ClientUtil.java @@ -19,6 +19,8 @@ import org.jboss.netty.buffer.ChannelBuffer; +import java.nio.ByteBuffer; + public class ClientUtil { public static ChannelBuffer generatePacket(long ledgerId, long entryId, long lastAddConfirmed, long length, byte[] data) { @@ -29,7 +31,7 @@ public static ChannelBuffer generatePacket(long ledgerId, long entryId, long las long length, byte[] data, int offset, int len) { CRC32DigestManager dm = new CRC32DigestManager(ledgerId); return dm.computeDigestAndPackageForSending(entryId, lastAddConfirmed, length, - data, offset, len); + (ByteBuffer)(ByteBuffer.wrap(data).position( offset).limit(len))); } /** Returns that whether ledger is in open state */ diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java index 0f2ffb11..89b76ff9 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/LedgerRecoveryTest.java @@ -561,7 +561,7 @@ public void testRecoveryOnBookieHandleNotAvailable() throws Exception { length += data.length; ChannelBuffer toSend = lh.macManager.computeDigestAndPackageForSending( - entryId, lac, length, data, 0, data.length); + entryId, lac, length, ByteBuffer.wrap(data)); int bid = (int) (entryId % 5); final CountDownLatch addLatch = new CountDownLatch(1); final AtomicBoolean addSuccess = new AtomicBoolean(false); diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java index bcf7a1ee..0d313ff4 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/ParallelLedgerRecoveryTest.java @@ -350,7 +350,7 @@ public void testRecoveryOnEntryGap() throws Exception { byte[] data = "recovery-on-entry-gap-gap".getBytes(UTF_8); ChannelBuffer toSend = lh.macManager.computeDigestAndPackageForSending( - entryId, lac, lh.getLength() + 100, data, 0, data.length); + entryId, lac, lh.getLength() + 100, ByteBuffer.wrap(data)); final CountDownLatch addLatch = new CountDownLatch(1); final AtomicBoolean addSuccess = new AtomicBoolean(false); LOG.info("Add entry {} with lac = {}", entryId, lac); From c3f49cf5e320ec3a39c1f51844f8680b29a58731 Mon Sep 17 00:00:00 2001 From: Arvind Kandhare Date: Sun, 11 Sep 2016 10:59:13 -0700 Subject: [PATCH 2/2] Took care of the review comments. --- .../org/apache/bookkeeper/client/DigestManager.java | 5 ----- .../org/apache/bookkeeper/client/LedgerHandle.java | 10 +++++++++- 2 files changed, 9 insertions(+), 6 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java index 920d234c..2c7209ac 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/DigestManager.java @@ -45,11 +45,6 @@ abstract class DigestManager { abstract int getMacCodeLength(); - /*void update(ByteBuffer data) { - update(data, 0, data.limit()); - }*/ - - // abstract void update(byte[] data, int offset, int length); abstract void update(ByteBuffer data); abstract byte[] getValueAndReset(); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java index eb3908ea..36f918f1 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java @@ -49,13 +49,21 @@ import java.nio.ByteBuffer; import java.security.GeneralSecurityException; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.Set; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; + /** * Ledger handle contains ledger metadata and is used to access the read and * write operations to a ledger.