-
Notifications
You must be signed in to change notification settings - Fork 74
DL-45: DL should allow ByteBuffer based API and should avoid copying of arrays #21
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,23 +17,22 @@ | |
*/ | ||
package com.twitter.distributedlog; | ||
|
||
import java.io.ByteArrayInputStream; | ||
import java.io.DataInputStream; | ||
import java.io.DataOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
|
||
import com.google.common.base.Preconditions; | ||
|
||
import com.twitter.distributedlog.annotations.DistributedLogAnnotations.Compression; | ||
import com.twitter.distributedlog.exceptions.InvalidEnvelopedEntryException; | ||
import com.twitter.distributedlog.io.CompressionCodec; | ||
import com.twitter.distributedlog.io.CompressionUtils; | ||
import com.twitter.distributedlog.util.BitMaskUtils; | ||
import org.apache.bookkeeper.stats.Counter; | ||
import org.apache.bookkeeper.stats.OpStatsLogger; | ||
import org.apache.bookkeeper.stats.StatsLogger; | ||
|
||
import com.twitter.distributedlog.annotations.DistributedLogAnnotations.Compression; | ||
import com.twitter.distributedlog.io.CompressionCodec; | ||
import com.twitter.distributedlog.io.CompressionUtils; | ||
import com.twitter.distributedlog.util.BitMaskUtils; | ||
import java.io.ByteArrayInputStream; | ||
import java.io.DataInputStream; | ||
import java.io.DataOutputStream; | ||
import java.io.IOException; | ||
import java.io.InputStream; | ||
import java.nio.ByteBuffer; | ||
|
||
/** | ||
* An enveloped entry written to BookKeeper. | ||
|
@@ -112,7 +111,7 @@ public EnvelopedEntry(byte version, | |
*/ | ||
public EnvelopedEntry(byte version, | ||
CompressionCodec.Type compressionType, | ||
byte[] decompressed, | ||
ByteBuffer decompressed, | ||
int length, | ||
StatsLogger statsLogger) | ||
throws InvalidEnvelopedEntryException { | ||
|
@@ -141,12 +140,12 @@ public void writeFully(DataOutputStream out) throws IOException { | |
header.write(out); | ||
// Compress | ||
CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType); | ||
byte[] compressed = codec.compress( | ||
ByteBuffer compressed = codec.compress( | ||
payloadDecompressed.payload, | ||
0, | ||
payloadDecompressed.length, | ||
compressionStat); | ||
this.payloadCompressed = new Payload(compressed.length, compressed); | ||
this.payloadCompressed = new Payload(compressed.limit(), compressed); | ||
this.compressedEntryBytes.add(payloadCompressed.length); | ||
this.decompressedEntryBytes.add(payloadDecompressed.length); | ||
payloadCompressed.write(out); | ||
|
@@ -165,18 +164,18 @@ public void readFully(DataInputStream in) throws IOException { | |
payloadCompressed.read(in); | ||
// Decompress | ||
CompressionCodec codec = CompressionUtils.getCompressionCodec(header.compressionType); | ||
byte[] decompressed = codec.decompress( | ||
ByteBuffer decompressed = codec.decompress( | ||
payloadCompressed.payload, | ||
0, | ||
payloadCompressed.length, | ||
header.decompressedSize, | ||
decompressionStat); | ||
this.payloadDecompressed = new Payload(decompressed.length, decompressed); | ||
this.payloadDecompressed = new Payload(decompressed.limit(), decompressed); | ||
this.compressedEntryBytes.add(payloadCompressed.length); | ||
this.decompressedEntryBytes.add(payloadDecompressed.length); | ||
} | ||
|
||
public byte[] getDecompressedPayload() throws IOException { | ||
public ByteBuffer getDecompressedPayload() throws IOException { | ||
if (!isReady()) { | ||
throw new IOException("Decompressed payload is not initialized"); | ||
} | ||
|
@@ -245,7 +244,7 @@ private void read(DataInputStream in) throws IOException { | |
|
||
public static class Payload { | ||
private int length = 0; | ||
private byte[] payload = null; | ||
private ByteBuffer payload = null; | ||
|
||
// Whether this struct is ready for reading/writing. | ||
private boolean ready = false; | ||
|
@@ -254,21 +253,22 @@ public static class Payload { | |
Payload() { | ||
} | ||
|
||
Payload(int length, byte[] payload) { | ||
Payload(int length, ByteBuffer payload) { | ||
this.length = length; | ||
this.payload = payload; | ||
this.ready = true; | ||
} | ||
|
||
private void write(DataOutputStream out) throws IOException { | ||
out.writeInt(length); | ||
out.write(payload, 0, length); | ||
out.write(payload.array(), 0, length); | ||
} | ||
|
||
private void read(DataInputStream in) throws IOException { | ||
this.length = in.readInt(); | ||
this.payload = new byte[length]; | ||
in.readFully(payload); | ||
this.payload = ByteBuffer.wrap(new byte[length]); | ||
//TODO: Fix this | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. what do you want to fix here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. will sync with Arvind to get this done. |
||
in.readFully(payload.array()); | ||
this.ready = true; | ||
} | ||
} | ||
|
@@ -290,7 +290,8 @@ public static InputStream fromInputStream(InputStream src, | |
src.reset(); | ||
EnvelopedEntry entry = new EnvelopedEntry(version, statsLogger); | ||
entry.readFully(new DataInputStream(src)); | ||
return new ByteArrayInputStream(entry.getDecompressedPayload()); | ||
return new ByteArrayInputStream(entry.getDecompressedPayload().array()); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. i think im missing something here, but it seems like a bad idea to use direct array access since the payload object could have been initialized with a ByteBuffer from anywhere. is this safe? |
||
|
||
} | ||
|
||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This looks good, just curious, What is the purpose to move this part down? :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nothing specific :). I think my editor just sorted all the imports.