diff --git a/src/main/java/org/logstash/beats/V2Batch.java b/src/main/java/org/logstash/beats/V2Batch.java index 84c529fa..a83681f8 100644 --- a/src/main/java/org/logstash/beats/V2Batch.java +++ b/src/main/java/org/logstash/beats/V2Batch.java @@ -1,17 +1,18 @@ package org.logstash.beats; +import java.io.Closeable; +import java.util.Iterator; +import java.util.NoSuchElementException; + import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; -import java.util.Iterator; - /** * Implementation of {@link Batch} for the v2 protocol backed by ByteBuf. *must* be released after use. */ -public class V2Batch implements Batch { +public class V2Batch implements Batch, Closeable { private ByteBuf internalBuffer = PooledByteBufAllocator.DEFAULT.buffer(); private int written = 0; - private int read = 0; private static final int SIZE_OF_INT = 4; private int batchSize; private int highestSequence = -1; @@ -27,9 +28,13 @@ public byte getProtocol() { return Protocol.VERSION_2; } - public Iterator iterator(){ - internalBuffer.resetReaderIndex(); + public Iterator iterator() { return new Iterator() { + private int read = 0; + private ByteBuf readerBuffer = internalBuffer.asReadOnly(); + { + readerBuffer.resetReaderIndex(); + } @Override public boolean hasNext() { return read < written; @@ -37,10 +42,13 @@ public boolean hasNext() { @Override public Message next() { - int sequenceNumber = internalBuffer.readInt(); - int readableBytes = internalBuffer.readInt(); - Message message = new Message(sequenceNumber, internalBuffer.slice(internalBuffer.readerIndex(), readableBytes)); - internalBuffer.readerIndex(internalBuffer.readerIndex() + readableBytes); + if (read >= written) { + throw new NoSuchElementException(); + } + int sequenceNumber = readerBuffer.readInt(); + int readableBytes = readerBuffer.readInt(); + Message message = new Message(sequenceNumber, readerBuffer.slice(readerBuffer.readerIndex(), readableBytes)); + readerBuffer.readerIndex(readerBuffer.readerIndex() + readableBytes); message.setBatch(V2Batch.this); read++; return message; @@ -101,4 +109,10 @@ void addMessage(int sequenceNumber, ByteBuf buffer, int size) { public void release() { internalBuffer.release(); } + + @Override + public void close() { + release(); + } + } diff --git a/src/test/java/org/logstash/beats/V2BatchTest.java b/src/test/java/org/logstash/beats/V2BatchTest.java index 9de6c629..de7e53cb 100644 --- a/src/test/java/org/logstash/beats/V2BatchTest.java +++ b/src/test/java/org/logstash/beats/V2BatchTest.java @@ -20,48 +20,49 @@ public class V2BatchTest { @Test public void testIsEmpty() { - V2Batch batch = new V2Batch(); - assertTrue(batch.isEmpty()); - ByteBuf content = messageContents(); - batch.addMessage(1, content, content.readableBytes()); - assertFalse(batch.isEmpty()); + try (V2Batch batch = new V2Batch()){ + assertTrue(batch.isEmpty()); + ByteBuf content = messageContents(); + batch.addMessage(1, content, content.readableBytes()); + assertFalse(batch.isEmpty()); + } } @Test public void testSize() { - V2Batch batch = new V2Batch(); - assertEquals(0, batch.size()); - ByteBuf content = messageContents(); - batch.addMessage(1, content, content.readableBytes()); - assertEquals(1, batch.size()); + try (V2Batch batch = new V2Batch()) { + assertEquals(0, batch.size()); + ByteBuf content = messageContents(); + batch.addMessage(1, content, content.readableBytes()); + assertEquals(1, batch.size()); + } } @Test - public void TestGetProtocol() { - assertEquals(Protocol.VERSION_2, new V2Batch().getProtocol()); + public void testGetProtocol() { + try (V2Batch batch = new V2Batch()) { + assertEquals(Protocol.VERSION_2, batch.getProtocol()); + } } @Test - public void TestCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() { - V2Batch batch = new V2Batch(); - int numberOfEvent = 2; - - batch.setBatchSize(numberOfEvent); - - for(int i = 1; i <= numberOfEvent; i++) { - ByteBuf content = messageContents(); - batch.addMessage(i, content, content.readableBytes()); + public void testCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() { + try (V2Batch batch = new V2Batch()) { + int numberOfEvent = 2; + batch.setBatchSize(numberOfEvent); + for (int i = 1; i <= numberOfEvent; i++) { + ByteBuf content = messageContents(); + batch.addMessage(i, content, content.readableBytes()); + } + assertTrue(batch.isComplete()); } - - assertTrue(batch.isComplete()); } @Test public void testBigBatch() { - V2Batch batch = new V2Batch(); - int size = 4096; - assertEquals(0, batch.size()); - try { + try (V2Batch batch = new V2Batch()) { + int size = 4096; + assertEquals(0, batch.size()); ByteBuf content = messageContents(); for (int i = 0; i < size; i++) { batch.addMessage(i, content, content.readableBytes()); @@ -71,8 +72,6 @@ public void testBigBatch() { for (Message message : batch) { assertEquals(message.getSequence(), i++); } - }finally { - batch.release(); } } @@ -91,17 +90,15 @@ public void testHighSequence(){ assertEquals(startSequenceNumber + numberOfEvent, batch.getHighestSequence()); } - @Test - public void TestCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() { - V2Batch batch = new V2Batch(); - int numberOfEvent = 2; - - batch.setBatchSize(numberOfEvent); - ByteBuf content = messageContents(); - batch.addMessage(1, content, content.readableBytes()); - - assertFalse(batch.isComplete()); + public void testCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() { + try (V2Batch batch = new V2Batch()) { + int numberOfEvent = 2; + batch.setBatchSize(numberOfEvent); + ByteBuf content = messageContents(); + batch.addMessage(1, content, content.readableBytes()); + assertFalse(batch.isComplete()); + } } public static ByteBuf messageContents() { @@ -114,4 +111,4 @@ public static ByteBuf messageContents() { throw new RuntimeException(e); } } -} \ No newline at end of file +}