Skip to content
This repository has been archived by the owner on Oct 16, 2024. It is now read-only.

DL-45: DL should allow ByteBuffer based API and should avoid copying of arrays #21

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,18 @@
*/
package com.twitter.distributedlog;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Stopwatch;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
import com.twitter.distributedlog.exceptions.BKTransmitException;
import com.twitter.distributedlog.exceptions.EndOfStreamException;
import com.twitter.distributedlog.exceptions.FlushException;
import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
import com.twitter.distributedlog.exceptions.LockingException;
import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good, just curious, What is the purpose to move this part down? :)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nothing specific :). I think my editor just sorted all the imports.

import com.twitter.distributedlog.exceptions.TransactionIdOutOfOrderException;
import com.twitter.distributedlog.exceptions.WriteCancelledException;
import com.twitter.distributedlog.exceptions.WriteException;
import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
import com.twitter.distributedlog.feature.CoreFeatureKeys;
import com.twitter.distributedlog.injector.FailureInjector;
import com.twitter.distributedlog.injector.RandomDelayFailureInjector;
Expand Down Expand Up @@ -77,9 +68,18 @@
import scala.runtime.AbstractFunction1;
import scala.runtime.BoxedUnit;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;

import static com.google.common.base.Charsets.UTF_8;
import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORDSET_SIZE;
import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;

/**
* BookKeeper Based Log Segment Writer.
Expand Down Expand Up @@ -1081,7 +1081,7 @@ private Future<Integer> transmit()
synchronized (this) {
BKTransmitPacket packet = new BKTransmitPacket(recordSetToTransmit);
packetPrevious = packet;
entryWriter.asyncAddEntry(toSend.getData(), 0, toSend.size(),
entryWriter.asyncAddEntry(toSend.getData(),
this, packet);

if (recordSetToTransmit.hasUserRecords()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,22 @@
*/
package com.twitter.distributedlog;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;

import com.google.common.base.Preconditions;

import com.twitter.distributedlog.annotations.DistributedLogAnnotations.Compression;
import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException;
import com.twitter.distributedlog.io.CompressionCodec;
import com.twitter.distributedlog.io.CompressionUtils;
import com.twitter.distributedlog.util.BitMaskUtils;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;

import com.twitter.distributedlog.annotations.DistributedLogAnnotations.Compression;
import com.twitter.distributedlog.io.CompressionCodec;
import com.twitter.distributedlog.io.CompressionUtils;
import com.twitter.distributedlog.util.BitMaskUtils;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;

