Skip to content

Commit

Permalink
Improved V2Batch: iterators don't change V2Batch internal state any m…
Browse files Browse the repository at this point in the history
…ore. It now implement Closable, for auto-release.
  • Loading branch information
fbacchella committed Jul 17, 2019
1 parent c9f5488 commit 46318dc
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 50 deletions.
34 changes: 24 additions & 10 deletions src/main/java/org/logstash/beats/V2Batch.java
Original file line number Diff line number Diff line change
@@ -1,17 +1,18 @@
package org.logstash.beats;

import java.io.Closeable;
import java.util.Iterator;
import java.util.NoSuchElementException;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;

import java.util.Iterator;

/**
* Implementation of {@link Batch} for the v2 protocol backed by ByteBuf. *must* be released after use.
*/
public class V2Batch implements Batch {
public class V2Batch implements Batch, Closeable {
private ByteBuf internalBuffer = PooledByteBufAllocator.DEFAULT.buffer();
private int written = 0;
private int read = 0;
private static final int SIZE_OF_INT = 4;
private int batchSize;
private int highestSequence = -1;
Expand All @@ -27,20 +28,27 @@ public byte getProtocol() {
return Protocol.VERSION_2;
}

public Iterator<Message> iterator(){
internalBuffer.resetReaderIndex();
public Iterator<Message> iterator() {
return new Iterator<Message>() {
private int read = 0;
private ByteBuf readerBuffer = internalBuffer.asReadOnly();
{
readerBuffer.resetReaderIndex();
}
@Override
public boolean hasNext() {
return read < written;
}

@Override
public Message next() {
int sequenceNumber = internalBuffer.readInt();
int readableBytes = internalBuffer.readInt();
Message message = new Message(sequenceNumber, internalBuffer.slice(internalBuffer.readerIndex(), readableBytes));
internalBuffer.readerIndex(internalBuffer.readerIndex() + readableBytes);
if (read >= written) {
throw new NoSuchElementException();
}
int sequenceNumber = readerBuffer.readInt();
int readableBytes = readerBuffer.readInt();
Message message = new Message(sequenceNumber, readerBuffer.slice(readerBuffer.readerIndex(), readableBytes));
readerBuffer.readerIndex(readerBuffer.readerIndex() + readableBytes);
message.setBatch(V2Batch.this);
read++;
return message;
Expand Down Expand Up @@ -101,4 +109,10 @@ void addMessage(int sequenceNumber, ByteBuf buffer, int size) {
public void release() {
internalBuffer.release();
}

@Override
public void close() {
release();
}

}
77 changes: 37 additions & 40 deletions src/test/java/org/logstash/beats/V2BatchTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,48 +20,49 @@ public class V2BatchTest {

@Test
public void testIsEmpty() {
V2Batch batch = new V2Batch();
assertTrue(batch.isEmpty());
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertFalse(batch.isEmpty());
try (V2Batch batch = new V2Batch()){
assertTrue(batch.isEmpty());
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertFalse(batch.isEmpty());
}
}

@Test
public void testSize() {
V2Batch batch = new V2Batch();
assertEquals(0, batch.size());
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertEquals(1, batch.size());
try (V2Batch batch = new V2Batch()) {
assertEquals(0, batch.size());
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertEquals(1, batch.size());
}
}

@Test
public void TestGetProtocol() {
assertEquals(Protocol.VERSION_2, new V2Batch().getProtocol());
public void testGetProtocol() {
try (V2Batch batch = new V2Batch()) {
assertEquals(Protocol.VERSION_2, batch.getProtocol());
}
}

@Test
public void TestCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() {
V2Batch batch = new V2Batch();
int numberOfEvent = 2;

batch.setBatchSize(numberOfEvent);

for(int i = 1; i <= numberOfEvent; i++) {
ByteBuf content = messageContents();
batch.addMessage(i, content, content.readableBytes());
public void testCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() {
try (V2Batch batch = new V2Batch()) {
int numberOfEvent = 2;
batch.setBatchSize(numberOfEvent);
for (int i = 1; i <= numberOfEvent; i++) {
ByteBuf content = messageContents();
batch.addMessage(i, content, content.readableBytes());
}
assertTrue(batch.isComplete());
}

assertTrue(batch.isComplete());
}

@Test
public void testBigBatch() {
V2Batch batch = new V2Batch();
int size = 4096;
assertEquals(0, batch.size());
try {
try (V2Batch batch = new V2Batch()) {
int size = 4096;
assertEquals(0, batch.size());
ByteBuf content = messageContents();
for (int i = 0; i < size; i++) {
batch.addMessage(i, content, content.readableBytes());
Expand All @@ -71,8 +72,6 @@ public void testBigBatch() {
for (Message message : batch) {
assertEquals(message.getSequence(), i++);
}
}finally {
batch.release();
}
}

Expand All @@ -91,17 +90,15 @@ public void testHighSequence(){
assertEquals(startSequenceNumber + numberOfEvent, batch.getHighestSequence());
}


@Test
public void TestCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() {
V2Batch batch = new V2Batch();
int numberOfEvent = 2;

batch.setBatchSize(numberOfEvent);
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());

assertFalse(batch.isComplete());
public void testCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() {
try (V2Batch batch = new V2Batch()) {
int numberOfEvent = 2;
batch.setBatchSize(numberOfEvent);
ByteBuf content = messageContents();
batch.addMessage(1, content, content.readableBytes());
assertFalse(batch.isComplete());
}
}

public static ByteBuf messageContents() {
Expand All @@ -114,4 +111,4 @@ public static ByteBuf messageContents() {
throw new RuntimeException(e);
}
}
}
}

0 comments on commit 46318dc

Please sign in to comment.