diff --git a/lib/logstash/inputs/beats.rb b/lib/logstash/inputs/beats.rb index f0e6a19f..acaeb9da 100644 --- a/lib/logstash/inputs/beats.rb +++ b/lib/logstash/inputs/beats.rb @@ -181,7 +181,6 @@ def create_server server.enableSSL(ssl_builder) end - server end diff --git a/lib/logstash/inputs/beats/message_listener.rb b/lib/logstash/inputs/beats/message_listener.rb index 5cddc4f7..ba1e7763 100644 --- a/lib/logstash/inputs/beats/message_listener.rb +++ b/lib/logstash/inputs/beats/message_listener.rb @@ -27,29 +27,22 @@ def initialize(queue, input) end def onNewMessage(ctx, message) - begin - hash = message.getData - ip_address = ip_address(ctx) + hash = message.getData + ip_address = ip_address(ctx) - hash['@metadata']['ip_address'] = ip_address unless ip_address.nil? || hash['@metadata'].nil? - target_field = extract_target_field(hash) + hash['@metadata']['ip_address'] = ip_address unless ip_address.nil? || hash['@metadata'].nil? + target_field = extract_target_field(hash) - if target_field.nil? - event = LogStash::Event.new(hash) - @nocodec_transformer.transform(event) - @queue << event - else - codec(ctx).accept(CodecCallbackListener.new(target_field, - hash, - message.getIdentityStream(), - @codec_transformer, - @queue)) - end - rescue => e - logger.warn("Error handling message #{message}", e) - raise e - ensure - message.release + if target_field.nil? + event = LogStash::Event.new(hash) + @nocodec_transformer.transform(event) + @queue << event + else + codec(ctx).accept(CodecCallbackListener.new(target_field, + hash, + message.getIdentityStream(), + @codec_transformer, + @queue)) end end diff --git a/spec/integration/logstash_forwarder_spec.rb b/spec/integration/logstash_forwarder_spec.rb index 79d3e8fa..fb40c188 100644 --- a/spec/integration/logstash_forwarder_spec.rb +++ b/spec/integration/logstash_forwarder_spec.rb @@ -55,7 +55,6 @@ f.write(events.join("\n") + "\n") end sleep(1) # give some time to the clients to pick up the changes - stop_client end after :each do diff --git a/src/main/java/org/logstash/beats/Batch.java b/src/main/java/org/logstash/beats/Batch.java index 270f1d70..741124b5 100644 --- a/src/main/java/org/logstash/beats/Batch.java +++ b/src/main/java/org/logstash/beats/Batch.java @@ -1,47 +1,48 @@ package org.logstash.beats; -import java.util.ArrayList; -import java.util.List; - -public class Batch { - private byte protocol = Protocol.VERSION_2; - private int batchSize; - private List messages = new ArrayList(); - - public List getMessages() { - return messages; - } - - public void addMessage(Message message) { - message.setBatch(this); - messages.add(message); - } - - public int size() { - return messages.size(); - } - - public void setBatchSize(int size) { - batchSize = size; - } - - public int getBatchSize() { - return batchSize; - } - - public boolean isEmpty() { - return 0 == messages.size(); - } - - public boolean complete() { - return size() == getBatchSize(); - } - - public byte getProtocol() { - return protocol; - } - - public void setProtocol(byte protocol) { - this.protocol = protocol; - } -} \ No newline at end of file +/** + * Interface representing a Batch of {@link Message}. + */ +public interface Batch extends Iterable{ + /** + * Returns the protocol of the sent messages that this batch was constructed from + * @return byte - either '1' or '2' + */ + byte getProtocol(); + + /** + * Number of messages that the batch is expected to contain. + * @return int - number of messages + */ + int getBatchSize(); + + /** + * Set the number of messages that the batch is expected to contain. + * @param batchSize int - number of messages + */ + void setBatchSize(int batchSize); + + /** + * Current number of messages in the batch + * @return int + */ + int size(); + + /** + * Is the batch currently empty? + * @return boolean + */ + boolean isEmpty(); + + /** + * Is the batch complete? + * @return boolean + */ + boolean isComplete(); + + /** + * Release the resources associated with the batch. Consumers of the batch *must* release + * after use. + */ + void release(); +} diff --git a/src/main/java/org/logstash/beats/BeatsHandler.java b/src/main/java/org/logstash/beats/BeatsHandler.java index e483b546..97dde3de 100644 --- a/src/main/java/org/logstash/beats/BeatsHandler.java +++ b/src/main/java/org/logstash/beats/BeatsHandler.java @@ -9,7 +9,7 @@ import java.net.InetSocketAddress; import javax.net.ssl.SSLHandshakeException; -public class BeatsHandler extends SimpleChannelInboundHandler { +public class BeatsHandler extends SimpleChannelInboundHandler { private final static Logger logger = LogManager.getLogger(BeatsHandler.class); public static AttributeKey PROCESSING_BATCH = AttributeKey.valueOf("processing-batch"); private final IMessageListener messageListener; @@ -45,50 +45,28 @@ public void channelInactive(ChannelHandlerContext ctx) throws Exception { @Override public void channelRead0(ChannelHandlerContext ctx, Batch batch) throws Exception { - if(logger.isTraceEnabled()) { - logger.trace(format("Received a new payload")); + if(logger.isDebugEnabled()) { + logger.debug(format("Received a new payload")); } ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).set(true); - for(Message message : batch.getMessages()) { - if(logger.isTraceEnabled()) { - logger.trace(format("Sending a new message for the listener, sequence: " + message.getSequence())); - } - messageListener.onNewMessage(ctx, message); - if(needAck(message)) { - ack(ctx, message); + try { + for (Message message : batch) { + if (logger.isDebugEnabled()) { + logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence())); + } + messageListener.onNewMessage(ctx, message); + + if (needAck(message)) { + ack(ctx, message); + } } + }finally{ + batch.release(); + ctx.flush(); + ctx.channel().attr(PROCESSING_BATCH).set(false); } - ctx.flush(); - ctx.channel().attr(PROCESSING_BATCH).set(false); - } - - @Override - public void channelRead0(ChannelHandlerContext ctx, NewBatch batch) throws Exception { - if(logger.isDebugEnabled()) { - logger.debug(format("Received a new payload")); - } - - ctx.channel().attr(PROCESSING_BATCH).set(true); - batch.getMessageStream().forEach(e -> { - messageListener.onNewMessage(ctx, e); - if (needAck(e)){ - ack(ctx, e); - } - }); -// for(Message message : batch.getMessages()) { -// if(logger.isDebugEnabled()) { -// logger.debug(format("Sending a new message for the listener, sequence: " + message.getSequence())); -// } -// messageListener.onNewMessage(ctx, message); -// -// if(needAck(message)) { -// ack(ctx, message); -// } -// } - ctx.flush(); - ctx.channel().attr(PROCESSING_BATCH).set(false); } /* @@ -120,11 +98,10 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } private boolean needAck(Message message) { - return message.getSequence() == message.getNewBatch().getBatchSize(); + return message.getSequence() == message.getBatch().getBatchSize(); } private void ack(ChannelHandlerContext ctx, Message message) { - logger.warn(format("Acking message number " + message.getSequence())); if (logger.isTraceEnabled()){ logger.trace(format("Acking message number " + message.getSequence())); } @@ -155,7 +132,7 @@ private String format(String message) { remotehost = remote.getAddress().getHostAddress() + ":" + remote.getPort(); } else{ remotehost = "undefined"; - }; + } return "[local: " + localhost + ", remote: " + remotehost + "] " + message; } diff --git a/src/main/java/org/logstash/beats/BeatsParser.java b/src/main/java/org/logstash/beats/BeatsParser.java index 2d1ebbb2..3e4b6bfe 100644 --- a/src/main/java/org/logstash/beats/BeatsParser.java +++ b/src/main/java/org/logstash/beats/BeatsParser.java @@ -1,8 +1,6 @@ package org.logstash.beats; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufOutputStream; import io.netty.channel.ChannelHandlerContext; @@ -20,8 +18,6 @@ public class BeatsParser extends ByteToMessageDecoder { - private static final int CHUNK_SIZE = 1024; - public final static ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); private final static Logger logger = LogManager.getLogger(BeatsParser.class); private Batch batch; @@ -56,19 +52,17 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t switch (currentState) { case READ_HEADER: { - if (batch == null){ - batch = new Batch(); - } logger.trace("Running: READ_HEADER"); byte currentVersion = in.readByte(); - - if(Protocol.isVersion2(currentVersion)) { - logger.trace("Frame version 2 detected"); - batch.setProtocol(Protocol.VERSION_2); - } else { - logger.trace("Frame version 1 detected"); - batch.setProtocol(Protocol.VERSION_1); + if (batch == null) { + if (Protocol.isVersion2(currentVersion)) { + batch = new V2Batch(); + logger.trace("Frame version 2 detected"); + } else { + logger.trace("Frame version 1 detected"); + batch = new V1Batch(); + } } transition(States.READ_FRAME_TYPE); break; @@ -145,15 +139,13 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t count++; } - Message message = new Message(sequence, dataMap); - batch.addMessage(message); + ((V1Batch)batch).addMessage(message); - if(batch.complete()) { + if (batch.isComplete()){ out.add(batch); batchComplete(); } - transition(States.READ_HEADER); break; @@ -184,9 +176,9 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t ByteBuf buffer = ctx.alloc().buffer(requiredBytes); try ( ByteBufOutputStream buffOutput = new ByteBufOutputStream(buffer); - InflaterOutputStream inflater = new InflaterOutputStream(buffOutput, new Inflater()); + InflaterOutputStream inflater = new InflaterOutputStream(buffOutput, new Inflater()) ) { - ByteBuf bytesRead = in.readBytes(inflater, requiredBytes); + in.readBytes(inflater, requiredBytes); transition(States.READ_HEADER); try { while (buffer.readableBytes() > 0) { @@ -201,13 +193,11 @@ protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) t } case READ_JSON: { logger.trace("Running: READ_JSON"); - - batch.addMessage(new Message(sequence, in.readBytes(requiredBytes))); - if(batch.size() == batch.getBatchSize()) { + ((V2Batch)batch).addMessage(sequence, in, requiredBytes); + if(batch.isComplete()) { if(logger.isTraceEnabled()) { logger.trace("Sending batch size: " + this.batch.size() + ", windowSize: " + batch.getBatchSize() + " , seq: " + sequence); } - out.add(batch); batchComplete(); } @@ -238,7 +228,6 @@ private void batchComplete() { requiredBytes = 0; sequence = 0; batch = null; - logger.warn("batch compete"); } public class InvalidFrameProtocolException extends Exception { diff --git a/src/main/java/org/logstash/beats/KeepAliveHandler.java b/src/main/java/org/logstash/beats/KeepAliveHandler.java index a2dae7e1..cefe5588 100644 --- a/src/main/java/org/logstash/beats/KeepAliveHandler.java +++ b/src/main/java/org/logstash/beats/KeepAliveHandler.java @@ -14,14 +14,13 @@ public class KeepAliveHandler extends ChannelDuplexHandler { @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { - IdleStateEvent e = null; + IdleStateEvent e; if (evt instanceof IdleStateEvent) { e = (IdleStateEvent) evt; if (e.state() == IdleState.WRITER_IDLE) { if (isProcessing(ctx)) { ChannelFuture f = ctx.writeAndFlush(new Ack(Protocol.VERSION_2, 0)); - logger.warn("sending keep alive ack to libbeat"); if (logger.isTraceEnabled()) { logger.trace("sending keep alive ack to libbeat"); f.addListener((ChannelFutureListener) future -> { @@ -51,7 +50,7 @@ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exc } } - public boolean isProcessing(ChannelHandlerContext ctx) { + public boolean isProcessing(ChannelHandlerContext ctx) { return ctx.channel().hasAttr(BeatsHandler.PROCESSING_BATCH) && ctx.channel().attr(BeatsHandler.PROCESSING_BATCH).get(); } } \ No newline at end of file diff --git a/src/main/java/org/logstash/beats/Message.java b/src/main/java/org/logstash/beats/Message.java index 37e1d9a6..b43a1c28 100644 --- a/src/main/java/org/logstash/beats/Message.java +++ b/src/main/java/org/logstash/beats/Message.java @@ -4,11 +4,9 @@ import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufInputStream; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; import java.io.IOException; -import java.util.HashMap; +import java.io.InputStream; import java.util.Map; public class Message implements Comparable { @@ -17,67 +15,66 @@ public class Message implements Comparable { private Map data; private Batch batch; private ByteBuf buffer; - private Logger logger = LogManager.getLogger(Message.class); public final static ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); + /** + * Create a message using a map of key, value pairs + * @param sequence sequence number of the message + * @param map key/value pairs representing the message + */ public Message(int sequence, Map map) { this.sequence = sequence; this.data = map; } + /** + * Create a message using a ByteBuf holding a Json object. + * Note that this ctr is *lazy* - it will not deserialize the Json object until it is needed. + * @param sequence sequence number of the message + * @param buffer {@link ByteBuf} buffer containing Json object + */ public Message(int sequence, ByteBuf buffer){ this.sequence = sequence; this.buffer = buffer; } + /** + * Returns the sequence number of this messsage + * @return + */ public int getSequence() { return sequence; } - public boolean release() { - if (buffer != null){ - return buffer.release(); - } - return true; - } - + /** + * Returns a list of key/value pairs representing the contents of the message. + * Note that this method is lazy if the Message was created using a {@link ByteBuf} + * @return {@link Map} Map of key/value pairs + */ public Map getData(){ if (data == null && buffer != null){ try (ByteBufInputStream byteBufInputStream = new ByteBufInputStream(buffer)){ - data = (Map)MAPPER.readValue(byteBufInputStream, Object.class); + data = MAPPER.readValue((InputStream)byteBufInputStream, Map.class); + buffer = null; } catch (IOException e){ throw new RuntimeException("Unable to parse beats payload ", e); } -// finally{ -// release(); -// } } - return data; } - private NewBatch newBatch; - @Override public int compareTo(Message o) { return Integer.compare(getSequence(), o.getSequence()); } - public Batch getBatch() { + public Batch getBatch(){ return batch; } - public void setBatch(Batch newBatch) { - batch = newBatch; - } - - public NewBatch getNewBatch() { - return newBatch; - } - - public void setNewBatch(NewBatch newnewBatch){ - this.newBatch = newnewBatch; + public void setBatch(Batch batch){ + this.batch = batch; } @@ -89,7 +86,7 @@ public String getIdentityStream() { } private String extractIdentityStream() { - Map beatsData = (HashMap) this.getData().get("beat"); + Map beatsData = (Map)this.getData().get("beat"); if(beatsData != null) { String id = (String) beatsData.get("id"); diff --git a/src/main/java/org/logstash/beats/Server.java b/src/main/java/org/logstash/beats/Server.java index aa2a21b6..df17c814 100644 --- a/src/main/java/org/logstash/beats/Server.java +++ b/src/main/java/org/logstash/beats/Server.java @@ -20,8 +20,6 @@ public class Server { private final static Logger logger = LogManager.getLogger(Server.class); - private static final int DEFAULT_CLIENT_TIMEOUT_SECONDS = 15; - private final int port; private final NioEventLoopGroup workGroup; private final String host; @@ -33,13 +31,12 @@ public class Server { private final int clientInactivityTimeoutSeconds; public Server(String host, int p, int timeout, int threadCount) { - this.host = host; port = p; clientInactivityTimeoutSeconds = timeout; beatsHeandlerThreadCount = threadCount; workGroup = new NioEventLoopGroup(); - } + } public void enableSSL(SslSimpleBuilder builder) { sslBuilder = builder; @@ -47,7 +44,7 @@ public void enableSSL(SslSimpleBuilder builder) { public Server listen() throws InterruptedException { try { - logger.info("Starting server on port: " + this.port); + logger.info("Starting server on port: " + this.port); beatsInitializer = new BeatsInitializer(isSslEnable(), messageListener, clientInactivityTimeoutSeconds, beatsHeandlerThreadCount); @@ -93,8 +90,6 @@ private class BeatsInitializer extends ChannelInitializer { private final String SSL_HANDLER = "ssl-handler"; private final String IDLESTATE_HANDLER = "idlestate-handler"; private final String KEEP_ALIVE_HANDLER = "keep-alive-handler"; - private final String BEATS_PARSER = "beats-parser"; - private final String BEATS_HANDLER = "beats-handler"; private final String BEATS_ACKER = "beats-acker"; diff --git a/src/main/java/org/logstash/beats/V1Batch.java b/src/main/java/org/logstash/beats/V1Batch.java new file mode 100644 index 00000000..3a43bd42 --- /dev/null +++ b/src/main/java/org/logstash/beats/V1Batch.java @@ -0,0 +1,69 @@ +package org.logstash.beats; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; + +/** + * Implementation of {@link Batch} intended for batches constructed from v1 protocol + * + */ +public class V1Batch implements Batch{ + + private int batchSize; + private List messages = new ArrayList<>(); + private byte protocol = Protocol.VERSION_1; + + @Override + public byte getProtocol() { + return protocol; + } + + public void setProtocol(byte protocol){ + this.protocol = protocol; + } + + /** + * Add Message to the batch + * @param message Message to add to the batch + */ + void addMessage(Message message){ + message.setBatch(this); + messages.add(message); + } + + @Override + public Iterator iterator(){ + return messages.iterator(); + } + + @Override + public int getBatchSize() { + return batchSize; + } + + @Override + public void setBatchSize(int batchSize){ + this.batchSize = batchSize; + } + + @Override + public int size() { + return messages.size(); + } + + @Override + public boolean isEmpty() { + return 0 == messages.size(); + } + + @Override + public boolean isComplete() { + return size() == getBatchSize(); + } + + @Override + public void release() { + //no-op + } +} diff --git a/src/main/java/org/logstash/beats/V2Batch.java b/src/main/java/org/logstash/beats/V2Batch.java new file mode 100644 index 00000000..ffc03374 --- /dev/null +++ b/src/main/java/org/logstash/beats/V2Batch.java @@ -0,0 +1,95 @@ +package org.logstash.beats; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; + +import java.util.Iterator; + +/** + * Implementation of {@link Batch} for the v2 protocol backed by ByteBuf. *must* be released after use. + */ +public class V2Batch implements Batch { + private ByteBuf internalBuffer = PooledByteBufAllocator.DEFAULT.buffer(); + private int written = 0; + private int read = 0; + private static final int SIZE_OF_INT = 4; + private int batchSize; + + public void setProtocol(byte protocol){ + if (protocol != Protocol.VERSION_2){ + throw new IllegalArgumentException("Only version 2 protocol is supported"); + } + } + + @Override + public byte getProtocol() { + return Protocol.VERSION_2; + } + + public Iterator iterator(){ + internalBuffer.resetReaderIndex(); + return new Iterator() { + @Override + public boolean hasNext() { + return read < written; + } + + @Override + public Message next() { + int sequenceNumber = internalBuffer.readInt(); + int readableBytes = internalBuffer.readInt(); + Message message = new Message(sequenceNumber, internalBuffer.slice(internalBuffer.readerIndex(), readableBytes)); + internalBuffer.readerIndex(internalBuffer.readerIndex() + readableBytes); + message.setBatch(V2Batch.this); + read++; + return message; + } + }; + } + + @Override + public int getBatchSize() { + return batchSize; + } + + @Override + public void setBatchSize(final int batchSize) { + this.batchSize = batchSize; + } + + @Override + public int size() { + return written; + } + + @Override + public boolean isEmpty() { + return written == 0; + } + + @Override + public boolean isComplete() { + return written == batchSize; + } + + /** + * Adds a message to the batch, which will be constructed into an actual {@link Message} lazily. + * @param sequenceNumber sequence number of the message within the batch + * @param buffer A ByteBuf pointing to serialized JSon + * @param size size of the serialized Json + */ + void addMessage(int sequenceNumber, ByteBuf buffer, int size) { + written++; + if (internalBuffer.writableBytes() < size + (2 * SIZE_OF_INT)){ + internalBuffer.capacity(internalBuffer.capacity() + size + (2 * SIZE_OF_INT)); + } + internalBuffer.writeInt(sequenceNumber); + internalBuffer.writeInt(size); + buffer.readBytes(internalBuffer, size); + } + + @Override + public void release() { + internalBuffer.release(); + } +} diff --git a/src/test/java/org/logstash/beats/BatchEncoder.java b/src/test/java/org/logstash/beats/BatchEncoder.java index 138a033d..7b70012e 100644 --- a/src/test/java/org/logstash/beats/BatchEncoder.java +++ b/src/test/java/org/logstash/beats/BatchEncoder.java @@ -40,7 +40,7 @@ protected ByteBuf getPayload(ChannelHandlerContext ctx, Batch batch) throws IOEx ByteBuf payload = ctx.alloc().buffer(); // Aggregates the payload that we could decide to compress or not. - for(Message message : batch.getMessages()) { + for(Message message : batch) { if (batch.getProtocol() == Protocol.VERSION_2) { encodeMessageWithJson(payload, message); } else { @@ -55,7 +55,7 @@ private void encodeMessageWithJson(ByteBuf payload, Message message) throws Json payload.writeByte('J'); payload.writeInt(message.getSequence()); - byte[] json = BeatsParser.MAPPER.writeValueAsBytes(message.getData()); + byte[] json = Message.MAPPER.writeValueAsBytes(message.getData()); payload.writeInt(json.length); payload.writeBytes(json); } diff --git a/src/test/java/org/logstash/beats/BeatsHandlerTest.java b/src/test/java/org/logstash/beats/BeatsHandlerTest.java index 5596bdf4..3e06d081 100644 --- a/src/test/java/org/logstash/beats/BeatsHandlerTest.java +++ b/src/test/java/org/logstash/beats/BeatsHandlerTest.java @@ -18,7 +18,7 @@ public class BeatsHandlerTest { private SpyListener spyListener; private BeatsHandler beatsHandler; - private Batch batch; + private V1Batch batch; private class SpyListener implements IMessageListener { private boolean onNewConnectionCalled = false; @@ -79,9 +79,8 @@ public void setup() { Message message1 = new Message(1, new HashMap()); Message message2 = new Message(2, new HashMap()); - batch = new Batch(); + batch = new V1Batch(); batch.setBatchSize(2); - batch.setProtocol(Protocol.VERSION_1); batch.addMessage(message1); batch.addMessage(message2); @@ -120,7 +119,6 @@ public void TestItAckLastMessageFromBatch() { EmbeddedChannel embeddedChannel = new EmbeddedChannel(new BeatsHandler(spyListener)); embeddedChannel.writeInbound(batch); - embeddedChannel.close(); } } diff --git a/src/test/java/org/logstash/beats/BeatsParserTest.java b/src/test/java/org/logstash/beats/BeatsParserTest.java index f8d547d0..54ccbd7e 100644 --- a/src/test/java/org/logstash/beats/BeatsParserTest.java +++ b/src/test/java/org/logstash/beats/BeatsParserTest.java @@ -1,6 +1,8 @@ package org.logstash.beats; import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.embedded.EmbeddedChannel; @@ -11,6 +13,7 @@ import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; import java.util.Random; @@ -20,7 +23,10 @@ public class BeatsParserTest { - private Batch batch; + private V1Batch v1Batch; + private V2Batch byteBufBatch; + public final static ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); + private final int numberOfMessage = 20; @Rule @@ -28,63 +34,94 @@ public class BeatsParserTest { @Before - public void setup() { - this.batch = new Batch(); - this.batch.setProtocol(Protocol.VERSION_2); + public void setup() throws Exception{ + this.v1Batch = new V1Batch(); - for(int i = 0; i < numberOfMessage; i++) { + for(int i = 1; i <= numberOfMessage; i++) { Map map = new HashMap(); map.put("line", "Another world"); map.put("from", "Little big Adventure"); - Message message = new Message(i + 1, map); - this.batch.addMessage(message); + Message message = new Message(i, map); + this.v1Batch.addMessage(message); + } + + this.byteBufBatch = new V2Batch(); + + for(int i = 1; i <= numberOfMessage; i++) { + Map map = new HashMap(); + map.put("line", "Another world"); + map.put("from", "Little big Adventure"); + ByteBuf bytebuf = Unpooled.wrappedBuffer(MAPPER.writeValueAsBytes(map)); + this.byteBufBatch.addMessage(i, bytebuf, bytebuf.readableBytes()); } + } @Test public void testEncodingDecodingJson() { - Batch decodedBatch = decodeBatch(); - assertMessages(this.batch, decodedBatch); + Batch decodedBatch = decodeBatch(v1Batch); + assertMessages(v1Batch, decodedBatch); } @Test public void testCompressedEncodingDecodingJson() { - Batch decodedBatch = decodeCompressedBatch(); - assertMessages(this.batch, decodedBatch); + Batch decodedBatch = decodeCompressedBatch(v1Batch); + assertMessages(v1Batch, decodedBatch); } @Test public void testEncodingDecodingFields() { - this.batch.setProtocol(Protocol.VERSION_1); - Batch decodedBatch = decodeBatch(); - assertMessages(this.batch, decodedBatch); + Batch decodedBatch = decodeBatch(v1Batch); + assertMessages(v1Batch, decodedBatch); } @Test - public void testEncodingDecodingFieldWithUTFCharacters() { - this.batch = new Batch(); - this.batch.setProtocol(Protocol.VERSION_2); + public void testEncodingDecodingFieldWithUTFCharacters() throws Exception { + V2Batch v2Batch = new V2Batch(); + // Generate Data with Keys and String with UTF-8 for(int i = 0; i < numberOfMessage; i++) { + ByteBuf payload = Unpooled.buffer(); + + Map map = new HashMap(); + map.put("étoile", "mystère"); + map.put("from", "ÉeèAççï"); + + byte[] json = MAPPER.writeValueAsBytes(map); + payload.writeBytes(json); + + v2Batch.addMessage(i, payload, payload.readableBytes()); + } + + Batch decodedBatch = decodeBatch(v2Batch); + assertMessages(v2Batch, decodedBatch); + } + + @Test + public void testV1EncodingDecodingFieldWithUTFCharacters() { + V1Batch batch = new V1Batch(); + + // Generate Data with Keys and String with UTF-8 + for(int i = 0; i < numberOfMessage; i++) { + Map map = new HashMap(); map.put("étoile", "mystère"); map.put("from", "ÉeèAççï"); Message message = new Message(i + 1, map); - this.batch.addMessage(message); + batch.addMessage(message); } - Batch decodedBatch = decodeBatch(); - assertMessages(this.batch, decodedBatch); + Batch decodedBatch = decodeBatch(batch); + assertMessages(batch, decodedBatch); } @Test public void testCompressedEncodingDecodingFields() { - this.batch.setProtocol(Protocol.VERSION_1); - Batch decodedBatch = decodeCompressedBatch(); - assertMessages(this.batch, decodedBatch); + Batch decodedBatch = decodeCompressedBatch(v1Batch); + assertMessages(this.v1Batch, decodedBatch); } @Test @@ -160,7 +197,7 @@ private void sendInvalidJSonPayload(int size) throws JsonProcessingException { payload.writeInt(1); payload.writeInt(size); - byte[] json = BeatsParser.MAPPER.writeValueAsBytes(mapData); + byte[] json = MAPPER.writeValueAsBytes(mapData); payload.writeBytes(json); sendPayloadToParser(payload); @@ -174,14 +211,18 @@ private void sendPayloadToParser(ByteBuf payload) { } private void assertMessages(Batch expected, Batch actual) { + assertNotNull(actual); assertEquals(expected.size(), actual.size()); - for(int i=0; i < expected.size(); i++) { - assertEquals(expected.getMessages().get(i).getSequence(), actual.getMessages().get(i).getSequence()); + int i = 0; + Iterator expectedMessages = expected.iterator(); + for(Message actualMessage: actual) { + Message expectedMessage = expectedMessages.next(); + assertEquals(expectedMessage.getSequence(), actualMessage.getSequence()); - Map expectedData = expected.getMessages().get(i).getData(); - Map actualData = actual.getMessages().get(i).getData(); + Map expectedData = expectedMessage.getData(); + Map actualData = actualMessage.getData(); assertEquals(expectedData.size(), actualData.size()); @@ -195,18 +236,18 @@ private void assertMessages(Batch expected, Batch actual) { } } - private Batch decodeCompressedBatch() { + private Batch decodeCompressedBatch(Batch batch) { EmbeddedChannel channel = new EmbeddedChannel(new CompressedBatchEncoder(), new BeatsParser()); - channel.writeOutbound(this.batch); + channel.writeOutbound(batch); Object o = channel.readOutbound(); channel.writeInbound(o); return (Batch) channel.readInbound(); } - private Batch decodeBatch() { + private Batch decodeBatch(Batch batch) { EmbeddedChannel channel = new EmbeddedChannel(new BatchEncoder(), new BeatsParser()); - channel.writeOutbound(this.batch); + channel.writeOutbound(batch); Object o = channel.readOutbound(); channel.writeInbound(o); diff --git a/src/test/java/org/logstash/beats/ServerTest.java b/src/test/java/org/logstash/beats/ServerTest.java index 7d27feaf..37512cdc 100644 --- a/src/test/java/org/logstash/beats/ServerTest.java +++ b/src/test/java/org/logstash/beats/ServerTest.java @@ -1,6 +1,7 @@ package org.logstash.beats; import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; @@ -22,7 +23,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; -import static java.lang.Thread.currentThread; import static java.lang.Thread.sleep; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; @@ -242,7 +242,7 @@ public ChannelFuture connectClient() throws InterruptedException { public void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new BatchEncoder()); - pipeline.addLast(new DummySender()); + pipeline.addLast(new DummyV2Sender()); } } ); @@ -254,9 +254,9 @@ public void initChannel(SocketChannel ch) throws Exception { * A dummy class to send a unique batch to an active server * */ - private class DummySender extends SimpleChannelInboundHandler { + private class DummyV1Sender extends SimpleChannelInboundHandler { public void channelActive(ChannelHandlerContext ctx) { - Batch batch = new Batch(); + V1Batch batch = new V1Batch(); batch.setBatchSize(1); batch.addMessage(new Message(1, Collections.singletonMap("hello", "world"))); @@ -273,6 +273,31 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } } + /** + * A dummy class to send a unique batch to an active server + * + */ + private class DummyV2Sender extends SimpleChannelInboundHandler { + public void channelActive(ChannelHandlerContext ctx) { + V2Batch batch = new V2Batch(); + batch.setBatchSize(1); + ByteBuf contents = V2BatchTest.messageContents(); + batch.addMessage(1, contents, contents.readableBytes()); + + ctx.writeAndFlush(batch); + } + + @Override + protected void channelRead0(ChannelHandlerContext ctx, String msg) throws Exception { + } + + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception { + ctx.close(); + } + } + + /** * Used to assert the number of messages send to the server */ diff --git a/src/test/java/org/logstash/beats/BatchTest.java b/src/test/java/org/logstash/beats/V1BatchTest.java similarity index 68% rename from src/test/java/org/logstash/beats/BatchTest.java rename to src/test/java/org/logstash/beats/V1BatchTest.java index d2e16a36..d8be54c3 100644 --- a/src/test/java/org/logstash/beats/BatchTest.java +++ b/src/test/java/org/logstash/beats/V1BatchTest.java @@ -7,12 +7,12 @@ import static org.junit.Assert.*; -public class BatchTest { - private Batch batch; +public class V1BatchTest { + private V1Batch batch; @Before public void setUp() { - batch = new Batch(); + batch = new V1Batch(); } @Test @@ -30,13 +30,7 @@ public void testSize() { } @Test - public void TestGetDefaultProtocol() { - assertEquals(Protocol.VERSION_2, batch.getProtocol()); - } - - @Test - public void TestGetSetProtocol() { - batch.setProtocol(Protocol.VERSION_1); + public void TestGetProtocol() { assertEquals(Protocol.VERSION_1, batch.getProtocol()); } @@ -46,11 +40,11 @@ public void TestCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() { batch.setBatchSize(numberOfEvent); - for(int i = 0; i < numberOfEvent; i++) { - batch.addMessage(new Message(i + 1, new HashMap())); + for(int i = 1; i <= numberOfEvent; i++) { + batch.addMessage(new Message(i, new HashMap())); } - assertTrue(batch.complete()); + assertTrue(batch.isComplete()); } @Test @@ -61,6 +55,6 @@ public void TestCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() { batch.addMessage(new Message(1, new HashMap())); - assertFalse(batch.complete()); + assertFalse(batch.isComplete()); } } \ No newline at end of file diff --git a/src/test/java/org/logstash/beats/V2BatchTest.java b/src/test/java/org/logstash/beats/V2BatchTest.java new file mode 100644 index 00000000..45a668a1 --- /dev/null +++ b/src/test/java/org/logstash/beats/V2BatchTest.java @@ -0,0 +1,101 @@ +package org.logstash.beats; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.module.afterburner.AfterburnerModule; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.junit.Test; + +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +public class V2BatchTest { + public final static ObjectMapper MAPPER = new ObjectMapper().registerModule(new AfterburnerModule()); + + @Test + public void testIsEmpty() { + V2Batch batch = new V2Batch(); + assertTrue(batch.isEmpty()); + ByteBuf content = messageContents(); + batch.addMessage(1, content, content.readableBytes()); + assertFalse(batch.isEmpty()); + } + + @Test + public void testSize() { + V2Batch batch = new V2Batch(); + assertEquals(0, batch.size()); + ByteBuf content = messageContents(); + batch.addMessage(1, content, content.readableBytes()); + assertEquals(1, batch.size()); + } + + @Test + public void TestGetProtocol() { + assertEquals(Protocol.VERSION_2, new V2Batch().getProtocol()); + } + + @Test + public void TestCompleteReturnTrueWhenIReceiveTheSameAmountOfEvent() { + V2Batch batch = new V2Batch(); + int numberOfEvent = 2; + + batch.setBatchSize(numberOfEvent); + + for(int i = 1; i <= numberOfEvent; i++) { + ByteBuf content = messageContents(); + batch.addMessage(i, content, content.readableBytes()); + } + + assertTrue(batch.isComplete()); + } + + @Test + public void testBigBatch() { + V2Batch batch = new V2Batch(); + int size = 4096; + assertEquals(0, batch.size()); + try { + ByteBuf content = messageContents(); + for (int i = 0; i < size; i++) { + batch.addMessage(i, content, content.readableBytes()); + } + assertEquals(size, batch.size()); + int i = 0; + for (Message message : batch) { + assertEquals(message.getSequence(), i++); + } + }finally { + batch.release(); + } + } + + + @Test + public void TestCompleteReturnWhenTheNumberOfEventDoesntMatchBatchSize() { + V2Batch batch = new V2Batch(); + int numberOfEvent = 2; + + batch.setBatchSize(numberOfEvent); + ByteBuf content = messageContents(); + batch.addMessage(1, content, content.readableBytes()); + + assertFalse(batch.isComplete()); + } + + public static ByteBuf messageContents() { + Map test = new HashMap(); + test.put("key", "value"); + try { + byte[] bytes = MAPPER.writeValueAsBytes(test); + return Unpooled.wrappedBuffer(bytes); + } catch (Exception e){ + throw new RuntimeException(e); + } + } +} \ No newline at end of file