/**
* An enveloped entry written to BookKeeper.
Expand Down Expand Up @@ -112,7 +111,7 @@ public EnvelopedEntry(byte version,
*/
public EnvelopedEntry(byte version,
CompressionCodec.Type compressionType,
byte[] decompressed,
ByteBuffer decompressed,
int length,
StatsLogger statsLogger)
throws InvalidEnvelopedEntryException {
Expand Down Expand Up @@ -141,12 +140,12 @@ public void writeFully(DataOutputStream out) throws IOException {
header.write(out);
// Compress
CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
byte[] compressed = codec.compress(
ByteBuffer compressed = codec.compress(
payloadDecompressed.payload,
0,
payloadDecompressed.length,
compressionStat);
this.payloadCompressed = new Payload(compressed.length, compressed);
this.payloadCompressed = new Payload(compressed.limit(), compressed);
this.compressedEntryBytes.add(payloadCompressed.length);
this.decompressedEntryBytes.add(payloadDecompressed.length);
payloadCompressed.write(out);
Expand All @@ -165,18 +164,18 @@ public void readFully(DataInputStream in) throws IOException {
payloadCompressed.read(in);
// Decompress
CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType);
byte[] decompressed = codec.decompress(
ByteBuffer decompressed = codec.decompress(
payloadCompressed.payload,
0,
payloadCompressed.length,
header.decompressedSize,
decompressionStat);
this.payloadDecompressed = new Payload(decompressed.length, decompressed);
this.payloadDecompressed = new Payload(decompressed.limit(), decompressed);
this.compressedEntryBytes.add(payloadCompressed.length);
this.decompressedEntryBytes.add(payloadDecompressed.length);
}

public byte[] getDecompressedPayload() throws IOException {
public ByteBuffer getDecompressedPayload() throws IOException {
if (!isReady()) {
throw new IOException("Decompressed payload is not initialized");
}
Expand Down Expand Up @@ -245,7 +244,7 @@ private void read(DataInputStream in) throws IOException {

public static class Payload {
private int length = 0;
private byte[] payload = null;
private ByteBuffer payload = null;

// Whether this struct is ready for reading/writing.
private boolean ready = false;
Expand All @@ -254,21 +253,22 @@ public static class Payload {
Payload() {
}

Payload(int length, byte[] payload) {
Payload(int length, ByteBuffer payload) {
this.length = length;
this.payload = payload;
this.ready = true;
}

private void write(DataOutputStream out) throws IOException {
out.writeInt(length);
out.write(payload, 0, length);
out.write(payload.array(), 0, length);
}

private void read(DataInputStream in) throws IOException {
this.length = in.readInt();
this.payload = new byte[length];
in.readFully(payload);
this.payload = ByteBuffer.wrap(new byte[length]);
//TODO: Fix this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you want to fix here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@leighst The existing Stream based implementation in buffer.java involved an avoidable byte array copy. This change request is to fix this. We realized that this can be done in a better way. I/@jiazhai will update the pull request with the latest changes.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will sync with Arvind to get this done.

in.readFully(payload.array());
this.ready = true;
}
}
Expand All @@ -290,7 +290,8 @@ public static InputStream fromInputStream(InputStream src,
src.reset();
EnvelopedEntry entry = new EnvelopedEntry(version, statsLogger);
entry.readFully(new DataInputStream(src));
return new ByteArrayInputStream(entry.getDecompressedPayload());
return new ByteArrayInputStream(entry.getDecompressedPayload().array());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think im missing something here, but it seems like a bad idea to use direct array access since the payload object could have been initialized with a ByteBuffer from anywhere. is this safe?


}

}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import java.io.DataOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
import java.util.List;

Expand Down Expand Up @@ -162,7 +163,7 @@ public synchronized Buffer getBuffer() throws InvalidEnvelopedEntryException, IO
// We can't escape this allocation because things need to be read from one byte array
// and then written to another. This is the destination.
Buffer toSend = new Buffer(buffer.size());
byte[] decompressed = buffer.getData();
ByteBuffer decompressed = buffer.getData();
int length = buffer.size();
EnvelopedEntry entry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
codec,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.twitter.distributedlog.logsegment.LogSegmentEntryWriter;
import org.apache.bookkeeper.client.AsyncCallback;
import org.apache.bookkeeper.client.LedgerHandle;
import java.nio.ByteBuffer;

/**
* Ledger based log segment entry writer.
Expand Down Expand Up @@ -49,9 +50,9 @@ public void asyncClose(AsyncCallback.CloseCallback callback, Object ctx) {
}

@Override
public void asyncAddEntry(byte[] data, int offset, int length,
public void asyncAddEntry(ByteBuffer data,
AsyncCallback.AddCallback callback, Object ctx) {
lh.asyncAddEntry(data, offset, length, callback, ctx);
lh.asyncAddEntry(data, callback, ctx);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.twitter.distributedlog.Entry;
import com.twitter.distributedlog.util.Sizable;
import org.apache.bookkeeper.client.AsyncCallback;
import java.nio.ByteBuffer;

/**
* An interface class to write the enveloped entry (serialized bytes of
Expand Down Expand Up @@ -56,17 +57,13 @@ public interface LogSegmentEntryWriter extends Sizable {
*
* @param data
* data to add
* @param offset
* offset in the data
* @param length
* length of the data
* @param callback
* callback
* @param ctx
* ctx
* @see org.apache.bookkeeper.client.LedgerHandle#asyncAddEntry(
* byte[], int, int, AsyncCallback.AddCallback, Object)
*/
void asyncAddEntry(byte[] data, int offset, int length,
void asyncAddEntry(ByteBuffer data,
AsyncCallback.AddCallback callback, Object ctx);
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,12 @@
*/
package com.twitter.distributedlog;

import com.google.common.base.Optional;
import com.google.common.collect.Lists;
import com.twitter.distributedlog.Entry.Reader;
import com.twitter.distributedlog.Entry.Writer;
import com.twitter.distributedlog.exceptions.LogRecordTooLongException;
import com.twitter.distributedlog.io.Buffer;
import com.twitter.distributedlog.io.CompressionCodec;
import com.twitter.io.Buf;
import com.twitter.util.Await;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
Expand All @@ -38,7 +36,8 @@

import static com.google.common.base.Charsets.UTF_8;
import static com.twitter.distributedlog.LogRecord.MAX_LOGRECORD_SIZE;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;

/**
* Test Case of {@link Entry}
Expand All @@ -58,7 +57,7 @@ public void testEmptyRecordSet() throws Exception {

Buffer buffer = writer.getBuffer();
Entry recordSet = Entry.newBuilder()
.setData(buffer.getData(), 0, buffer.size())
.setData(buffer.getData().array(), 0, buffer.size())
.setLogSegmentInfo(1L, 0L)
.setEntryId(0L)
.build();
Expand Down Expand Up @@ -145,7 +144,7 @@ public void testWriteRecords() throws Exception {

// Test reading from buffer
Entry recordSet = Entry.newBuilder()
.setData(buffer.getData(), 0, buffer.size())
.setData(buffer.getData().array(), 0, buffer.size())
.setLogSegmentInfo(1L, 1L)
.setEntryId(0L)
.build();
Expand Down Expand Up @@ -278,7 +277,7 @@ void verifyReadResult(Buffer data,
DLSN expectedDLSN,
long expectedTxId) throws Exception {
Entry recordSet = Entry.newBuilder()
.setData(data.getData(), 0, data.size())
.setData(data.getData().array(), 0, data.size())
.setLogSegmentInfo(lssn, startSequenceId)
.setEntryId(entryId)
.deserializeRecordSet(deserializeRecordSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@
*/
package com.twitter.distributedlog;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;

import com.twitter.distributedlog.io.Buffer;
import com.twitter.distributedlog.io.CompressionCodec;
import org.apache.bookkeeper.stats.NullStatsLogger;
Expand All @@ -29,6 +25,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.nio.ByteBuffer;

public class TestEnvelopedEntry {

static final Logger LOG = LoggerFactory.getLogger(TestEnvelopedEntry.class);
Expand All @@ -49,33 +50,33 @@ public void testEnvelope() throws Exception {
byte[] data = getString(false).getBytes();
EnvelopedEntry writeEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
CompressionCodec.Type.NONE,
data,
ByteBuffer.wrap(data),
data.length,
new NullStatsLogger());
Buffer outBuf = new Buffer(2 * data.length);
writeEntry.writeFully(new DataOutputStream(outBuf));
EnvelopedEntry readEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
new NullStatsLogger());
readEntry.readFully(new DataInputStream(new ByteArrayInputStream(outBuf.getData())));
byte[] newData = readEntry.getDecompressedPayload();
Assert.assertEquals("Written data should equal read data", new String(data), new String(newData));
readEntry.readFully(new DataInputStream(new ByteArrayInputStream(outBuf.getData().array())));
ByteBuffer newData = readEntry.getDecompressedPayload();
Assert.assertEquals("Written data should equal read data", new String(data), new String(newData.array()));
}

@Test(timeout = 20000)
public void testLZ4Compression() throws Exception {
byte[] data = getString(true).getBytes();
EnvelopedEntry writeEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
CompressionCodec.Type.LZ4,
data,
ByteBuffer.wrap(data),
data.length,
new NullStatsLogger());
Buffer outBuf = new Buffer(data.length);
writeEntry.writeFully(new DataOutputStream(outBuf));
Assert.assertTrue(data.length > outBuf.size());
EnvelopedEntry readEntry = new EnvelopedEntry(EnvelopedEntry.CURRENT_VERSION,
new NullStatsLogger());
readEntry.readFully(new DataInputStream(new ByteArrayInputStream(outBuf.getData())));
byte[] newData = readEntry.getDecompressedPayload();
Assert.assertEquals("Written data should equal read data", new String(data), new String(newData));
readEntry.readFully(new DataInputStream(new ByteArrayInputStream(outBuf.getData().array())));
ByteBuffer newData = readEntry.getDecompressedPayload();
Assert.assertEquals("Written data should equal read data", new String(data), new String(newData.array()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ class EnvelopedRecordSetReader implements LogRecordSet.Reader {

if (COMPRESSION_CODEC_LZ4 == codecCode) {
CompressionCodec codec = CompressionUtils.getCompressionCodec(CompressionCodec.Type.LZ4);
byte[] decompressedData = codec.decompress(compressedData, 0, actualDataLen,
ByteBuffer decompressedData = codec.decompress(ByteBuffer.wrap(compressedData), 0, actualDataLen,
originDataLen, NullOpStatsLogger);
this.reader = ByteBuffer.wrap(decompressedData);
this.reader = decompressedData;
} else {
if (originDataLen != actualDataLen) {
throw new IOException("Inconsistent data length found for a non-compressed record set : original = "
Expand Down
Loading